Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / commands / rsync.py
blob4eb9b92e92ad9eb1155393634fc999e8f3cbb7e0
1 # -*- coding: utf-8 -*-
2 # Copyright 2014 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Implementation of Unix-like rsync command."""
17 from __future__ import absolute_import
19 import errno
20 import heapq
21 import io
22 from itertools import islice
23 import os
24 import re
25 import tempfile
26 import textwrap
27 import traceback
28 import urllib
30 from boto import config
31 import crcmod
33 from gslib import copy_helper
34 from gslib.cloud_api import NotFoundException
35 from gslib.command import Command
36 from gslib.command import DummyArgChecker
37 from gslib.command_argument import CommandArgument
38 from gslib.copy_helper import CreateCopyHelperOpts
39 from gslib.copy_helper import SkipUnsupportedObjectError
40 from gslib.cs_api_map import ApiSelector
41 from gslib.exception import CommandException
42 from gslib.hashing_helper import CalculateB64EncodedCrc32cFromContents
43 from gslib.hashing_helper import CalculateB64EncodedMd5FromContents
44 from gslib.hashing_helper import SLOW_CRCMOD_WARNING
45 from gslib.plurality_checkable_iterator import PluralityCheckableIterator
46 from gslib.sig_handling import GetCaughtSignals
47 from gslib.sig_handling import RegisterSignalHandler
48 from gslib.storage_url import StorageUrlFromString
49 from gslib.util import GetCloudApiInstance
50 from gslib.util import IsCloudSubdirPlaceholder
51 from gslib.util import TEN_MIB
52 from gslib.util import UsingCrcmodExtension
53 from gslib.util import UTF8
54 from gslib.wildcard_iterator import CreateWildcardIterator
57 _SYNOPSIS = """
58 gsutil rsync [-c] [-C] [-d] [-e] [-n] [-p] [-r] [-U] [-x] src_url dst_url
59 """
61 _DETAILED_HELP_TEXT = ("""
62 <B>SYNOPSIS</B>
63 """ + _SYNOPSIS + """
66 <B>DESCRIPTION</B>
67 The gsutil rsync command makes the contents under dst_url the same as the
68 contents under src_url, by copying any missing files/objects, and (if the
69 -d option is specified) deleting any extra files/objects. For example, to
70 make gs://mybucket/data match the contents of the local directory "data"
71 you could do:
73 gsutil rsync -d data gs://mybucket/data
75 To recurse into directories use the -r option:
77 gsutil rsync -d -r data gs://mybucket/data
79 To copy only new/changed files without deleting extra files from
80 gs://mybucket/data leave off the -d option:
82 gsutil rsync -r data gs://mybucket/data
84 If you have a large number of objects to synchronize you might want to use the
85 gsutil -m option, to perform parallel (multi-threaded/multi-processing)
86 synchronization:
88 gsutil -m rsync -d -r data gs://mybucket/data
90 The -m option typically will provide a large performance boost if either the
91 source or destination (or both) is a cloud URL. If both source and
92 destination are file URLs the -m option will typically thrash the disk and
93 slow synchronization down.
95 To make the local directory "data" the same as the contents of
96 gs://mybucket/data:
98 gsutil rsync -d -r gs://mybucket/data data
100 To make the contents of gs://mybucket2 the same as gs://mybucket1:
102 gsutil rsync -d -r gs://mybucket1 gs://mybucket2
104 You can also mirror data across local directories:
106 gsutil rsync -d -r dir1 dir2
108 To mirror your content across clouds:
110 gsutil rsync -d -r gs://my-gs-bucket s3://my-s3-bucket
112 Note: If you are synchronizing a large amount of data between clouds you might
113 consider setting up a
114 `Google Compute Engine <https://cloud.google.com/products/compute-engine>`_
115 account and running gsutil there. Since cross-provider gsutil data transfers
116 flow through the machine where gsutil is running, doing this can make your
117 transfer run significantly faster than running gsutil on your local
118 workstation.
121 <B>BE CAREFUL WHEN USING -d OPTION!</B>
122 The rsync -d option is very useful and commonly used, because it provides a
123 means of making the contents of a destination bucket or directory match those
124 of a source bucket or directory. However, please exercise caution when you
125 use this option: It's possible to delete large amounts of data accidentally
126 if, for example, you erroneously reverse source and destination. For example,
127 if you meant to synchronize a local directory from a bucket in the cloud but
128 instead run the command:
130 gsutil -m rsync -r -d ./your-dir gs://your-bucket
132 and your-dir is currently empty, you will quickly delete all of the objects in
133 gs://your-bucket.
135 You can also cause large amounts of data to be lost quickly by specifying a
136 subdirectory of the destination as the source of an rsync. For example, the
137 command:
139 gsutil -m rsync -r -d gs://your-bucket/data gs://your-bucket
141 would cause most or all of the objects in gs://your-bucket to be deleted
142 (some objects may survive if there are any with names that sort lower than
143 "data" under gs://your-bucket/data).
145 In addition to paying careful attention to the source and destination you
146 specify with the rsync command, there are two more safety measures your can
147 take when using gsutil rsync -d:
149 1. Try running the command with the rsync -n option first, to see what it
150 would do without actually performing the operations. For example, if
151 you run the command:
153 gsutil -m rsync -r -d -n gs://your-bucket/data gs://your-bucket
155 it will be immediately evident that running that command without the -n
156 option would cause many objects to be deleted.
158 2. Enable object versioning in your bucket, which will allow you to restore
159 objects if you accidentally delete them. For more details see
160 "gsutil help versions".
163 <B>IMPACT OF BUCKET LISTING EVENTUAL CONSISTENCY</B>
164 The rsync command operates by listing the source and destination URLs, and
165 then performing copy and remove operations according to the differences
166 between these listings. Because bucket listing is eventually (not strongly)
167 consistent, if you upload new objects or delete objects from a bucket and then
168 immediately run gsutil rsync with that bucket as the source or destination,
169 it's possible the rsync command will not see the recent updates and thus
170 synchronize incorrectly. You can rerun the rsync operation again later to
171 correct the incorrect synchronization.
174 <B>CHECKSUM VALIDATION AND FAILURE HANDLING</B>
175 At the end of every upload or download, the gsutil rsync command validates
176 that the checksum of the source file/object matches the checksum of the
177 destination file/object. If the checksums do not match, gsutil will delete
178 the invalid copy and print a warning message. This very rarely happens, but
179 if it does, please contact gs-team@google.com.
181 The rsync command will retry when failures occur, but if enough failures
182 happen during a particular copy or delete operation the command will skip that
183 object and move on. At the end of the synchronization run if any failures were
184 not successfully retried, the rsync command will report the count of failures,
185 and exit with non-zero status. At this point you can run the rsync command
186 again, and it will attempt any remaining needed copy and/or delete operations.
188 Note that there are cases where retrying will never succeed, such as if you
189 don't have write permission to the destination bucket or if the destination
190 path for some objects is longer than the maximum allowed length.
192 For more details about gsutil's retry handling, please see
193 "gsutil help retries".
196 <B>CHANGE DETECTION ALGORITHM</B>
197 To determine if a file or object has changed gsutil rsync first checks whether
198 the source and destination sizes match. If they match, it next checks if their
199 checksums match, using checksums if available (see below). Unlike the Unix
200 rsync command, gsutil rsync does not use timestamps to determine if the
201 file/object changed, because the GCS API does not permit the caller to set an
202 object's timestamp (hence, timestamps of identical files/objects cannot be
203 made to match).
205 Checksums will not be available in two cases:
207 1. When synchronizing to or from a file system. By default, gsutil does not
208 checksum files, because of the slowdown caused when working with large
209 files. You can cause gsutil to checksum files by using the gsutil rsync -c
210 option, at the cost of increased local disk I/O and run time when working
211 with large files. You should consider using the -c option if your files can
212 change without changing sizes (e.g., if you have files that contain fixed
213 width data, such as timestamps).
215 2. When comparing composite GCS objects with objects at a cloud provider that
216 does not support CRC32C (which is the only checksum available for composite
217 objects). See 'gsutil help compose' for details about composite objects.
220 <B>COPYING IN THE CLOUD AND METADATA PRESERVATION</B>
221 If both the source and destination URL are cloud URLs from the same provider,
222 gsutil copies data "in the cloud" (i.e., without downloading to and uploading
223 from the machine where you run gsutil). In addition to the performance and
224 cost advantages of doing this, copying in the cloud preserves metadata (like
225 Content-Type and Cache-Control). In contrast, when you download data from the
226 cloud it ends up in a file, which has no associated metadata. Thus, unless you
227 have some way to hold on to or re-create that metadata, synchronizing a bucket
228 to a directory in the local file system will not retain the metadata.
230 Note that by default, the gsutil rsync command does not copy the ACLs of
231 objects being synchronized and instead will use the default bucket ACL (see
232 "gsutil help defacl"). You can override this behavior with the -p option (see
233 OPTIONS below).
236 <B>SLOW CHECKSUMS</B>
237 If you find that CRC32C checksum computation runs slowly, this is likely
238 because you don't have a compiled CRC32c on your system. Try running:
240 gsutil ver -l
242 If the output contains:
244 compiled crcmod: False
246 you are running a Python library for computing CRC32C, which is much slower
247 than using the compiled code. For information on getting a compiled CRC32C
248 implementation, see 'gsutil help crc32c'.
251 <B>LIMITATIONS</B>
252 1. The gsutil rsync command doesn't make the destination object's timestamps
253 match those of the source object (it can't; timestamp setting is not
254 allowed by the GCS API).
256 2. The gsutil rsync command ignores versioning, synchronizing only the live
257 object versions in versioned buckets.
260 <B>OPTIONS</B>
261 -c Causes the rsync command to compute checksums for files if the
262 size of source and destination match, and then compare
263 checksums. This option increases local disk I/O and run time
264 if either src_url or dst_url are on the local file system.
266 -C If an error occurs, continue to attempt to copy the remaining
267 files. If errors occurred, gsutil's exit status will be non-zero
268 even if this flag is set. This option is implicitly set when
269 running "gsutil -m rsync...". Note: -C only applies to the
270 actual copying operation. If an error occurs while iterating
271 over the files in the local directory (e.g., invalid Unicode
272 file name) gsutil will print an error message and abort.
274 -d Delete extra files under dst_url not found under src_url. By
275 default extra files are not deleted. Note: this option can
276 delete data quickly if you specify the wrong source/destination
277 combination. See the help section above,
278 "BE CAREFUL WHEN USING -d OPTION!".
280 -e Exclude symlinks. When specified, symbolic links will be
281 ignored.
283 -n Causes rsync to run in "dry run" mode, i.e., just outputting
284 what would be copied or deleted without actually doing any
285 copying/deleting.
287 -p Causes ACLs to be preserved when synchronizing in the cloud.
288 Note that this option has performance and cost implications when
289 using the XML API, as it requires separate HTTP calls for
290 interacting with ACLs. The performance issue can be mitigated to
291 some degree by using gsutil -m rsync to cause parallel
292 synchronization. Also, this option only works if you have OWNER
293 access to all of the objects that are copied.
295 You can avoid the additional performance and cost of using
296 rsync -p if you want all objects in the destination bucket to
297 end up with the same ACL by setting a default object ACL on that
298 bucket instead of using rsync -p. See 'help gsutil defacl'.
300 -R, -r Causes directories, buckets, and bucket subdirectories to be
301 synchronized recursively. If you neglect to use this option
302 gsutil will make only the top-level directory in the source
303 and destination URLs match, skipping any sub-directories.
305 -U Skip objects with unsupported object types instead of failing.
306 Unsupported object types are s3 glacier objects.
308 -x pattern Causes files/objects matching pattern to be excluded, i.e., any
309 matching files/objects will not be copied or deleted. Note that
310 the pattern is a Python regular expression, not a wildcard (so,
311 matching any string ending in 'abc' would be specified using
312 '.*abc' rather than '*abc'). Note also that the exclude path is
313 always relative (similar to Unix rsync or tar exclude options).
314 For example, if you run the command:
316 gsutil rsync -x 'data./.*\\.txt' dir gs://my-bucket
318 it will skip the file dir/data1/a.txt.
320 You can use regex alternation to specify multiple exclusions,
321 for example:
323 gsutil rsync -x '.*\\.txt|.*\\.jpg' dir gs://my-bucket
324 """)
327 class _DiffAction(object):
328 COPY = 'copy'
329 REMOVE = 'remove'
332 _NA = '-'
333 _OUTPUT_BUFFER_SIZE = 64 * 1024
334 _PROGRESS_REPORT_LISTING_COUNT = 10000
337 # Tracks files we need to clean up at end or if interrupted.
338 _tmp_files = []
341 # pylint: disable=unused-argument
342 def _HandleSignals(signal_num, cur_stack_frame):
343 """Called when rsync command is killed with SIGINT, SIGQUIT or SIGTERM."""
344 CleanUpTempFiles()
347 def CleanUpTempFiles():
348 """Cleans up temp files.
350 This function allows the main (RunCommand) function to clean up at end of
351 operation, or if gsutil rsync is interrupted (e.g., via ^C). This is necessary
352 because tempfile.NamedTemporaryFile doesn't allow the created file to be
353 re-opened in read mode on Windows, so we have to use tempfile.mkstemp, which
354 doesn't automatically delete temp files.
356 try:
357 for fname in _tmp_files:
358 os.unlink(fname)
359 except: # pylint: disable=bare-except
360 pass
363 class _DiffToApply(object):
364 """Class that encapsulates info needed to apply diff for one object."""
366 def __init__(self, src_url_str, dst_url_str, diff_action):
367 """Constructor.
369 Args:
370 src_url_str: The source URL string, or None if diff_action is REMOVE.
371 dst_url_str: The destination URL string.
372 diff_action: _DiffAction to be applied.
374 self.src_url_str = src_url_str
375 self.dst_url_str = dst_url_str
376 self.diff_action = diff_action
379 def _DiffToApplyArgChecker(command_instance, diff_to_apply):
380 """Arg checker that skips symlinks if -e flag specified."""
381 if (diff_to_apply.diff_action == _DiffAction.REMOVE
382 or not command_instance.exclude_symlinks):
383 # No src URL is populated for REMOVE actions.
384 return True
385 exp_src_url = StorageUrlFromString(diff_to_apply.src_url_str)
386 if exp_src_url.IsFileUrl() and os.path.islink(exp_src_url.object_name):
387 command_instance.logger.info('Skipping symbolic link %s...', exp_src_url)
388 return False
389 return True
392 def _ComputeNeededFileChecksums(logger, src_url_str, src_size, src_crc32c,
393 src_md5, dst_url_str, dst_size, dst_crc32c,
394 dst_md5):
395 """Computes any file checksums needed by _ObjectsMatch.
397 Args:
398 logger: logging.logger for outputting log messages.
399 src_url_str: Source URL string.
400 src_size: Source size
401 src_crc32c: Source CRC32c.
402 src_md5: Source MD5.
403 dst_url_str: Destination URL string.
404 dst_size: Destination size
405 dst_crc32c: Destination CRC32c.
406 dst_md5: Destination MD5.
408 Returns:
409 (src_crc32c, src_md5, dst_crc32c, dst_md5)
411 src_url = StorageUrlFromString(src_url_str)
412 dst_url = StorageUrlFromString(dst_url_str)
413 if src_url.IsFileUrl():
414 if dst_crc32c != _NA or dst_url.IsFileUrl():
415 if src_size > TEN_MIB:
416 logger.info('Computing MD5 for %s...', src_url_str)
417 with open(src_url.object_name, 'rb') as fp:
418 src_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
419 elif dst_md5 != _NA or dst_url.IsFileUrl():
420 if dst_size > TEN_MIB:
421 logger.info('Computing MD5 for %s...', dst_url_str)
422 with open(src_url.object_name, 'rb') as fp:
423 src_md5 = CalculateB64EncodedMd5FromContents(fp)
424 if dst_url.IsFileUrl():
425 if src_crc32c != _NA:
426 if src_size > TEN_MIB:
427 logger.info('Computing CRC32C for %s...', src_url_str)
428 with open(dst_url.object_name, 'rb') as fp:
429 dst_crc32c = CalculateB64EncodedCrc32cFromContents(fp)
430 elif src_md5 != _NA:
431 if dst_size > TEN_MIB:
432 logger.info('Computing CRC32C for %s...', dst_url_str)
433 with open(dst_url.object_name, 'rb') as fp:
434 dst_md5 = CalculateB64EncodedMd5FromContents(fp)
435 return (src_crc32c, src_md5, dst_crc32c, dst_md5)
438 def _ListUrlRootFunc(cls, args_tuple, thread_state=None):
439 """Worker function for listing files/objects under to be sync'd.
441 Outputs sorted list to out_file_name, formatted per _BuildTmpOutputLine. We
442 sort the listed URLs because we don't want to depend on consistent sort
443 order across file systems and cloud providers.
445 Args:
446 cls: Command instance.
447 args_tuple: (base_url_str, out_file_name, desc), where base_url_str is
448 top-level URL string to list; out_filename is name of file to
449 which sorted output should be written; desc is 'source' or
450 'destination'.
451 thread_state: gsutil Cloud API instance to use.
453 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
454 (base_url_str, out_filename, desc) = args_tuple
455 # We sort while iterating over base_url_str, allowing parallelism of batched
456 # sorting with collecting the listing.
457 out_file = io.open(out_filename, mode='w', encoding=UTF8)
458 try:
459 _BatchSort(_FieldedListingIterator(cls, gsutil_api, base_url_str, desc),
460 out_file)
461 except Exception as e: # pylint: disable=broad-except
462 # Abandon rsync if an exception percolates up to this layer - retryable
463 # exceptions are handled in the lower layers, so we got a non-retryable
464 # exception (like 404 bucket not found) and proceeding would either be
465 # futile or could result in data loss - for example:
466 # gsutil rsync -d gs://non-existent-bucket ./localdir
467 # would delete files from localdir.
468 cls.logger.error(
469 'Caught non-retryable exception while listing %s: %s' %
470 (base_url_str, e))
471 cls.non_retryable_listing_failures = 1
472 out_file.close()
475 def _FieldedListingIterator(cls, gsutil_api, base_url_str, desc):
476 """Iterator over base_url_str formatting output per _BuildTmpOutputLine.
478 Args:
479 cls: Command instance.
480 gsutil_api: gsutil Cloud API instance to use for bucket listing.
481 base_url_str: The top-level URL string over which to iterate.
482 desc: 'source' or 'destination'.
484 Yields:
485 Output line formatted per _BuildTmpOutputLine.
487 if cls.recursion_requested:
488 wildcard = '%s/**' % base_url_str.rstrip('/\\')
489 else:
490 wildcard = '%s/*' % base_url_str.rstrip('/\\')
491 i = 0
492 for blr in CreateWildcardIterator(
493 wildcard, gsutil_api, debug=cls.debug,
494 project_id=cls.project_id).IterObjects(
495 # Request just the needed fields, to reduce bandwidth usage.
496 bucket_listing_fields=['crc32c', 'md5Hash', 'name', 'size']):
497 # Various GUI tools (like the GCS web console) create placeholder objects
498 # ending with '/' when the user creates an empty directory. Normally these
499 # tools should delete those placeholders once objects have been written
500 # "under" the directory, but sometimes the placeholders are left around.
501 # We need to filter them out here, otherwise if the user tries to rsync
502 # from GCS to a local directory it will result in a directory/file
503 # conflict (e.g., trying to download an object called "mydata/" where the
504 # local directory "mydata" exists).
505 url = blr.storage_url
506 if IsCloudSubdirPlaceholder(url, blr=blr):
507 cls.logger.info('Skipping cloud sub-directory placeholder object (%s) '
508 'because such objects aren\'t needed in (and would '
509 'interfere with) directories in the local file system',
510 url)
511 continue
512 if (cls.exclude_symlinks and url.IsFileUrl()
513 and os.path.islink(url.object_name)):
514 continue
515 if cls.exclude_pattern:
516 str_to_check = url.url_string[len(base_url_str):]
517 if str_to_check.startswith(url.delim):
518 str_to_check = str_to_check[1:]
519 if cls.exclude_pattern.match(str_to_check):
520 continue
521 i += 1
522 if i % _PROGRESS_REPORT_LISTING_COUNT == 0:
523 cls.logger.info('At %s listing %d...', desc, i)
524 yield _BuildTmpOutputLine(blr)
527 def _BuildTmpOutputLine(blr):
528 """Builds line to output to temp file for given BucketListingRef.
530 Args:
531 blr: The BucketListingRef.
533 Returns:
534 The output line, formatted as _EncodeUrl(URL)<sp>size<sp>crc32c<sp>md5
535 where crc32c will only be present for GCS URLs, and md5 will only be
536 present for cloud URLs that aren't composite objects. A missing field is
537 populated with '-'.
539 crc32c = _NA
540 md5 = _NA
541 url = blr.storage_url
542 if url.IsFileUrl():
543 size = os.path.getsize(url.object_name)
544 elif url.IsCloudUrl():
545 size = blr.root_object.size
546 crc32c = blr.root_object.crc32c or _NA
547 md5 = blr.root_object.md5Hash or _NA
548 else:
549 raise CommandException('Got unexpected URL type (%s)' % url.scheme)
550 return '%s %d %s %s\n' % (_EncodeUrl(url.url_string), size, crc32c, md5)
553 def _EncodeUrl(url_string):
554 """Encodes url_str with quote plus encoding and UTF8 character encoding.
556 We use this for all URL encodings.
558 Args:
559 url_string: String URL to encode.
561 Returns:
562 encoded URL.
564 return urllib.quote_plus(url_string.encode(UTF8))
567 def _DecodeUrl(enc_url_string):
568 """Inverts encoding from EncodeUrl.
570 Args:
571 enc_url_string: String URL to decode.
573 Returns:
574 decoded URL.
576 return urllib.unquote_plus(enc_url_string).decode(UTF8)
579 # pylint: disable=bare-except
580 def _BatchSort(in_iter, out_file):
581 """Sorts input lines from in_iter and outputs to out_file.
583 Sorts in batches as input arrives, so input file does not need to be loaded
584 into memory all at once. Derived from Python Recipe 466302: Sorting big
585 files the Python 2.4 way by Nicolas Lehuen.
587 Sorted format is per _BuildTmpOutputLine. We're sorting on the entire line
588 when we could just sort on the first record (URL); but the sort order is
589 identical either way.
591 Args:
592 in_iter: Input iterator.
593 out_file: Output file.
595 # Note: If chunk_files gets very large we can run out of open FDs. See .boto
596 # file comments about rsync_buffer_lines. If increasing rsync_buffer_lines
597 # doesn't suffice (e.g., for someone synchronizing with a really large
598 # bucket), an option would be to make gsutil merge in passes, never
599 # opening all chunk files simultaneously.
600 buffer_size = config.getint('GSUtil', 'rsync_buffer_lines', 32000)
601 chunk_files = []
602 try:
603 while True:
604 current_chunk = sorted(islice(in_iter, buffer_size))
605 if not current_chunk:
606 break
607 output_chunk = io.open('%s-%06i' % (out_file.name, len(chunk_files)),
608 mode='w+', encoding=UTF8)
609 chunk_files.append(output_chunk)
610 output_chunk.writelines(unicode(''.join(current_chunk)))
611 output_chunk.flush()
612 output_chunk.seek(0)
613 out_file.writelines(heapq.merge(*chunk_files))
614 except IOError as e:
615 if e.errno == errno.EMFILE:
616 raise CommandException('\n'.join(textwrap.wrap(
617 'Synchronization failed because too many open file handles were '
618 'needed while building synchronization state. Please see the '
619 'comments about rsync_buffer_lines in your .boto config file for a '
620 'possible way to address this problem.')))
621 raise
622 finally:
623 for chunk_file in chunk_files:
624 try:
625 chunk_file.close()
626 os.remove(chunk_file.name)
627 except:
628 pass
631 class _DiffIterator(object):
632 """Iterator yielding sequence of _DiffToApply objects."""
634 def __init__(self, command_obj, base_src_url, base_dst_url):
635 self.command_obj = command_obj
636 self.compute_file_checksums = command_obj.compute_file_checksums
637 self.delete_extras = command_obj.delete_extras
638 self.recursion_requested = command_obj.recursion_requested
639 self.logger = self.command_obj.logger
640 self.base_src_url = base_src_url
641 self.base_dst_url = base_dst_url
642 self.logger.info('Building synchronization state...')
644 (src_fh, self.sorted_list_src_file_name) = tempfile.mkstemp(
645 prefix='gsutil-rsync-src-')
646 _tmp_files.append(self.sorted_list_src_file_name)
647 (dst_fh, self.sorted_list_dst_file_name) = tempfile.mkstemp(
648 prefix='gsutil-rsync-dst-')
649 _tmp_files.append(self.sorted_list_dst_file_name)
650 # Close the file handles; the file will be opened in write mode by
651 # _ListUrlRootFunc.
652 os.close(src_fh)
653 os.close(dst_fh)
655 # Build sorted lists of src and dst URLs in parallel. To do this, pass args
656 # to _ListUrlRootFunc as tuple (base_url_str, out_filename, desc)
657 # where base_url_str is the starting URL string for listing.
658 args_iter = iter([
659 (self.base_src_url.url_string, self.sorted_list_src_file_name,
660 'source'),
661 (self.base_dst_url.url_string, self.sorted_list_dst_file_name,
662 'destination')
665 # Contains error message from non-retryable listing failure.
666 command_obj.non_retryable_listing_failures = 0
667 shared_attrs = ['non_retryable_listing_failures']
668 command_obj.Apply(_ListUrlRootFunc, args_iter, _RootListingExceptionHandler,
669 shared_attrs, arg_checker=DummyArgChecker,
670 parallel_operations_override=True,
671 fail_on_error=True)
673 if command_obj.non_retryable_listing_failures:
674 raise CommandException('Caught non-retryable exception - aborting rsync')
676 self.sorted_list_src_file = open(self.sorted_list_src_file_name, 'r')
677 self.sorted_list_dst_file = open(self.sorted_list_dst_file_name, 'r')
679 # Wrap iterators in PluralityCheckableIterator so we can check emptiness.
680 self.sorted_src_urls_it = PluralityCheckableIterator(
681 iter(self.sorted_list_src_file))
682 self.sorted_dst_urls_it = PluralityCheckableIterator(
683 iter(self.sorted_list_dst_file))
685 def _ParseTmpFileLine(self, line):
686 """Parses output from _BuildTmpOutputLine.
688 Parses into tuple:
689 (URL, size, crc32c, md5)
690 where crc32c and/or md5 can be _NA.
692 Args:
693 line: The line to parse.
695 Returns:
696 Parsed tuple: (url, size, crc32c, md5)
698 (encoded_url, size, crc32c, md5) = line.split()
699 return (_DecodeUrl(encoded_url), int(size), crc32c, md5.strip())
701 def _WarnIfMissingCloudHash(self, url_str, crc32c, md5):
702 """Warns if given url_str is a cloud URL and is missing both crc32c and md5.
704 Args:
705 url_str: Destination URL string.
706 crc32c: Destination CRC32c.
707 md5: Destination MD5.
709 Returns:
710 True if issued warning.
712 # One known way this can currently happen is when rsync'ing objects larger
713 # than 5 GB from S3 (for which the etag is not an MD5).
714 if (StorageUrlFromString(url_str).IsCloudUrl()
715 and crc32c == _NA and md5 == _NA):
716 self.logger.warn(
717 'Found no hashes to validate %s. Integrity cannot be assured without '
718 'hashes.', url_str)
719 return True
720 return False
722 def _ObjectsMatch(self, src_url_str, src_size, src_crc32c, src_md5,
723 dst_url_str, dst_size, dst_crc32c, dst_md5):
724 """Returns True if src and dst objects are the same.
726 Uses size plus whatever checksums are available.
728 Args:
729 src_url_str: Source URL string.
730 src_size: Source size
731 src_crc32c: Source CRC32c.
732 src_md5: Source MD5.
733 dst_url_str: Destination URL string.
734 dst_size: Destination size
735 dst_crc32c: Destination CRC32c.
736 dst_md5: Destination MD5.
738 Returns:
739 True/False.
741 # Note: This function is called from __iter__, which is called from the
742 # Command.Apply driver. Thus, all checksum computation will be run in a
743 # single thread, which is good (having multiple threads concurrently
744 # computing checksums would thrash the disk).
745 if src_size != dst_size:
746 return False
747 if self.compute_file_checksums:
748 (src_crc32c, src_md5, dst_crc32c, dst_md5) = _ComputeNeededFileChecksums(
749 self.logger, src_url_str, src_size, src_crc32c, src_md5, dst_url_str,
750 dst_size, dst_crc32c, dst_md5)
751 if src_md5 != _NA and dst_md5 != _NA:
752 self.logger.debug('Comparing md5 for %s and %s', src_url_str, dst_url_str)
753 return src_md5 == dst_md5
754 if src_crc32c != _NA and dst_crc32c != _NA:
755 self.logger.debug(
756 'Comparing crc32c for %s and %s', src_url_str, dst_url_str)
757 return src_crc32c == dst_crc32c
758 if not self._WarnIfMissingCloudHash(src_url_str, src_crc32c, src_md5):
759 self._WarnIfMissingCloudHash(dst_url_str, dst_crc32c, dst_md5)
760 # Without checksums to compare we depend only on basic size comparison.
761 return True
763 def __iter__(self):
764 """Iterates over src/dst URLs and produces a _DiffToApply sequence.
766 Yields:
767 The _DiffToApply.
769 # Strip trailing slashes, if any, so we compute tail length against
770 # consistent position regardless of whether trailing slashes were included
771 # or not in URL.
772 base_src_url_len = len(self.base_src_url.url_string.rstrip('/\\'))
773 base_dst_url_len = len(self.base_dst_url.url_string.rstrip('/\\'))
774 src_url_str = dst_url_str = None
775 # Invariant: After each yield, the URLs in src_url_str, dst_url_str,
776 # self.sorted_src_urls_it, and self.sorted_dst_urls_it are not yet
777 # processed. Each time we encounter None in src_url_str or dst_url_str we
778 # populate from the respective iterator, and we reset one or the other value
779 # to None after yielding an action that disposes of that URL.
780 while not self.sorted_src_urls_it.IsEmpty() or src_url_str is not None:
781 if src_url_str is None:
782 (src_url_str, src_size, src_crc32c, src_md5) = self._ParseTmpFileLine(
783 self.sorted_src_urls_it.next())
784 # Skip past base URL and normalize slashes so we can compare across
785 # clouds/file systems (including Windows).
786 src_url_str_to_check = _EncodeUrl(
787 src_url_str[base_src_url_len:].replace('\\', '/'))
788 dst_url_str_would_copy_to = copy_helper.ConstructDstUrl(
789 self.base_src_url, StorageUrlFromString(src_url_str), True, True,
790 self.base_dst_url, False, self.recursion_requested).url_string
791 if self.sorted_dst_urls_it.IsEmpty():
792 # We've reached end of dst URLs, so copy src to dst.
793 yield _DiffToApply(
794 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
795 src_url_str = None
796 continue
797 if not dst_url_str:
798 (dst_url_str, dst_size, dst_crc32c, dst_md5) = (
799 self._ParseTmpFileLine(self.sorted_dst_urls_it.next()))
800 # Skip past base URL and normalize slashes so we can compare acros
801 # clouds/file systems (including Windows).
802 dst_url_str_to_check = _EncodeUrl(
803 dst_url_str[base_dst_url_len:].replace('\\', '/'))
805 if src_url_str_to_check < dst_url_str_to_check:
806 # There's no dst object corresponding to src object, so copy src to dst.
807 yield _DiffToApply(
808 src_url_str, dst_url_str_would_copy_to, _DiffAction.COPY)
809 src_url_str = None
810 elif src_url_str_to_check > dst_url_str_to_check:
811 # dst object without a corresponding src object, so remove dst if -d
812 # option was specified.
813 if self.delete_extras:
814 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
815 dst_url_str = None
816 else:
817 # There is a dst object corresponding to src object, so check if objects
818 # match.
819 if self._ObjectsMatch(
820 src_url_str, src_size, src_crc32c, src_md5,
821 dst_url_str, dst_size, dst_crc32c, dst_md5):
822 # Continue iterating without yielding a _DiffToApply.
823 pass
824 else:
825 yield _DiffToApply(src_url_str, dst_url_str, _DiffAction.COPY)
826 src_url_str = None
827 dst_url_str = None
829 # If -d option specified any files/objects left in dst iteration should be
830 # removed.
831 if not self.delete_extras:
832 return
833 if dst_url_str:
834 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
835 dst_url_str = None
836 for line in self.sorted_dst_urls_it:
837 (dst_url_str, _, _, _) = self._ParseTmpFileLine(line)
838 yield _DiffToApply(None, dst_url_str, _DiffAction.REMOVE)
841 def _RsyncFunc(cls, diff_to_apply, thread_state=None):
842 """Worker function for performing the actual copy and remove operations."""
843 gsutil_api = GetCloudApiInstance(cls, thread_state=thread_state)
844 dst_url_str = diff_to_apply.dst_url_str
845 dst_url = StorageUrlFromString(dst_url_str)
846 if diff_to_apply.diff_action == _DiffAction.REMOVE:
847 if cls.dryrun:
848 cls.logger.info('Would remove %s', dst_url)
849 else:
850 cls.logger.info('Removing %s', dst_url)
851 if dst_url.IsFileUrl():
852 os.unlink(dst_url.object_name)
853 else:
854 try:
855 gsutil_api.DeleteObject(
856 dst_url.bucket_name, dst_url.object_name,
857 generation=dst_url.generation, provider=dst_url.scheme)
858 except NotFoundException:
859 # If the object happened to be deleted by an external process, this
860 # is fine because it moves us closer to the desired state.
861 pass
862 elif diff_to_apply.diff_action == _DiffAction.COPY:
863 src_url_str = diff_to_apply.src_url_str
864 src_url = StorageUrlFromString(src_url_str)
865 if cls.dryrun:
866 cls.logger.info('Would copy %s to %s', src_url, dst_url)
867 else:
868 try:
869 copy_helper.PerformCopy(cls.logger, src_url, dst_url, gsutil_api, cls,
870 _RsyncExceptionHandler,
871 headers=cls.headers)
872 except SkipUnsupportedObjectError, e:
873 cls.logger.info('Skipping item %s with unsupported object type %s',
874 src_url, e.unsupported_type)
876 else:
877 raise CommandException('Got unexpected DiffAction (%d)'
878 % diff_to_apply.diff_action)
881 def _RootListingExceptionHandler(cls, e):
882 """Simple exception handler for exceptions during listing URLs to sync."""
883 cls.logger.error(str(e))
886 def _RsyncExceptionHandler(cls, e):
887 """Simple exception handler to allow post-completion status."""
888 cls.logger.error(str(e))
889 cls.op_failure_count += 1
890 cls.logger.debug('\n\nEncountered exception while syncing:\n%s\n',
891 traceback.format_exc())
894 class RsyncCommand(Command):
895 """Implementation of gsutil rsync command."""
897 # Command specification. See base class for documentation.
898 command_spec = Command.CreateCommandSpec(
899 'rsync',
900 command_name_aliases=[],
901 usage_synopsis=_SYNOPSIS,
902 min_args=2,
903 max_args=2,
904 supported_sub_args='cCdenprRUx:',
905 file_url_ok=True,
906 provider_url_ok=False,
907 urls_start_arg=0,
908 gs_api_support=[ApiSelector.XML, ApiSelector.JSON],
909 gs_default_api=ApiSelector.JSON,
910 argparse_arguments=[
911 CommandArgument.MakeNCloudOrFileURLsArgument(2)
914 # Help specification. See help_provider.py for documentation.
915 help_spec = Command.HelpSpec(
916 help_name='rsync',
917 help_name_aliases=['sync', 'synchronize'],
918 help_type='command_help',
919 help_one_line_summary='Synchronize content of two buckets/directories',
920 help_text=_DETAILED_HELP_TEXT,
921 subcommand_help_text={},
923 total_bytes_transferred = 0
925 def _InsistContainer(self, url_str, treat_nonexistent_object_as_subdir):
926 """Sanity checks that URL names an existing container.
928 Args:
929 url_str: URL string to check.
930 treat_nonexistent_object_as_subdir: indicates if should treat a
931 non-existent object as a subdir.
933 Returns:
934 URL for checked string.
936 Raises:
937 CommandException if url_str doesn't name an existing container.
939 (url, have_existing_container) = (
940 copy_helper.ExpandUrlToSingleBlr(url_str, self.gsutil_api, self.debug,
941 self.project_id,
942 treat_nonexistent_object_as_subdir))
943 if not have_existing_container:
944 raise CommandException(
945 'arg (%s) does not name a directory, bucket, or bucket subdir.'
946 % url_str)
947 return url
949 def RunCommand(self):
950 """Command entry point for the rsync command."""
951 self._ParseOpts()
952 if self.compute_file_checksums and not UsingCrcmodExtension(crcmod):
953 self.logger.warn(SLOW_CRCMOD_WARNING)
955 src_url = self._InsistContainer(self.args[0], False)
956 dst_url = self._InsistContainer(self.args[1], True)
958 # Tracks if any copy or rm operations failed.
959 self.op_failure_count = 0
961 # List of attributes to share/manage across multiple processes in
962 # parallel (-m) mode.
963 shared_attrs = ['op_failure_count']
965 for signal_num in GetCaughtSignals():
966 RegisterSignalHandler(signal_num, _HandleSignals)
968 # Perform sync requests in parallel (-m) mode, if requested, using
969 # configured number of parallel processes and threads. Otherwise,
970 # perform requests with sequential function calls in current process.
971 diff_iterator = _DiffIterator(self, src_url, dst_url)
972 self.logger.info('Starting synchronization')
973 try:
974 self.Apply(_RsyncFunc, diff_iterator, _RsyncExceptionHandler,
975 shared_attrs, arg_checker=_DiffToApplyArgChecker,
976 fail_on_error=True)
977 finally:
978 CleanUpTempFiles()
980 if self.op_failure_count:
981 plural_str = 's' if self.op_failure_count else ''
982 raise CommandException(
983 '%d file%s/object%s could not be copied/removed.' %
984 (self.op_failure_count, plural_str, plural_str))
986 def _ParseOpts(self):
987 # exclude_symlinks is handled by Command parent class, so save in Command
988 # state rather than CopyHelperOpts.
989 self.exclude_symlinks = False
990 # continue_on_error is handled by Command parent class, so save in Command
991 # state rather than CopyHelperOpts.
992 self.continue_on_error = False
993 self.delete_extras = False
994 preserve_acl = False
995 self.compute_file_checksums = False
996 self.dryrun = False
997 self.exclude_pattern = None
998 self.skip_unsupported_objects = False
999 # self.recursion_requested is initialized in command.py (so it can be
1000 # checked in parent class for all commands).
1002 if self.sub_opts:
1003 for o, a in self.sub_opts:
1004 if o == '-c':
1005 self.compute_file_checksums = True
1006 # Note: In gsutil cp command this is specified using -c but here we use
1007 # -C so we can use -c for checksum arg (to be consistent with Unix rsync
1008 # command options).
1009 elif o == '-C':
1010 self.continue_on_error = True
1011 elif o == '-d':
1012 self.delete_extras = True
1013 elif o == '-e':
1014 self.exclude_symlinks = True
1015 elif o == '-n':
1016 self.dryrun = True
1017 elif o == '-p':
1018 preserve_acl = True
1019 elif o == '-r' or o == '-R':
1020 self.recursion_requested = True
1021 elif o == '-U':
1022 self.skip_unsupported_objects = True
1023 elif o == '-x':
1024 if not a:
1025 raise CommandException('Invalid blank exclude filter')
1026 try:
1027 self.exclude_pattern = re.compile(a)
1028 except re.error:
1029 raise CommandException('Invalid exclude filter (%s)' % a)
1030 return CreateCopyHelperOpts(
1031 preserve_acl=preserve_acl,
1032 skip_unsupported_objects=self.skip_unsupported_objects)