Merge Chromium + Blink git repositories
[chromium-blink-merge.git] / tools / telemetry / third_party / gsutilz / gslib / tests / test_parallelism_framework.py
blobe895c1680a4c99fcfbaa116fc9f95cc3af06c53b
1 # -*- coding: utf-8 -*-
2 # Copyright 2013 Google Inc. All Rights Reserved.
4 # Permission is hereby granted, free of charge, to any person obtaining a
5 # copy of this software and associated documentation files (the
6 # "Software"), to deal in the Software without restriction, including
7 # without limitation the rights to use, copy, modify, merge, publish, dis-
8 # tribute, sublicense, and/or sell copies of the Software, and to permit
9 # persons to whom the Software is furnished to do so, subject to the fol-
10 # lowing conditions:
12 # The above copyright notice and this permission notice shall be included
13 # in all copies or substantial portions of the Software.
15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
21 # IN THE SOFTWARE.
22 """Unit tests for gsutil parallelism framework."""
24 from __future__ import absolute_import
26 import functools
27 import signal
29 from boto.storage_uri import BucketStorageUri
30 from gslib import cs_api_map
31 from gslib.command import Command
32 from gslib.command import CreateGsutilLogger
33 from gslib.command import DummyArgChecker
34 import gslib.tests.testcase as testcase
35 from gslib.tests.testcase.base import RequiresIsolation
36 from gslib.tests.util import unittest
37 from gslib.util import IS_WINDOWS
38 from gslib.util import MultiprocessingIsAvailable
41 # Amount of time for an individual test to run before timing out. We need a
42 # reasonably high value since if many tests are running in parallel, an
43 # individual test may take a while to complete.
44 _TEST_TIMEOUT_SECONDS = 120
47 def Timeout(func):
48 """Decorator used to provide a timeout for functions."""
49 @functools.wraps(func)
50 def Wrapper(*args, **kwargs):
51 if not IS_WINDOWS:
52 signal.signal(signal.SIGALRM, _HandleAlarm)
53 signal.alarm(_TEST_TIMEOUT_SECONDS)
54 try:
55 func(*args, **kwargs)
56 finally:
57 if not IS_WINDOWS:
58 signal.alarm(0) # Cancel the alarm.
59 return Wrapper
62 # pylint: disable=unused-argument
63 def _HandleAlarm(signal_num, cur_stack_frame):
64 raise Exception('Test timed out.')
67 class CustomException(Exception):
69 def __init__(self, exception_str):
70 super(CustomException, self).__init__(exception_str)
73 def _ReturnOneValue(cls, args, thread_state=None):
74 return 1
77 def _FailureFunc(cls, args, thread_state=None):
78 raise CustomException('Failing on purpose.')
81 def _FailingExceptionHandler(cls, e):
82 cls.failure_count += 1
83 raise CustomException('Exception handler failing on purpose.')
86 def _ExceptionHandler(cls, e):
87 cls.logger.exception(e)
88 cls.failure_count += 1
91 def _IncrementByLength(cls, args, thread_state=None):
92 cls.arg_length_sum += len(args)
95 def _AdjustProcessCountIfWindows(process_count):
96 if IS_WINDOWS:
97 return 1
98 else:
99 return process_count
102 def _ReApplyWithReplicatedArguments(cls, args, thread_state=None):
103 """Calls Apply with arguments repeated seven times.
105 The first two elements of args should be the process and thread counts,
106 respectively, to be used for the recursive calls.
108 Args:
109 cls: The Command class to call Apply on.
110 args: Arguments to pass to Apply.
111 thread_state: Unused, required by function signature.
113 Returns:
114 Number of values returned by the two calls to Apply.
116 new_args = [args] * 7
117 process_count = _AdjustProcessCountIfWindows(args[0])
118 thread_count = args[1]
119 return_values = cls.Apply(_PerformNRecursiveCalls, new_args,
120 _ExceptionHandler, arg_checker=DummyArgChecker,
121 process_count=process_count,
122 thread_count=thread_count,
123 should_return_results=True)
124 ret = sum(return_values)
126 return_values = cls.Apply(_ReturnOneValue, new_args,
127 _ExceptionHandler, arg_checker=DummyArgChecker,
128 process_count=process_count,
129 thread_count=thread_count,
130 should_return_results=True)
132 return len(return_values) + ret
135 def _PerformNRecursiveCalls(cls, args, thread_state=None):
136 """Calls Apply to perform N recursive calls.
138 The first two elements of args should be the process and thread counts,
139 respectively, to be used for the recursive calls, while N is the third element
140 (the number of recursive calls to make).
142 Args:
143 cls: The Command class to call Apply on.
144 args: Arguments to pass to Apply.
145 thread_state: Unused, required by function signature.
147 Returns:
148 Number of values returned by the call to Apply.
150 process_count = _AdjustProcessCountIfWindows(args[0])
151 thread_count = args[1]
152 return_values = cls.Apply(_ReturnOneValue, [()] * args[2], _ExceptionHandler,
153 arg_checker=DummyArgChecker,
154 process_count=process_count,
155 thread_count=thread_count,
156 should_return_results=True)
157 return len(return_values)
160 def _SkipEvenNumbersArgChecker(cls, arg):
161 return arg % 2 != 0
164 class FailingIterator(object):
166 def __init__(self, size, failure_indices):
167 self.size = size
168 self.failure_indices = failure_indices
169 self.current_index = 0
171 def __iter__(self):
172 return self
174 def next(self):
175 if self.current_index == self.size:
176 raise StopIteration('')
177 elif self.current_index in self.failure_indices:
178 self.current_index += 1
179 raise CustomException(
180 'Iterator failing on purpose at index %d.' % self.current_index)
181 else:
182 self.current_index += 1
183 return self.current_index - 1
186 class FakeCommand(Command):
187 """Fake command class for overriding command instance state."""
188 command_spec = Command.CreateCommandSpec(
189 'fake',
190 command_name_aliases=[],
192 # Help specification. See help_provider.py for documentation.
193 help_spec = Command.HelpSpec(
194 help_name='fake',
195 help_name_aliases=[],
196 help_type='command_help',
197 help_one_line_summary='Something to take up space.',
198 help_text='Something else to take up space.',
199 subcommand_help_text={},
202 def __init__(self, do_parallel):
203 self.bucket_storage_uri_class = BucketStorageUri
204 support_map = {
205 'gs': ['JSON'],
206 's3': ['XML']
208 default_map = {
209 'gs': 'JSON',
210 's3': 'XML'
212 self.gsutil_api_map = cs_api_map.GsutilApiMapFactory.GetApiMap(
213 cs_api_map.GsutilApiClassMapFactory, support_map, default_map)
214 self.logger = CreateGsutilLogger('FakeCommand')
215 self.parallel_operations = do_parallel
216 self.failure_count = 0
217 self.multiprocessing_is_available = MultiprocessingIsAvailable()[0]
218 self.debug = 0
221 class FakeCommandWithoutMultiprocessingModule(FakeCommand):
223 def __init__(self, do_parallel):
224 super(FakeCommandWithoutMultiprocessingModule, self).__init__(do_parallel)
225 self.multiprocessing_is_available = False
228 # TODO: Figure out a good way to test that ctrl+C really stops execution,
229 # and also that ctrl+C works when there are still tasks enqueued.
230 class TestParallelismFramework(testcase.GsUtilUnitTestCase):
231 """gsutil parallelism framework test suite."""
233 command_class = FakeCommand
235 def _RunApply(self, func, args_iterator, process_count, thread_count,
236 command_inst=None, shared_attrs=None, fail_on_error=False,
237 thr_exc_handler=None, arg_checker=DummyArgChecker):
238 command_inst = command_inst or self.command_class(True)
239 exception_handler = thr_exc_handler or _ExceptionHandler
241 return command_inst.Apply(func, args_iterator, exception_handler,
242 thread_count=thread_count,
243 process_count=process_count,
244 arg_checker=arg_checker,
245 should_return_results=True,
246 shared_attrs=shared_attrs,
247 fail_on_error=fail_on_error)
249 @RequiresIsolation
250 def testBasicApplySingleProcessSingleThread(self):
251 self._TestBasicApply(1, 1)
253 @RequiresIsolation
254 def testBasicApplySingleProcessMultiThread(self):
255 self._TestBasicApply(1, 3)
257 @RequiresIsolation
258 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
259 def testBasicApplyMultiProcessSingleThread(self):
260 self._TestBasicApply(3, 1)
262 @RequiresIsolation
263 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
264 def testBasicApplyMultiProcessMultiThread(self):
265 self._TestBasicApply(3, 3)
267 @Timeout
268 def _TestBasicApply(self, process_count, thread_count):
269 args = [()] * (17 * process_count * thread_count + 1)
271 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
272 self.assertEqual(len(args), len(results))
274 @RequiresIsolation
275 def testIteratorFailureSingleProcessSingleThread(self):
276 self._TestIteratorFailure(1, 1)
278 @RequiresIsolation
279 def testIteratorFailureSingleProcessMultiThread(self):
280 self._TestIteratorFailure(1, 3)
282 @RequiresIsolation
283 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
284 def testIteratorFailureMultiProcessSingleThread(self):
285 self._TestIteratorFailure(3, 1)
287 @RequiresIsolation
288 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
289 def testIteratorFailureMultiProcessMultiThread(self):
290 self._TestIteratorFailure(3, 3)
292 @Timeout
293 def _TestIteratorFailure(self, process_count, thread_count):
294 """Tests apply with a failing iterator."""
295 # Tests for fail_on_error == False.
297 args = FailingIterator(10, [0])
298 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
299 self.assertEqual(9, len(results))
301 args = FailingIterator(10, [5])
302 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
303 self.assertEqual(9, len(results))
305 args = FailingIterator(10, [9])
306 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
307 self.assertEqual(9, len(results))
309 if process_count * thread_count > 1:
310 # In this case, we should ignore the fail_on_error flag.
311 args = FailingIterator(10, [9])
312 results = self._RunApply(_ReturnOneValue, args, process_count,
313 thread_count, fail_on_error=True)
314 self.assertEqual(9, len(results))
316 args = FailingIterator(10, range(10))
317 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
318 self.assertEqual(0, len(results))
320 args = FailingIterator(0, [])
321 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count)
322 self.assertEqual(0, len(results))
324 @RequiresIsolation
325 def testTestSharedAttrsWorkSingleProcessSingleThread(self):
326 self._TestSharedAttrsWork(1, 1)
328 @RequiresIsolation
329 def testTestSharedAttrsWorkSingleProcessMultiThread(self):
330 self._TestSharedAttrsWork(1, 3)
332 @RequiresIsolation
333 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
334 def testTestSharedAttrsWorkMultiProcessSingleThread(self):
335 self._TestSharedAttrsWork(3, 1)
337 @RequiresIsolation
338 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
339 def testTestSharedAttrsWorkMultiProcessMultiThread(self):
340 self._TestSharedAttrsWork(3, 3)
342 @Timeout
343 def _TestSharedAttrsWork(self, process_count, thread_count):
344 """Tests that Apply successfully uses shared_attrs."""
345 command_inst = self.command_class(True)
346 command_inst.arg_length_sum = 19
347 args = ['foo', ['bar', 'baz'], [], ['x', 'y'], [], 'abcd']
348 self._RunApply(_IncrementByLength, args, process_count,
349 thread_count, command_inst=command_inst,
350 shared_attrs=['arg_length_sum'])
351 expected_sum = 19
352 for arg in args:
353 expected_sum += len(arg)
354 self.assertEqual(expected_sum, command_inst.arg_length_sum)
356 # Test that shared variables work when the iterator fails.
357 command_inst = self.command_class(True)
358 args = FailingIterator(10, [1, 3, 5])
359 self._RunApply(_ReturnOneValue, args, process_count, thread_count,
360 command_inst=command_inst, shared_attrs=['failure_count'])
361 self.assertEqual(3, command_inst.failure_count)
363 @RequiresIsolation
364 def testThreadsSurviveExceptionsInFuncSingleProcessSingleThread(self):
365 self._TestThreadsSurviveExceptionsInFunc(1, 1)
367 @RequiresIsolation
368 def testThreadsSurviveExceptionsInFuncSingleProcessMultiThread(self):
369 self._TestThreadsSurviveExceptionsInFunc(1, 3)
371 @RequiresIsolation
372 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
373 def testThreadsSurviveExceptionsInFuncMultiProcessSingleThread(self):
374 self._TestThreadsSurviveExceptionsInFunc(3, 1)
376 @RequiresIsolation
377 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
378 def testThreadsSurviveExceptionsInFuncMultiProcessMultiThread(self):
379 self._TestThreadsSurviveExceptionsInFunc(3, 3)
381 @Timeout
382 def _TestThreadsSurviveExceptionsInFunc(self, process_count, thread_count):
383 command_inst = self.command_class(True)
384 args = ([()] * 5)
385 self._RunApply(_FailureFunc, args, process_count, thread_count,
386 command_inst=command_inst, shared_attrs=['failure_count'],
387 thr_exc_handler=_FailingExceptionHandler)
388 self.assertEqual(len(args), command_inst.failure_count)
390 @RequiresIsolation
391 def testThreadsSurviveExceptionsInHandlerSingleProcessSingleThread(self):
392 self._TestThreadsSurviveExceptionsInHandler(1, 1)
394 @RequiresIsolation
395 def testThreadsSurviveExceptionsInHandlerSingleProcessMultiThread(self):
396 self._TestThreadsSurviveExceptionsInHandler(1, 3)
398 @RequiresIsolation
399 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
400 def testThreadsSurviveExceptionsInHandlerMultiProcessSingleThread(self):
401 self._TestThreadsSurviveExceptionsInHandler(3, 1)
403 @RequiresIsolation
404 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
405 def testThreadsSurviveExceptionsInHandlerMultiProcessMultiThread(self):
406 self._TestThreadsSurviveExceptionsInHandler(3, 3)
408 @Timeout
409 def _TestThreadsSurviveExceptionsInHandler(self, process_count, thread_count):
410 command_inst = self.command_class(True)
411 args = ([()] * 5)
412 self._RunApply(_FailureFunc, args, process_count, thread_count,
413 command_inst=command_inst, shared_attrs=['failure_count'],
414 thr_exc_handler=_FailingExceptionHandler)
415 self.assertEqual(len(args), command_inst.failure_count)
417 @RequiresIsolation
418 @Timeout
419 def testFailOnErrorFlag(self):
420 """Tests that fail_on_error produces the correct exception on failure."""
421 def _ExpectCustomException(test_func):
422 try:
423 test_func()
424 self.fail(
425 'Setting fail_on_error should raise any exception encountered.')
426 except CustomException, e:
427 pass
428 except Exception, e:
429 self.fail('Got unexpected error: ' + str(e))
431 def _RunFailureFunc():
432 command_inst = self.command_class(True)
433 args = ([()] * 5)
434 self._RunApply(_FailureFunc, args, 1, 1, command_inst=command_inst,
435 shared_attrs=['failure_count'], fail_on_error=True)
436 _ExpectCustomException(_RunFailureFunc)
438 def _RunFailingIteratorFirstPosition():
439 args = FailingIterator(10, [0])
440 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
441 self.assertEqual(0, len(results))
442 _ExpectCustomException(_RunFailingIteratorFirstPosition)
444 def _RunFailingIteratorPositionMiddlePosition():
445 args = FailingIterator(10, [5])
446 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
447 self.assertEqual(5, len(results))
448 _ExpectCustomException(_RunFailingIteratorPositionMiddlePosition)
450 def _RunFailingIteratorLastPosition():
451 args = FailingIterator(10, [9])
452 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
453 self.assertEqual(9, len(results))
454 _ExpectCustomException(_RunFailingIteratorLastPosition)
456 def _RunFailingIteratorMultiplePositions():
457 args = FailingIterator(10, [1, 3, 5])
458 results = self._RunApply(_ReturnOneValue, args, 1, 1, fail_on_error=True)
459 self.assertEqual(1, len(results))
460 _ExpectCustomException(_RunFailingIteratorMultiplePositions)
462 @RequiresIsolation
463 def testRecursiveDepthThreeDifferentFunctionsSingleProcessSingleThread(self):
464 self._TestRecursiveDepthThreeDifferentFunctions(1, 1)
466 @RequiresIsolation
467 def testRecursiveDepthThreeDifferentFunctionsSingleProcessMultiThread(self):
468 self._TestRecursiveDepthThreeDifferentFunctions(1, 3)
470 @RequiresIsolation
471 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
472 def testRecursiveDepthThreeDifferentFunctionsMultiProcessSingleThread(self):
473 self._TestRecursiveDepthThreeDifferentFunctions(3, 1)
475 @RequiresIsolation
476 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
477 def testRecursiveDepthThreeDifferentFunctionsMultiProcessMultiThread(self):
478 self._TestRecursiveDepthThreeDifferentFunctions(3, 3)
480 @Timeout
481 def _TestRecursiveDepthThreeDifferentFunctions(self, process_count,
482 thread_count):
483 """Tests recursive application of Apply.
485 Calls Apply(A), where A calls Apply(B) followed by Apply(C) and B calls
486 Apply(C).
488 Args:
489 process_count: Number of processes to use.
490 thread_count: Number of threads to use.
492 base_args = [3, 1, 4, 1, 5]
493 args = [[process_count, thread_count, count] for count in base_args]
495 results = self._RunApply(_ReApplyWithReplicatedArguments, args,
496 process_count, thread_count)
497 self.assertEqual(7 * (sum(base_args) + len(base_args)), sum(results))
499 @RequiresIsolation
500 def testExceptionInProducerRaisesAndTerminatesSingleProcessSingleThread(self):
501 self._TestExceptionInProducerRaisesAndTerminates(1, 1)
503 @RequiresIsolation
504 def testExceptionInProducerRaisesAndTerminatesSingleProcessMultiThread(self):
505 self._TestExceptionInProducerRaisesAndTerminates(1, 3)
507 @RequiresIsolation
508 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
509 def testExceptionInProducerRaisesAndTerminatesMultiProcessSingleThread(self):
510 self._TestExceptionInProducerRaisesAndTerminates(3, 1)
512 @RequiresIsolation
513 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
514 def testExceptionInProducerRaisesAndTerminatesMultiProcessMultiThread(self):
515 self._TestExceptionInProducerRaisesAndTerminates(3, 3)
517 @Timeout
518 def _TestExceptionInProducerRaisesAndTerminates(self, process_count,
519 thread_count):
520 args = self # The ProducerThread will try and fail to iterate over this.
521 try:
522 self._RunApply(_ReturnOneValue, args, process_count, thread_count)
523 self.fail('Did not raise expected exception.')
524 except TypeError:
525 pass
527 @RequiresIsolation
528 def testSkippedArgumentsSingleThreadSingleProcess(self):
529 self._TestSkippedArguments(1, 1)
531 @RequiresIsolation
532 def testSkippedArgumentsMultiThreadSingleProcess(self):
533 self._TestSkippedArguments(1, 3)
535 @RequiresIsolation
536 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
537 def testSkippedArgumentsSingleThreadMultiProcess(self):
538 self._TestSkippedArguments(3, 1)
540 @RequiresIsolation
541 @unittest.skipIf(IS_WINDOWS, 'Multiprocessing is not supported on Windows')
542 def testSkippedArgumentsMultiThreadMultiProcess(self):
543 self._TestSkippedArguments(3, 3)
545 @Timeout
546 def _TestSkippedArguments(self, process_count, thread_count):
548 # Skip a proper subset of the arguments.
549 n = 2 * process_count * thread_count
550 args = range(1, n + 1)
551 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
552 arg_checker=_SkipEvenNumbersArgChecker)
553 self.assertEqual(n / 2, len(results)) # We know n is even.
554 self.assertEqual(n / 2, sum(results))
556 # Skip all arguments.
557 args = [2 * x for x in args]
558 results = self._RunApply(_ReturnOneValue, args, process_count, thread_count,
559 arg_checker=_SkipEvenNumbersArgChecker)
560 self.assertEqual(0, len(results))
563 class TestParallelismFrameworkWithoutMultiprocessing(TestParallelismFramework):
564 """Tests parallelism framework works with multiprocessing module unavailable.
566 Notably, this test has no way to override previous calls
567 to gslib.util.MultiprocessingIsAvailable to prevent the initialization of
568 all of the global variables in command.py, so this still behaves slightly
569 differently than the behavior one would see on a machine where the
570 multiprocessing functionality is actually not available (in particular, it
571 will not catch the case where a global variable that is not available for
572 the sequential path is referenced before initialization).
574 command_class = FakeCommandWithoutMultiprocessingModule