1 """Thread module emulating a subset of Java's threading model."""
9 # Rename some stuff so "from threading import *" is safe
18 _start_new_thread
= thread
.start_new_thread
19 _allocate_lock
= thread
.allocate_lock
20 _get_ident
= thread
.get_ident
21 ThreadError
= thread
.error
24 _print_exc
= traceback
.print_exc
27 _StringIO
= StringIO
.StringIO
31 # Debug support (adapted from ihooks.py)
33 _VERBOSE
= 0 # XXX Bool or int?
39 def __init__(self
, verbose
=None):
42 self
.__verbose
= verbose
44 def _note(self
, format
, *args
):
46 format
= format
% args
47 format
= "%s: %s\n" % (
48 currentThread().getName(), format
)
49 _sys
.stderr
.write(format
)
52 # Disable this when using "python -O"
54 def __init__(self
, verbose
=None):
56 def _note(self
, *args
):
60 # Synchronization classes
64 def RLock(*args
, **kwargs
):
65 return apply(_RLock
, args
, kwargs
)
67 class _RLock(_Verbose
):
69 def __init__(self
, verbose
=None):
70 _Verbose
.__init
__(self
, verbose
)
71 self
.__block
= _allocate_lock()
76 return "<%s(%s, %d)>" % (
77 self
.__class
__.__name
__,
78 self
.__owner
and self
.__owner
.getName(),
81 def acquire(self
, blocking
=1):
83 if self
.__owner
is me
:
84 self
.__count
= self
.__count
+ 1
86 self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
88 rc
= self
.__block
.acquire(blocking
)
93 self
._note
("%s.acquire(%s): initial succes", self
, blocking
)
96 self
._note
("%s.acquire(%s): failure", self
, blocking
)
101 assert self
.__owner
is me
, "release() of un-acquire()d lock"
102 self
.__count
= count
= self
.__count
- 1
105 self
.__block
.release()
107 self
._note
("%s.release(): final release", self
)
110 self
._note
("%s.release(): non-final release", self
)
112 # Internal methods used by condition variables
114 def _acquire_restore(self
, (count
, owner
)):
115 self
.__block
.acquire()
119 self
._note
("%s._acquire_restore()", self
)
121 def _release_save(self
):
123 self
._note
("%s._release_save()", self
)
128 self
.__block
.release()
129 return (count
, owner
)
132 return self
.__owner
is currentThread()
135 def Condition(*args
, **kwargs
):
136 return apply(_Condition
, args
, kwargs
)
138 class _Condition(_Verbose
):
140 def __init__(self
, lock
=None, verbose
=None):
141 _Verbose
.__init
__(self
, verbose
)
145 # Export the lock's acquire() and release() methods
146 self
.acquire
= lock
.acquire
147 self
.release
= lock
.release
148 # If the lock defines _release_save() and/or _acquire_restore(),
149 # these override the default implementations (which just call
150 # release() and acquire() on the lock). Ditto for _is_owned().
152 self
._release
_save
= lock
._release
_save
153 except AttributeError:
156 self
._acquire
_restore
= lock
._acquire
_restore
157 except AttributeError:
160 self
._is
_owned
= lock
._is
_owned
161 except AttributeError:
166 return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
168 def _release_save(self
):
169 self
.__lock
.release() # No state to save
171 def _acquire_restore(self
, x
):
172 self
.__lock
.acquire() # Ignore saved state
175 # Return True if lock is owned by currentThread.
176 # This method is called only if __lock doesn't have _is_owned().
177 if self
.__lock
.acquire(0):
178 self
.__lock
.release()
183 def wait(self
, timeout
=None):
184 currentThread() # for side-effect
185 assert self
._is
_owned
(), "wait() of un-acquire()d lock"
186 waiter
= _allocate_lock()
188 self
.__waiters
.append(waiter
)
189 saved_state
= self
._release
_save
()
190 try: # restore state no matter what (e.g., KeyboardInterrupt)
194 self
._note
("%s.wait(): got it", self
)
196 # Balancing act: We can't afford a pure busy loop, so we
197 # have to sleep; but if we sleep the whole timeout time,
198 # we'll be unresponsive. The scheme here sleeps very
199 # little at first, longer as time goes on, but never longer
200 # than 20 times per second (or the timeout time remaining).
201 endtime
= _time() + timeout
202 delay
= 0.0005 # 500 us -> initial delay of 1 ms
204 gotit
= waiter
.acquire(0)
207 remaining
= endtime
- _time()
210 delay
= min(delay
* 2, remaining
, .05)
214 self
._note
("%s.wait(%s): timed out", self
, timeout
)
216 self
.__waiters
.remove(waiter
)
221 self
._note
("%s.wait(%s): got it", self
, timeout
)
223 self
._acquire
_restore
(saved_state
)
225 def notify(self
, n
=1):
226 currentThread() # for side-effect
227 assert self
._is
_owned
(), "notify() of un-acquire()d lock"
228 __waiters
= self
.__waiters
229 waiters
= __waiters
[:n
]
232 self
._note
("%s.notify(): no waiters", self
)
234 self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
236 for waiter
in waiters
:
239 __waiters
.remove(waiter
)
244 self
.notify(len(self
.__waiters
))
247 def Semaphore(*args
, **kwargs
):
248 return apply(_Semaphore
, args
, kwargs
)
250 class _Semaphore(_Verbose
):
252 # After Tim Peters' semaphore class, but not quite the same (no maximum)
254 def __init__(self
, value
=1, verbose
=None):
255 assert value
>= 0, "Semaphore initial value must be >= 0"
256 _Verbose
.__init
__(self
, verbose
)
257 self
.__cond
= Condition(Lock())
260 def acquire(self
, blocking
=1):
262 self
.__cond
.acquire()
263 while self
.__value
== 0:
267 self
._note
("%s.acquire(%s): blocked waiting, value=%s",
268 self
, blocking
, self
.__value
)
271 self
.__value
= self
.__value
- 1
273 self
._note
("%s.acquire: success, value=%s",
276 self
.__cond
.release()
280 self
.__cond
.acquire()
281 self
.__value
= self
.__value
+ 1
283 self
._note
("%s.release: success, value=%s",
286 self
.__cond
.release()
289 def BoundedSemaphore(*args
, **kwargs
):
290 return apply(_BoundedSemaphore
, args
, kwargs
)
292 class _BoundedSemaphore(_Semaphore
):
293 """Semaphore that checks that # releases is <= # acquires"""
294 def __init__(self
, value
=1, verbose
=None):
295 _Semaphore
.__init
__(self
, value
, verbose
)
296 self
._initial
_value
= value
299 if self
._Semaphore
__value
>= self
._initial
_value
:
300 raise ValueError, "Semaphore released too many times"
301 return _Semaphore
.release(self
)
304 def Event(*args
, **kwargs
):
305 return apply(_Event
, args
, kwargs
)
307 class _Event(_Verbose
):
309 # After Tim Peters' event class (without is_posted())
311 def __init__(self
, verbose
=None):
312 _Verbose
.__init
__(self
, verbose
)
313 self
.__cond
= Condition(Lock())
320 self
.__cond
.acquire()
322 self
.__cond
.notifyAll()
323 self
.__cond
.release()
326 self
.__cond
.acquire()
328 self
.__cond
.release()
330 def wait(self
, timeout
=None):
331 self
.__cond
.acquire()
333 self
.__cond
.wait(timeout
)
334 self
.__cond
.release()
336 # Helper to generate new thread names
338 def _newname(template
="Thread-%d"):
340 _counter
= _counter
+ 1
341 return template
% _counter
343 # Active thread administration
344 _active_limbo_lock
= _allocate_lock()
349 # Main class for threads
351 class Thread(_Verbose
):
353 __initialized
= False
355 def __init__(self
, group
=None, target
=None, name
=None,
356 args
=(), kwargs
={}, verbose
=None):
357 assert group
is None, "group argument must be None for now"
358 _Verbose
.__init
__(self
, verbose
)
359 self
.__target
= target
360 self
.__name
= str(name
or _newname())
362 self
.__kwargs
= kwargs
363 self
.__daemonic
= self
._set
_daemon
()
364 self
.__started
= False
365 self
.__stopped
= False
366 self
.__block
= Condition(Lock())
367 self
.__initialized
= True
369 def _set_daemon(self
):
370 # Overridden in _MainThread and _DummyThread
371 return currentThread().isDaemon()
374 assert self
.__initialized
, "Thread.__init__() was not called"
381 status
= status
+ " daemon"
382 return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
385 assert self
.__initialized
, "Thread.__init__() not called"
386 assert not self
.__started
, "thread already started"
388 self
._note
("%s.start(): starting thread", self
)
389 _active_limbo_lock
.acquire()
391 _active_limbo_lock
.release()
392 _start_new_thread(self
.__bootstrap
, ())
393 self
.__started
= True
394 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
398 apply(self
.__target
, self
.__args
, self
.__kwargs
)
400 def __bootstrap(self
):
402 self
.__started
= True
403 _active_limbo_lock
.acquire()
404 _active
[_get_ident()] = self
406 _active_limbo_lock
.release()
408 self
._note
("%s.__bootstrap(): thread started", self
)
413 self
._note
("%s.__bootstrap(): raised SystemExit", self
)
416 self
._note
("%s.__bootstrap(): unhandled exception", self
)
419 _sys
.stderr
.write("Exception in thread %s:\n%s\n" %
420 (self
.getName(), s
.getvalue()))
423 self
._note
("%s.__bootstrap(): normal return", self
)
432 self
.__block
.acquire()
433 self
.__stopped
= True
434 self
.__block
.notifyAll()
435 self
.__block
.release()
438 _active_limbo_lock
.acquire()
439 del _active
[_get_ident()]
440 _active_limbo_lock
.release()
442 def join(self
, timeout
=None):
443 assert self
.__initialized
, "Thread.__init__() not called"
444 assert self
.__started
, "cannot join thread before it is started"
445 assert self
is not currentThread(), "cannot join current thread"
447 if not self
.__stopped
:
448 self
._note
("%s.join(): waiting until thread stops", self
)
449 self
.__block
.acquire()
451 while not self
.__stopped
:
454 self
._note
("%s.join(): thread stopped", self
)
456 deadline
= _time() + timeout
457 while not self
.__stopped
:
458 delay
= deadline
- _time()
461 self
._note
("%s.join(): timed out", self
)
463 self
.__block
.wait(delay
)
466 self
._note
("%s.join(): thread stopped", self
)
467 self
.__block
.release()
470 assert self
.__initialized
, "Thread.__init__() not called"
473 def setName(self
, name
):
474 assert self
.__initialized
, "Thread.__init__() not called"
475 self
.__name
= str(name
)
478 assert self
.__initialized
, "Thread.__init__() not called"
479 return self
.__started
and not self
.__stopped
482 assert self
.__initialized
, "Thread.__init__() not called"
483 return self
.__daemonic
485 def setDaemon(self
, daemonic
):
486 assert self
.__initialized
, "Thread.__init__() not called"
487 assert not self
.__started
, "cannot set daemon status of active thread"
488 self
.__daemonic
= daemonic
490 # The timer class was contributed by Itamar Shtull-Trauring
492 def Timer(*args
, **kwargs
):
493 return _Timer(*args
, **kwargs
)
495 class _Timer(Thread
):
496 """Call a function after a specified number of seconds:
498 t = Timer(30.0, f, args=[], kwargs={})
500 t.cancel() # stop the timer's action if it's still waiting
503 def __init__(self
, interval
, function
, args
=[], kwargs
={}):
504 Thread
.__init
__(self
)
505 self
.interval
= interval
506 self
.function
= function
509 self
.finished
= Event()
512 """Stop the timer if it hasn't finished yet"""
516 self
.finished
.wait(self
.interval
)
517 if not self
.finished
.isSet():
518 self
.function(*self
.args
, **self
.kwargs
)
521 # Special thread class to represent the main thread
522 # This is garbage collected through an exit handler
524 class _MainThread(Thread
):
527 Thread
.__init
__(self
, name
="MainThread")
528 self
._Thread
__started
= True
529 _active_limbo_lock
.acquire()
530 _active
[_get_ident()] = self
531 _active_limbo_lock
.release()
533 atexit
.register(self
.__exitfunc
)
535 def _set_daemon(self
):
538 def __exitfunc(self
):
540 t
= _pickSomeNonDaemonThread()
543 self
._note
("%s: waiting for other threads", self
)
546 t
= _pickSomeNonDaemonThread()
548 self
._note
("%s: exiting", self
)
549 self
._Thread
__delete
()
551 def _pickSomeNonDaemonThread():
552 for t
in enumerate():
553 if not t
.isDaemon() and t
.isAlive():
558 # Dummy thread class to represent threads not started here.
559 # These aren't garbage collected when they die,
560 # nor can they be waited for.
561 # Their purpose is to return *something* from currentThread().
562 # They are marked as daemon threads so we won't wait for them
563 # when we exit (conform previous semantics).
565 class _DummyThread(Thread
):
568 Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
569 self
._Thread
__started
= True
570 _active_limbo_lock
.acquire()
571 _active
[_get_ident()] = self
572 _active_limbo_lock
.release()
574 def _set_daemon(self
):
577 def join(self
, timeout
=None):
578 assert False, "cannot join a dummy thread"
581 # Global API functions
585 return _active
[_get_ident()]
587 ##print "currentThread(): no current thread for", _get_ident()
588 return _DummyThread()
591 _active_limbo_lock
.acquire()
592 count
= len(_active
) + len(_limbo
)
593 _active_limbo_lock
.release()
597 _active_limbo_lock
.acquire()
598 active
= _active
.values() + _limbo
.values()
599 _active_limbo_lock
.release()
603 # Create the main thread object
612 class BoundedQueue(_Verbose
):
614 def __init__(self
, limit
):
615 _Verbose
.__init
__(self
)
617 self
.rc
= Condition(self
.mon
)
618 self
.wc
= Condition(self
.mon
)
624 while len(self
.queue
) >= self
.limit
:
625 self
._note
("put(%s): queue full", item
)
627 self
.queue
.append(item
)
628 self
._note
("put(%s): appended, length now %d",
629 item
, len(self
.queue
))
635 while not self
.queue
:
636 self
._note
("get(): queue empty")
638 item
= self
.queue
.pop(0)
639 self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
644 class ProducerThread(Thread
):
646 def __init__(self
, queue
, quota
):
647 Thread
.__init
__(self
, name
="Producer")
652 from random
import random
654 while counter
< self
.quota
:
655 counter
= counter
+ 1
656 self
.queue
.put("%s.%d" % (self
.getName(), counter
))
657 _sleep(random() * 0.00001)
660 class ConsumerThread(Thread
):
662 def __init__(self
, queue
, count
):
663 Thread
.__init
__(self
, name
="Consumer")
668 while self
.count
> 0:
669 item
= self
.queue
.get()
671 self
.count
= self
.count
- 1
680 t
= ProducerThread(Q
, NI
)
681 t
.setName("Producer-%d" % (i
+1))
683 C
= ConsumerThread(Q
, NI
*NP
)
692 if __name__
== '__main__':