1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """Unit tests for the contents of parallelizer.py."""
7 # pylint: disable=W0212
8 # pylint: disable=W0613
15 from devil
.utils
import parallelizer
18 class ParallelizerTestObject(object):
19 """Class used to test parallelizer.Parallelizer."""
21 parallel
= parallelizer
.Parallelizer
23 def __init__(self
, thing
, completion_file_name
=None):
25 self
._completion
_file
_name
= completion_file_name
26 self
.helper
= ParallelizerTestObjectHelper(thing
)
33 def doRaise(cls
, what
):
36 def doSetTheThing(self
, new_thing
):
37 self
._thing
= new_thing
39 def doReturnTheThing(self
):
42 def doRaiseTheThing(self
):
45 def doRaiseIfExceptionElseSleepFor(self
, sleep_duration
):
46 if isinstance(self
._thing
, Exception):
48 time
.sleep(sleep_duration
)
49 self
._write
_completion
_file
()
52 def _write_completion_file(self
):
53 if self
._completion
_file
_name
and len(self
._completion
_file
_name
):
54 with
open(self
._completion
_file
_name
, 'w+b') as completion_file
:
55 completion_file
.write('complete')
57 def __getitem__(self
, index
):
58 return self
._thing
[index
]
61 return type(self
).__name
__
64 class ParallelizerTestObjectHelper(object):
66 def __init__(self
, thing
):
69 def doReturnStringThing(self
):
70 return str(self
._thing
)
73 class ParallelizerTest(unittest
.TestCase
):
75 def testInitWithNone(self
):
76 with self
.assertRaises(AssertionError):
77 parallelizer
.Parallelizer(None)
79 def testInitEmptyList(self
):
80 with self
.assertRaises(AssertionError):
81 parallelizer
.Parallelizer([])
83 def testMethodCall(self
):
84 test_data
= ['abc_foo', 'def_foo', 'ghi_foo']
85 expected
= ['abc_bar', 'def_bar', 'ghi_bar']
86 r
= parallelizer
.Parallelizer(test_data
).replace('_foo', '_bar').pGet(0.1)
87 self
.assertEquals(expected
, r
)
90 devices
= [ParallelizerTestObject(True) for _
in xrange(0, 10)]
91 self
.assertTrue(all(d
.doReturnTheThing() for d
in devices
))
92 ParallelizerTestObject
.parallel(devices
).doSetTheThing(False).pFinish(1)
93 self
.assertTrue(not any(d
.doReturnTheThing() for d
in devices
))
95 def testAllReturn(self
):
96 devices
= [ParallelizerTestObject(True) for _
in xrange(0, 10)]
97 results
= ParallelizerTestObject
.parallel(
98 devices
).doReturnTheThing().pGet(1)
99 self
.assertTrue(isinstance(results
, list))
100 self
.assertEquals(10, len(results
))
101 self
.assertTrue(all(results
))
103 def testAllRaise(self
):
104 devices
= [ParallelizerTestObject(Exception('thing %d' % i
))
105 for i
in xrange(0, 10)]
106 p
= ParallelizerTestObject
.parallel(devices
).doRaiseTheThing()
107 with self
.assertRaises(Exception):
110 def testOneFailOthersComplete(self
):
111 parallel_device_count
= 10
113 exception_msg
= 'thing %d' % exception_index
116 completion_files
= [tempfile
.NamedTemporaryFile(delete
=False)
117 for _
in xrange(0, parallel_device_count
)]
119 ParallelizerTestObject(
120 i
if i
!= exception_index
else Exception(exception_msg
),
121 completion_files
[i
].name
)
122 for i
in xrange(0, parallel_device_count
)]
123 for f
in completion_files
:
125 p
= ParallelizerTestObject
.parallel(devices
)
126 with self
.assertRaises(Exception) as e
:
127 p
.doRaiseIfExceptionElseSleepFor(2).pGet(3)
128 self
.assertTrue(exception_msg
in str(e
.exception
))
129 for i
in xrange(0, parallel_device_count
):
130 with
open(completion_files
[i
].name
) as f
:
131 if i
== exception_index
:
132 self
.assertEquals('', f
.read())
134 self
.assertEquals('complete', f
.read())
136 for f
in completion_files
:
139 def testReusable(self
):
140 devices
= [ParallelizerTestObject(True) for _
in xrange(0, 10)]
141 p
= ParallelizerTestObject
.parallel(devices
)
142 results
= p
.doReturn(True).pGet(1)
143 self
.assertTrue(all(results
))
144 results
= p
.doReturn(True).pGet(1)
145 self
.assertTrue(all(results
))
146 with self
.assertRaises(Exception):
147 results
= p
.doRaise(Exception('reusableTest')).pGet(1)
149 def testContained(self
):
150 devices
= [ParallelizerTestObject(i
) for i
in xrange(0, 10)]
151 results
= (ParallelizerTestObject
.parallel(devices
).helper
152 .doReturnStringThing().pGet(1))
153 self
.assertTrue(isinstance(results
, list))
154 self
.assertEquals(10, len(results
))
155 for i
in xrange(0, 10):
156 self
.assertEquals(str(i
), results
[i
])
158 def testGetItem(self
):
159 devices
= [ParallelizerTestObject(range(i
, i
+10)) for i
in xrange(0, 10)]
160 results
= ParallelizerTestObject
.parallel(devices
)[9].pGet(1)
161 self
.assertEquals(range(9, 19), results
)
164 if __name__
== '__main__':
165 unittest
.main(verbosity
=2)