From d6a1c57c3c2f65e59c9d070449e2e7837fcabc80 Mon Sep 17 00:00:00 2001 From: David Heidelberg Date: Sun, 22 Jan 2023 00:52:33 +0100 Subject: [PATCH] Implementation of verifying multi-part uploaded files To be squashed with previous commit. Also increase chunk size to match upload chunk size of s3cp (currently 10M, even with 10 parallel traces downloading at once, 100M should have every runner available). Reviwed-by: Guilherme Gallo Signed-off-by: David Heidelberg Part-of: --- framework/replay/download_utils.py | 45 +++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 13 deletions(-) diff --git a/framework/replay/download_utils.py b/framework/replay/download_utils.py index 3fd5ce075..fce2a4cfe 100644 --- a/framework/replay/download_utils.py +++ b/framework/replay/download_utils.py @@ -30,7 +30,7 @@ import xml.etree.ElementTree as ET from email.utils import formatdate from os import path from pathlib import Path -from typing import Any, Dict +from typing import Any, Dict, List from urllib.parse import urlparse import requests @@ -118,6 +118,20 @@ def get_jwt_authorization_headers(url, resource): return headers +def calc_etags(inputfile: Path, partsize: int = 10 * 1024 * 1024) -> List[str]: + '''Calculate e-tag generated by FDO upload script (s3cp).''' + md5 = hashlib.md5() + md5_digests = [] + with open(inputfile, 'rb') as file: + for chunk in iter(lambda: file.read(partsize), b''): + md5.update(chunk) + md5_digests.append(hashlib.md5(chunk).digest()) + return [ + hashlib.md5(b''.join(md5_digests)).hexdigest() + '-' + str(len(md5_digests)), + md5.hexdigest() + ] + + @core.timer_ms def download(url: str, file_path: str, headers: Dict[str, str], attempts: int = 2) -> None: """Downloads a URL content into a file @@ -142,32 +156,40 @@ def download(url: str, file_path: str, headers: Dict[str, str], attempts: int = file_adapter = LocalFileAdapter() session.mount(protocol, file_adapter) - local_file_checksum = hashlib.md5() + md5 = hashlib.md5() + local_file_checksums: List[Any] = [] + md5_digests = [] with session.get(url, allow_redirects=True, stream=True, headers=headers) as response: with open(file_path, "wb") as file: response.raise_for_status() - for chunk in response.iter_content(chunk_size=1048576): + # chuck_size must be equal to s3cp upload chunk for md5 digest to match + for chunk in response.iter_content(chunk_size=10 * 1024 * 1024): if chunk: file.write(chunk) - local_file_checksum.update(chunk) + md5.update(chunk) + md5_digests.append(hashlib.md5(chunk).digest()) + local_file_checksums = [ + hashlib.md5(b''.join(md5_digests)).hexdigest() + '-' + str(len(md5_digests)), + md5.hexdigest() + ] - verify_file_integrity(file_path, response.headers, local_file_checksum.hexdigest()) + verify_file_integrity(file_path, response.headers, local_file_checksums) -def verify_file_integrity(file_path: str, headers: Any, local_file_checksum: str) -> None: +def verify_file_integrity(file_path: str, headers: Any, local_file_checksums: Any) -> None: """ :param file_path: path to the local file :param headers: reference to the request - :param local_file_checksum: already generated MD5 + :param local_file_checksums: list of already generated MD5 """ try: remote_file_checksum: str = headers["etag"].strip('\"').lower() - if remote_file_checksum != local_file_checksum: + if remote_file_checksum not in local_file_checksums: raise exceptions.PiglitFatalError( - f"MD5 checksum {local_file_checksum} " + f"MD5 checksum {local_file_checksums} " f"doesn't match remote ETag MD5 {remote_file_checksum}" ) except KeyError: @@ -196,11 +218,8 @@ def verify_local_file_checksum(url, file_path, headers, destination_file_path): end=" ", flush=True, ) - local_file_checksum = hashlib.md5( - Path(destination_file_path).read_bytes() - ).hexdigest() verify_file_integrity( - destination_file_path, remote_headers, local_file_checksum + destination_file_path, remote_headers, calc_etags(destination_file_path) ) print(f"[check_image] Requesting headers for {file_path}", end=" ", flush=True) -- 2.11.4.GIT