1 # Copyright 2014 The Chromium Authors. All rights reserved.
2 # Use of this source code is governed by a BSD-style license that can be
3 # found in the LICENSE file.
5 """ Wrapper that allows method execution in parallel.
7 This class wraps a list of objects of the same type, emulates their
8 interface, and executes any functions called on the objects in parallel
11 This means that, given a list of objects:
17 def bar(self, my_param):
20 list_of_foos = [Foo(1), Foo(2), Foo(3)]
22 we can take a sequential operation on that list of objects:
24 for f in list_of_foos:
27 and run it in parallel across all of the objects:
29 Parallelizer(list_of_foos).bar('Hello')
31 It can also handle (non-method) attributes of objects, so that this:
33 for f in list_of_foos:
36 can be run in parallel with:
38 Parallelizer(list_of_foos).baz.myBazMethod()
40 Because it emulates the interface of the wrapped objects, a Parallelizer
41 can be passed to a method or function that takes objects of that type:
43 def DoesSomethingWithFoo(the_foo):
46 the_foo.baz.myBazMethod
48 DoesSomethingWithFoo(Parallelizer(list_of_foos))
50 Note that this class spins up a thread for each object. Using this class
51 to parallelize operations that are already fast will incur a net performance
55 # pylint: disable=protected-access
57 from devil
.utils
import reraiser_thread
58 from devil
.utils
import watchdog_timer
64 class Parallelizer(object):
65 """Allows parallel execution of method calls across a group of objects."""
67 def __init__(self
, objs
):
68 assert (objs
is not None and len(objs
) > 0), (
69 "Passed empty list to 'Parallelizer'")
70 self
._orig
_objs
= objs
73 def __getattr__(self
, name
):
74 """Emulate getting the |name| attribute of |self|.
77 name: The name of the attribute to retrieve.
79 A Parallelizer emulating the |name| attribute of |self|.
83 r
= type(self
)(self
._orig
_objs
)
84 r
._objs
= [getattr(o
, name
) for o
in self
._objs
]
87 def __getitem__(self
, index
):
88 """Emulate getting the value of |self| at |index|.
91 A Parallelizer emulating the value of |self| at |index|.
95 r
= type(self
)(self
._orig
_objs
)
96 r
._objs
= [o
[index
] for o
in self
._objs
]
99 def __call__(self
, *args
, **kwargs
):
100 """Emulate calling |self| with |args| and |kwargs|.
102 Note that this call is asynchronous. Call pFinish on the return value to
103 block until the call finishes.
106 A Parallelizer wrapping the ReraiserThreadGroup running the call in
109 AttributeError if the wrapped objects aren't callable.
114 raise AttributeError('Nothing to call.')
117 raise AttributeError("'%s' is not callable" % o
.__name
__)
119 r
= type(self
)(self
._orig
_objs
)
120 r
._objs
= reraiser_thread
.ReraiserThreadGroup(
121 [reraiser_thread
.ReraiserThread(
122 o
, args
=args
, kwargs
=kwargs
,
123 name
='%s.%s' % (str(d
), o
.__name
__))
124 for d
, o
in zip(self
._orig
_objs
, self
._objs
)])
125 r
._objs
.StartAll() # pylint: disable=W0212
128 def pFinish(self
, timeout
):
129 """Finish any outstanding asynchronous operations.
132 timeout: The maximum number of seconds to wait for an individual
133 result to return, or None to wait forever.
135 self, now emulating the return values.
137 self
._assertNoShadow
('pFinish')
138 if isinstance(self
._objs
, reraiser_thread
.ReraiserThreadGroup
):
140 self
._objs
= self
._objs
.GetAllReturnValues(
141 watchdog_timer
.WatchdogTimer(timeout
))
144 def pGet(self
, timeout
):
145 """Get the current wrapped objects.
148 timeout: Same as |pFinish|.
150 A list of the results, in order of the provided devices.
152 Any exception raised by any of the called functions.
154 self
._assertNoShadow
('pGet')
155 self
.pFinish(timeout
)
158 def pMap(self
, f
, *args
, **kwargs
):
159 """Map a function across the current wrapped objects in parallel.
161 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
163 Note that this call is asynchronous. Call pFinish on the return value to
164 block until the call finishes.
167 f: The function to call.
168 args: The positional args to pass to f.
169 kwargs: The keyword args to pass to f.
171 A Parallelizer wrapping the ReraiserThreadGroup running the map in
174 self
._assertNoShadow
('pMap')
175 r
= type(self
)(self
._orig
_objs
)
176 r
._objs
= reraiser_thread
.ReraiserThreadGroup(
177 [reraiser_thread
.ReraiserThread(
178 f
, args
=tuple([o
] + list(args
)), kwargs
=kwargs
,
179 name
='%s(%s)' % (f
.__name
__, d
))
180 for d
, o
in zip(self
._orig
_objs
, self
._objs
)])
181 r
._objs
.StartAll() # pylint: disable=W0212
184 def _assertNoShadow(self
, attr_name
):
185 """Ensures that |attr_name| isn't shadowing part of the wrapped obejcts.
187 If the wrapped objects _do_ have an |attr_name| attribute, it will be
188 inaccessible to clients.
191 attr_name: The attribute to check.
193 AssertionError if the wrapped objects have an attribute named 'attr_name'
194 or '_assertNoShadow'.
196 if isinstance(self
._objs
, reraiser_thread
.ReraiserThreadGroup
):
197 assert not hasattr(self
._objs
, '_assertNoShadow')
198 assert not hasattr(self
._objs
, attr_name
)
200 assert not any(hasattr(o
, '_assertNoShadow') for o
in self
._objs
)
201 assert not any(hasattr(o
, attr_name
) for o
in self
._objs
)
204 class SyncParallelizer(Parallelizer
):
205 """A Parallelizer that blocks on function calls."""
208 def __call__(self
, *args
, **kwargs
):
209 """Emulate calling |self| with |args| and |kwargs|.
211 Note that this call is synchronous.
214 A Parallelizer emulating the value returned from calling |self| with
217 AttributeError if the wrapped objects aren't callable.
219 r
= super(SyncParallelizer
, self
).__call
__(*args
, **kwargs
)
224 def pMap(self
, f
, *args
, **kwargs
):
225 """Map a function across the current wrapped objects in parallel.
227 This calls f(o, *args, **kwargs) for each o in the set of wrapped objects.
229 Note that this call is synchronous.
232 f: The function to call.
233 args: The positional args to pass to f.
234 kwargs: The keyword args to pass to f.
236 A Parallelizer wrapping the ReraiserThreadGroup running the map in
239 r
= super(SyncParallelizer
, self
).pMap(f
, *args
, **kwargs
)