Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_cp.py
blob7c44c366a614f71eb3c020660c5bf6876e0764b9
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Integration tests for cp command."""
17 from __future__ import absolute_import
19 import base64
20 import binascii
21 import datetime
22 import httplib
23 import logging
24 import os
25 import pickle
26 import pkgutil
27 import random
28 import re
29 import string
30 import sys
32 from apitools.base.py import exceptions as apitools_exceptions
33 import boto
34 from boto import storage_uri
35 from boto.exception import ResumableTransferDisposition
36 from boto.exception import ResumableUploadException
37 from boto.exception import StorageResponseError
38 from boto.storage_uri import BucketStorageUri
40 from gslib.cloud_api import ResumableDownloadException
41 from gslib.cloud_api import ResumableUploadException
42 from gslib.cloud_api import ResumableUploadStartOverException
43 from gslib.copy_helper import GetTrackerFilePath
44 from gslib.copy_helper import TrackerFileType
45 from gslib.cs_api_map import ApiSelector
46 from gslib.gcs_json_api import GcsJsonApi
47 from gslib.hashing_helper import CalculateMd5FromContents
48 from gslib.storage_url import StorageUrlFromString
49 import gslib.tests.testcase as testcase
50 from gslib.tests.testcase.base import NotParallelizable
51 from gslib.tests.testcase.integration_testcase import SkipForS3
52 from gslib.tests.util import GenerationFromURI as urigen
53 from gslib.tests.util import HAS_S3_CREDS
54 from gslib.tests.util import ObjectToURI as suri
55 from gslib.tests.util import PerformsFileToObjectUpload
56 from gslib.tests.util import SetBotoConfigForTest
57 from gslib.tests.util import unittest
58 from gslib.third_party.storage_apitools import storage_v1_messages as apitools_messages
59 from gslib.tracker_file import DeleteTrackerFile
60 from gslib.tracker_file import GetRewriteTrackerFilePath
61 from gslib.util import EIGHT_MIB
62 from gslib.util import IS_WINDOWS
63 from gslib.util import MakeHumanReadable
64 from gslib.util import ONE_KIB
65 from gslib.util import ONE_MIB
66 from gslib.util import Retry
67 from gslib.util import START_CALLBACK_PER_BYTES
68 from gslib.util import UTF8
71 # Custom test callbacks must be pickleable, and therefore at global scope.
72 class _HaltingCopyCallbackHandler(object):
73 """Test callback handler for intentionally stopping a resumable transfer."""
75 def __init__(self, is_upload, halt_at_byte):
76 self._is_upload = is_upload
77 self._halt_at_byte = halt_at_byte
79 # pylint: disable=invalid-name
80 def call(self, total_bytes_transferred, total_size):
81 """Forcibly exits if the transfer has passed the halting point."""
82 if total_bytes_transferred >= self._halt_at_byte:
83 sys.stderr.write(
84 'Halting transfer after byte %s. %s/%s transferred.\r\n' % (
85 self._halt_at_byte, MakeHumanReadable(total_bytes_transferred),
86 MakeHumanReadable(total_size)))
87 if self._is_upload:
88 raise ResumableUploadException('Artifically halting upload.')
89 else:
90 raise ResumableDownloadException('Artifically halting download.')
93 class _JSONForceHTTPErrorCopyCallbackHandler(object):
94 """Test callback handler that raises an arbitrary HTTP error exception."""
96 def __init__(self, startover_at_byte, http_error_num):
97 self._startover_at_byte = startover_at_byte
98 self._http_error_num = http_error_num
99 self.started_over_once = False
101 # pylint: disable=invalid-name
102 def call(self, total_bytes_transferred, total_size):
103 """Forcibly exits if the transfer has passed the halting point."""
104 if (total_bytes_transferred >= self._startover_at_byte
105 and not self.started_over_once):
106 sys.stderr.write(
107 'Forcing HTTP error %s after byte %s. '
108 '%s/%s transferred.\r\n' % (
109 self._http_error_num,
110 self._startover_at_byte,
111 MakeHumanReadable(total_bytes_transferred),
112 MakeHumanReadable(total_size)))
113 self.started_over_once = True
114 raise apitools_exceptions.HttpError(
115 {'status': self._http_error_num}, None, None)
118 class _XMLResumableUploadStartOverCopyCallbackHandler(object):
119 """Test callback handler that raises start-over exception during upload."""
121 def __init__(self, startover_at_byte):
122 self._startover_at_byte = startover_at_byte
123 self.started_over_once = False
125 # pylint: disable=invalid-name
126 def call(self, total_bytes_transferred, total_size):
127 """Forcibly exits if the transfer has passed the halting point."""
128 if (total_bytes_transferred >= self._startover_at_byte
129 and not self.started_over_once):
130 sys.stderr.write(
131 'Forcing ResumableUpload start over error after byte %s. '
132 '%s/%s transferred.\r\n' % (
133 self._startover_at_byte,
134 MakeHumanReadable(total_bytes_transferred),
135 MakeHumanReadable(total_size)))
136 self.started_over_once = True
137 raise boto.exception.ResumableUploadException(
138 'Forcing upload start over',
139 ResumableTransferDisposition.START_OVER)
142 class _DeleteBucketThenStartOverCopyCallbackHandler(object):
143 """Test callback handler that deletes bucket then raises start-over."""
145 def __init__(self, startover_at_byte, bucket_uri):
146 self._startover_at_byte = startover_at_byte
147 self._bucket_uri = bucket_uri
148 self.started_over_once = False
150 # pylint: disable=invalid-name
151 def call(self, total_bytes_transferred, total_size):
152 """Forcibly exits if the transfer has passed the halting point."""
153 if (total_bytes_transferred >= self._startover_at_byte
154 and not self.started_over_once):
155 sys.stderr.write('Deleting bucket (%s)' %(self._bucket_uri.bucket_name))
157 @Retry(StorageResponseError, tries=5, timeout_secs=1)
158 def DeleteBucket():
159 bucket_list = list(self._bucket_uri.list_bucket(all_versions=True))
160 for k in bucket_list:
161 self._bucket_uri.get_bucket().delete_key(k.name,
162 version_id=k.version_id)
163 self._bucket_uri.delete_bucket()
165 DeleteBucket()
166 sys.stderr.write(
167 'Forcing ResumableUpload start over error after byte %s. '
168 '%s/%s transferred.\r\n' % (
169 self._startover_at_byte,
170 MakeHumanReadable(total_bytes_transferred),
171 MakeHumanReadable(total_size)))
172 self.started_over_once = True
173 raise ResumableUploadStartOverException(
174 'Artificially forcing start-over')
177 class _RewriteHaltException(Exception):
178 pass
181 class _HaltingRewriteCallbackHandler(object):
182 """Test callback handler for intentionally stopping a rewrite operation."""
184 def __init__(self, halt_at_byte):
185 self._halt_at_byte = halt_at_byte
187 # pylint: disable=invalid-name
188 def call(self, total_bytes_rewritten, unused_total_size):
189 """Forcibly exits if the operation has passed the halting point."""
190 if total_bytes_rewritten >= self._halt_at_byte:
191 raise _RewriteHaltException('Artificially halting rewrite')
194 class _EnsureRewriteResumeCallbackHandler(object):
195 """Test callback handler for ensuring a rewrite operation resumed."""
197 def __init__(self, required_byte):
198 self._required_byte = required_byte
200 # pylint: disable=invalid-name
201 def call(self, total_bytes_rewritten, unused_total_size):
202 """Forcibly exits if the operation has passed the halting point."""
203 if total_bytes_rewritten <= self._required_byte:
204 raise _RewriteHaltException(
205 'Rewrite did not resume; %s bytes written, but %s bytes should '
206 'have already been written.' % (total_bytes_rewritten,
207 self._required_byte))
210 class _ResumableUploadRetryHandler(object):
211 """Test callback handler for causing retries during a resumable transfer."""
213 def __init__(self, retry_at_byte, exception_to_raise, exc_args,
214 num_retries=1):
215 self._retry_at_byte = retry_at_byte
216 self._exception_to_raise = exception_to_raise
217 self._exception_args = exc_args
218 self._num_retries = num_retries
220 self._retries_made = 0
222 # pylint: disable=invalid-name
223 def call(self, total_bytes_transferred, unused_total_size):
224 """Cause a single retry at the retry point."""
225 if (total_bytes_transferred >= self._retry_at_byte
226 and self._retries_made < self._num_retries):
227 self._retries_made += 1
228 raise self._exception_to_raise(*self._exception_args)
231 class TestCp(testcase.GsUtilIntegrationTestCase):
232 """Integration tests for cp command."""
234 # For tests that artificially halt, we need to ensure at least one callback
235 # occurs.
236 halt_size = START_CALLBACK_PER_BYTES * 2
238 def _get_test_file(self, name):
239 contents = pkgutil.get_data('gslib', 'tests/test_data/%s' % name)
240 return self.CreateTempFile(file_name=name, contents=contents)
242 @PerformsFileToObjectUpload
243 def test_noclobber(self):
244 key_uri = self.CreateObject(contents='foo')
245 fpath = self.CreateTempFile(contents='bar')
246 stderr = self.RunGsUtil(['cp', '-n', fpath, suri(key_uri)],
247 return_stderr=True)
248 self.assertIn('Skipping existing item: %s' % suri(key_uri), stderr)
249 self.assertEqual(key_uri.get_contents_as_string(), 'foo')
250 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), fpath],
251 return_stderr=True)
252 with open(fpath, 'r') as f:
253 self.assertIn('Skipping existing item: %s' % suri(f), stderr)
254 self.assertEqual(f.read(), 'bar')
256 def test_dest_bucket_not_exist(self):
257 fpath = self.CreateTempFile(contents='foo')
258 invalid_bucket_uri = (
259 '%s://%s' % (self.default_provider, self.nonexistent_bucket_name))
260 stderr = self.RunGsUtil(['cp', fpath, invalid_bucket_uri],
261 expected_status=1, return_stderr=True)
262 self.assertIn('does not exist.', stderr)
264 def test_copy_in_cloud_noclobber(self):
265 bucket1_uri = self.CreateBucket()
266 bucket2_uri = self.CreateBucket()
267 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
268 stderr = self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)],
269 return_stderr=True)
270 # Rewrite API may output an additional 'Copying' progress notification.
271 self.assertGreaterEqual(stderr.count('Copying'), 1)
272 self.assertLessEqual(stderr.count('Copying'), 2)
273 stderr = self.RunGsUtil(['cp', '-n', suri(key_uri), suri(bucket2_uri)],
274 return_stderr=True)
275 self.assertIn('Skipping existing item: %s' %
276 suri(bucket2_uri, key_uri.object_name), stderr)
278 @PerformsFileToObjectUpload
279 def test_streaming(self):
280 bucket_uri = self.CreateBucket()
281 stderr = self.RunGsUtil(['cp', '-', '%s' % suri(bucket_uri, 'foo')],
282 stdin='bar', return_stderr=True)
283 self.assertIn('Copying from <STDIN>', stderr)
284 key_uri = bucket_uri.clone_replace_name('foo')
285 self.assertEqual(key_uri.get_contents_as_string(), 'bar')
287 def test_streaming_multiple_arguments(self):
288 bucket_uri = self.CreateBucket()
289 stderr = self.RunGsUtil(['cp', '-', '-', suri(bucket_uri)],
290 stdin='bar', return_stderr=True, expected_status=1)
291 self.assertIn('Multiple URL strings are not supported with streaming',
292 stderr)
294 # TODO: Implement a way to test both with and without using magic file.
296 @PerformsFileToObjectUpload
297 def test_detect_content_type(self):
298 """Tests local detection of content type."""
299 bucket_uri = self.CreateBucket()
300 dsturi = suri(bucket_uri, 'foo')
302 self.RunGsUtil(['cp', self._get_test_file('test.mp3'), dsturi])
304 # Use @Retry as hedge against bucket listing eventual consistency.
305 @Retry(AssertionError, tries=3, timeout_secs=1)
306 def _Check1():
307 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
308 if IS_WINDOWS:
309 self.assertTrue(
310 re.search(r'Content-Type:\s+audio/x-mpg', stdout) or
311 re.search(r'Content-Type:\s+audio/mpeg', stdout))
312 else:
313 self.assertRegexpMatches(stdout, r'Content-Type:\s+audio/mpeg')
314 _Check1()
316 self.RunGsUtil(['cp', self._get_test_file('test.gif'), dsturi])
318 # Use @Retry as hedge against bucket listing eventual consistency.
319 @Retry(AssertionError, tries=3, timeout_secs=1)
320 def _Check2():
321 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
322 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
323 _Check2()
325 def test_content_type_override_default(self):
326 """Tests overriding content type with the default value."""
327 bucket_uri = self.CreateBucket()
328 dsturi = suri(bucket_uri, 'foo')
330 self.RunGsUtil(['-h', 'Content-Type:', 'cp',
331 self._get_test_file('test.mp3'), dsturi])
333 # Use @Retry as hedge against bucket listing eventual consistency.
334 @Retry(AssertionError, tries=3, timeout_secs=1)
335 def _Check1():
336 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
337 self.assertRegexpMatches(stdout,
338 r'Content-Type:\s+application/octet-stream')
339 _Check1()
341 self.RunGsUtil(['-h', 'Content-Type:', 'cp',
342 self._get_test_file('test.gif'), dsturi])
344 # Use @Retry as hedge against bucket listing eventual consistency.
345 @Retry(AssertionError, tries=3, timeout_secs=1)
346 def _Check2():
347 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
348 self.assertRegexpMatches(stdout,
349 r'Content-Type:\s+application/octet-stream')
350 _Check2()
352 def test_content_type_override(self):
353 """Tests overriding content type with a value."""
354 bucket_uri = self.CreateBucket()
355 dsturi = suri(bucket_uri, 'foo')
357 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp',
358 self._get_test_file('test.mp3'), dsturi])
360 # Use @Retry as hedge against bucket listing eventual consistency.
361 @Retry(AssertionError, tries=3, timeout_secs=1)
362 def _Check1():
363 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
364 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
365 _Check1()
367 self.RunGsUtil(['-h', 'Content-Type:text/plain', 'cp',
368 self._get_test_file('test.gif'), dsturi])
370 # Use @Retry as hedge against bucket listing eventual consistency.
371 @Retry(AssertionError, tries=3, timeout_secs=1)
372 def _Check2():
373 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
374 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
375 _Check2()
377 @unittest.skipIf(IS_WINDOWS, 'magicfile is not available on Windows.')
378 @PerformsFileToObjectUpload
379 def test_magicfile_override(self):
380 """Tests content type override with magicfile value."""
381 bucket_uri = self.CreateBucket()
382 dsturi = suri(bucket_uri, 'foo')
383 fpath = self.CreateTempFile(contents='foo/bar\n')
384 self.RunGsUtil(['cp', fpath, dsturi])
386 # Use @Retry as hedge against bucket listing eventual consistency.
387 @Retry(AssertionError, tries=3, timeout_secs=1)
388 def _Check1():
389 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
390 use_magicfile = boto.config.getbool('GSUtil', 'use_magicfile', False)
391 content_type = ('text/plain' if use_magicfile
392 else 'application/octet-stream')
393 self.assertRegexpMatches(stdout, r'Content-Type:\s+%s' % content_type)
394 _Check1()
396 @PerformsFileToObjectUpload
397 def test_content_type_mismatches(self):
398 """Tests overriding content type when it does not match the file type."""
399 bucket_uri = self.CreateBucket()
400 dsturi = suri(bucket_uri, 'foo')
401 fpath = self.CreateTempFile(contents='foo/bar\n')
403 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp',
404 self._get_test_file('test.mp3'), dsturi])
406 # Use @Retry as hedge against bucket listing eventual consistency.
407 @Retry(AssertionError, tries=3, timeout_secs=1)
408 def _Check1():
409 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
410 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
411 _Check1()
413 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp',
414 self._get_test_file('test.gif'), dsturi])
416 # Use @Retry as hedge against bucket listing eventual consistency.
417 @Retry(AssertionError, tries=3, timeout_secs=1)
418 def _Check2():
419 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
420 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
421 _Check2()
423 self.RunGsUtil(['-h', 'Content-Type:image/gif', 'cp', fpath, dsturi])
425 # Use @Retry as hedge against bucket listing eventual consistency.
426 @Retry(AssertionError, tries=3, timeout_secs=1)
427 def _Check3():
428 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
429 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
430 _Check3()
432 @PerformsFileToObjectUpload
433 def test_content_type_header_case_insensitive(self):
434 """Tests that content type header is treated with case insensitivity."""
435 bucket_uri = self.CreateBucket()
436 dsturi = suri(bucket_uri, 'foo')
437 fpath = self._get_test_file('test.gif')
439 self.RunGsUtil(['-h', 'content-Type:text/plain', 'cp',
440 fpath, dsturi])
442 # Use @Retry as hedge against bucket listing eventual consistency.
443 @Retry(AssertionError, tries=3, timeout_secs=1)
444 def _Check1():
445 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
446 self.assertRegexpMatches(stdout, r'Content-Type:\s+text/plain')
447 self.assertNotRegexpMatches(stdout, r'image/gif')
448 _Check1()
450 self.RunGsUtil(['-h', 'CONTENT-TYPE:image/gif',
451 '-h', 'content-type:image/gif',
452 'cp', fpath, dsturi])
454 # Use @Retry as hedge against bucket listing eventual consistency.
455 @Retry(AssertionError, tries=3, timeout_secs=1)
456 def _Check2():
457 stdout = self.RunGsUtil(['ls', '-L', dsturi], return_stdout=True)
458 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
459 self.assertNotRegexpMatches(stdout, r'image/gif,\s*image/gif')
460 _Check2()
462 @PerformsFileToObjectUpload
463 def test_other_headers(self):
464 """Tests that non-content-type headers are applied successfully on copy."""
465 bucket_uri = self.CreateBucket()
466 dst_uri = suri(bucket_uri, 'foo')
467 fpath = self._get_test_file('test.gif')
469 self.RunGsUtil(['-h', 'Cache-Control:public,max-age=12',
470 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta, 'cp',
471 fpath, dst_uri])
473 stdout = self.RunGsUtil(['ls', '-L', dst_uri], return_stdout=True)
474 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
475 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
477 dst_uri2 = suri(bucket_uri, 'bar')
478 self.RunGsUtil(['cp', dst_uri, dst_uri2])
479 # Ensure metadata was preserved across copy.
480 stdout = self.RunGsUtil(['ls', '-L', dst_uri2], return_stdout=True)
481 self.assertRegexpMatches(stdout, r'Cache-Control\s*:\s*public,max-age=12')
482 self.assertRegexpMatches(stdout, r'Metadata:\s*1:\s*abcd')
484 @PerformsFileToObjectUpload
485 def test_versioning(self):
486 """Tests copy with versioning."""
487 bucket_uri = self.CreateVersionedBucket()
488 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
489 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
490 g1 = urigen(k2_uri)
491 self.RunGsUtil(['cp', suri(k1_uri), suri(k2_uri)])
492 k2_uri = bucket_uri.clone_replace_name(k2_uri.object_name)
493 k2_uri = bucket_uri.clone_replace_key(k2_uri.get_key())
494 g2 = urigen(k2_uri)
495 k2_uri.set_contents_from_string('data3')
496 g3 = urigen(k2_uri)
498 fpath = self.CreateTempFile()
499 # Check to make sure current version is data3.
500 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
501 with open(fpath, 'r') as f:
502 self.assertEqual(f.read(), 'data3')
504 # Check contents of all three versions
505 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1), fpath])
506 with open(fpath, 'r') as f:
507 self.assertEqual(f.read(), 'data1')
508 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g2), fpath])
509 with open(fpath, 'r') as f:
510 self.assertEqual(f.read(), 'data2')
511 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g3), fpath])
512 with open(fpath, 'r') as f:
513 self.assertEqual(f.read(), 'data3')
515 # Copy first version to current and verify.
516 self.RunGsUtil(['cp', '%s#%s' % (k2_uri.versionless_uri, g1),
517 k2_uri.versionless_uri])
518 self.RunGsUtil(['cp', k2_uri.versionless_uri, fpath])
519 with open(fpath, 'r') as f:
520 self.assertEqual(f.read(), 'data1')
522 # Attempt to specify a version-specific URI for destination.
523 stderr = self.RunGsUtil(['cp', fpath, k2_uri.uri], return_stderr=True,
524 expected_status=1)
525 self.assertIn('cannot be the destination for gsutil cp', stderr)
527 @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
528 def test_recursive_copying_versioned_bucket(self):
529 """Tests that cp -R with versioned buckets copies all versions in order."""
530 bucket1_uri = self.CreateVersionedBucket()
531 bucket2_uri = self.CreateVersionedBucket()
533 # Write two versions of an object to the bucket1.
534 self.CreateObject(bucket_uri=bucket1_uri, object_name='k', contents='data0')
535 self.CreateObject(bucket_uri=bucket1_uri, object_name='k',
536 contents='longer_data1')
538 self.AssertNObjectsInBucket(bucket1_uri, 2, versioned=True)
539 self.AssertNObjectsInBucket(bucket2_uri, 0, versioned=True)
541 # Recursively copy to second versioned bucket.
542 self.RunGsUtil(['cp', '-R', suri(bucket1_uri, '*'), suri(bucket2_uri)])
544 # Use @Retry as hedge against bucket listing eventual consistency.
545 @Retry(AssertionError, tries=3, timeout_secs=1)
546 def _Check2():
547 """Validates the results of the cp -R."""
548 listing1 = self.RunGsUtil(['ls', '-la', suri(bucket1_uri)],
549 return_stdout=True).split('\n')
550 listing2 = self.RunGsUtil(['ls', '-la', suri(bucket2_uri)],
551 return_stdout=True).split('\n')
552 # 2 lines of listing output, 1 summary line, 1 empty line from \n split.
553 self.assertEquals(len(listing1), 4)
554 self.assertEquals(len(listing2), 4)
556 # First object in each bucket should match in size and version-less name.
557 size1, _, uri_str1, _ = listing1[0].split()
558 self.assertEquals(size1, str(len('data0')))
559 self.assertEquals(storage_uri(uri_str1).object_name, 'k')
560 size2, _, uri_str2, _ = listing2[0].split()
561 self.assertEquals(size2, str(len('data0')))
562 self.assertEquals(storage_uri(uri_str2).object_name, 'k')
564 # Similarly for second object in each bucket.
565 size1, _, uri_str1, _ = listing1[1].split()
566 self.assertEquals(size1, str(len('longer_data1')))
567 self.assertEquals(storage_uri(uri_str1).object_name, 'k')
568 size2, _, uri_str2, _ = listing2[1].split()
569 self.assertEquals(size2, str(len('longer_data1')))
570 self.assertEquals(storage_uri(uri_str2).object_name, 'k')
571 _Check2()
573 @PerformsFileToObjectUpload
574 @SkipForS3('Preconditions not supported for S3.')
575 def test_cp_generation_zero_match(self):
576 """Tests that cp handles an object-not-exists precondition header."""
577 bucket_uri = self.CreateBucket()
578 fpath1 = self.CreateTempFile(contents='data1')
579 # Match 0 means only write the object if it doesn't already exist.
580 gen_match_header = 'x-goog-if-generation-match:0'
582 # First copy should succeed.
583 # TODO: This can fail (rarely) if the server returns a 5xx but actually
584 # commits the bytes. If we add restarts on small uploads, handle this
585 # case.
586 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(bucket_uri)])
588 # Second copy should fail with a precondition error.
589 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1,
590 suri(bucket_uri)],
591 return_stderr=True, expected_status=1)
592 self.assertIn('PreconditionException', stderr)
594 @PerformsFileToObjectUpload
595 @SkipForS3('Preconditions not supported for S3.')
596 def test_cp_v_generation_match(self):
597 """Tests that cp -v option handles the if-generation-match header."""
598 bucket_uri = self.CreateVersionedBucket()
599 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
600 g1 = k1_uri.generation
602 tmpdir = self.CreateTempDir()
603 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
605 gen_match_header = 'x-goog-if-generation-match:%s' % g1
606 # First copy should succeed.
607 self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1, suri(k1_uri)])
609 # Second copy should fail the precondition.
610 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', fpath1,
611 suri(k1_uri)],
612 return_stderr=True, expected_status=1)
614 self.assertIn('PreconditionException', stderr)
616 # Specifiying a generation with -n should fail before the request hits the
617 # server.
618 stderr = self.RunGsUtil(['-h', gen_match_header, 'cp', '-n', fpath1,
619 suri(k1_uri)],
620 return_stderr=True, expected_status=1)
622 self.assertIn('ArgumentException', stderr)
623 self.assertIn('Specifying x-goog-if-generation-match is not supported '
624 'with cp -n', stderr)
626 @PerformsFileToObjectUpload
627 def test_cp_nv(self):
628 """Tests that cp -nv works when skipping existing file."""
629 bucket_uri = self.CreateVersionedBucket()
630 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
632 tmpdir = self.CreateTempDir()
633 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
635 # First copy should succeed.
636 self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)])
638 # Second copy should skip copying.
639 stderr = self.RunGsUtil(['cp', '-nv', fpath1, suri(k1_uri)],
640 return_stderr=True)
641 self.assertIn('Skipping existing item:', stderr)
643 @PerformsFileToObjectUpload
644 @SkipForS3('S3 lists versioned objects in reverse timestamp order.')
645 def test_cp_v_option(self):
646 """"Tests that cp -v returns the created object's version-specific URI."""
647 bucket_uri = self.CreateVersionedBucket()
648 k1_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data1')
649 k2_uri = self.CreateObject(bucket_uri=bucket_uri, contents='data2')
651 # Case 1: Upload file to object using one-shot PUT.
652 tmpdir = self.CreateTempDir()
653 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
654 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
656 # Case 2: Upload file to object using resumable upload.
657 size_threshold = ONE_KIB
658 boto_config_for_test = ('GSUtil', 'resumable_threshold',
659 str(size_threshold))
660 with SetBotoConfigForTest([boto_config_for_test]):
661 file_as_string = os.urandom(size_threshold)
662 tmpdir = self.CreateTempDir()
663 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents=file_as_string)
664 self._run_cp_minus_v_test('-v', fpath1, k2_uri.uri)
666 # Case 3: Upload stream to object.
667 self._run_cp_minus_v_test('-v', '-', k2_uri.uri)
669 # Case 4: Download object to file. For this case we just expect output of
670 # gsutil cp -v to be the URI of the file.
671 tmpdir = self.CreateTempDir()
672 fpath1 = self.CreateTempFile(tmpdir=tmpdir)
673 dst_uri = storage_uri(fpath1)
674 stderr = self.RunGsUtil(['cp', '-v', suri(k1_uri), suri(dst_uri)],
675 return_stderr=True)
676 self.assertIn('Created: %s' % dst_uri.uri, stderr.split('\n')[-2])
678 # Case 5: Daisy-chain from object to object.
679 self._run_cp_minus_v_test('-Dv', k1_uri.uri, k2_uri.uri)
681 # Case 6: Copy object to object in-the-cloud.
682 self._run_cp_minus_v_test('-v', k1_uri.uri, k2_uri.uri)
684 def _run_cp_minus_v_test(self, opt, src_str, dst_str):
685 """Runs cp -v with the options and validates the results."""
686 stderr = self.RunGsUtil(['cp', opt, src_str, dst_str], return_stderr=True)
687 match = re.search(r'Created: (.*)\n', stderr)
688 self.assertIsNotNone(match)
689 created_uri = match.group(1)
691 # Use @Retry as hedge against bucket listing eventual consistency.
692 @Retry(AssertionError, tries=3, timeout_secs=1)
693 def _Check1():
694 stdout = self.RunGsUtil(['ls', '-a', dst_str], return_stdout=True)
695 lines = stdout.split('\n')
696 # Final (most recent) object should match the "Created:" URI. This is
697 # in second-to-last line (last line is '\n').
698 self.assertGreater(len(lines), 2)
699 self.assertEqual(created_uri, lines[-2])
700 _Check1()
702 @PerformsFileToObjectUpload
703 def test_stdin_args(self):
704 """Tests cp with the -I option."""
705 tmpdir = self.CreateTempDir()
706 fpath1 = self.CreateTempFile(tmpdir=tmpdir, contents='data1')
707 fpath2 = self.CreateTempFile(tmpdir=tmpdir, contents='data2')
708 bucket_uri = self.CreateBucket()
709 self.RunGsUtil(['cp', '-I', suri(bucket_uri)],
710 stdin='\n'.join((fpath1, fpath2)))
712 # Use @Retry as hedge against bucket listing eventual consistency.
713 @Retry(AssertionError, tries=3, timeout_secs=1)
714 def _Check1():
715 stdout = self.RunGsUtil(['ls', suri(bucket_uri)], return_stdout=True)
716 self.assertIn(os.path.basename(fpath1), stdout)
717 self.assertIn(os.path.basename(fpath2), stdout)
718 self.assertNumLines(stdout, 2)
719 _Check1()
721 def test_cross_storage_class_cloud_cp(self):
722 bucket1_uri = self.CreateBucket(storage_class='STANDARD')
723 bucket2_uri = self.CreateBucket(
724 storage_class='DURABLE_REDUCED_AVAILABILITY')
725 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
726 # Server now allows copy-in-the-cloud across storage classes.
727 self.RunGsUtil(['cp', suri(key_uri), suri(bucket2_uri)])
729 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
730 def test_cross_provider_cp(self):
731 s3_bucket = self.CreateBucket(provider='s3')
732 gs_bucket = self.CreateBucket(provider='gs')
733 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='foo')
734 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='bar')
735 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
736 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
738 @unittest.skipUnless(HAS_S3_CREDS, 'Test requires both S3 and GS credentials')
739 @unittest.skip('This test performs a large copy but remains here for '
740 'debugging purposes.')
741 def test_cross_provider_large_cp(self):
742 s3_bucket = self.CreateBucket(provider='s3')
743 gs_bucket = self.CreateBucket(provider='gs')
744 s3_key = self.CreateObject(bucket_uri=s3_bucket, contents='f'*1024*1024)
745 gs_key = self.CreateObject(bucket_uri=gs_bucket, contents='b'*1024*1024)
746 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
747 self.RunGsUtil(['cp', suri(gs_key), suri(s3_bucket)])
748 with SetBotoConfigForTest([
749 ('GSUtil', 'resumable_threshold', str(ONE_KIB)),
750 ('GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))]):
751 # Ensure copy also works across json upload chunk boundaries.
752 self.RunGsUtil(['cp', suri(s3_key), suri(gs_bucket)])
754 @unittest.skip('This test is slow due to creating many objects, '
755 'but remains here for debugging purposes.')
756 def test_daisy_chain_cp_file_sizes(self):
757 """Ensure daisy chain cp works with a wide of file sizes."""
758 bucket_uri = self.CreateBucket()
759 bucket2_uri = self.CreateBucket()
760 exponent_cap = 28 # Up to 256 MiB in size.
761 for i in range(exponent_cap):
762 one_byte_smaller = 2**i - 1
763 normal = 2**i
764 one_byte_larger = 2**i + 1
765 self.CreateObject(bucket_uri=bucket_uri, contents='a'*one_byte_smaller)
766 self.CreateObject(bucket_uri=bucket_uri, contents='b'*normal)
767 self.CreateObject(bucket_uri=bucket_uri, contents='c'*one_byte_larger)
769 self.AssertNObjectsInBucket(bucket_uri, exponent_cap*3)
770 self.RunGsUtil(['-m', 'cp', '-D', suri(bucket_uri, '**'),
771 suri(bucket2_uri)])
773 self.AssertNObjectsInBucket(bucket2_uri, exponent_cap*3)
775 def test_daisy_chain_cp(self):
776 """Tests cp with the -D option."""
777 bucket1_uri = self.CreateBucket(storage_class='STANDARD')
778 bucket2_uri = self.CreateBucket(
779 storage_class='DURABLE_REDUCED_AVAILABILITY')
780 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
781 # Set some headers on source object so we can verify that headers are
782 # presereved by daisy-chain copy.
783 self.RunGsUtil(['setmeta', '-h', 'Cache-Control:public,max-age=12',
784 '-h', 'Content-Type:image/gif',
785 '-h', 'x-%s-meta-1:abcd' % self.provider_custom_meta,
786 suri(key_uri)])
787 # Set public-read (non-default) ACL so we can verify that cp -D -p works.
788 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
789 acl_json = self.RunGsUtil(['acl', 'get', suri(key_uri)], return_stdout=True)
790 # Perform daisy-chain copy and verify that source object headers and ACL
791 # were preserved. Also specify -n option to test that gsutil correctly
792 # removes the x-goog-if-generation-match:0 header that was set at uploading
793 # time when updating the ACL.
794 stderr = self.RunGsUtil(['cp', '-Dpn', suri(key_uri), suri(bucket2_uri)],
795 return_stderr=True)
796 self.assertNotIn('Copy-in-the-cloud disallowed', stderr)
798 @Retry(AssertionError, tries=3, timeout_secs=1)
799 def _Check():
800 uri = suri(bucket2_uri, key_uri.object_name)
801 stdout = self.RunGsUtil(['ls', '-L', uri], return_stdout=True)
802 self.assertRegexpMatches(stdout, r'Cache-Control:\s+public,max-age=12')
803 self.assertRegexpMatches(stdout, r'Content-Type:\s+image/gif')
804 self.assertRegexpMatches(stdout, r'Metadata:\s+1:\s+abcd')
805 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
806 self.assertEqual(acl_json, new_acl_json)
807 _Check()
809 def test_daisy_chain_cp_download_failure(self):
810 """Tests cp with the -D option when the download thread dies."""
811 bucket1_uri = self.CreateBucket()
812 bucket2_uri = self.CreateBucket()
813 key_uri = self.CreateObject(bucket_uri=bucket1_uri,
814 contents='a' * self.halt_size)
815 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
816 test_callback_file = self.CreateTempFile(
817 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
818 with SetBotoConfigForTest([boto_config_for_test]):
819 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
820 '-D', suri(key_uri), suri(bucket2_uri)],
821 expected_status=1, return_stderr=True)
822 # Should have two exception traces; one from the download thread and
823 # one from the upload thread.
824 self.assertEqual(stderr.count(
825 'ResumableDownloadException: Artifically halting download'), 2)
827 def test_canned_acl_cp(self):
828 """Tests copying with a canned ACL."""
829 bucket1_uri = self.CreateBucket()
830 bucket2_uri = self.CreateBucket()
831 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
832 self.RunGsUtil(['cp', '-a', 'public-read', suri(key_uri),
833 suri(bucket2_uri)])
834 # Set public-read on the original key after the copy so we can compare
835 # the ACLs.
836 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
837 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
838 return_stdout=True)
840 @Retry(AssertionError, tries=3, timeout_secs=1)
841 def _Check():
842 uri = suri(bucket2_uri, key_uri.object_name)
843 new_acl_json = self.RunGsUtil(['acl', 'get', uri], return_stdout=True)
844 self.assertEqual(public_read_acl, new_acl_json)
845 _Check()
847 @PerformsFileToObjectUpload
848 def test_canned_acl_upload(self):
849 """Tests uploading a file with a canned ACL."""
850 bucket1_uri = self.CreateBucket()
851 key_uri = self.CreateObject(bucket_uri=bucket1_uri, contents='foo')
852 # Set public-read on the object so we can compare the ACLs.
853 self.RunGsUtil(['acl', 'set', 'public-read', suri(key_uri)])
854 public_read_acl = self.RunGsUtil(['acl', 'get', suri(key_uri)],
855 return_stdout=True)
857 file_name = 'bar'
858 fpath = self.CreateTempFile(file_name=file_name, contents='foo')
859 self.RunGsUtil(['cp', '-a', 'public-read', fpath, suri(bucket1_uri)])
860 new_acl_json = self.RunGsUtil(['acl', 'get', suri(bucket1_uri, file_name)],
861 return_stdout=True)
862 self.assertEqual(public_read_acl, new_acl_json)
864 resumable_size = ONE_KIB
865 boto_config_for_test = ('GSUtil', 'resumable_threshold',
866 str(resumable_size))
867 with SetBotoConfigForTest([boto_config_for_test]):
868 resumable_file_name = 'resumable_bar'
869 resumable_contents = os.urandom(resumable_size)
870 resumable_fpath = self.CreateTempFile(
871 file_name=resumable_file_name, contents=resumable_contents)
872 self.RunGsUtil(['cp', '-a', 'public-read', resumable_fpath,
873 suri(bucket1_uri)])
874 new_resumable_acl_json = self.RunGsUtil(
875 ['acl', 'get', suri(bucket1_uri, resumable_file_name)],
876 return_stdout=True)
877 self.assertEqual(public_read_acl, new_resumable_acl_json)
879 def test_cp_key_to_local_stream(self):
880 bucket_uri = self.CreateBucket()
881 contents = 'foo'
882 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents=contents)
883 stdout = self.RunGsUtil(['cp', suri(key_uri), '-'], return_stdout=True)
884 self.assertIn(contents, stdout)
886 def test_cp_local_file_to_local_stream(self):
887 contents = 'content'
888 fpath = self.CreateTempFile(contents=contents)
889 stdout = self.RunGsUtil(['cp', fpath, '-'], return_stdout=True)
890 self.assertIn(contents, stdout)
892 @PerformsFileToObjectUpload
893 def test_cp_zero_byte_file(self):
894 dst_bucket_uri = self.CreateBucket()
895 src_dir = self.CreateTempDir()
896 fpath = os.path.join(src_dir, 'zero_byte')
897 with open(fpath, 'w') as unused_out_file:
898 pass # Write a zero byte file
899 self.RunGsUtil(['cp', fpath, suri(dst_bucket_uri)])
901 @Retry(AssertionError, tries=3, timeout_secs=1)
902 def _Check1():
903 stdout = self.RunGsUtil(['ls', suri(dst_bucket_uri)], return_stdout=True)
904 self.assertIn(os.path.basename(fpath), stdout)
905 _Check1()
907 download_path = os.path.join(src_dir, 'zero_byte_download')
908 self.RunGsUtil(['cp', suri(dst_bucket_uri, 'zero_byte'), download_path])
909 self.assertTrue(os.stat(download_path))
911 def test_copy_bucket_to_bucket(self):
912 """Tests that recursively copying from bucket to bucket.
914 This should produce identically named objects (and not, in particular,
915 destination objects named by the version-specific URI from source objects).
917 src_bucket_uri = self.CreateVersionedBucket()
918 dst_bucket_uri = self.CreateVersionedBucket()
919 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
920 contents='abc')
921 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
922 contents='def')
924 # Use @Retry as hedge against bucket listing eventual consistency.
925 @Retry(AssertionError, tries=3, timeout_secs=1)
926 def _CopyAndCheck():
927 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri),
928 suri(dst_bucket_uri)])
929 stdout = self.RunGsUtil(['ls', '-R', dst_bucket_uri.uri],
930 return_stdout=True)
931 self.assertIn('%s%s/obj0\n' % (dst_bucket_uri,
932 src_bucket_uri.bucket_name), stdout)
933 self.assertIn('%s%s/obj1\n' % (dst_bucket_uri,
934 src_bucket_uri.bucket_name), stdout)
935 _CopyAndCheck()
937 def test_copy_bucket_to_dir(self):
938 """Tests recursively copying from bucket to a directory.
940 This should produce identically named objects (and not, in particular,
941 destination objects named by the version- specific URI from source objects).
943 src_bucket_uri = self.CreateBucket()
944 dst_dir = self.CreateTempDir()
945 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
946 contents='abc')
947 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
948 contents='def')
950 # Use @Retry as hedge against bucket listing eventual consistency.
951 @Retry(AssertionError, tries=3, timeout_secs=1)
952 def _CopyAndCheck():
953 """Copies the bucket recursively and validates the results."""
954 self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir])
955 dir_list = []
956 for dirname, _, filenames in os.walk(dst_dir):
957 for filename in filenames:
958 dir_list.append(os.path.join(dirname, filename))
959 dir_list = sorted(dir_list)
960 self.assertEqual(len(dir_list), 2)
961 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
962 'obj0'), dir_list[0])
963 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
964 'obj1'), dir_list[1])
965 _CopyAndCheck()
967 def test_recursive_download_with_leftover_dir_placeholder(self):
968 """Tests that we correctly handle leftover dir placeholders."""
969 src_bucket_uri = self.CreateBucket()
970 dst_dir = self.CreateTempDir()
971 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj0',
972 contents='abc')
973 self.CreateObject(bucket_uri=src_bucket_uri, object_name='obj1',
974 contents='def')
976 # Create a placeholder like what can be left over by web GUI tools.
977 key_uri = src_bucket_uri.clone_replace_name('/')
978 key_uri.set_contents_from_string('')
979 self.AssertNObjectsInBucket(src_bucket_uri, 3)
981 stderr = self.RunGsUtil(['cp', '-R', suri(src_bucket_uri), dst_dir],
982 return_stderr=True)
983 self.assertIn('Skipping cloud sub-directory placeholder object', stderr)
984 dir_list = []
985 for dirname, _, filenames in os.walk(dst_dir):
986 for filename in filenames:
987 dir_list.append(os.path.join(dirname, filename))
988 dir_list = sorted(dir_list)
989 self.assertEqual(len(dir_list), 2)
990 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
991 'obj0'), dir_list[0])
992 self.assertEqual(os.path.join(dst_dir, src_bucket_uri.bucket_name,
993 'obj1'), dir_list[1])
995 def test_copy_quiet(self):
996 bucket_uri = self.CreateBucket()
997 key_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
998 stderr = self.RunGsUtil(['-q', 'cp', suri(key_uri),
999 suri(bucket_uri.clone_replace_name('o2'))],
1000 return_stderr=True)
1001 self.assertEqual(stderr.count('Copying '), 0)
1003 def test_cp_md5_match(self):
1004 """Tests that the uploaded object has the expected MD5.
1006 Note that while this does perform a file to object upload, MD5's are
1007 not supported for composite objects so we don't use the decorator in this
1008 case.
1010 bucket_uri = self.CreateBucket()
1011 fpath = self.CreateTempFile(contents='bar')
1012 with open(fpath, 'r') as f_in:
1013 file_md5 = base64.encodestring(binascii.unhexlify(
1014 CalculateMd5FromContents(f_in))).rstrip('\n')
1015 self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
1017 # Use @Retry as hedge against bucket listing eventual consistency.
1018 @Retry(AssertionError, tries=3, timeout_secs=1)
1019 def _Check1():
1020 stdout = self.RunGsUtil(['ls', '-L', suri(bucket_uri)],
1021 return_stdout=True)
1022 self.assertRegexpMatches(stdout,
1023 r'Hash\s+\(md5\):\s+%s' % re.escape(file_md5))
1024 _Check1()
1026 @unittest.skipIf(IS_WINDOWS,
1027 'Unicode handling on Windows requires mods to site-packages')
1028 @PerformsFileToObjectUpload
1029 def test_cp_manifest_upload_unicode(self):
1030 return self._ManifestUpload('foo-unicöde', 'bar-unicöde',
1031 'manifest-unicöde')
1033 @PerformsFileToObjectUpload
1034 def test_cp_manifest_upload(self):
1035 """Tests uploading with a mnifest file."""
1036 return self._ManifestUpload('foo', 'bar', 'manifest')
1038 def _ManifestUpload(self, file_name, object_name, manifest_name):
1039 """Tests uploading with a manifest file."""
1040 bucket_uri = self.CreateBucket()
1041 dsturi = suri(bucket_uri, object_name)
1043 fpath = self.CreateTempFile(file_name=file_name, contents='bar')
1044 logpath = self.CreateTempFile(file_name=manifest_name, contents='')
1045 # Ensure the file is empty.
1046 open(logpath, 'w').close()
1047 self.RunGsUtil(['cp', '-L', logpath, fpath, dsturi])
1048 with open(logpath, 'r') as f:
1049 lines = f.readlines()
1050 self.assertEqual(len(lines), 2)
1052 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5',
1053 'UploadId', 'Source Size', 'Bytes Transferred',
1054 'Result', 'Description']
1055 self.assertEqual(expected_headers, lines[0].strip().split(','))
1056 results = lines[1].strip().split(',')
1057 self.assertEqual(results[0][:7], 'file://') # source
1058 self.assertEqual(results[1][:5], '%s://' %
1059 self.default_provider) # destination
1060 date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
1061 start_date = datetime.datetime.strptime(results[2], date_format)
1062 end_date = datetime.datetime.strptime(results[3], date_format)
1063 self.assertEqual(end_date > start_date, True)
1064 if self.RunGsUtil == testcase.GsUtilIntegrationTestCase.RunGsUtil:
1065 # Check that we didn't do automatic parallel uploads - compose doesn't
1066 # calculate the MD5 hash. Since RunGsUtil is overriden in
1067 # TestCpParallelUploads to force parallel uploads, we can check which
1068 # method was used.
1069 self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5
1070 self.assertEqual(int(results[6]), 3) # Source Size
1071 self.assertEqual(int(results[7]), 3) # Bytes Transferred
1072 self.assertEqual(results[8], 'OK') # Result
1074 @PerformsFileToObjectUpload
1075 def test_cp_manifest_download(self):
1076 """Tests downloading with a manifest file."""
1077 key_uri = self.CreateObject(contents='foo')
1078 fpath = self.CreateTempFile(contents='')
1079 logpath = self.CreateTempFile(contents='')
1080 # Ensure the file is empty.
1081 open(logpath, 'w').close()
1082 self.RunGsUtil(['cp', '-L', logpath, suri(key_uri), fpath],
1083 return_stdout=True)
1084 with open(logpath, 'r') as f:
1085 lines = f.readlines()
1086 self.assertEqual(len(lines), 2)
1088 expected_headers = ['Source', 'Destination', 'Start', 'End', 'Md5',
1089 'UploadId', 'Source Size', 'Bytes Transferred',
1090 'Result', 'Description']
1091 self.assertEqual(expected_headers, lines[0].strip().split(','))
1092 results = lines[1].strip().split(',')
1093 self.assertEqual(results[0][:5], '%s://' %
1094 self.default_provider) # source
1095 self.assertEqual(results[1][:7], 'file://') # destination
1096 date_format = '%Y-%m-%dT%H:%M:%S.%fZ'
1097 start_date = datetime.datetime.strptime(results[2], date_format)
1098 end_date = datetime.datetime.strptime(results[3], date_format)
1099 self.assertEqual(end_date > start_date, True)
1100 self.assertEqual(results[4], 'rL0Y20zC+Fzt72VPzMSk2A==') # md5
1101 self.assertEqual(int(results[6]), 3) # Source Size
1102 # Bytes transferred might be more than 3 if the file was gzipped, since
1103 # the minimum gzip header is 10 bytes.
1104 self.assertGreaterEqual(int(results[7]), 3) # Bytes Transferred
1105 self.assertEqual(results[8], 'OK') # Result
1107 @PerformsFileToObjectUpload
1108 def test_copy_unicode_non_ascii_filename(self):
1109 key_uri = self.CreateObject(contents='foo')
1110 # Make file large enough to cause a resumable upload (which hashes filename
1111 # to construct tracker filename).
1112 fpath = self.CreateTempFile(file_name=u'Аудиоархив',
1113 contents='x' * 3 * 1024 * 1024)
1114 fpath_bytes = fpath.encode(UTF8)
1115 stderr = self.RunGsUtil(['cp', fpath_bytes, suri(key_uri)],
1116 return_stderr=True)
1117 self.assertIn('Copying file:', stderr)
1119 # Note: We originally one time implemented a test
1120 # (test_copy_invalid_unicode_filename) that invalid unicode filenames were
1121 # skipped, but it turns out os.walk() on MacOS doesn't have problems with
1122 # such files (so, failed that test). Given that, we decided to remove the
1123 # test.
1125 def test_gzip_upload_and_download(self):
1126 bucket_uri = self.CreateBucket()
1127 contents = 'x' * 10000
1128 tmpdir = self.CreateTempDir()
1129 self.CreateTempFile(file_name='test.html', tmpdir=tmpdir, contents=contents)
1130 self.CreateTempFile(file_name='test.js', tmpdir=tmpdir, contents=contents)
1131 self.CreateTempFile(file_name='test.txt', tmpdir=tmpdir, contents=contents)
1132 # Test that copying specifying only 2 of the 3 prefixes gzips the correct
1133 # files, and test that including whitespace in the extension list works.
1134 self.RunGsUtil(['cp', '-z', 'js, html',
1135 os.path.join(tmpdir, 'test.*'), suri(bucket_uri)])
1136 self.AssertNObjectsInBucket(bucket_uri, 3)
1137 uri1 = suri(bucket_uri, 'test.html')
1138 uri2 = suri(bucket_uri, 'test.js')
1139 uri3 = suri(bucket_uri, 'test.txt')
1140 stdout = self.RunGsUtil(['stat', uri1], return_stdout=True)
1141 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1142 stdout = self.RunGsUtil(['stat', uri2], return_stdout=True)
1143 self.assertRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1144 stdout = self.RunGsUtil(['stat', uri3], return_stdout=True)
1145 self.assertNotRegexpMatches(stdout, r'Content-Encoding:\s+gzip')
1146 fpath4 = self.CreateTempFile()
1147 for uri in (uri1, uri2, uri3):
1148 self.RunGsUtil(['cp', uri, suri(fpath4)])
1149 with open(fpath4, 'r') as f:
1150 self.assertEqual(f.read(), contents)
1152 def test_upload_with_subdir_and_unexpanded_wildcard(self):
1153 fpath1 = self.CreateTempFile(file_name=('tmp', 'x', 'y', 'z'))
1154 bucket_uri = self.CreateBucket()
1155 wildcard_uri = '%s*' % fpath1[:-5]
1156 stderr = self.RunGsUtil(['cp', '-R', wildcard_uri, suri(bucket_uri)],
1157 return_stderr=True)
1158 self.assertIn('Copying file:', stderr)
1159 self.AssertNObjectsInBucket(bucket_uri, 1)
1161 def test_cp_object_ending_with_slash(self):
1162 """Tests that cp works with object names ending with slash."""
1163 tmpdir = self.CreateTempDir()
1164 bucket_uri = self.CreateBucket()
1165 self.CreateObject(bucket_uri=bucket_uri,
1166 object_name='abc/',
1167 contents='dir')
1168 self.CreateObject(bucket_uri=bucket_uri,
1169 object_name='abc/def',
1170 contents='def')
1171 self.AssertNObjectsInBucket(bucket_uri, 2)
1172 self.RunGsUtil(['cp', '-R', suri(bucket_uri), tmpdir])
1173 # Check that files in the subdir got copied even though subdir object
1174 # download was skipped.
1175 with open(os.path.join(tmpdir, bucket_uri.bucket_name, 'abc', 'def')) as f:
1176 self.assertEquals('def', '\n'.join(f.readlines()))
1178 def test_cp_without_read_access(self):
1179 """Tests that cp fails without read access to the object."""
1180 # TODO: With 401's triggering retries in apitools, this test will take
1181 # a long time. Ideally, make apitools accept a num_retries config for this
1182 # until we stop retrying the 401's.
1183 bucket_uri = self.CreateBucket()
1184 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
1186 # Use @Retry as hedge against bucket listing eventual consistency.
1187 self.AssertNObjectsInBucket(bucket_uri, 1)
1189 with self.SetAnonymousBotoCreds():
1190 stderr = self.RunGsUtil(['cp', suri(object_uri), 'foo'],
1191 return_stderr=True, expected_status=1)
1192 self.assertIn('AccessDenied', stderr)
1194 @unittest.skipIf(IS_WINDOWS, 'os.symlink() is not available on Windows.')
1195 def test_cp_minus_e(self):
1196 fpath_dir = self.CreateTempDir()
1197 fpath1 = self.CreateTempFile(tmpdir=fpath_dir)
1198 fpath2 = os.path.join(fpath_dir, 'cp_minus_e')
1199 bucket_uri = self.CreateBucket()
1200 os.symlink(fpath1, fpath2)
1201 stderr = self.RunGsUtil(
1202 ['cp', '-e', '%s%s*' % (fpath_dir, os.path.sep),
1203 suri(bucket_uri, 'files')],
1204 return_stderr=True)
1205 self.assertIn('Copying file', stderr)
1206 self.assertIn('Skipping symbolic link file', stderr)
1208 def test_cp_multithreaded_wildcard(self):
1209 """Tests that cp -m works with a wildcard."""
1210 num_test_files = 5
1211 tmp_dir = self.CreateTempDir(test_files=num_test_files)
1212 bucket_uri = self.CreateBucket()
1213 wildcard_uri = '%s%s*' % (tmp_dir, os.sep)
1214 self.RunGsUtil(['-m', 'cp', wildcard_uri, suri(bucket_uri)])
1215 self.AssertNObjectsInBucket(bucket_uri, num_test_files)
1217 def test_cp_duplicate_source_args(self):
1218 """Tests that cp -m works when a source argument is provided twice."""
1219 object_contents = 'edge'
1220 object_uri = self.CreateObject(object_name='foo', contents=object_contents)
1221 tmp_dir = self.CreateTempDir()
1222 self.RunGsUtil(['-m', 'cp', suri(object_uri), suri(object_uri), tmp_dir])
1223 with open(os.path.join(tmp_dir, 'foo'), 'r') as in_fp:
1224 contents = in_fp.read()
1225 # Contents should be not duplicated.
1226 self.assertEqual(contents, object_contents)
1228 @SkipForS3('No resumable upload support for S3.')
1229 def test_cp_resumable_upload_break(self):
1230 """Tests that an upload can be resumed after a connection break."""
1231 bucket_uri = self.CreateBucket()
1232 fpath = self.CreateTempFile(contents='a' * self.halt_size)
1233 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1234 test_callback_file = self.CreateTempFile(
1235 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5)))
1237 with SetBotoConfigForTest([boto_config_for_test]):
1238 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1239 fpath, suri(bucket_uri)],
1240 expected_status=1, return_stderr=True)
1241 self.assertIn('Artifically halting upload', stderr)
1242 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
1243 return_stderr=True)
1244 self.assertIn('Resuming upload', stderr)
1246 @SkipForS3('No resumable upload support for S3.')
1247 def test_cp_resumable_upload_retry(self):
1248 """Tests that a resumable upload completes with one retry."""
1249 bucket_uri = self.CreateBucket()
1250 fpath = self.CreateTempFile(contents='a' * self.halt_size)
1251 # TODO: Raising an httplib or socket error blocks bucket teardown
1252 # in JSON for 60-120s on a multiprocessing lock acquire. Figure out why;
1253 # until then, raise an apitools retryable exception.
1254 if self.test_api == ApiSelector.XML:
1255 test_callback_file = self.CreateTempFile(
1256 contents=pickle.dumps(_ResumableUploadRetryHandler(
1257 5, httplib.BadStatusLine, ('unused',))))
1258 else:
1259 test_callback_file = self.CreateTempFile(
1260 contents=pickle.dumps(_ResumableUploadRetryHandler(
1261 5, apitools_exceptions.BadStatusCodeError,
1262 ('unused', 'unused', 'unused'))))
1263 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1264 with SetBotoConfigForTest([boto_config_for_test]):
1265 stderr = self.RunGsUtil(['-D', 'cp', '--testcallbackfile',
1266 test_callback_file, fpath, suri(bucket_uri)],
1267 return_stderr=1)
1268 if self.test_api == ApiSelector.XML:
1269 self.assertIn('Got retryable failure', stderr)
1270 else:
1271 self.assertIn('Retrying', stderr)
1273 @SkipForS3('No resumable upload support for S3.')
1274 def test_cp_resumable_streaming_upload_retry(self):
1275 """Tests that a streaming resumable upload completes with one retry."""
1276 if self.test_api == ApiSelector.XML:
1277 return unittest.skip('XML does not support resumable streaming uploads.')
1278 bucket_uri = self.CreateBucket()
1280 test_callback_file = self.CreateTempFile(
1281 contents=pickle.dumps(_ResumableUploadRetryHandler(
1282 5, apitools_exceptions.BadStatusCodeError,
1283 ('unused', 'unused', 'unused'))))
1284 # Need to reduce the JSON chunk size since streaming uploads buffer a
1285 # full chunk.
1286 boto_configs_for_test = [('GSUtil', 'json_resumable_chunk_size',
1287 str(256 * ONE_KIB)),
1288 ('Boto', 'num_retries', '2')]
1289 with SetBotoConfigForTest(boto_configs_for_test):
1290 stderr = self.RunGsUtil(
1291 ['-D', 'cp', '--testcallbackfile', test_callback_file, '-',
1292 suri(bucket_uri, 'foo')],
1293 stdin='a' * 512 * ONE_KIB, return_stderr=1)
1294 self.assertIn('Retrying', stderr)
1296 @SkipForS3('No resumable upload support for S3.')
1297 def test_cp_resumable_upload(self):
1298 """Tests that a basic resumable upload completes successfully."""
1299 bucket_uri = self.CreateBucket()
1300 fpath = self.CreateTempFile(contents='a' * self.halt_size)
1301 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1302 with SetBotoConfigForTest([boto_config_for_test]):
1303 self.RunGsUtil(['cp', fpath, suri(bucket_uri)])
1305 @SkipForS3('No resumable upload support for S3.')
1306 def test_resumable_upload_break_leaves_tracker(self):
1307 """Tests that a tracker file is created with a resumable upload."""
1308 bucket_uri = self.CreateBucket()
1309 fpath = self.CreateTempFile(file_name='foo',
1310 contents='a' * self.halt_size)
1311 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1312 with SetBotoConfigForTest([boto_config_for_test]):
1313 tracker_filename = GetTrackerFilePath(
1314 StorageUrlFromString(suri(bucket_uri, 'foo')),
1315 TrackerFileType.UPLOAD, self.test_api)
1316 test_callback_file = self.CreateTempFile(
1317 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5)))
1318 try:
1319 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1320 fpath, suri(bucket_uri, 'foo')],
1321 expected_status=1, return_stderr=True)
1322 self.assertIn('Artifically halting upload', stderr)
1323 self.assertTrue(os.path.exists(tracker_filename),
1324 'Tracker file %s not present.' % tracker_filename)
1325 finally:
1326 if os.path.exists(tracker_filename):
1327 os.unlink(tracker_filename)
1329 @SkipForS3('No resumable upload support for S3.')
1330 def test_cp_resumable_upload_break_file_size_change(self):
1331 """Tests a resumable upload where the uploaded file changes size.
1333 This should fail when we read the tracker data.
1335 bucket_uri = self.CreateBucket()
1336 tmp_dir = self.CreateTempDir()
1337 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
1338 contents='a' * self.halt_size)
1339 test_callback_file = self.CreateTempFile(
1340 contents=pickle.dumps(_HaltingCopyCallbackHandler(True, 5)))
1342 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1343 with SetBotoConfigForTest([boto_config_for_test]):
1344 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1345 fpath, suri(bucket_uri)],
1346 expected_status=1, return_stderr=True)
1347 self.assertIn('Artifically halting upload', stderr)
1348 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
1349 contents='a' * self.halt_size * 2)
1350 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
1351 expected_status=1, return_stderr=True)
1352 self.assertIn('ResumableUploadAbortException', stderr)
1354 @SkipForS3('No resumable upload support for S3.')
1355 def test_cp_resumable_upload_break_file_content_change(self):
1356 """Tests a resumable upload where the uploaded file changes content."""
1357 if self.test_api == ApiSelector.XML:
1358 return unittest.skip(
1359 'XML doesn\'t make separate HTTP calls at fixed-size boundaries for '
1360 'resumable uploads, so we can\'t guarantee that the server saves a '
1361 'specific part of the upload.')
1362 bucket_uri = self.CreateBucket()
1363 tmp_dir = self.CreateTempDir()
1364 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
1365 contents='a' * ONE_KIB * 512)
1366 test_callback_file = self.CreateTempFile(
1367 contents=pickle.dumps(_HaltingCopyCallbackHandler(True,
1368 int(ONE_KIB) * 384)))
1369 resumable_threshold_for_test = (
1370 'GSUtil', 'resumable_threshold', str(ONE_KIB))
1371 resumable_chunk_size_for_test = (
1372 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))
1373 with SetBotoConfigForTest([resumable_threshold_for_test,
1374 resumable_chunk_size_for_test]):
1375 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1376 fpath, suri(bucket_uri)],
1377 expected_status=1, return_stderr=True)
1378 self.assertIn('Artifically halting upload', stderr)
1379 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
1380 contents='b' * ONE_KIB * 512)
1381 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
1382 expected_status=1, return_stderr=True)
1383 self.assertIn('doesn\'t match cloud-supplied digest', stderr)
1385 @SkipForS3('No resumable upload support for S3.')
1386 def test_cp_resumable_upload_break_file_smaller_size(self):
1387 """Tests a resumable upload where the uploaded file changes content.
1389 This should fail hash validation.
1391 bucket_uri = self.CreateBucket()
1392 tmp_dir = self.CreateTempDir()
1393 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
1394 contents='a' * ONE_KIB * 512)
1395 test_callback_file = self.CreateTempFile(
1396 contents=pickle.dumps(_HaltingCopyCallbackHandler(True,
1397 int(ONE_KIB) * 384)))
1398 resumable_threshold_for_test = (
1399 'GSUtil', 'resumable_threshold', str(ONE_KIB))
1400 resumable_chunk_size_for_test = (
1401 'GSUtil', 'json_resumable_chunk_size', str(ONE_KIB * 256))
1402 with SetBotoConfigForTest([resumable_threshold_for_test,
1403 resumable_chunk_size_for_test]):
1404 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1405 fpath, suri(bucket_uri)],
1406 expected_status=1, return_stderr=True)
1407 self.assertIn('Artifically halting upload', stderr)
1408 fpath = self.CreateTempFile(file_name='foo', tmpdir=tmp_dir,
1409 contents='a' * ONE_KIB)
1410 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
1411 expected_status=1, return_stderr=True)
1412 self.assertIn('ResumableUploadAbortException', stderr)
1414 # This temporarily changes the tracker directory to unwritable which
1415 # interferes with any parallel running tests that use the tracker directory.
1416 @NotParallelizable
1417 @SkipForS3('No resumable upload support for S3.')
1418 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
1419 @PerformsFileToObjectUpload
1420 def test_cp_unwritable_tracker_file(self):
1421 """Tests a resumable upload with an unwritable tracker file."""
1422 bucket_uri = self.CreateBucket()
1423 tracker_filename = GetTrackerFilePath(
1424 StorageUrlFromString(suri(bucket_uri, 'foo')),
1425 TrackerFileType.UPLOAD, self.test_api)
1426 tracker_dir = os.path.dirname(tracker_filename)
1427 fpath = self.CreateTempFile(file_name='foo', contents='a' * ONE_KIB)
1428 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1429 save_mod = os.stat(tracker_dir).st_mode
1431 try:
1432 os.chmod(tracker_dir, 0)
1433 with SetBotoConfigForTest([boto_config_for_test]):
1434 stderr = self.RunGsUtil(['cp', fpath, suri(bucket_uri)],
1435 expected_status=1, return_stderr=True)
1436 self.assertIn('Couldn\'t write tracker file', stderr)
1437 finally:
1438 os.chmod(tracker_dir, save_mod)
1439 if os.path.exists(tracker_filename):
1440 os.unlink(tracker_filename)
1442 # This temporarily changes the tracker directory to unwritable which
1443 # interferes with any parallel running tests that use the tracker directory.
1444 @NotParallelizable
1445 @unittest.skipIf(IS_WINDOWS, 'chmod on dir unsupported on Windows.')
1446 def test_cp_unwritable_tracker_file_download(self):
1447 """Tests downloads with an unwritable tracker file."""
1448 object_uri = self.CreateObject(contents='foo' * ONE_KIB)
1449 tracker_filename = GetTrackerFilePath(
1450 StorageUrlFromString(suri(object_uri)),
1451 TrackerFileType.DOWNLOAD, self.test_api)
1452 tracker_dir = os.path.dirname(tracker_filename)
1453 fpath = self.CreateTempFile()
1454 save_mod = os.stat(tracker_dir).st_mode
1456 try:
1457 os.chmod(tracker_dir, 0)
1458 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(EIGHT_MIB))
1459 with SetBotoConfigForTest([boto_config_for_test]):
1460 # Should succeed because we are below the threshold.
1461 self.RunGsUtil(['cp', suri(object_uri), fpath])
1462 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1463 with SetBotoConfigForTest([boto_config_for_test]):
1464 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
1465 expected_status=1, return_stderr=True)
1466 self.assertIn('Couldn\'t write tracker file', stderr)
1467 finally:
1468 os.chmod(tracker_dir, save_mod)
1469 if os.path.exists(tracker_filename):
1470 os.unlink(tracker_filename)
1472 def test_cp_resumable_download_break(self):
1473 """Tests that a download can be resumed after a connection break."""
1474 bucket_uri = self.CreateBucket()
1475 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1476 contents='a' * self.halt_size)
1477 fpath = self.CreateTempFile()
1478 test_callback_file = self.CreateTempFile(
1479 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
1481 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1482 with SetBotoConfigForTest([boto_config_for_test]):
1483 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1484 suri(object_uri), fpath],
1485 expected_status=1, return_stderr=True)
1486 self.assertIn('Artifically halting download.', stderr)
1487 tracker_filename = GetTrackerFilePath(
1488 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
1489 self.assertTrue(os.path.isfile(tracker_filename))
1490 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
1491 return_stderr=True)
1492 self.assertIn('Resuming download', stderr)
1494 def test_cp_resumable_download_etag_differs(self):
1495 """Tests that download restarts the file when the source object changes.
1497 This causes the etag not to match.
1499 bucket_uri = self.CreateBucket()
1500 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1501 contents='a' * self.halt_size)
1502 fpath = self.CreateTempFile()
1503 test_callback_file = self.CreateTempFile(
1504 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
1505 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1506 with SetBotoConfigForTest([boto_config_for_test]):
1507 # This will create a tracker file with an ETag.
1508 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1509 suri(object_uri), fpath],
1510 expected_status=1, return_stderr=True)
1511 self.assertIn('Artifically halting download.', stderr)
1512 # Create a new object with different contents - it should have a
1513 # different ETag since the content has changed.
1514 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1515 contents='b' * self.halt_size)
1516 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
1517 return_stderr=True)
1518 self.assertNotIn('Resuming download', stderr)
1520 def test_cp_resumable_download_file_larger(self):
1521 """Tests download deletes the tracker file when existing file is larger."""
1522 bucket_uri = self.CreateBucket()
1523 fpath = self.CreateTempFile()
1524 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1525 contents='a' * self.halt_size)
1526 test_callback_file = self.CreateTempFile(
1527 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
1528 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1529 with SetBotoConfigForTest([boto_config_for_test]):
1530 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1531 suri(object_uri), fpath],
1532 expected_status=1, return_stderr=True)
1533 self.assertIn('Artifically halting download.', stderr)
1534 with open(fpath, 'w') as larger_file:
1535 for _ in range(self.halt_size * 2):
1536 larger_file.write('a')
1537 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
1538 expected_status=1, return_stderr=True)
1539 self.assertNotIn('Resuming download', stderr)
1540 self.assertIn('is larger', stderr)
1541 self.assertIn('Deleting tracker file', stderr)
1543 def test_cp_resumable_download_content_differs(self):
1544 """Tests that we do not re-download when tracker file matches existing file.
1546 We only compare size, not contents, so re-download should not occur even
1547 though the contents are technically different. However, hash validation on
1548 the file should still occur and we will delete the file then because
1549 the hashes differ.
1551 bucket_uri = self.CreateBucket()
1552 tmp_dir = self.CreateTempDir()
1553 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB)
1554 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1555 contents='efgh' * ONE_KIB)
1556 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
1557 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
1558 self.assertIsNotNone(etag_match, 'Could not get object ETag')
1559 self.assertEqual(len(etag_match.groups()), 1,
1560 'Did not match expected single ETag')
1561 etag = etag_match.group(1)
1563 tracker_filename = GetTrackerFilePath(
1564 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
1565 try:
1566 with open(tracker_filename, 'w') as tracker_fp:
1567 tracker_fp.write(etag)
1568 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1569 with SetBotoConfigForTest([boto_config_for_test]):
1570 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
1571 return_stderr=True, expected_status=1)
1572 self.assertIn('Download already complete for file', stderr)
1573 self.assertIn('doesn\'t match cloud-supplied digest', stderr)
1574 # File and tracker file should be deleted.
1575 self.assertFalse(os.path.isfile(fpath))
1576 self.assertFalse(os.path.isfile(tracker_filename))
1577 finally:
1578 if os.path.exists(tracker_filename):
1579 os.unlink(tracker_filename)
1581 def test_cp_resumable_download_content_matches(self):
1582 """Tests download no-ops when tracker file matches existing file."""
1583 bucket_uri = self.CreateBucket()
1584 tmp_dir = self.CreateTempDir()
1585 matching_contents = 'abcd' * ONE_KIB
1586 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents=matching_contents)
1587 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1588 contents=matching_contents)
1589 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
1590 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
1591 self.assertIsNotNone(etag_match, 'Could not get object ETag')
1592 self.assertEqual(len(etag_match.groups()), 1,
1593 'Did not match expected single ETag')
1594 etag = etag_match.group(1)
1595 tracker_filename = GetTrackerFilePath(
1596 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
1597 with open(tracker_filename, 'w') as tracker_fp:
1598 tracker_fp.write(etag)
1599 try:
1600 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1601 with SetBotoConfigForTest([boto_config_for_test]):
1602 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
1603 return_stderr=True)
1604 self.assertIn('Download already complete for file', stderr)
1605 # Tracker file should be removed after successful hash validation.
1606 self.assertFalse(os.path.isfile(tracker_filename))
1607 finally:
1608 if os.path.exists(tracker_filename):
1609 os.unlink(tracker_filename)
1611 def test_cp_resumable_download_tracker_file_not_matches(self):
1612 """Tests that download overwrites when tracker file etag does not match."""
1613 bucket_uri = self.CreateBucket()
1614 tmp_dir = self.CreateTempDir()
1615 fpath = self.CreateTempFile(tmpdir=tmp_dir, contents='abcd' * ONE_KIB)
1616 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1617 contents='efgh' * ONE_KIB)
1618 stdout = self.RunGsUtil(['ls', '-L', suri(object_uri)], return_stdout=True)
1619 etag_match = re.search(r'\s*ETag:\s*(.*)', stdout)
1620 self.assertIsNotNone(etag_match, 'Could not get object ETag')
1621 self.assertEqual(len(etag_match.groups()), 1,
1622 'Did not match regex for exactly one object ETag')
1623 etag = etag_match.group(1)
1624 etag += 'nonmatching'
1625 tracker_filename = GetTrackerFilePath(
1626 StorageUrlFromString(fpath), TrackerFileType.DOWNLOAD, self.test_api)
1627 with open(tracker_filename, 'w') as tracker_fp:
1628 tracker_fp.write(etag)
1629 try:
1630 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1631 with SetBotoConfigForTest([boto_config_for_test]):
1632 stderr = self.RunGsUtil(['cp', suri(object_uri), fpath],
1633 return_stderr=True)
1634 self.assertNotIn('Resuming download', stderr)
1635 # Ensure the file was overwritten.
1636 with open(fpath, 'r') as in_fp:
1637 contents = in_fp.read()
1638 self.assertEqual(contents, 'efgh' * ONE_KIB,
1639 'File not overwritten when it should have been '
1640 'due to a non-matching tracker file.')
1641 self.assertFalse(os.path.isfile(tracker_filename))
1642 finally:
1643 if os.path.exists(tracker_filename):
1644 os.unlink(tracker_filename)
1646 def test_cp_resumable_download_gzip(self):
1647 """Tests that download can be resumed successfully with a gzipped file."""
1648 # Generate some reasonably incompressible data. This compresses to a bit
1649 # around 128K in practice, but we assert specifically below that it is
1650 # larger than self.halt_size to guarantee that we can halt the download
1651 # partway through.
1652 object_uri = self.CreateObject()
1653 random.seed(0)
1654 contents = str([random.choice(string.ascii_letters)
1655 for _ in xrange(ONE_KIB * 128)])
1656 random.seed() # Reset the seed for any other tests.
1657 fpath1 = self.CreateTempFile(file_name='unzipped.txt', contents=contents)
1658 self.RunGsUtil(['cp', '-z', 'txt', suri(fpath1), suri(object_uri)])
1660 # Use @Retry as hedge against bucket listing eventual consistency.
1661 @Retry(AssertionError, tries=3, timeout_secs=1)
1662 def _GetObjectSize():
1663 stdout = self.RunGsUtil(['du', suri(object_uri)], return_stdout=True)
1664 size_match = re.search(r'(\d+)\s+.*', stdout)
1665 self.assertIsNotNone(size_match, 'Could not get object size')
1666 self.assertEqual(len(size_match.groups()), 1,
1667 'Did not match regex for exactly one object size.')
1668 return long(size_match.group(1))
1670 object_size = _GetObjectSize()
1671 self.assertGreaterEqual(object_size, self.halt_size,
1672 'Compresed object size was not large enough to '
1673 'allow for a halted download, so the test results '
1674 'would be invalid. Please increase the compressed '
1675 'object size in the test.')
1676 fpath2 = self.CreateTempFile()
1677 test_callback_file = self.CreateTempFile(
1678 contents=pickle.dumps(_HaltingCopyCallbackHandler(False, 5)))
1680 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1681 with SetBotoConfigForTest([boto_config_for_test]):
1682 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1683 suri(object_uri), suri(fpath2)],
1684 return_stderr=True, expected_status=1)
1685 self.assertIn('Artifically halting download.', stderr)
1686 tracker_filename = GetTrackerFilePath(
1687 StorageUrlFromString(fpath2), TrackerFileType.DOWNLOAD, self.test_api)
1688 self.assertTrue(os.path.isfile(tracker_filename))
1689 self.assertIn('Downloading to temp gzip filename', stderr)
1690 # We should have a temporary gzipped file, a tracker file, and no
1691 # final file yet.
1692 self.assertTrue(os.path.isfile('%s_.gztmp' % fpath2))
1693 stderr = self.RunGsUtil(['cp', suri(object_uri), suri(fpath2)],
1694 return_stderr=True)
1695 self.assertIn('Resuming download', stderr)
1696 with open(fpath2, 'r') as f:
1697 self.assertEqual(f.read(), contents, 'File contents did not match.')
1698 self.assertFalse(os.path.isfile(tracker_filename))
1699 self.assertFalse(os.path.isfile('%s_.gztmp' % fpath2))
1701 @SkipForS3('No resumable upload support for S3.')
1702 def test_cp_resumable_upload_bucket_deleted(self):
1703 """Tests that a not found exception is raised if bucket no longer exists."""
1704 bucket_uri = self.CreateBucket()
1705 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB)
1706 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1707 test_callback_file = self.CreateTempFile(
1708 contents=pickle.dumps(
1709 _DeleteBucketThenStartOverCopyCallbackHandler(5, bucket_uri)))
1711 with SetBotoConfigForTest([boto_config_for_test]):
1712 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1713 fpath, suri(bucket_uri)], return_stderr=True,
1714 expected_status=1)
1715 self.assertIn('Deleting bucket', stderr)
1716 self.assertIn('bucket does not exist', stderr)
1718 @SkipForS3('No resumable upload support for S3.')
1719 def test_cp_resumable_upload_start_over_http_error(self):
1720 for start_over_error in (404, 410):
1721 self.start_over_error_test_helper(start_over_error)
1723 def start_over_error_test_helper(self, http_error_num):
1724 bucket_uri = self.CreateBucket()
1725 fpath = self.CreateTempFile(contents='a' * 2 * ONE_KIB)
1726 boto_config_for_test = ('GSUtil', 'resumable_threshold', str(ONE_KIB))
1727 if self.test_api == ApiSelector.JSON:
1728 test_callback_file = self.CreateTempFile(
1729 contents=pickle.dumps(_JSONForceHTTPErrorCopyCallbackHandler(5, 404)))
1730 elif self.test_api == ApiSelector.XML:
1731 test_callback_file = self.CreateTempFile(
1732 contents=pickle.dumps(
1733 _XMLResumableUploadStartOverCopyCallbackHandler(5)))
1735 with SetBotoConfigForTest([boto_config_for_test]):
1736 stderr = self.RunGsUtil(['cp', '--testcallbackfile', test_callback_file,
1737 fpath, suri(bucket_uri)], return_stderr=True)
1738 self.assertIn('Restarting upload from scratch', stderr)
1740 def test_cp_minus_c(self):
1741 bucket_uri = self.CreateBucket()
1742 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1743 contents='foo')
1744 self.RunGsUtil(
1745 ['cp', '-c', suri(bucket_uri) + '/foo2', suri(object_uri),
1746 suri(bucket_uri) + '/dir/'],
1747 expected_status=1)
1748 self.RunGsUtil(['stat', '%s/dir/foo' % suri(bucket_uri)])
1750 def test_rewrite_cp(self):
1751 """Tests the JSON Rewrite API."""
1752 if self.test_api == ApiSelector.XML:
1753 return unittest.skip('Rewrite API is only supported in JSON.')
1754 bucket_uri = self.CreateBucket()
1755 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1756 contents='bar')
1757 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
1758 self.default_provider)
1759 key = object_uri.get_key()
1760 src_obj_metadata = apitools_messages.Object(
1761 name=key.name, bucket=key.bucket.name, contentType=key.content_type)
1762 dst_obj_metadata = apitools_messages.Object(
1763 bucket=src_obj_metadata.bucket,
1764 name=self.MakeTempName('object'),
1765 contentType=src_obj_metadata.contentType)
1766 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata)
1767 self.assertEqual(
1768 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
1769 src_obj_metadata.name,
1770 fields=['md5Hash']).md5Hash,
1771 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
1772 dst_obj_metadata.name,
1773 fields=['md5Hash']).md5Hash,
1774 'Error: Rewritten object\'s hash doesn\'t match source object.')
1776 def test_rewrite_cp_resume(self):
1777 """Tests the JSON Rewrite API, breaking and resuming via a tracker file."""
1778 if self.test_api == ApiSelector.XML:
1779 return unittest.skip('Rewrite API is only supported in JSON.')
1780 bucket_uri = self.CreateBucket()
1781 # Second bucket needs to be a different storage class so the service
1782 # actually rewrites the bytes.
1783 bucket_uri2 = self.CreateBucket(
1784 storage_class='DURABLE_REDUCED_AVAILABILITY')
1785 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
1786 # need 2 response from the service: 1 success, 1 failure prior to
1787 # completion.
1788 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1789 contents=('12'*ONE_MIB) + 'bar',
1790 prefer_json_api=True)
1791 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
1792 self.default_provider)
1793 key = object_uri.get_key()
1794 src_obj_metadata = apitools_messages.Object(
1795 name=key.name, bucket=key.bucket.name, contentType=key.content_type,
1796 etag=key.etag.strip('"\''))
1797 dst_obj_name = self.MakeTempName('object')
1798 dst_obj_metadata = apitools_messages.Object(
1799 bucket=bucket_uri2.bucket_name,
1800 name=dst_obj_name,
1801 contentType=src_obj_metadata.contentType)
1802 tracker_file_name = GetRewriteTrackerFilePath(
1803 src_obj_metadata.bucket, src_obj_metadata.name,
1804 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
1805 try:
1806 try:
1807 gsutil_api.CopyObject(
1808 src_obj_metadata, dst_obj_metadata,
1809 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call,
1810 max_bytes_per_call=ONE_MIB)
1811 self.fail('Expected _RewriteHaltException.')
1812 except _RewriteHaltException:
1813 pass
1815 # Tracker file should be left over.
1816 self.assertTrue(os.path.exists(tracker_file_name))
1818 # Now resume. Callback ensures we didn't start over.
1819 gsutil_api.CopyObject(
1820 src_obj_metadata, dst_obj_metadata,
1821 progress_callback=_EnsureRewriteResumeCallbackHandler(ONE_MIB*2).call,
1822 max_bytes_per_call=ONE_MIB)
1824 # Copy completed; tracker file should be deleted.
1825 self.assertFalse(os.path.exists(tracker_file_name))
1827 self.assertEqual(
1828 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
1829 src_obj_metadata.name,
1830 fields=['md5Hash']).md5Hash,
1831 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
1832 dst_obj_metadata.name,
1833 fields=['md5Hash']).md5Hash,
1834 'Error: Rewritten object\'s hash doesn\'t match source object.')
1835 finally:
1836 # Clean up if something went wrong.
1837 DeleteTrackerFile(tracker_file_name)
1839 def test_rewrite_cp_resume_source_changed(self):
1840 """Tests that Rewrite starts over when the source object has changed."""
1841 if self.test_api == ApiSelector.XML:
1842 return unittest.skip('Rewrite API is only supported in JSON.')
1843 bucket_uri = self.CreateBucket()
1844 # Second bucket needs to be a different storage class so the service
1845 # actually rewrites the bytes.
1846 bucket_uri2 = self.CreateBucket(
1847 storage_class='DURABLE_REDUCED_AVAILABILITY')
1848 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
1849 # need 2 response from the service: 1 success, 1 failure prior to
1850 # completion.
1851 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1852 contents=('12'*ONE_MIB) + 'bar',
1853 prefer_json_api=True)
1854 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
1855 self.default_provider)
1856 key = object_uri.get_key()
1857 src_obj_metadata = apitools_messages.Object(
1858 name=key.name, bucket=key.bucket.name, contentType=key.content_type,
1859 etag=key.etag.strip('"\''))
1860 dst_obj_name = self.MakeTempName('object')
1861 dst_obj_metadata = apitools_messages.Object(
1862 bucket=bucket_uri2.bucket_name,
1863 name=dst_obj_name,
1864 contentType=src_obj_metadata.contentType)
1865 tracker_file_name = GetRewriteTrackerFilePath(
1866 src_obj_metadata.bucket, src_obj_metadata.name,
1867 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
1868 try:
1869 try:
1870 gsutil_api.CopyObject(
1871 src_obj_metadata, dst_obj_metadata,
1872 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call,
1873 max_bytes_per_call=ONE_MIB)
1874 self.fail('Expected _RewriteHaltException.')
1875 except _RewriteHaltException:
1876 pass
1877 # Overwrite the original object.
1878 object_uri2 = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1879 contents='bar', prefer_json_api=True)
1880 key2 = object_uri2.get_key()
1881 src_obj_metadata2 = apitools_messages.Object(
1882 name=key2.name, bucket=key2.bucket.name,
1883 contentType=key2.content_type, etag=key2.etag.strip('"\''))
1885 # Tracker file for original object should still exist.
1886 self.assertTrue(os.path.exists(tracker_file_name))
1888 # Copy the new object.
1889 gsutil_api.CopyObject(src_obj_metadata2, dst_obj_metadata,
1890 max_bytes_per_call=ONE_MIB)
1892 # Copy completed; original tracker file should be deleted.
1893 self.assertFalse(os.path.exists(tracker_file_name))
1895 self.assertEqual(
1896 gsutil_api.GetObjectMetadata(src_obj_metadata2.bucket,
1897 src_obj_metadata2.name,
1898 fields=['md5Hash']).md5Hash,
1899 gsutil_api.GetObjectMetadata(dst_obj_metadata.bucket,
1900 dst_obj_metadata.name,
1901 fields=['md5Hash']).md5Hash,
1902 'Error: Rewritten object\'s hash doesn\'t match source object.')
1903 finally:
1904 # Clean up if something went wrong.
1905 DeleteTrackerFile(tracker_file_name)
1907 def test_rewrite_cp_resume_command_changed(self):
1908 """Tests that Rewrite starts over when the arguments changed."""
1909 if self.test_api == ApiSelector.XML:
1910 return unittest.skip('Rewrite API is only supported in JSON.')
1911 bucket_uri = self.CreateBucket()
1912 # Second bucket needs to be a different storage class so the service
1913 # actually rewrites the bytes.
1914 bucket_uri2 = self.CreateBucket(
1915 storage_class='DURABLE_REDUCED_AVAILABILITY')
1916 # maxBytesPerCall must be >= 1 MiB, so create an object > 2 MiB because we
1917 # need 2 response from the service: 1 success, 1 failure prior to
1918 # completion.
1919 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
1920 contents=('12'*ONE_MIB) + 'bar',
1921 prefer_json_api=True)
1922 gsutil_api = GcsJsonApi(BucketStorageUri, logging.getLogger(),
1923 self.default_provider)
1924 key = object_uri.get_key()
1925 src_obj_metadata = apitools_messages.Object(
1926 name=key.name, bucket=key.bucket.name, contentType=key.content_type,
1927 etag=key.etag.strip('"\''))
1928 dst_obj_name = self.MakeTempName('object')
1929 dst_obj_metadata = apitools_messages.Object(
1930 bucket=bucket_uri2.bucket_name,
1931 name=dst_obj_name,
1932 contentType=src_obj_metadata.contentType)
1933 tracker_file_name = GetRewriteTrackerFilePath(
1934 src_obj_metadata.bucket, src_obj_metadata.name,
1935 dst_obj_metadata.bucket, dst_obj_metadata.name, self.test_api)
1936 try:
1937 try:
1938 gsutil_api.CopyObject(
1939 src_obj_metadata, dst_obj_metadata, canned_acl='private',
1940 progress_callback=_HaltingRewriteCallbackHandler(ONE_MIB*2).call,
1941 max_bytes_per_call=ONE_MIB)
1942 self.fail('Expected _RewriteHaltException.')
1943 except _RewriteHaltException:
1944 pass
1946 # Tracker file for original object should still exist.
1947 self.assertTrue(os.path.exists(tracker_file_name))
1949 # Copy the same object but with different call parameters.
1950 gsutil_api.CopyObject(src_obj_metadata, dst_obj_metadata,
1951 canned_acl='public-read',
1952 max_bytes_per_call=ONE_MIB)
1954 # Copy completed; original tracker file should be deleted.
1955 self.assertFalse(os.path.exists(tracker_file_name))
1957 new_obj_metadata = gsutil_api.GetObjectMetadata(
1958 dst_obj_metadata.bucket, dst_obj_metadata.name,
1959 fields=['acl,md5Hash'])
1960 self.assertEqual(
1961 gsutil_api.GetObjectMetadata(src_obj_metadata.bucket,
1962 src_obj_metadata.name,
1963 fields=['md5Hash']).md5Hash,
1964 new_obj_metadata.md5Hash,
1965 'Error: Rewritten object\'s hash doesn\'t match source object.')
1966 # New object should have a public-read ACL from the second command.
1967 found_public_acl = False
1968 for acl_entry in new_obj_metadata.acl:
1969 if acl_entry.entity == 'allUsers':
1970 found_public_acl = True
1971 self.assertTrue(found_public_acl,
1972 'New object was not written with a public ACL.')
1973 finally:
1974 # Clean up if something went wrong.
1975 DeleteTrackerFile(tracker_file_name)
1978 class TestCpUnitTests(testcase.GsUtilUnitTestCase):
1979 """Unit tests for gsutil cp."""
1981 def testDownloadWithNoHashAvailable(self):
1982 """Tests a download with no valid server-supplied hash."""
1983 # S3 should have a special message for non-MD5 etags.
1984 bucket_uri = self.CreateBucket(provider='s3')
1985 object_uri = self.CreateObject(bucket_uri=bucket_uri, contents='foo')
1986 object_uri.get_key().etag = '12345' # Not an MD5
1987 dst_dir = self.CreateTempDir()
1989 log_handler = self.RunCommand(
1990 'cp', [suri(object_uri), dst_dir], return_log_handler=True)
1991 warning_messages = log_handler.messages['warning']
1992 self.assertEquals(2, len(warning_messages))
1993 self.assertRegexpMatches(
1994 warning_messages[0],
1995 r'Non-MD5 etag \(12345\) present for key .*, '
1996 r'data integrity checks are not possible')
1997 self.assertIn('Integrity cannot be assured', warning_messages[1])
1999 def test_object_and_prefix_same_name(self):
2000 bucket_uri = self.CreateBucket()
2001 object_uri = self.CreateObject(bucket_uri=bucket_uri, object_name='foo',
2002 contents='foo')
2003 self.CreateObject(bucket_uri=bucket_uri,
2004 object_name='foo/bar', contents='bar')
2005 fpath = self.CreateTempFile()
2006 # MockKey doesn't support hash_algs, so the MD5 will not match.
2007 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
2008 self.RunCommand('cp', [suri(object_uri), fpath])
2009 with open(fpath, 'r') as f:
2010 self.assertEqual(f.read(), 'foo')
2012 def test_cp_upload_respects_no_hashes(self):
2013 bucket_uri = self.CreateBucket()
2014 fpath = self.CreateTempFile(contents='abcd')
2015 with SetBotoConfigForTest([('GSUtil', 'check_hashes', 'never')]):
2016 log_handler = self.RunCommand('cp', [fpath, suri(bucket_uri)],
2017 return_log_handler=True)
2018 warning_messages = log_handler.messages['warning']
2019 self.assertEquals(1, len(warning_messages))
2020 self.assertIn('Found no hashes to validate object upload',
2021 warning_messages[0])