1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 """Unit tests for gsutil parallelism framework."""
24 from __future__
import absolute_import
29 from boto
.storage_uri
import BucketStorageUri
30 from gslib
import cs_api_map
31 from gslib
.command
import Command
32 from gslib
.command
import CreateGsutilLogger
33 from gslib
.command
import DummyArgChecker
34 import gslib
.tests
.testcase
as testcase
35 from gslib
.tests
.testcase
.base
import RequiresIsolation
36 from gslib
.tests
.util
import unittest
37 from gslib
.util
import IS_WINDOWS
38 from gslib
.util
import MultiprocessingIsAvailable
41 # Amount of time for an individual test to run before timing out. We need a
42 # reasonably high value since if many tests are running in parallel, an
43 # individual test may take a while to complete.
44 _TEST_TIMEOUT_SECONDS
= 120
48 """Decorator used to provide a timeout for functions."""
49 @functools.wraps(func
)
50 def Wrapper(*args
, **kwargs
):
52 signal
.signal(signal
.SIGALRM
, _HandleAlarm
)
53 signal
.alarm(_TEST_TIMEOUT_SECONDS
)
58 signal
.alarm(0) # Cancel the alarm.
62 # pylint: disable=unused-argument
63 def _HandleAlarm(signal_num
, cur_stack_frame
):
64 raise Exception('Test timed out.')
67 class CustomException(Exception):
69 def __init__(self
, exception_str
):
70 super(CustomException
, self
).__init
__(exception_str
)
73 def _ReturnOneValue(cls
, args
, thread_state
=None):
77 def _FailureFunc(cls
, args
, thread_state
=None):
78 raise CustomException('Failing on purpose.')
81 def _FailingExceptionHandler(cls
, e
):
82 cls
.failure_count
+= 1
83 raise CustomException('Exception handler failing on purpose.')
86 def _ExceptionHandler(cls
, e
):
87 cls
.logger
.exception(e
)
88 cls
.failure_count
+= 1
91 def _IncrementByLength(cls
, args
, thread_state
=None):
92 cls
.arg_length_sum
+= len(args
)
95 def _AdjustProcessCountIfWindows(process_count
):
102 def _ReApplyWithReplicatedArguments(cls
, args
, thread_state
=None):
103 """Calls Apply with arguments repeated seven times.
105 The first two elements of args should be the process and thread counts,
106 respectively, to be used for the recursive calls.
109 cls: The Command class to call Apply on.
110 args: Arguments to pass to Apply.
111 thread_state: Unused, required by function signature.
114 Number of values returned by the two calls to Apply.
116 new_args
= [args
] * 7
117 process_count
= _AdjustProcessCountIfWindows(args
[0])
118 thread_count
= args
[1]
119 return_values
= cls
.Apply(_PerformNRecursiveCalls
, new_args
,
120 _ExceptionHandler
, arg_checker
=DummyArgChecker
,
121 process_count
=process_count
,
122 thread_count
=thread_count
,
123 should_return_results
=True)
124 ret
= sum(return_values
)
126 return_values
= cls
.Apply(_ReturnOneValue
, new_args
,
127 _ExceptionHandler
, arg_checker
=DummyArgChecker
,
128 process_count
=process_count
,
129 thread_count
=thread_count
,
130 should_return_results
=True)
132 return len(return_values
) + ret
135 def _PerformNRecursiveCalls(cls
, args
, thread_state
=None):
136 """Calls Apply to perform N recursive calls.
138 The first two elements of args should be the process and thread counts,
139 respectively, to be used for the recursive calls, while N is the third element
140 (the number of recursive calls to make).
143 cls: The Command class to call Apply on.
144 args: Arguments to pass to Apply.
145 thread_state: Unused, required by function signature.
148 Number of values returned by the call to Apply.
150 process_count
= _AdjustProcessCountIfWindows(args
[0])
151 thread_count
= args
[1]
152 return_values
= cls
.Apply(_ReturnOneValue
, [()] * args
[2], _ExceptionHandler
,
153 arg_checker
=DummyArgChecker
,
154 process_count
=process_count
,
155 thread_count
=thread_count
,
156 should_return_results
=True)
157 return len(return_values
)
160 def _SkipEvenNumbersArgChecker(cls
, arg
):
164 class FailingIterator(object):
166 def __init__(self
, size
, failure_indices
):
168 self
.failure_indices
= failure_indices
169 self
.current_index
= 0
175 if self
.current_index
== self
.size
:
176 raise StopIteration('')
177 elif self
.current_index
in self
.failure_indices
:
178 self
.current_index
+= 1
179 raise CustomException(
180 'Iterator failing on purpose at index %d.' % self
.current_index
)
182 self
.current_index
+= 1
183 return self
.current_index
- 1
186 class FakeCommand(Command
):
187 """Fake command class for overriding command instance state."""
188 command_spec
= Command
.CreateCommandSpec(
190 command_name_aliases
=[],
192 # Help specification. See help_provider.py for documentation.
193 help_spec
= Command
.HelpSpec(
195 help_name_aliases
=[],
196 help_type
='command_help',
197 help_one_line_summary
='Something to take up space.',
198 help_text
='Something else to take up space.',
199 subcommand_help_text
={},
202 def __init__(self
, do_parallel
):
203 self
.bucket_storage_uri_class
= BucketStorageUri
212 self
.gsutil_api_map
= cs_api_map
.GsutilApiMapFactory
.GetApiMap(
213 cs_api_map
.GsutilApiClassMapFactory
, support_map
, default_map
)
214 self
.logger
= CreateGsutilLogger('FakeCommand')
215 self
.parallel_operations
= do_parallel
216 self
.failure_count
= 0
217 self
.multiprocessing_is_available
= MultiprocessingIsAvailable()[0]
221 class FakeCommandWithoutMultiprocessingModule(FakeCommand
):
223 def __init__(self
, do_parallel
):
224 super(FakeCommandWithoutMultiprocessingModule
, self
).__init
__(do_parallel
)
225 self
.multiprocessing_is_available
= False
228 # TODO: Figure out a good way to test that ctrl+C really stops execution,
229 # and also that ctrl+C works when there are still tasks enqueued.
230 class TestParallelismFramework(testcase
.GsUtilUnitTestCase
):
231 """gsutil parallelism framework test suite."""
233 command_class
= FakeCommand
235 def _RunApply(self
, func
, args_iterator
, process_count
, thread_count
,
236 command_inst
=None, shared_attrs
=None, fail_on_error
=False,
237 thr_exc_handler
=None, arg_checker
=DummyArgChecker
):
238 command_inst
= command_inst
or self
.command_class(True)
239 exception_handler
= thr_exc_handler
or _ExceptionHandler
241 return command_inst
.Apply(func
, args_iterator
, exception_handler
,
242 thread_count
=thread_count
,
243 process_count
=process_count
,
244 arg_checker
=arg_checker
,
245 should_return_results
=True,
246 shared_attrs
=shared_attrs
,
247 fail_on_error
=fail_on_error
)
250 def testBasicApplySingleProcessSingleThread(self
):
251 self
._TestBasicApply
(1, 1)
254 def testBasicApplySingleProcessMultiThread(self
):
255 self
._TestBasicApply
(1, 3)
258 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
259 def testBasicApplyMultiProcessSingleThread(self
):
260 self
._TestBasicApply
(3, 1)
263 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
264 def testBasicApplyMultiProcessMultiThread(self
):
265 self
._TestBasicApply
(3, 3)
268 def _TestBasicApply(self
, process_count
, thread_count
):
269 args
= [()] * (17 * process_count
* thread_count
+ 1)
271 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
)
272 self
.assertEqual(len(args
), len(results
))
275 def testIteratorFailureSingleProcessSingleThread(self
):
276 self
._TestIteratorFailure
(1, 1)
279 def testIteratorFailureSingleProcessMultiThread(self
):
280 self
._TestIteratorFailure
(1, 3)
283 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
284 def testIteratorFailureMultiProcessSingleThread(self
):
285 self
._TestIteratorFailure
(3, 1)
288 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
289 def testIteratorFailureMultiProcessMultiThread(self
):
290 self
._TestIteratorFailure
(3, 3)
293 def _TestIteratorFailure(self
, process_count
, thread_count
):
294 """Tests apply with a failing iterator."""
295 # Tests for fail_on_error == False.
297 args
= FailingIterator(10, [0])
298 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
)
299 self
.assertEqual(9, len(results
))
301 args
= FailingIterator(10, [5])
302 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
)
303 self
.assertEqual(9, len(results
))
305 args
= FailingIterator(10, [9])
306 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
)
307 self
.assertEqual(9, len(results
))
309 if process_count
* thread_count
> 1:
310 # In this case, we should ignore the fail_on_error flag.
311 args
= FailingIterator(10, [9])
312 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
,
313 thread_count
, fail_on_error
=True)
314 self
.assertEqual(9, len(results
))
316 args
= FailingIterator(10, range(10))
317 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
)
318 self
.assertEqual(0, len(results
))
320 args
= FailingIterator(0, [])
321 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
)
322 self
.assertEqual(0, len(results
))
325 def testTestSharedAttrsWorkSingleProcessSingleThread(self
):
326 self
._TestSharedAttrsWork
(1, 1)
329 def testTestSharedAttrsWorkSingleProcessMultiThread(self
):
330 self
._TestSharedAttrsWork
(1, 3)
333 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
334 def testTestSharedAttrsWorkMultiProcessSingleThread(self
):
335 self
._TestSharedAttrsWork
(3, 1)
338 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
339 def testTestSharedAttrsWorkMultiProcessMultiThread(self
):
340 self
._TestSharedAttrsWork
(3, 3)
343 def _TestSharedAttrsWork(self
, process_count
, thread_count
):
344 """Tests that Apply successfully uses shared_attrs."""
345 command_inst
= self
.command_class(True)
346 command_inst
.arg_length_sum
= 19
347 args
= ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
348 self
._RunApply
(_IncrementByLength
, args
, process_count
,
349 thread_count
, command_inst
=command_inst
,
350 shared_attrs
=['arg_length_sum'])
353 expected_sum
+= len(arg
)
354 self
.assertEqual(expected_sum
, command_inst
.arg_length_sum
)
356 # Test that shared variables work when the iterator fails.
357 command_inst
= self
.command_class(True)
358 args
= FailingIterator(10, [1, 3, 5])
359 self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
,
360 command_inst
=command_inst
, shared_attrs
=['failure_count'])
361 self
.assertEqual(3, command_inst
.failure_count
)
364 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self
):
365 self
._TestThreadsSurviveExceptionsInFunc
(1, 1)
368 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self
):
369 self
._TestThreadsSurviveExceptionsInFunc
(1, 3)
372 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
373 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self
):
374 self
._TestThreadsSurviveExceptionsInFunc
(3, 1)
377 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
378 def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self
):
379 self
._TestThreadsSurviveExceptionsInFunc
(3, 3)
382 def _TestThreadsSurviveExceptionsInFunc(self
, process_count
, thread_count
):
383 command_inst
= self
.command_class(True)
385 self
._RunApply
(_FailureFunc
, args
, process_count
, thread_count
,
386 command_inst
=command_inst
, shared_attrs
=['failure_count'],
387 thr_exc_handler
=_FailingExceptionHandler
)
388 self
.assertEqual(len(args
), command_inst
.failure_count
)
391 def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self
):
392 self
._TestThreadsSurviveExceptionsInHandler
(1, 1)
395 def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self
):
396 self
._TestThreadsSurviveExceptionsInHandler
(1, 3)
399 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
400 def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self
):
401 self
._TestThreadsSurviveExceptionsInHandler
(3, 1)
404 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
405 def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self
):
406 self
._TestThreadsSurviveExceptionsInHandler
(3, 3)
409 def _TestThreadsSurviveExceptionsInHandler(self
, process_count
, thread_count
):
410 command_inst
= self
.command_class(True)
412 self
._RunApply
(_FailureFunc
, args
, process_count
, thread_count
,
413 command_inst
=command_inst
, shared_attrs
=['failure_count'],
414 thr_exc_handler
=_FailingExceptionHandler
)
415 self
.assertEqual(len(args
), command_inst
.failure_count
)
419 def testFailOnErrorFlag(self
):
420 """Tests that fail_on_error produces the correct exception on failure."""
421 def _ExpectCustomException(test_func
):
425 'Setting fail_on_error should raise any exception encountered.')
426 except CustomException
, e
:
429 self
.fail('Got unexpected error: ' + str(e
))
431 def _RunFailureFunc():
432 command_inst
= self
.command_class(True)
434 self
._RunApply
(_FailureFunc
, args
, 1, 1, command_inst
=command_inst
,
435 shared_attrs
=['failure_count'], fail_on_error
=True)
436 _ExpectCustomException(_RunFailureFunc
)
438 def _RunFailingIteratorFirstPosition():
439 args
= FailingIterator(10, [0])
440 results
= self
._RunApply
(_ReturnOneValue
, args
, 1, 1, fail_on_error
=True)
441 self
.assertEqual(0, len(results
))
442 _ExpectCustomException(_RunFailingIteratorFirstPosition
)
444 def _RunFailingIteratorPositionMiddlePosition():
445 args
= FailingIterator(10, [5])
446 results
= self
._RunApply
(_ReturnOneValue
, args
, 1, 1, fail_on_error
=True)
447 self
.assertEqual(5, len(results
))
448 _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition
)
450 def _RunFailingIteratorLastPosition():
451 args
= FailingIterator(10, [9])
452 results
= self
._RunApply
(_ReturnOneValue
, args
, 1, 1, fail_on_error
=True)
453 self
.assertEqual(9, len(results
))
454 _ExpectCustomException(_RunFailingIteratorLastPosition
)
456 def _RunFailingIteratorMultiplePositions():
457 args
= FailingIterator(10, [1, 3, 5])
458 results
= self
._RunApply
(_ReturnOneValue
, args
, 1, 1, fail_on_error
=True)
459 self
.assertEqual(1, len(results
))
460 _ExpectCustomException(_RunFailingIteratorMultiplePositions
)
463 def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self
):
464 self
._TestRecursiveDepthThreeDifferentFunctions
(1, 1)
467 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self
):
468 self
._TestRecursiveDepthThreeDifferentFunctions
(1, 3)
471 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
472 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self
):
473 self
._TestRecursiveDepthThreeDifferentFunctions
(3, 1)
476 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
477 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self
):
478 self
._TestRecursiveDepthThreeDifferentFunctions
(3, 3)
481 def _TestRecursiveDepthThreeDifferentFunctions(self
, process_count
,
483 """Tests recursive application of Apply.
485 Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
489 process_count: Number of processes to use.
490 thread_count: Number of threads to use.
492 base_args
= [3, 1, 4, 1, 5]
493 args
= [[process_count
, thread_count
, count
] for count
in base_args
]
495 results
= self
._RunApply
(_ReApplyWithReplicatedArguments
, args
,
496 process_count
, thread_count
)
497 self
.assertEqual(7 * (sum(base_args
) + len(base_args
)), sum(results
))
500 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self
):
501 self
._TestExceptionInProducerRaisesAndTerminates
(1, 1)
504 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self
):
505 self
._TestExceptionInProducerRaisesAndTerminates
(1, 3)
508 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
509 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self
):
510 self
._TestExceptionInProducerRaisesAndTerminates
(3, 1)
513 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
514 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self
):
515 self
._TestExceptionInProducerRaisesAndTerminates
(3, 3)
518 def _TestExceptionInProducerRaisesAndTerminates(self
, process_count
,
520 args
= self
# The ProducerThread will try and fail to iterate over this.
522 self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
)
523 self
.fail('Did not raise expected exception.')
528 def testSkippedArgumentsSingleThreadSingleProcess(self
):
529 self
._TestSkippedArguments
(1, 1)
532 def testSkippedArgumentsMultiThreadSingleProcess(self
):
533 self
._TestSkippedArguments
(1, 3)
536 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
537 def testSkippedArgumentsSingleThreadMultiProcess(self
):
538 self
._TestSkippedArguments
(3, 1)
541 @unittest.skipIf(IS_WINDOWS
, 'Multiprocessing is not supported on Windows')
542 def testSkippedArgumentsMultiThreadMultiProcess(self
):
543 self
._TestSkippedArguments
(3, 3)
546 def _TestSkippedArguments(self
, process_count
, thread_count
):
548 # Skip a proper subset of the arguments.
549 n
= 2 * process_count
* thread_count
550 args
= range(1, n
+ 1)
551 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
,
552 arg_checker
=_SkipEvenNumbersArgChecker
)
553 self
.assertEqual(n
/ 2, len(results
)) # We know n is even.
554 self
.assertEqual(n
/ 2, sum(results
))
556 # Skip all arguments.
557 args
= [2 * x
for x
in args
]
558 results
= self
._RunApply
(_ReturnOneValue
, args
, process_count
, thread_count
,
559 arg_checker
=_SkipEvenNumbersArgChecker
)
560 self
.assertEqual(0, len(results
))
563 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework
):
564 """Tests parallelism framework works with multiprocessing module unavailable.
566 Notably, this test has no way to override previous calls
567 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
568 all of the global variables in command.py, so this still behaves slightly
569 differently than the behavior one would see on a machine where the
570 multiprocessing functionality is actually not available (in particular, it
571 will not catch the case where a global variable that is not available for
572 the sequential path is referenced before initialization).
574 command_class
= FakeCommandWithoutMultiprocessingModule