1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 """Unit tests for parallel upload functions in copy_helper."""
17 from apitools
.base
.py
import exceptions
as apitools_exceptions
19 from util
import GSMockBucketStorageUri
21 from gslib
.cloud_api
import ResumableUploadAbortException
22 from gslib
.cloud_api
import ResumableUploadException
23 from gslib
.cloud_api
import ResumableUploadStartOverException
24 from gslib
.cloud_api
import ServiceException
25 from gslib
.command
import CreateGsutilLogger
26 from gslib
.copy_helper
import _AppendComponentTrackerToParallelUploadTrackerFile
27 from gslib
.copy_helper
import _CreateParallelUploadTrackerFile
28 from gslib
.copy_helper
import _GetPartitionInfo
29 from gslib
.copy_helper
import _ParseParallelUploadTrackerFile
30 from gslib
.copy_helper
import FilterExistingComponents
31 from gslib
.copy_helper
import ObjectFromTracker
32 from gslib
.copy_helper
import PerformParallelUploadFileToObjectArgs
33 from gslib
.gcs_json_api
import GcsJsonApi
34 from gslib
.hashing_helper
import CalculateB64EncodedMd5FromContents
35 from gslib
.storage_url
import StorageUrlFromString
36 from gslib
.tests
.mock_cloud_api
import MockCloudApi
37 from gslib
.tests
.testcase
.unit_testcase
import GsUtilUnitTestCase
38 from gslib
.third_party
.storage_apitools
import storage_v1_messages
as apitools_messages
39 from gslib
.util
import CreateLock
42 class TestCpFuncs(GsUtilUnitTestCase
):
43 """Unit tests for parallel upload functions in cp command."""
45 def test_GetPartitionInfo(self
):
46 """Tests the _GetPartitionInfo function."""
47 # Simplest case - threshold divides file_size.
48 (num_components
, component_size
) = _GetPartitionInfo(300, 200, 10)
49 self
.assertEqual(30, num_components
)
50 self
.assertEqual(10, component_size
)
52 # Threshold = 1 (mod file_size).
53 (num_components
, component_size
) = _GetPartitionInfo(301, 200, 10)
54 self
.assertEqual(31, num_components
)
55 self
.assertEqual(10, component_size
)
57 # Threshold = -1 (mod file_size).
58 (num_components
, component_size
) = _GetPartitionInfo(299, 200, 10)
59 self
.assertEqual(30, num_components
)
60 self
.assertEqual(10, component_size
)
62 # Too many components needed.
63 (num_components
, component_size
) = _GetPartitionInfo(301, 2, 10)
64 self
.assertEqual(2, num_components
)
65 self
.assertEqual(151, component_size
)
67 # Test num_components with huge numbers.
68 (num_components
, component_size
) = _GetPartitionInfo((10 ** 150) + 1,
71 self
.assertEqual((10 ** 149) + 1, num_components
)
72 self
.assertEqual(10, component_size
)
74 # Test component_size with huge numbers.
75 (num_components
, component_size
) = _GetPartitionInfo((10 ** 150) + 1,
78 self
.assertEqual(10, num_components
)
79 self
.assertEqual((10 ** 149) + 1, component_size
)
81 # Test component_size > file_size (make sure we get at least two components.
82 (num_components
, component_size
) = _GetPartitionInfo(100, 500, 51)
83 self
.assertEquals(2, num_components
)
84 self
.assertEqual(50, component_size
)
86 def test_ParseParallelUploadTrackerFile(self
):
87 """Tests the _ParseParallelUploadTrackerFile function."""
88 tracker_file_lock
= CreateLock()
90 objects
= ['obj1', '42', 'obj2', '314159']
91 contents
= '\n'.join([random_prefix
] + objects
)
92 fpath
= self
.CreateTempFile(file_name
='foo', contents
=contents
)
93 expected_objects
= [ObjectFromTracker(objects
[2 * i
], objects
[2 * i
+ 1])
94 for i
in range(0, len(objects
) / 2)]
95 (actual_prefix
, actual_objects
) = _ParseParallelUploadTrackerFile(
96 fpath
, tracker_file_lock
)
97 self
.assertEqual(random_prefix
, actual_prefix
)
98 self
.assertEqual(expected_objects
, actual_objects
)
100 def test_ParseEmptyParallelUploadTrackerFile(self
):
101 """Tests _ParseParallelUploadTrackerFile with an empty tracker file."""
102 tracker_file_lock
= CreateLock()
103 fpath
= self
.CreateTempFile(file_name
='foo', contents
='')
104 expected_objects
= []
105 (actual_prefix
, actual_objects
) = _ParseParallelUploadTrackerFile(
106 fpath
, tracker_file_lock
)
107 self
.assertEqual(actual_objects
, expected_objects
)
108 self
.assertIsNotNone(actual_prefix
)
110 def test_CreateParallelUploadTrackerFile(self
):
111 """Tests the _CreateParallelUploadTrackerFile function."""
112 tracker_file
= self
.CreateTempFile(file_name
='foo', contents
='asdf')
113 tracker_file_lock
= CreateLock()
114 random_prefix
= '123'
115 objects
= ['obj1', '42', 'obj2', '314159']
116 expected_contents
= [random_prefix
] + objects
117 objects
= [ObjectFromTracker(objects
[2 * i
], objects
[2 * i
+ 1])
118 for i
in range(0, len(objects
) / 2)]
119 _CreateParallelUploadTrackerFile(tracker_file
, random_prefix
, objects
,
121 with
open(tracker_file
, 'rb') as f
:
122 lines
= f
.read().splitlines()
123 self
.assertEqual(expected_contents
, lines
)
125 def test_AppendComponentTrackerToParallelUploadTrackerFile(self
):
126 """Tests the _CreateParallelUploadTrackerFile function with append."""
127 tracker_file
= self
.CreateTempFile(file_name
='foo', contents
='asdf')
128 tracker_file_lock
= CreateLock()
129 random_prefix
= '123'
130 objects
= ['obj1', '42', 'obj2', '314159']
131 expected_contents
= [random_prefix
] + objects
132 objects
= [ObjectFromTracker(objects
[2 * i
], objects
[2 * i
+ 1])
133 for i
in range(0, len(objects
) / 2)]
134 _CreateParallelUploadTrackerFile(tracker_file
, random_prefix
, objects
,
137 new_object
= ['obj2', '1234']
138 expected_contents
+= new_object
139 new_object
= ObjectFromTracker(new_object
[0], new_object
[1])
140 _AppendComponentTrackerToParallelUploadTrackerFile(tracker_file
, new_object
,
142 with
open(tracker_file
, 'rb') as f
:
143 lines
= f
.read().splitlines()
144 self
.assertEqual(expected_contents
, lines
)
146 def test_FilterExistingComponentsNonVersioned(self
):
147 """Tests upload with a variety of component states."""
148 mock_api
= MockCloudApi()
149 bucket_name
= self
.MakeTempName('bucket')
150 tracker_file
= self
.CreateTempFile(file_name
='foo', contents
='asdf')
151 tracker_file_lock
= CreateLock()
153 # dst_obj_metadata used for passing content-type.
154 empty_object
= apitools_messages
.Object()
156 # Already uploaded, contents still match, component still used.
157 fpath_uploaded_correctly
= self
.CreateTempFile(file_name
='foo1',
159 fpath_uploaded_correctly_url
= StorageUrlFromString(
160 str(fpath_uploaded_correctly
))
161 object_uploaded_correctly_url
= StorageUrlFromString('%s://%s/%s' % (
162 self
.default_provider
, bucket_name
,
163 fpath_uploaded_correctly
))
164 with
open(fpath_uploaded_correctly
) as f_in
:
165 fpath_uploaded_correctly_md5
= CalculateB64EncodedMd5FromContents(f_in
)
166 mock_api
.MockCreateObjectWithMetadata(
167 apitools_messages
.Object(bucket
=bucket_name
,
168 name
=fpath_uploaded_correctly
,
169 md5Hash
=fpath_uploaded_correctly_md5
),
172 args_uploaded_correctly
= PerformParallelUploadFileToObjectArgs(
173 fpath_uploaded_correctly
, 0, 1, fpath_uploaded_correctly_url
,
174 object_uploaded_correctly_url
, '', empty_object
, tracker_file
,
177 # Not yet uploaded, but needed.
178 fpath_not_uploaded
= self
.CreateTempFile(file_name
='foo2', contents
='2')
179 fpath_not_uploaded_url
= StorageUrlFromString(str(fpath_not_uploaded
))
180 object_not_uploaded_url
= StorageUrlFromString('%s://%s/%s' % (
181 self
.default_provider
, bucket_name
, fpath_not_uploaded
))
182 args_not_uploaded
= PerformParallelUploadFileToObjectArgs(
183 fpath_not_uploaded
, 0, 1, fpath_not_uploaded_url
,
184 object_not_uploaded_url
, '', empty_object
, tracker_file
,
187 # Already uploaded, but contents no longer match. Even though the contents
188 # differ, we don't delete this since the bucket is not versioned and it
189 # will be overwritten anyway.
190 fpath_wrong_contents
= self
.CreateTempFile(file_name
='foo4', contents
='4')
191 fpath_wrong_contents_url
= StorageUrlFromString(str(fpath_wrong_contents
))
192 object_wrong_contents_url
= StorageUrlFromString('%s://%s/%s' % (
193 self
.default_provider
, bucket_name
, fpath_wrong_contents
))
194 with
open(self
.CreateTempFile(contents
='_')) as f_in
:
195 fpath_wrong_contents_md5
= CalculateB64EncodedMd5FromContents(f_in
)
196 mock_api
.MockCreateObjectWithMetadata(
197 apitools_messages
.Object(bucket
=bucket_name
,
198 name
=fpath_wrong_contents
,
199 md5Hash
=fpath_wrong_contents_md5
),
202 args_wrong_contents
= PerformParallelUploadFileToObjectArgs(
203 fpath_wrong_contents
, 0, 1, fpath_wrong_contents_url
,
204 object_wrong_contents_url
, '', empty_object
, tracker_file
,
207 # Exists in tracker file, but component object no longer exists.
208 fpath_remote_deleted
= self
.CreateTempFile(file_name
='foo5', contents
='5')
209 fpath_remote_deleted_url
= StorageUrlFromString(
210 str(fpath_remote_deleted
))
211 args_remote_deleted
= PerformParallelUploadFileToObjectArgs(
212 fpath_remote_deleted
, 0, 1, fpath_remote_deleted_url
, '', '',
213 empty_object
, tracker_file
, tracker_file_lock
)
215 # Exists in tracker file and already uploaded, but no longer needed.
216 fpath_no_longer_used
= self
.CreateTempFile(file_name
='foo6', contents
='6')
217 with
open(fpath_no_longer_used
) as f_in
:
218 file_md5
= CalculateB64EncodedMd5FromContents(f_in
)
219 mock_api
.MockCreateObjectWithMetadata(
220 apitools_messages
.Object(bucket
=bucket_name
,
221 name
='foo6', md5Hash
=file_md5
), contents
='6')
223 dst_args
= {fpath_uploaded_correctly
: args_uploaded_correctly
,
224 fpath_not_uploaded
: args_not_uploaded
,
225 fpath_wrong_contents
: args_wrong_contents
,
226 fpath_remote_deleted
: args_remote_deleted
}
228 existing_components
= [ObjectFromTracker(fpath_uploaded_correctly
, ''),
229 ObjectFromTracker(fpath_wrong_contents
, ''),
230 ObjectFromTracker(fpath_remote_deleted
, ''),
231 ObjectFromTracker(fpath_no_longer_used
, '')]
233 bucket_url
= StorageUrlFromString('%s://%s' % (self
.default_provider
,
236 (components_to_upload
, uploaded_components
, existing_objects_to_delete
) = (
237 FilterExistingComponents(dst_args
, existing_components
,
238 bucket_url
, mock_api
))
240 for arg
in [args_not_uploaded
, args_wrong_contents
, args_remote_deleted
]:
241 self
.assertTrue(arg
in components_to_upload
)
242 self
.assertEqual(1, len(uploaded_components
))
243 self
.assertEqual(args_uploaded_correctly
.dst_url
.url_string
,
244 uploaded_components
[0].url_string
)
245 self
.assertEqual(1, len(existing_objects_to_delete
))
246 no_longer_used_url
= StorageUrlFromString('%s://%s/%s' % (
247 self
.default_provider
, bucket_name
, fpath_no_longer_used
))
248 self
.assertEqual(no_longer_used_url
.url_string
,
249 existing_objects_to_delete
[0].url_string
)
251 def test_FilterExistingComponentsVersioned(self
):
252 """Tests upload with versionined parallel components."""
254 mock_api
= MockCloudApi()
255 bucket_name
= self
.MakeTempName('bucket')
256 mock_api
.MockCreateVersionedBucket(bucket_name
)
258 # dst_obj_metadata used for passing content-type.
259 empty_object
= apitools_messages
.Object()
261 tracker_file
= self
.CreateTempFile(file_name
='foo', contents
='asdf')
262 tracker_file_lock
= CreateLock()
264 # Already uploaded, contents still match, component still used.
265 fpath_uploaded_correctly
= self
.CreateTempFile(file_name
='foo1',
267 fpath_uploaded_correctly_url
= StorageUrlFromString(
268 str(fpath_uploaded_correctly
))
269 with
open(fpath_uploaded_correctly
) as f_in
:
270 fpath_uploaded_correctly_md5
= CalculateB64EncodedMd5FromContents(f_in
)
271 object_uploaded_correctly
= mock_api
.MockCreateObjectWithMetadata(
272 apitools_messages
.Object(bucket
=bucket_name
,
273 name
=fpath_uploaded_correctly
,
274 md5Hash
=fpath_uploaded_correctly_md5
),
276 object_uploaded_correctly_url
= StorageUrlFromString('%s://%s/%s#%s' % (
277 self
.default_provider
, bucket_name
,
278 fpath_uploaded_correctly
, object_uploaded_correctly
.generation
))
279 args_uploaded_correctly
= PerformParallelUploadFileToObjectArgs(
280 fpath_uploaded_correctly
, 0, 1, fpath_uploaded_correctly_url
,
281 object_uploaded_correctly_url
, object_uploaded_correctly
.generation
,
282 empty_object
, tracker_file
, tracker_file_lock
)
284 # Duplicate object name in tracker file, but uploaded correctly.
285 fpath_duplicate
= fpath_uploaded_correctly
286 fpath_duplicate_url
= StorageUrlFromString(str(fpath_duplicate
))
287 duplicate_uploaded_correctly
= mock_api
.MockCreateObjectWithMetadata(
288 apitools_messages
.Object(bucket
=bucket_name
,
289 name
=fpath_duplicate
,
290 md5Hash
=fpath_uploaded_correctly_md5
),
292 duplicate_uploaded_correctly_url
= StorageUrlFromString('%s://%s/%s#%s' % (
293 self
.default_provider
, bucket_name
,
294 fpath_uploaded_correctly
, duplicate_uploaded_correctly
.generation
))
295 args_duplicate
= PerformParallelUploadFileToObjectArgs(
296 fpath_duplicate
, 0, 1, fpath_duplicate_url
,
297 duplicate_uploaded_correctly_url
,
298 duplicate_uploaded_correctly
.generation
, empty_object
, tracker_file
,
301 # Already uploaded, but contents no longer match.
302 fpath_wrong_contents
= self
.CreateTempFile(file_name
='foo4', contents
='4')
303 fpath_wrong_contents_url
= StorageUrlFromString(str(fpath_wrong_contents
))
304 with
open(self
.CreateTempFile(contents
='_')) as f_in
:
305 fpath_wrong_contents_md5
= CalculateB64EncodedMd5FromContents(f_in
)
306 object_wrong_contents
= mock_api
.MockCreateObjectWithMetadata(
307 apitools_messages
.Object(bucket
=bucket_name
,
308 name
=fpath_wrong_contents
,
309 md5Hash
=fpath_wrong_contents_md5
),
311 wrong_contents_url
= StorageUrlFromString('%s://%s/%s#%s' % (
312 self
.default_provider
, bucket_name
,
313 fpath_wrong_contents
, object_wrong_contents
.generation
))
314 args_wrong_contents
= PerformParallelUploadFileToObjectArgs(
315 fpath_wrong_contents
, 0, 1, fpath_wrong_contents_url
,
316 wrong_contents_url
, '', empty_object
, tracker_file
,
319 dst_args
= {fpath_uploaded_correctly
: args_uploaded_correctly
,
320 fpath_wrong_contents
: args_wrong_contents
}
322 existing_components
= [
323 ObjectFromTracker(fpath_uploaded_correctly
,
324 object_uploaded_correctly_url
.generation
),
325 ObjectFromTracker(fpath_duplicate
,
326 duplicate_uploaded_correctly_url
.generation
),
327 ObjectFromTracker(fpath_wrong_contents
,
328 wrong_contents_url
.generation
)]
330 bucket_url
= StorageUrlFromString('%s://%s' % (self
.default_provider
,
333 (components_to_upload
, uploaded_components
, existing_objects_to_delete
) = (
334 FilterExistingComponents(dst_args
, existing_components
,
335 bucket_url
, mock_api
))
337 self
.assertEqual([args_wrong_contents
], components_to_upload
)
338 self
.assertEqual(args_uploaded_correctly
.dst_url
.url_string
,
339 uploaded_components
[0].url_string
)
340 expected_to_delete
= [(args_wrong_contents
.dst_url
.object_name
,
341 args_wrong_contents
.dst_url
.generation
),
342 (args_duplicate
.dst_url
.object_name
,
343 args_duplicate
.dst_url
.generation
)]
344 for uri
in existing_objects_to_delete
:
345 self
.assertTrue((uri
.object_name
, uri
.generation
) in expected_to_delete
)
346 self
.assertEqual(len(expected_to_delete
), len(existing_objects_to_delete
))
348 # pylint: disable=protected-access
349 def test_TranslateApitoolsResumableUploadException(self
):
350 """Tests that _TranslateApitoolsResumableUploadException works correctly."""
351 gsutil_api
= GcsJsonApi(
352 GSMockBucketStorageUri
,
353 CreateGsutilLogger('copy_test'))
355 gsutil_api
.http
.disable_ssl_certificate_validation
= True
356 exc
= apitools_exceptions
.HttpError({'status': 503}, None, None)
357 translated_exc
= gsutil_api
._TranslateApitoolsResumableUploadException
(exc
)
358 self
.assertTrue(isinstance(translated_exc
, ServiceException
))
360 gsutil_api
.http
.disable_ssl_certificate_validation
= False
361 exc
= apitools_exceptions
.HttpError({'status': 503}, None, None)
362 translated_exc
= gsutil_api
._TranslateApitoolsResumableUploadException
(exc
)
363 self
.assertTrue(isinstance(translated_exc
, ResumableUploadException
))
365 gsutil_api
.http
.disable_ssl_certificate_validation
= False
366 exc
= apitools_exceptions
.HttpError({'status': 429}, None, None)
367 translated_exc
= gsutil_api
._TranslateApitoolsResumableUploadException
(exc
)
368 self
.assertTrue(isinstance(translated_exc
, ResumableUploadException
))
370 exc
= apitools_exceptions
.HttpError({'status': 410}, None, None)
371 translated_exc
= gsutil_api
._TranslateApitoolsResumableUploadException
(exc
)
372 self
.assertTrue(isinstance(translated_exc
,
373 ResumableUploadStartOverException
))
375 exc
= apitools_exceptions
.HttpError({'status': 404}, None, None)
376 translated_exc
= gsutil_api
._TranslateApitoolsResumableUploadException
(exc
)
377 self
.assertTrue(isinstance(translated_exc
,
378 ResumableUploadStartOverException
))
380 exc
= apitools_exceptions
.HttpError({'status': 401}, None, None)
381 translated_exc
= gsutil_api
._TranslateApitoolsResumableUploadException
(exc
)
382 self
.assertTrue(isinstance(translated_exc
, ResumableUploadAbortException
))
384 exc
= apitools_exceptions
.TransferError('Aborting transfer')
385 translated_exc
= gsutil_api
._TranslateApitoolsResumableUploadException
(exc
)
386 self
.assertTrue(isinstance(translated_exc
, ResumableUploadAbortException
))