1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Helper functions for hashing functionality."""
19 from hashlib
import md5
22 from boto
import config
25 from gslib
.exception
import CommandException
26 from gslib
.util
import DEFAULT_FILE_BUFFER_SIZE
27 from gslib
.util
import MIN_SIZE_COMPUTE_LOGGING
28 from gslib
.util
import TRANSFER_BUFFER_SIZE
29 from gslib
.util
import UsingCrcmodExtension
32 SLOW_CRCMOD_WARNING
= """
33 WARNING: You have requested checksumming but your crcmod installation isn't
34 using the module's C extension, so checksumming will run very slowly. For help
35 installing the extension, please see:
40 _SLOW_CRCMOD_DOWNLOAD_WARNING
= """
41 WARNING: Downloading this composite object requires integrity checking with
42 CRC32c, but your crcmod installation isn't using the module's C extension,
43 so the hash computation will likely throttle download performance. For help
44 installing the extension, please see:
46 To disable slow integrity checking, see the "check_hashes" option in your
50 _SLOW_CRC_EXCEPTION_TEXT
= """
51 Downloading this composite object requires integrity checking with CRC32c,
52 but your crcmod installation isn't using the module's C extension, so the
53 hash computation will likely throttle download performance. For help
54 installing the extension, please see:
58 To download regardless of crcmod performance or to skip slow integrity
59 checks, see the "check_hashes" option in your boto config file.
61 NOTE: It is strongly recommended that you not disable integrity checks. Doing so
62 could allow data corruption to go undetected during uploading/downloading."""
65 _NO_HASH_CHECK_WARNING
= """
66 WARNING: This download will not be validated since your crcmod installation
67 doesn't use the module's C extension, so the hash computation would likely
68 throttle download performance. For help in installing the extension, please
71 To force integrity checking, see the "check_hashes" option in your boto config
76 # Configuration values for hashing.
77 CHECK_HASH_IF_FAST_ELSE_FAIL
= 'if_fast_else_fail'
78 CHECK_HASH_IF_FAST_ELSE_SKIP
= 'if_fast_else_skip'
79 CHECK_HASH_ALWAYS
= 'always'
80 CHECK_HASH_NEVER
= 'never'
83 def _CalculateHashFromContents(fp
, hash_alg
):
84 """Calculates a base64 digest of the contents of a seekable stream.
86 This function resets the file pointer to position 0.
89 fp: An already-open file object.
90 hash_alg: Instance of hashing class initialized to start state.
93 Hash of the stream in hex string format.
95 hash_dict
= {'placeholder': hash_alg
}
97 CalculateHashesFromContents(fp
, hash_dict
)
99 return hash_dict
['placeholder'].hexdigest()
102 def CalculateHashesFromContents(fp
, hash_dict
, callback_processor
=None):
103 """Calculates hashes of the contents of a file.
106 fp: An already-open file object (stream will be consumed).
107 hash_dict: Dict of (string alg_name: initialized hashing class)
108 Hashing class will be populated with digests upon return.
109 callback_processor: Optional callback processing class that implements
110 Progress(integer amount of bytes processed).
113 data
= fp
.read(DEFAULT_FILE_BUFFER_SIZE
)
116 for hash_alg
in hash_dict
.itervalues():
117 hash_alg
.update(data
)
118 if callback_processor
:
119 callback_processor
.Progress(len(data
))
122 def CalculateB64EncodedCrc32cFromContents(fp
):
123 """Calculates a base64 CRC32c checksum of the contents of a seekable stream.
125 This function sets the stream position 0 before and after calculation.
128 fp: An already-open file object.
131 CRC32c checksum of the file in base64 format.
133 return _CalculateB64EncodedHashFromContents(
134 fp
, crcmod
.predefined
.Crc('crc-32c'))
137 def CalculateB64EncodedMd5FromContents(fp
):
138 """Calculates a base64 MD5 digest of the contents of a seekable stream.
140 This function sets the stream position 0 before and after calculation.
143 fp: An already-open file object.
146 MD5 digest of the file in base64 format.
148 return _CalculateB64EncodedHashFromContents(fp
, md5())
151 def CalculateMd5FromContents(fp
):
152 """Calculates a base64 MD5 digest of the contents of a seekable stream.
154 This function sets the stream position 0 before and after calculation.
157 fp: An already-open file object.
160 MD5 digest of the file in hex format.
162 return _CalculateHashFromContents(fp
, md5())
165 def Base64EncodeHash(digest_value
):
166 """Returns the base64-encoded version of the input hex digest value."""
167 return base64
.encodestring(binascii
.unhexlify(digest_value
)).rstrip('\n')
170 def Base64ToHexHash(base64_hash
):
171 """Returns the hex digest value of the input base64-encoded hash.
174 base64_hash: Base64-encoded hash, which may contain newlines and single or
178 Hex digest of the input argument.
180 return binascii
.hexlify(base64
.decodestring(base64_hash
.strip('\n"\'')))
183 def _CalculateB64EncodedHashFromContents(fp
, hash_alg
):
184 """Calculates a base64 digest of the contents of a seekable stream.
186 This function sets the stream position 0 before and after calculation.
189 fp: An already-open file object.
190 hash_alg: Instance of hashing class initialized to start state.
193 Hash of the stream in base64 format.
195 return Base64EncodeHash(_CalculateHashFromContents(fp
, hash_alg
))
198 def GetUploadHashAlgs():
199 """Returns a dict of hash algorithms for validating an uploaded object.
201 This is for use only with single object uploads, not compose operations
202 such as those used by parallel composite uploads (though it can be used to
203 validate the individual components).
206 dict of (algorithm_name: hash_algorithm)
208 check_hashes_config
= config
.get(
209 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL
)
210 if check_hashes_config
== 'never':
215 def GetDownloadHashAlgs(logger
, src_has_md5
=False, src_has_crc32c
=False):
216 """Returns a dict of hash algorithms for validating an object.
219 logger: logging.Logger for outputting log messages.
220 src_has_md5: If True, source object has an md5 hash.
221 src_has_crc32c: If True, source object has a crc32c hash.
224 Dict of (string, hash algorithm).
227 CommandException if hash algorithms satisfying the boto config file
230 check_hashes_config
= config
.get(
231 'GSUtil', 'check_hashes', CHECK_HASH_IF_FAST_ELSE_FAIL
)
232 if check_hashes_config
== CHECK_HASH_NEVER
:
237 hash_algs
['md5'] = md5
239 # If the cloud provider supplies a CRC, we'll compute a checksum to
240 # validate if we're using a native crcmod installation and MD5 isn't
241 # offered as an alternative.
242 if UsingCrcmodExtension(crcmod
):
243 hash_algs
['crc32c'] = lambda: crcmod
.predefined
.Crc('crc-32c')
245 if check_hashes_config
== CHECK_HASH_IF_FAST_ELSE_FAIL
:
246 raise CommandException(_SLOW_CRC_EXCEPTION_TEXT
)
247 elif check_hashes_config
== CHECK_HASH_IF_FAST_ELSE_SKIP
:
248 logger
.warn(_NO_HASH_CHECK_WARNING
)
249 elif check_hashes_config
== CHECK_HASH_ALWAYS
:
250 logger
.warn(_SLOW_CRCMOD_DOWNLOAD_WARNING
)
251 hash_algs
['crc32c'] = lambda: crcmod
.predefined
.Crc('crc-32c')
253 raise CommandException(
254 'Your boto config \'check_hashes\' option is misconfigured.')
259 class HashingFileUploadWrapper(object):
260 """Wraps an input stream in a hash digester and exposes a stream interface.
262 This class provides integrity checking during file uploads via the
263 following properties:
265 Calls to read will appropriately update digesters with all bytes read.
266 Calls to seek (assuming it is supported by the wrapped stream) using
267 os.SEEK_SET will catch up / reset the digesters to the specified
268 position. If seek is called with a different os.SEEK mode, the caller
269 must return to the original position using os.SEEK_SET before further
271 Calls to seek are fast if the desired position is equal to the position at
272 the beginning of the last read call (we only need to re-hash bytes
276 def __init__(self
, stream
, digesters
, hash_algs
, src_url
, logger
):
277 """Initializes the wrapper.
280 stream: Input stream.
281 digesters: dict of {string: hash digester} containing digesters, where
282 string is the name of the hash algorithm.
283 hash_algs: dict of {string: hash algorithm} for resetting and
284 recalculating digesters. String is the name of the hash algorithm.
285 src_url: Source FileUrl that is being copied.
286 logger: For outputting log messages.
289 raise CommandException('HashingFileUploadWrapper used with no digesters.')
291 raise CommandException('HashingFileUploadWrapper used with no hash_algs.')
293 self
._orig
_fp
= stream
294 self
._digesters
= digesters
295 self
._src
_url
= src_url
296 self
._logger
= logger
297 self
._seek
_away
= None
299 self
._digesters
_previous
= {}
300 for alg
in self
._digesters
:
301 self
._digesters
_previous
[alg
] = self
._digesters
[alg
].copy()
302 self
._digesters
_previous
_mark
= 0
303 self
._digesters
_current
_mark
= 0
304 self
._hash
_algs
= hash_algs
306 def read(self
, size
=-1): # pylint: disable=invalid-name
307 """"Reads from the wrapped file pointer and calculates hash digests.
310 size: The amount of bytes to read. If ommited or negative, the entire
311 contents of the file will be read, hashed, and returned.
314 Bytes from the wrapped stream.
317 CommandException if the position of the wrapped stream is unknown.
319 if self
._seek
_away
is not None:
320 raise CommandException('Read called on hashing file pointer in an '
321 'unknown position; cannot correctly compute '
324 data
= self
._orig
_fp
.read(size
)
325 self
._digesters
_previous
_mark
= self
._digesters
_current
_mark
326 for alg
in self
._digesters
:
327 self
._digesters
_previous
[alg
] = self
._digesters
[alg
].copy()
328 self
._digesters
[alg
].update(data
)
329 self
._digesters
_current
_mark
+= len(data
)
332 def tell(self
): # pylint: disable=invalid-name
333 """Returns the current stream position."""
334 return self
._orig
_fp
.tell()
336 def seekable(self
): # pylint: disable=invalid-name
337 """Returns true if the stream is seekable."""
338 return self
._orig
_fp
.seekable()
340 def seek(self
, offset
, whence
=os
.SEEK_SET
): # pylint: disable=invalid-name
341 """Seeks in the wrapped file pointer and catches up hash digests.
344 offset: The offset to seek to.
345 whence: os.SEEK_CUR, or SEEK_END, SEEK_SET.
348 Return value from the wrapped stream's seek call.
350 if whence
!= os
.SEEK_SET
:
351 # We do not catch up hashes for non-absolute seeks, and rely on the
352 # caller to seek to an absolute position before reading.
353 self
._seek
_away
= self
._orig
_fp
.tell()
356 # Hashes will be correct and it's safe to call read().
357 self
._seek
_away
= None
358 if offset
< self
._digesters
_previous
_mark
:
359 # This is earlier than our earliest saved digest, so we need to
360 # reset the digesters and scan from the beginning.
361 for alg
in self
._digesters
:
362 self
._digesters
[alg
] = self
._hash
_algs
[alg
]()
363 self
._digesters
_current
_mark
= 0
364 self
._orig
_fp
.seek(0)
365 self
._CatchUp
(offset
)
367 elif offset
== self
._digesters
_previous
_mark
:
368 # Just load the saved digests.
369 self
._digesters
_current
_mark
= self
._digesters
_previous
_mark
370 for alg
in self
._digesters
:
371 self
._digesters
[alg
] = self
._digesters
_previous
[alg
]
373 elif offset
< self
._digesters
_current
_mark
:
374 # Reset the position to our previous digest and scan forward.
375 self
._digesters
_current
_mark
= self
._digesters
_previous
_mark
376 for alg
in self
._digesters
:
377 self
._digesters
[alg
] = self
._digesters
_previous
[alg
]
378 self
._orig
_fp
.seek(self
._digesters
_previous
_mark
)
379 self
._CatchUp
(offset
- self
._digesters
_previous
_mark
)
382 # Scan forward from our current digest and position.
383 self
._orig
_fp
.seek(self
._digesters
_current
_mark
)
384 self
._CatchUp
(offset
- self
._digesters
_current
_mark
)
386 return self
._orig
_fp
.seek(offset
, whence
)
388 def _CatchUp(self
, bytes_to_read
):
389 """Catches up hashes, but does not return data and uses little memory.
391 Before calling this function, digesters_current_mark should be updated
392 to the current location of the original stream and the self._digesters
393 should be current to that point (but no further).
396 bytes_to_read: Number of bytes to catch up from the original stream.
398 if self
._orig
_fp
.tell() != self
._digesters
_current
_mark
:
399 raise CommandException(
400 'Invalid mark when catching up hashes. Stream position %s, hash '
401 'position %s' % (self
._orig
_fp
.tell(), self
._digesters
_current
_mark
))
403 for alg
in self
._digesters
:
404 if bytes_to_read
>= MIN_SIZE_COMPUTE_LOGGING
:
405 self
._logger
.info('Catching up %s for %s...', alg
,
406 self
._src
_url
.url_string
)
407 self
._digesters
_previous
[alg
] = self
._digesters
[alg
].copy()
409 self
._digesters
_previous
_mark
= self
._digesters
_current
_mark
410 bytes_remaining
= bytes_to_read
411 bytes_this_round
= min(bytes_remaining
, TRANSFER_BUFFER_SIZE
)
412 while bytes_this_round
:
413 data
= self
._orig
_fp
.read(bytes_this_round
)
414 bytes_remaining
-= bytes_this_round
415 for alg
in self
._digesters
:
416 self
._digesters
[alg
].update(data
)
417 bytes_this_round
= min(bytes_remaining
, TRANSFER_BUFFER_SIZE
)
418 self
._digesters
_current
_mark
+= bytes_to_read