1 # -*- coding: UTF-8 -*-
2 """Easy to use object-oriented thread pool framework.
4 A thread pool is an object that maintains a pool of worker threads to perform
5 time consuming operations in parallel. It assigns jobs to the threads
6 by putting them in a work request queue, where they are picked up by the
7 next available thread. This then performs the requested operation in the
8 background and puts the results in another queue.
10 The thread pool object can then collect the results from all threads from
11 this queue as soon as they become available or after all threads have
12 finished their work. It's also possible, to define callbacks to handle
13 each result as it comes in.
15 The basic concept and some code was taken from the book "Python in a Nutshell,
16 2nd edition" by Alex Martelli, O'Reilly 2006, ISBN 0-596-10046-9, from section
17 14.5 "Threaded Program Architecture". I wrapped the main program logic in the
18 ThreadPool class, added the WorkRequest class and the callback system and
19 tweaked the code here and there. Kudos also to Florent Aide for the exception
24 >>> pool = ThreadPool(poolsize)
25 >>> requests = makeRequests(some_callable, list_of_args, callback)
26 >>> [pool.putRequest(req) for req in requests]
29 See the end of the module code for a brief, annotated usage example.
31 Website : http://chrisarndt.de/projects/threadpool/
34 __docformat__
= "restructuredtext en"
45 __author__
= "Christopher Arndt"
47 __revision__
= "$Revision: 416 $"
48 __date__
= "$Date: 2009-10-07 05:41:27 +0200 (Wed, 07 Oct 2009) $"
49 __license__
= "MIT license"
52 # standard library modules
60 class NoResultsPending(Exception):
61 """All work requests have been processed."""
64 class NoWorkersAvailable(Exception):
65 """No worker threads available to process remaining requests."""
69 # internal module helper functions
70 def _handle_thread_exception(request
, exc_info
):
71 """Default exception handler callback function.
73 This just prints the exception info via ``traceback.print_exception``.
76 traceback
.print_exception(*exc_info
)
80 def makeRequests(callable_
, args_list
, callback
=None,
81 exc_callback
=_handle_thread_exception
):
82 """Create several work requests for same callable with different arguments.
84 Convenience function for creating several work requests for the same
85 callable where each invocation of the callable receives different values
88 ``args_list`` contains the parameters for each invocation of callable.
89 Each item in ``args_list`` should be either a 2-item tuple of the list of
90 positional arguments and a dictionary of keyword arguments or a single,
93 See docstring for ``WorkRequest`` for info on ``callback`` and
98 for item
in args_list
:
99 if isinstance(item
, tuple):
101 WorkRequest(callable_
, item
[0], item
[1], callback
=callback
,
102 exc_callback
=exc_callback
)
106 WorkRequest(callable_
, [item
], None, callback
=callback
,
107 exc_callback
=exc_callback
)
113 class WorkerThread(threading
.Thread
):
114 """Background thread connected to the requests/results queues.
116 A worker thread sits in the background and picks up work requests from
117 one queue and puts the results in another until it is dismissed.
121 def __init__(self
, requests_queue
, results_queue
, poll_timeout
=5, **kwds
):
122 """Set up thread in daemonic mode and start it immediatedly.
124 ``requests_queue`` and ``results_queue`` are instances of
125 ``Queue.Queue`` passed by the ``ThreadPool`` class when it creates a new
129 threading
.Thread
.__init
__(self
, **kwds
)
131 self
._requests
_queue
= requests_queue
132 self
._results
_queue
= results_queue
133 self
._poll
_timeout
= poll_timeout
134 self
._dismissed
= threading
.Event()
138 """Repeatedly process the job queue until told to exit."""
140 if self
._dismissed
.isSet():
141 # we are dismissed, break out of loop
143 # get next work request. If we don't get a new request from the
144 # queue after self._poll_timout seconds, we jump to the start of
145 # the while loop again, to give the thread a chance to exit.
147 request
= self
._requests
_queue
.get(True, self
._poll
_timeout
)
151 if self
._dismissed
.isSet():
152 # we are dismissed, put back request in queue and exit loop
153 self
._requests
_queue
.put(request
)
156 result
= request
.callable(*request
.args
, **request
.kwds
)
157 self
._results
_queue
.put((request
, result
))
159 request
.exception
= True
160 self
._results
_queue
.put((request
, sys
.exc_info()))
163 """Sets a flag to tell the thread to exit when done with current job."""
164 self
._dismissed
.set()
168 """A request to execute a callable for putting in the request queue later.
170 See the module function ``makeRequests`` for the common case
171 where you want to build several ``WorkRequest`` objects for the same
172 callable but with different arguments for each call.
176 def __init__(self
, callable_
, args
=None, kwds
=None, requestID
=None,
177 callback
=None, exc_callback
=_handle_thread_exception
):
178 """Create a work request for a callable and attach callbacks.
180 A work request consists of the a callable to be executed by a
181 worker thread, a list of positional arguments, a dictionary
182 of keyword arguments.
184 A ``callback`` function can be specified, that is called when the
185 results of the request are picked up from the result queue. It must
186 accept two anonymous arguments, the ``WorkRequest`` object and the
187 results of the callable, in that order. If you want to pass additional
188 information to the callback, just stick it on the request object.
190 You can also give custom callback for when an exception occurs with
191 the ``exc_callback`` keyword parameter. It should also accept two
192 anonymous arguments, the ``WorkRequest`` and a tuple with the exception
193 details as returned by ``sys.exc_info()``. The default implementation
194 of this callback just prints the exception info via
195 ``traceback.print_exception``. If you want no exception handler
196 callback, just pass in ``None``.
198 ``requestID``, if given, must be hashable since it is used by
199 ``ThreadPool`` object to store the results of that work request in a
200 dictionary. It defaults to the return value of ``id(self)``.
203 if requestID
is None:
204 self
.requestID
= id(self
)
207 self
.requestID
= hash(requestID
)
209 raise TypeError("requestID must be hashable.")
210 self
.exception
= False
211 self
.callback
= callback
212 self
.exc_callback
= exc_callback
213 self
.callable = callable_
214 self
.args
= args
or []
215 self
.kwds
= kwds
or {}
218 return "<WorkRequest id=%s args=%r kwargs=%r exception=%s>" % \
219 (self
.requestID
, self
.args
, self
.kwds
, self
.exception
)
222 """A thread pool, distributing work requests and collecting results.
224 See the module docstring for more information.
228 def __init__(self
, num_workers
, q_size
=0, resq_size
=0, poll_timeout
=5):
229 """Set up the thread pool and start num_workers worker threads.
231 ``num_workers`` is the number of worker threads to start initially.
233 If ``q_size > 0`` the size of the work *request queue* is limited and
234 the thread pool blocks when the queue is full and it tries to put
235 more work requests in it (see ``putRequest`` method), unless you also
236 use a positive ``timeout`` value for ``putRequest``.
238 If ``resq_size > 0`` the size of the *results queue* is limited and the
239 worker threads will block when the queue is full and they try to put
243 If you set both ``q_size`` and ``resq_size`` to ``!= 0`` there is
244 the possibilty of a deadlock, when the results queue is not pulled
245 regularly and too many jobs are put in the work requests queue.
246 To prevent this, always set ``timeout > 0`` when calling
247 ``ThreadPool.putRequest()`` and catch ``Queue.Full`` exceptions.
250 self
._requests
_queue
= Queue
.Queue(q_size
)
251 self
._results
_queue
= Queue
.Queue(resq_size
)
253 self
.dismissedWorkers
= []
254 self
.workRequests
= {}
255 self
.createWorkers(num_workers
, poll_timeout
)
257 def createWorkers(self
, num_workers
, poll_timeout
=5):
258 """Add num_workers worker threads to the pool.
260 ``poll_timout`` sets the interval in seconds (int or float) for how
261 ofte threads should check whether they are dismissed, while waiting for
265 for i
in range(num_workers
):
266 self
.workers
.append(WorkerThread(self
._requests
_queue
,
267 self
._results
_queue
, poll_timeout
=poll_timeout
))
269 def dismissWorkers(self
, num_workers
, do_join
=False):
270 """Tell num_workers worker threads to quit after their current task."""
272 for i
in range(min(num_workers
, len(self
.workers
))):
273 worker
= self
.workers
.pop()
275 dismiss_list
.append(worker
)
278 for worker
in dismiss_list
:
281 self
.dismissedWorkers
.extend(dismiss_list
)
283 def joinAllDismissedWorkers(self
):
284 """Perform Thread.join() on all worker threads that have been dismissed.
286 for worker
in self
.dismissedWorkers
:
288 self
.dismissedWorkers
= []
290 def putRequest(self
, request
, block
=True, timeout
=None):
291 """Put work request into work queue and save its id for later."""
292 assert isinstance(request
, WorkRequest
)
293 # don't reuse old work requests
294 assert not getattr(request
, 'exception', None)
295 self
._requests
_queue
.put(request
, block
, timeout
)
296 self
.workRequests
[request
.requestID
] = request
298 def poll(self
, block
=False):
299 """Process any new results in the queue."""
301 # still results pending?
302 if not self
.workRequests
:
303 raise NoResultsPending
304 # are there still workers to process remaining requests?
305 elif block
and not self
.workers
:
306 raise NoWorkersAvailable
308 # get back next results
309 request
, result
= self
._results
_queue
.get(block
=block
)
310 # has an exception occured?
311 if request
.exception
and request
.exc_callback
:
312 request
.exc_callback(request
, result
)
313 # hand results to callback, if any
314 if request
.callback
and not \
315 (request
.exception
and request
.exc_callback
):
316 request
.callback(request
, result
)
317 del self
.workRequests
[request
.requestID
]
322 """Wait for results, blocking until all have arrived."""
326 except NoResultsPending
:
334 if __name__
== '__main__':
338 # the work the threads will have to do (rather trivial in our example)
339 def do_something(data
):
340 time
.sleep(random
.randint(1,5))
341 result
= round(random
.random() * data
, 5)
342 # just to show off, we throw an exception once in a while
344 raise RuntimeError("Something extraordinary happened!")
347 # this will be called each time a result is available
348 def print_result(request
, result
):
349 print "**** Result from request #%s: %r" % (request
.requestID
, result
)
351 # this will be called when an exception occurs within a thread
352 # this example exception handler does little more than the default handler
353 def handle_exception(request
, exc_info
):
354 if not isinstance(exc_info
, tuple):
355 # Something is seriously wrong...
359 print "**** Exception occured in request #%s: %s" % \
360 (request
.requestID
, exc_info
)
362 # assemble the arguments for each job to a list...
363 data
= [random
.randint(1,10) for i
in range(20)]
364 # ... and build a WorkRequest object for each item in data
365 requests
= makeRequests(do_something
, data
, print_result
, handle_exception
)
366 # to use the default exception handler, uncomment next line and comment out
368 #requests = makeRequests(do_something, data, print_result)
370 # or the other form of args_lists accepted by makeRequests: ((,), {})
371 data
= [((random
.randint(1,10),), {}) for i
in range(20)]
373 makeRequests(do_something
, data
, print_result
, handle_exception
)
374 #makeRequests(do_something, data, print_result)
375 # to use the default exception handler, uncomment next line and comment
376 # out the preceding one.
379 # we create a pool of 3 worker threads
380 print "Creating thread pool with 3 worker threads."
383 # then we put the work requests in the queue...
386 print "Work request #%s added." % req
.requestID
388 # [main.putRequest(req) for req in requests]
390 # ...and wait for the results to arrive in the result queue
391 # by using ThreadPool.wait(). This would block until results for
392 # all work requests have arrived:
395 # instead we can poll for results while doing something else:
401 print "Main thread working...",
402 print "(active worker threads: %i)" % (threading
.activeCount()-1, )
404 print "**** Adding 3 more worker threads..."
405 main
.createWorkers(3)
407 print "**** Dismissing 2 worker threads..."
408 main
.dismissWorkers(2)
410 except KeyboardInterrupt:
411 print "**** Interrupted!"
413 except NoResultsPending
:
414 print "**** No pending results."
416 if main
.dismissedWorkers
:
417 print "Joining all dismissed worker threads..."
418 main
.joinAllDismissedWorkers()