1 """Thread module emulating a subset of Java's threading model."""
8 del _sys
.modules
[__name__
]
11 from StringIO
import StringIO
as _StringIO
12 from time
import time
as _time
, sleep
as _sleep
13 from traceback
import print_exc
as _print_exc
15 # Rename some stuff so "from threading import *" is safe
16 __all__
= ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
17 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', 'Timer']
19 _start_new_thread
= thread
.start_new_thread
20 _allocate_lock
= thread
.allocate_lock
21 _get_ident
= thread
.get_ident
22 ThreadError
= thread
.error
26 # Debug support (adapted from ihooks.py)
28 _VERBOSE
= 0 # XXX Bool or int?
34 def __init__(self
, verbose
=None):
37 self
.__verbose
= verbose
39 def _note(self
, format
, *args
):
41 format
= format
% args
42 format
= "%s: %s\n" % (
43 currentThread().getName(), format
)
44 _sys
.stderr
.write(format
)
47 # Disable this when using "python -O"
49 def __init__(self
, verbose
=None):
51 def _note(self
, *args
):
55 # Synchronization classes
59 def RLock(*args
, **kwargs
):
60 return apply(_RLock
, args
, kwargs
)
62 class _RLock(_Verbose
):
64 def __init__(self
, verbose
=None):
65 _Verbose
.__init
__(self
, verbose
)
66 self
.__block
= _allocate_lock()
71 return "<%s(%s, %d)>" % (
72 self
.__class
__.__name
__,
73 self
.__owner
and self
.__owner
.getName(),
76 def acquire(self
, blocking
=1):
78 if self
.__owner
is me
:
79 self
.__count
= self
.__count
+ 1
81 self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
83 rc
= self
.__block
.acquire(blocking
)
88 self
._note
("%s.acquire(%s): initial succes", self
, blocking
)
91 self
._note
("%s.acquire(%s): failure", self
, blocking
)
96 assert self
.__owner
is me
, "release() of un-acquire()d lock"
97 self
.__count
= count
= self
.__count
- 1
100 self
.__block
.release()
102 self
._note
("%s.release(): final release", self
)
105 self
._note
("%s.release(): non-final release", self
)
107 # Internal methods used by condition variables
109 def _acquire_restore(self
, (count
, owner
)):
110 self
.__block
.acquire()
114 self
._note
("%s._acquire_restore()", self
)
116 def _release_save(self
):
118 self
._note
("%s._release_save()", self
)
123 self
.__block
.release()
124 return (count
, owner
)
127 return self
.__owner
is currentThread()
130 def Condition(*args
, **kwargs
):
131 return apply(_Condition
, args
, kwargs
)
133 class _Condition(_Verbose
):
135 def __init__(self
, lock
=None, verbose
=None):
136 _Verbose
.__init
__(self
, verbose
)
140 # Export the lock's acquire() and release() methods
141 self
.acquire
= lock
.acquire
142 self
.release
= lock
.release
143 # If the lock defines _release_save() and/or _acquire_restore(),
144 # these override the default implementations (which just call
145 # release() and acquire() on the lock). Ditto for _is_owned().
147 self
._release
_save
= lock
._release
_save
148 except AttributeError:
151 self
._acquire
_restore
= lock
._acquire
_restore
152 except AttributeError:
155 self
._is
_owned
= lock
._is
_owned
156 except AttributeError:
161 return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
163 def _release_save(self
):
164 self
.__lock
.release() # No state to save
166 def _acquire_restore(self
, x
):
167 self
.__lock
.acquire() # Ignore saved state
170 # Return True if lock is owned by currentThread.
171 # This method is called only if __lock doesn't have _is_owned().
172 if self
.__lock
.acquire(0):
173 self
.__lock
.release()
178 def wait(self
, timeout
=None):
179 currentThread() # for side-effect
180 assert self
._is
_owned
(), "wait() of un-acquire()d lock"
181 waiter
= _allocate_lock()
183 self
.__waiters
.append(waiter
)
184 saved_state
= self
._release
_save
()
185 try: # restore state no matter what (e.g., KeyboardInterrupt)
189 self
._note
("%s.wait(): got it", self
)
191 # Balancing act: We can't afford a pure busy loop, so we
192 # have to sleep; but if we sleep the whole timeout time,
193 # we'll be unresponsive. The scheme here sleeps very
194 # little at first, longer as time goes on, but never longer
195 # than 20 times per second (or the timeout time remaining).
196 endtime
= _time() + timeout
197 delay
= 0.0005 # 500 us -> initial delay of 1 ms
199 gotit
= waiter
.acquire(0)
202 remaining
= endtime
- _time()
205 delay
= min(delay
* 2, remaining
, .05)
209 self
._note
("%s.wait(%s): timed out", self
, timeout
)
211 self
.__waiters
.remove(waiter
)
216 self
._note
("%s.wait(%s): got it", self
, timeout
)
218 self
._acquire
_restore
(saved_state
)
220 def notify(self
, n
=1):
221 currentThread() # for side-effect
222 assert self
._is
_owned
(), "notify() of un-acquire()d lock"
223 __waiters
= self
.__waiters
224 waiters
= __waiters
[:n
]
227 self
._note
("%s.notify(): no waiters", self
)
229 self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
231 for waiter
in waiters
:
234 __waiters
.remove(waiter
)
239 self
.notify(len(self
.__waiters
))
242 def Semaphore(*args
, **kwargs
):
243 return apply(_Semaphore
, args
, kwargs
)
245 class _Semaphore(_Verbose
):
247 # After Tim Peters' semaphore class, but not quite the same (no maximum)
249 def __init__(self
, value
=1, verbose
=None):
250 assert value
>= 0, "Semaphore initial value must be >= 0"
251 _Verbose
.__init
__(self
, verbose
)
252 self
.__cond
= Condition(Lock())
255 def acquire(self
, blocking
=1):
257 self
.__cond
.acquire()
258 while self
.__value
== 0:
262 self
._note
("%s.acquire(%s): blocked waiting, value=%s",
263 self
, blocking
, self
.__value
)
266 self
.__value
= self
.__value
- 1
268 self
._note
("%s.acquire: success, value=%s",
271 self
.__cond
.release()
275 self
.__cond
.acquire()
276 self
.__value
= self
.__value
+ 1
278 self
._note
("%s.release: success, value=%s",
281 self
.__cond
.release()
284 def BoundedSemaphore(*args
, **kwargs
):
285 return apply(_BoundedSemaphore
, args
, kwargs
)
287 class _BoundedSemaphore(_Semaphore
):
288 """Semaphore that checks that # releases is <= # acquires"""
289 def __init__(self
, value
=1, verbose
=None):
290 _Semaphore
.__init
__(self
, value
, verbose
)
291 self
._initial
_value
= value
294 if self
._Semaphore
__value
>= self
._initial
_value
:
295 raise ValueError, "Semaphore released too many times"
296 return _Semaphore
.release(self
)
299 def Event(*args
, **kwargs
):
300 return apply(_Event
, args
, kwargs
)
302 class _Event(_Verbose
):
304 # After Tim Peters' event class (without is_posted())
306 def __init__(self
, verbose
=None):
307 _Verbose
.__init
__(self
, verbose
)
308 self
.__cond
= Condition(Lock())
315 self
.__cond
.acquire()
318 self
.__cond
.notifyAll()
320 self
.__cond
.release()
323 self
.__cond
.acquire()
327 self
.__cond
.release()
329 def wait(self
, timeout
=None):
330 self
.__cond
.acquire()
333 self
.__cond
.wait(timeout
)
335 self
.__cond
.release()
337 # Helper to generate new thread names
339 def _newname(template
="Thread-%d"):
341 _counter
= _counter
+ 1
342 return template
% _counter
344 # Active thread administration
345 _active_limbo_lock
= _allocate_lock()
350 # Main class for threads
352 class Thread(_Verbose
):
354 __initialized
= False
356 def __init__(self
, group
=None, target
=None, name
=None,
357 args
=(), kwargs
={}, verbose
=None):
358 assert group
is None, "group argument must be None for now"
359 _Verbose
.__init
__(self
, verbose
)
360 self
.__target
= target
361 self
.__name
= str(name
or _newname())
363 self
.__kwargs
= kwargs
364 self
.__daemonic
= self
._set
_daemon
()
365 self
.__started
= False
366 self
.__stopped
= False
367 self
.__block
= Condition(Lock())
368 self
.__initialized
= True
370 def _set_daemon(self
):
371 # Overridden in _MainThread and _DummyThread
372 return currentThread().isDaemon()
375 assert self
.__initialized
, "Thread.__init__() was not called"
382 status
= status
+ " daemon"
383 return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
386 assert self
.__initialized
, "Thread.__init__() not called"
387 assert not self
.__started
, "thread already started"
389 self
._note
("%s.start(): starting thread", self
)
390 _active_limbo_lock
.acquire()
392 _active_limbo_lock
.release()
393 _start_new_thread(self
.__bootstrap
, ())
394 self
.__started
= True
395 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
399 apply(self
.__target
, self
.__args
, self
.__kwargs
)
401 def __bootstrap(self
):
403 self
.__started
= True
404 _active_limbo_lock
.acquire()
405 _active
[_get_ident()] = self
407 _active_limbo_lock
.release()
409 self
._note
("%s.__bootstrap(): thread started", self
)
414 self
._note
("%s.__bootstrap(): raised SystemExit", self
)
417 self
._note
("%s.__bootstrap(): unhandled exception", self
)
420 _sys
.stderr
.write("Exception in thread %s:\n%s\n" %
421 (self
.getName(), s
.getvalue()))
424 self
._note
("%s.__bootstrap(): normal return", self
)
433 self
.__block
.acquire()
434 self
.__stopped
= True
435 self
.__block
.notifyAll()
436 self
.__block
.release()
439 _active_limbo_lock
.acquire()
440 del _active
[_get_ident()]
441 _active_limbo_lock
.release()
443 def join(self
, timeout
=None):
444 assert self
.__initialized
, "Thread.__init__() not called"
445 assert self
.__started
, "cannot join thread before it is started"
446 assert self
is not currentThread(), "cannot join current thread"
448 if not self
.__stopped
:
449 self
._note
("%s.join(): waiting until thread stops", self
)
450 self
.__block
.acquire()
452 while not self
.__stopped
:
455 self
._note
("%s.join(): thread stopped", self
)
457 deadline
= _time() + timeout
458 while not self
.__stopped
:
459 delay
= deadline
- _time()
462 self
._note
("%s.join(): timed out", self
)
464 self
.__block
.wait(delay
)
467 self
._note
("%s.join(): thread stopped", self
)
468 self
.__block
.release()
471 assert self
.__initialized
, "Thread.__init__() not called"
474 def setName(self
, name
):
475 assert self
.__initialized
, "Thread.__init__() not called"
476 self
.__name
= str(name
)
479 assert self
.__initialized
, "Thread.__init__() not called"
480 return self
.__started
and not self
.__stopped
483 assert self
.__initialized
, "Thread.__init__() not called"
484 return self
.__daemonic
486 def setDaemon(self
, daemonic
):
487 assert self
.__initialized
, "Thread.__init__() not called"
488 assert not self
.__started
, "cannot set daemon status of active thread"
489 self
.__daemonic
= daemonic
491 # The timer class was contributed by Itamar Shtull-Trauring
493 def Timer(*args
, **kwargs
):
494 return _Timer(*args
, **kwargs
)
496 class _Timer(Thread
):
497 """Call a function after a specified number of seconds:
499 t = Timer(30.0, f, args=[], kwargs={})
501 t.cancel() # stop the timer's action if it's still waiting
504 def __init__(self
, interval
, function
, args
=[], kwargs
={}):
505 Thread
.__init
__(self
)
506 self
.interval
= interval
507 self
.function
= function
510 self
.finished
= Event()
513 """Stop the timer if it hasn't finished yet"""
517 self
.finished
.wait(self
.interval
)
518 if not self
.finished
.isSet():
519 self
.function(*self
.args
, **self
.kwargs
)
522 # Special thread class to represent the main thread
523 # This is garbage collected through an exit handler
525 class _MainThread(Thread
):
528 Thread
.__init
__(self
, name
="MainThread")
529 self
._Thread
__started
= True
530 _active_limbo_lock
.acquire()
531 _active
[_get_ident()] = self
532 _active_limbo_lock
.release()
534 atexit
.register(self
.__exitfunc
)
536 def _set_daemon(self
):
539 def __exitfunc(self
):
541 t
= _pickSomeNonDaemonThread()
544 self
._note
("%s: waiting for other threads", self
)
547 t
= _pickSomeNonDaemonThread()
549 self
._note
("%s: exiting", self
)
550 self
._Thread
__delete
()
552 def _pickSomeNonDaemonThread():
553 for t
in enumerate():
554 if not t
.isDaemon() and t
.isAlive():
559 # Dummy thread class to represent threads not started here.
560 # These aren't garbage collected when they die,
561 # nor can they be waited for.
562 # Their purpose is to return *something* from currentThread().
563 # They are marked as daemon threads so we won't wait for them
564 # when we exit (conform previous semantics).
566 class _DummyThread(Thread
):
569 Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
570 self
._Thread
__started
= True
571 _active_limbo_lock
.acquire()
572 _active
[_get_ident()] = self
573 _active_limbo_lock
.release()
575 def _set_daemon(self
):
578 def join(self
, timeout
=None):
579 assert False, "cannot join a dummy thread"
582 # Global API functions
586 return _active
[_get_ident()]
588 ##print "currentThread(): no current thread for", _get_ident()
589 return _DummyThread()
592 _active_limbo_lock
.acquire()
593 count
= len(_active
) + len(_limbo
)
594 _active_limbo_lock
.release()
598 _active_limbo_lock
.acquire()
599 active
= _active
.values() + _limbo
.values()
600 _active_limbo_lock
.release()
604 # Create the main thread object
613 class BoundedQueue(_Verbose
):
615 def __init__(self
, limit
):
616 _Verbose
.__init
__(self
)
618 self
.rc
= Condition(self
.mon
)
619 self
.wc
= Condition(self
.mon
)
625 while len(self
.queue
) >= self
.limit
:
626 self
._note
("put(%s): queue full", item
)
628 self
.queue
.append(item
)
629 self
._note
("put(%s): appended, length now %d",
630 item
, len(self
.queue
))
636 while not self
.queue
:
637 self
._note
("get(): queue empty")
639 item
= self
.queue
.pop(0)
640 self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
645 class ProducerThread(Thread
):
647 def __init__(self
, queue
, quota
):
648 Thread
.__init
__(self
, name
="Producer")
653 from random
import random
655 while counter
< self
.quota
:
656 counter
= counter
+ 1
657 self
.queue
.put("%s.%d" % (self
.getName(), counter
))
658 _sleep(random() * 0.00001)
661 class ConsumerThread(Thread
):
663 def __init__(self
, queue
, count
):
664 Thread
.__init
__(self
, name
="Consumer")
669 while self
.count
> 0:
670 item
= self
.queue
.get()
672 self
.count
= self
.count
- 1
681 t
= ProducerThread(Q
, NI
)
682 t
.setName("Producer-%d" % (i
+1))
684 C
= ConsumerThread(Q
, NI
*NP
)
693 if __name__
== '__main__':