Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / hashing_helper.py
blobdee2f96c926d5ae01a4ff52b87f4966c3e472198
1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Helper functions for hashing functionality."""
17 import base64
18 import binascii
19 from hashlib import md5
20 import os
22 from boto import config
23 import crcmod
25 from gslib.exception import CommandException
26 from gslib.util import DEFAULT_FILE_BUFFER_SIZE
27 from gslib.util import MIN_SIZE_COMPUTE_LOGGING
28 from gslib.util import TRANSFER_BUFFER_SIZE
29 from gslib.util import UsingCrcmodExtension
32 SLOW_CRCMOD_WARNING = """
33 WARNING: You have requested checksumming but your crcmod installation isn't
34 using the module's C extension, so checksumming will run very slowly. For help
35 installing the extension, please see:
36 $ gsutil help crcmod
37 """
40 _SLOW_CRCMOD_DOWNLOAD_WARNING = """
41 WARNING: Downloading this composite object requires integrity checking with
42 CRC32c, but your crcmod installation isn't using the module's C extension,
43 so the hash computation will likely throttle download performance. For help
44 installing the extension, please see:
45 $ gsutil help crcmod
46 To disable slow integrity checking, see the "check_hashes" option in your
47 boto config file.
48 """
50 _SLOW_CRC_EXCEPTION_TEXT = """
51 Downloading this composite object requires integrity checking with CRC32c,
52 but your crcmod installation isn't using the module's C extension, so the
53 hash computation will likely throttle download performance. For help
54 installing the extension, please see:
56 $ gsutil help crcmod
58 To download regardless of crcmod performance or to skip slow integrity
59 checks, see the "check_hashes" option in your boto config file.
61 NOTE: It is strongly recommended that you not disable integrity checks. Doing so
62 could allow data corruption to go undetected during uploading/downloading."""
65 _NO_HASH_CHECK_WARNING = """
66 WARNING: This download will not be validated since your crcmod installation
67 doesn't use the module's C extension, so the hash computation would likely
68 throttle download performance. For help in installing the extension, please
69 see:
70 $ gsutil help crcmod
71 To force integrity checking, see the "check_hashes" option in your boto config
72 file.
73 """
76 # Configuration values for hashing.
77 CHECK_HASH_IF_FAST_ELSE_FAIL = 'if_fast_else_fail'
78 CHECK_HASH_IF_FAST_ELSE_SKIP = 'if_fast_else_skip'
79 CHECK_HASH_ALWAYS = 'always'
80 CHECK_HASH_NEVER = 'never'
83 def _CalculateHashFromContents(fp, hash_alg):
84 """Calculates a base64 digest of the contents of a seekable stream.
86 This function resets the file pointer to position 0.
88 Args:
89 fp: An already-open file object.
90 hash_alg: Instance of hashing class initialized to start state.
92 Returns:
93 Hash of the stream in hex string format.
94 """
95 hash_dict = {'placeholder': hash_alg}
96 fp.seek(0)
97 CalculateHashesFromContents(fp, hash_dict)
98 fp.seek(0)
99 return hash_dict['placeholder'].hexdigest()
102 def CalculateHashesFromContents(fp, hash_dict, callback_processor=None):
103 """Calculates hashes of the contents of a file.
105 Args:
106 fp: An already-open file object (stream will be consumed).
107 hash_dict: Dict of (string alg_name: initialized hashing class)
108 Hashing class will be populated with digests upon return.
109 callback_processor: Optional callback processing class that implements
110 Progress(integer amount of bytes processed).
112 while True:
113 data = fp.read(DEFAULT_FILE_BUFFER_SIZE)
114 if not data:
115 break
116 for hash_alg in hash_dict.itervalues():
117 hash_alg.update(data)
118 if callback_processor:
119 callback_processor.Progress(len(data))
122 def CalculateB64EncodedCrc32cFromContents(fp):
123 """Calculates a base64 CRC32c checksum of the contents of a seekable stream.
125 This function sets the stream position 0 before and after calculation.
127 Args:
128 fp: An already-open file object.
130 Returns:
131 CRC32c checksum of the file in base64 format.
133 return _CalculateB64EncodedHashFromContents(
134 fp, crcmod.predefined.Crc('crc-32c'))
137 def CalculateB64EncodedMd5FromContents(fp):
138 """Calculates a base64 MD5 digest of the contents of a seekable stream.
140 This function sets the stream position 0 before and after calculation.
142 Args:
143 fp: An already-open file object.
145 Returns:
146 MD5 digest of the file in base64 format.
148 return _CalculateB64EncodedHashFromContents(fp, md5())
151 def CalculateMd5FromContents(fp):
152 """Calculates a base64 MD5 digest of the contents of a seekable stream.
154 This function sets the stream position 0 before and after calculation.
156 Args:
157 fp: An already-open file object.
159 Returns:
160 MD5 digest of the file in hex format.
162 return _CalculateHashFromContents(fp, md5())
165 def Base64EncodeHash(digest_value):
166 """Returns the base64-encoded version of the input hex digest value."""
167 return base64.encodestring(binascii.unhexlify(digest_value)).rstrip('\n')
170 def Base64ToHexHash(base64_hash):
171 """Returns the hex digest value of the input base64-encoded hash.
173 Args:
174 base64_hash: Base64-encoded hash, which may contain newlines and single or
175 double quotes.
177 Returns:
178 Hex digest of the input argument.
180 return binascii.hexlify(base64.decodestring(base64_hash.strip('\n"\'')))
183 def _CalculateB64EncodedHashFromContents(fp, hash_alg):
184 """Calculates a base64 digest of the contents of a seekable stream.
186 This function sets the stream position 0 before and after calculation.
188 Args:
189 fp: An already-open file object.
190 hash_alg: Instance of hashing class initialized to start state.
192 Returns:
193 Hash of the stream in base64 format.
195 return Base64EncodeHash(_CalculateHashFromContents(fp, hash_alg))
198 def GetUploadHashAlgs():
199 """Returns a dict of hash algorithms for validating an uploaded object.
201 This is for use only with single object uploads, not compose operations
202 such as those used by parallel composite uploads (though it can be used to
203 validate the individual components).
205 Returns:
206 dict of (algorithm_name: hash_algorithm)
208 check_hashes_config = config.get(
209 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
210 if check_hashes_config == 'never':
211 return {}
212 return {'md5': md5}
215 def GetDownloadHashAlgs(logger, src_has_md5=False, src_has_crc32c=False):
216 """Returns a dict of hash algorithms for validating an object.
218 Args:
219 logger: logging.Logger for outputting log messages.
220 src_has_md5: If True, source object has an md5 hash.
221 src_has_crc32c: If True, source object has a crc32c hash.
223 Returns:
224 Dict of (string, hash algorithm).
226 Raises:
227 CommandException if hash algorithms satisfying the boto config file
228 cannot be returned.
230 check_hashes_config = config.get(
231 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL)
232 if check_hashes_config == CHECK_HASH_NEVER:
233 return {}
235 hash_algs = {}
236 if src_has_md5:
237 hash_algs['md5'] = md5
238 elif src_has_crc32c:
239 # If the cloud provider supplies a CRC, we'll compute a checksum to
240 # validate if we're using a native crcmod installation and MD5 isn't
241 # offered as an alternative.
242 if UsingCrcmodExtension(crcmod):
243 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
244 elif not hash_algs:
245 if check_hashes_config == CHECK_HASH_IF_FAST_ELSE_FAIL:
246 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT)
247 elif check_hashes_config == CHECK_HASH_IF_FAST_ELSE_SKIP:
248 logger.warn(_NO_HASH_CHECK_WARNING)
249 elif check_hashes_config == CHECK_HASH_ALWAYS:
250 logger.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING)
251 hash_algs['crc32c'] = lambda: crcmod.predefined.Crc('crc-32c')
252 else:
253 raise CommandException(
254 'Your boto config \'check_hashes\' option is misconfigured.')
256 return hash_algs
259 class HashingFileUploadWrapper(object):
260 """Wraps an input stream in a hash digester and exposes a stream interface.
262 This class provides integrity checking during file uploads via the
263 following properties:
265 Calls to read will appropriately update digesters with all bytes read.
266 Calls to seek (assuming it is supported by the wrapped stream) using
267 os.SEEK_SET will catch up / reset the digesters to the specified
268 position. If seek is called with a different os.SEEK mode, the caller
269 must return to the original position using os.SEEK_SET before further
270 reads.
271 Calls to seek are fast if the desired position is equal to the position at
272 the beginning of the last read call (we only need to re-hash bytes
273 from that point on).
276 def __init__(self, stream, digesters, hash_algs, src_url, logger):
277 """Initializes the wrapper.
279 Args:
280 stream: Input stream.
281 digesters: dict of {string: hash digester} containing digesters, where
282 string is the name of the hash algorithm.
283 hash_algs: dict of {string: hash algorithm} for resetting and
284 recalculating digesters. String is the name of the hash algorithm.
285 src_url: Source FileUrl that is being copied.
286 logger: For outputting log messages.
288 if not digesters:
289 raise CommandException('HashingFileUploadWrapper used with no digesters.')
290 elif not hash_algs:
291 raise CommandException('HashingFileUploadWrapper used with no hash_algs.')
293 self._orig_fp = stream
294 self._digesters = digesters
295 self._src_url = src_url
296 self._logger = logger
297 self._seek_away = None
299 self._digesters_previous = {}
300 for alg in self._digesters:
301 self._digesters_previous[alg] = self._digesters[alg].copy()
302 self._digesters_previous_mark = 0
303 self._digesters_current_mark = 0
304 self._hash_algs = hash_algs
306 def read(self, size=-1): # pylint: disable=invalid-name
307 """"Reads from the wrapped file pointer and calculates hash digests.
309 Args:
310 size: The amount of bytes to read. If ommited or negative, the entire
311 contents of the file will be read, hashed, and returned.
313 Returns:
314 Bytes from the wrapped stream.
316 Raises:
317 CommandException if the position of the wrapped stream is unknown.
319 if self._seek_away is not None:
320 raise CommandException('Read called on hashing file pointer in an '
321 'unknown position; cannot correctly compute '
322 'digest.')
324 data = self._orig_fp.read(size)
325 self._digesters_previous_mark = self._digesters_current_mark
326 for alg in self._digesters:
327 self._digesters_previous[alg] = self._digesters[alg].copy()
328 self._digesters[alg].update(data)
329 self._digesters_current_mark += len(data)
330 return data
332 def tell(self): # pylint: disable=invalid-name
333 """Returns the current stream position."""
334 return self._orig_fp.tell()
336 def seekable(self): # pylint: disable=invalid-name
337 """Returns true if the stream is seekable."""
338 return self._orig_fp.seekable()
340 def seek(self, offset, whence=os.SEEK_SET): # pylint: disable=invalid-name
341 """Seeks in the wrapped file pointer and catches up hash digests.
343 Args:
344 offset: The offset to seek to.
345 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET.
347 Returns:
348 Return value from the wrapped stream's seek call.
350 if whence != os.SEEK_SET:
351 # We do not catch up hashes for non-absolute seeks, and rely on the
352 # caller to seek to an absolute position before reading.
353 self._seek_away = self._orig_fp.tell()
355 else:
356 # Hashes will be correct and it's safe to call read().
357 self._seek_away = None
358 if offset < self._digesters_previous_mark:
359 # This is earlier than our earliest saved digest, so we need to
360 # reset the digesters and scan from the beginning.
361 for alg in self._digesters:
362 self._digesters[alg] = self._hash_algs[alg]()
363 self._digesters_current_mark = 0
364 self._orig_fp.seek(0)
365 self._CatchUp(offset)
367 elif offset == self._digesters_previous_mark:
368 # Just load the saved digests.
369 self._digesters_current_mark = self._digesters_previous_mark
370 for alg in self._digesters:
371 self._digesters[alg] = self._digesters_previous[alg]
373 elif offset < self._digesters_current_mark:
374 # Reset the position to our previous digest and scan forward.
375 self._digesters_current_mark = self._digesters_previous_mark
376 for alg in self._digesters:
377 self._digesters[alg] = self._digesters_previous[alg]
378 self._orig_fp.seek(self._digesters_previous_mark)
379 self._CatchUp(offset - self._digesters_previous_mark)
381 else:
382 # Scan forward from our current digest and position.
383 self._orig_fp.seek(self._digesters_current_mark)
384 self._CatchUp(offset - self._digesters_current_mark)
386 return self._orig_fp.seek(offset, whence)
388 def _CatchUp(self, bytes_to_read):
389 """Catches up hashes, but does not return data and uses little memory.
391 Before calling this function, digesters_current_mark should be updated
392 to the current location of the original stream and the self._digesters
393 should be current to that point (but no further).
395 Args:
396 bytes_to_read: Number of bytes to catch up from the original stream.
398 if self._orig_fp.tell() != self._digesters_current_mark:
399 raise CommandException(
400 'Invalid mark when catching up hashes. Stream position %s, hash '
401 'position %s' % (self._orig_fp.tell(), self._digesters_current_mark))
403 for alg in self._digesters:
404 if bytes_to_read >= MIN_SIZE_COMPUTE_LOGGING:
405 self._logger.info('Catching up %s for %s...', alg,
406 self._src_url.url_string)
407 self._digesters_previous[alg] = self._digesters[alg].copy()
409 self._digesters_previous_mark = self._digesters_current_mark
410 bytes_remaining = bytes_to_read
411 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
412 while bytes_this_round:
413 data = self._orig_fp.read(bytes_this_round)
414 bytes_remaining -= bytes_this_round
415 for alg in self._digesters:
416 self._digesters[alg].update(data)
417 bytes_this_round = min(bytes_remaining, TRANSFER_BUFFER_SIZE)
418 self._digesters_current_mark += bytes_to_read