1 """Proposed new threading module, emulating a subset of Java's threading model."""
9 # Rename some stuff so "from threading import *" is safe
18 _start_new_thread
= thread
.start_new_thread
19 _allocate_lock
= thread
.allocate_lock
20 _get_ident
= thread
.get_ident
23 _print_exc
= traceback
.print_exc
26 _StringIO
= StringIO
.StringIO
30 # Debug support (adapted from ihooks.py)
38 def __init__(self
, verbose
=None):
41 self
.__verbose
= verbose
43 def _note(self
, format
, *args
):
45 format
= format
% args
46 format
= "%s: %s\n" % (
47 currentThread().getName(), format
)
48 _sys
.stderr
.write(format
)
51 # Disable this when using "python -O"
53 def __init__(self
, verbose
=None):
55 def _note(self
, *args
):
59 # Synchronization classes
63 def RLock(*args
, **kwargs
):
64 return apply(_RLock
, args
, kwargs
)
66 class _RLock(_Verbose
):
68 def __init__(self
, verbose
=None):
69 _Verbose
.__init
__(self
, verbose
)
70 self
.__block
= _allocate_lock()
75 return "<%s(%s, %d)>" % (
76 self
.__class
__.__name
__,
77 self
.__owner
and self
.__owner
.getName(),
80 def acquire(self
, blocking
=1):
82 if self
.__owner
is me
:
83 self
.__count
= self
.__count
+ 1
85 self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
87 rc
= self
.__block
.acquire(blocking
)
92 self
._note
("%s.acquire(%s): initial succes", self
, blocking
)
95 self
._note
("%s.acquire(%s): failure", self
, blocking
)
100 assert self
.__owner
is me
, "release() of un-acquire()d lock"
101 self
.__count
= count
= self
.__count
- 1
104 self
.__block
.release()
106 self
._note
("%s.release(): final release", self
)
109 self
._note
("%s.release(): non-final release", self
)
111 # Internal methods used by condition variables
113 def _acquire_restore(self
, (count
, owner
)):
114 self
.__block
.acquire()
118 self
._note
("%s._acquire_restore()", self
)
120 def _release_save(self
):
122 self
._note
("%s._release_save()", self
)
127 self
.__block
.release()
128 return (count
, owner
)
131 return self
.__owner
is currentThread()
134 def Condition(*args
, **kwargs
):
135 return apply(_Condition
, args
, kwargs
)
137 class _Condition(_Verbose
):
139 def __init__(self
, lock
=None, verbose
=None):
140 _Verbose
.__init
__(self
, verbose
)
144 # Export the lock's acquire() and release() methods
145 self
.acquire
= lock
.acquire
146 self
.release
= lock
.release
147 # If the lock defines _release_save() and/or _acquire_restore(),
148 # these override the default implementations (which just call
149 # release() and acquire() on the lock). Ditto for _is_owned().
151 self
._release
_save
= lock
._release
_save
152 except AttributeError:
155 self
._acquire
_restore
= lock
._acquire
_restore
156 except AttributeError:
159 self
._is
_owned
= lock
._is
_owned
160 except AttributeError:
165 return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
167 def _release_save(self
):
168 self
.__lock
.release() # No state to save
170 def _acquire_restore(self
, x
):
171 self
.__lock
.acquire() # Ignore saved state
174 if self
.__lock
.acquire(0):
175 self
.__lock
.release()
180 def wait(self
, timeout
=None):
182 assert self
._is
_owned
(), "wait() of un-acquire()d lock"
183 waiter
= _allocate_lock()
185 self
.__waiters
.append(waiter
)
186 saved_state
= self
._release
_save
()
190 self
._note
("%s.wait(): got it", self
)
192 endtime
= _time() + timeout
193 delay
= 0.000001 # 1 usec
195 gotit
= waiter
.acquire(0)
196 if gotit
or _time() >= endtime
:
203 self
._note
("%s.wait(%s): timed out", self
, timeout
)
205 self
.__waiters
.remove(waiter
)
210 self
._note
("%s.wait(%s): got it", self
, timeout
)
211 self
._acquire
_restore
(saved_state
)
213 def notify(self
, n
=1):
215 assert self
._is
_owned
(), "notify() of un-acquire()d lock"
216 __waiters
= self
.__waiters
217 waiters
= __waiters
[:n
]
220 self
._note
("%s.notify(): no waiters", self
)
222 self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
224 for waiter
in waiters
:
227 __waiters
.remove(waiter
)
232 self
.notify(len(self
.__waiters
))
235 def Semaphore(*args
, **kwargs
):
236 return apply(_Semaphore
, args
, kwargs
)
238 class _Semaphore(_Verbose
):
240 # After Tim Peters' semaphore class, but bnot quite the same (no maximum)
242 def __init__(self
, value
=1, verbose
=None):
243 assert value
>= 0, "Semaphore initial value must be >= 0"
244 _Verbose
.__init
__(self
, verbose
)
245 self
.__cond
= Condition(Lock())
248 def acquire(self
, blocking
=1):
250 self
.__cond
.acquire()
251 while self
.__value
== 0:
256 self
.__value
= self
.__value
- 1
258 self
.__cond
.release()
262 self
.__cond
.acquire()
263 self
.__value
= self
.__value
+ 1
265 self
.__cond
.release()
268 def Event(*args
, **kwargs
):
269 return apply(_Event
, args
, kwargs
)
271 class _Event(_Verbose
):
273 # After Tim Peters' event class (without is_posted())
275 def __init__(self
, verbose
=None):
276 _Verbose
.__init
__(self
, verbose
)
277 self
.__cond
= Condition(Lock())
284 self
.__cond
.acquire()
286 self
.__cond
.notifyAll()
287 self
.__cond
.release()
290 self
.__cond
.acquire()
292 self
.__cond
.release()
294 def wait(self
, timeout
=None):
295 self
.__cond
.acquire()
297 self
.__cond
.wait(timeout
)
298 self
.__cond
.release()
301 # Helper to generate new thread names
303 def _newname(template
="Thread-%d"):
305 _counter
= _counter
+ 1
306 return template
% _counter
308 # Active thread administration
309 _active_limbo_lock
= _allocate_lock()
314 # Main class for threads
316 class Thread(_Verbose
):
320 def __init__(self
, group
=None, target
=None, name
=None,
321 args
=(), kwargs
={}, verbose
=None):
322 assert group
is None, "group argument must be None for now"
323 _Verbose
.__init
__(self
, verbose
)
324 self
.__target
= target
325 self
.__name
= str(name
or _newname())
327 self
.__kwargs
= kwargs
328 self
.__daemonic
= self
._set
_daemon
()
331 self
.__block
= Condition(Lock())
332 self
.__initialized
= 1
334 def _set_daemon(self
):
335 # Overridden in _MainThread and _DummyThread
336 return currentThread().isDaemon()
339 assert self
.__initialized
, "Thread.__init__() was not called"
346 status
= status
+ " daemon"
347 return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
350 assert self
.__initialized
, "Thread.__init__() not called"
351 assert not self
.__started
, "thread already started"
353 self
._note
("%s.start(): starting thread", self
)
354 _active_limbo_lock
.acquire()
356 _active_limbo_lock
.release()
357 _start_new_thread(self
.__bootstrap
, ())
359 _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
363 apply(self
.__target
, self
.__args
, self
.__kwargs
)
365 def __bootstrap(self
):
368 _active_limbo_lock
.acquire()
369 _active
[_get_ident()] = self
371 _active_limbo_lock
.release()
373 self
._note
("%s.__bootstrap(): thread started", self
)
378 self
._note
("%s.__bootstrap(): raised SystemExit", self
)
381 self
._note
("%s.__bootstrap(): unhandled exception", self
)
384 _sys
.stderr
.write("Exception in thread %s:\n%s\n" %
385 (self
.getName(), s
.getvalue()))
388 self
._note
("%s.__bootstrap(): normal return", self
)
394 self
.__block
.acquire()
396 self
.__block
.notifyAll()
397 self
.__block
.release()
400 _active_limbo_lock
.acquire()
401 del _active
[_get_ident()]
402 _active_limbo_lock
.release()
404 def join(self
, timeout
=None):
405 assert self
.__initialized
, "Thread.__init__() not called"
406 assert self
.__started
, "cannot join thread before it is started"
407 assert self
is not currentThread(), "cannot join current thread"
409 if not self
.__stopped
:
410 self
._note
("%s.join(): waiting until thread stops", self
)
411 self
.__block
.acquire()
413 while not self
.__stopped
:
416 self
._note
("%s.join(): thread stopped", self
)
418 deadline
= _time() + timeout
419 while not self
.__stopped
:
420 delay
= deadline
- _time()
423 self
._note
("%s.join(): timed out", self
)
425 self
.__block
.wait(delay
)
428 self
._note
("%s.join(): thread stopped", self
)
429 self
.__block
.release()
432 assert self
.__initialized
, "Thread.__init__() not called"
435 def setName(self
, name
):
436 assert self
.__initialized
, "Thread.__init__() not called"
437 self
.__name
= str(name
)
440 assert self
.__initialized
, "Thread.__init__() not called"
441 return self
.__started
and not self
.__stopped
444 assert self
.__initialized
, "Thread.__init__() not called"
445 return self
.__daemonic
447 def setDaemon(self
, daemonic
):
448 assert self
.__initialized
, "Thread.__init__() not called"
449 assert not self
.__started
, "cannot set daemon status of active thread"
450 self
.__daemonic
= daemonic
453 # Special thread class to represent the main thread
454 # This is garbage collected through an exit handler
456 class _MainThread(Thread
):
459 Thread
.__init
__(self
, name
="MainThread")
460 self
._Thread
__started
= 1
461 _active_limbo_lock
.acquire()
462 _active
[_get_ident()] = self
463 _active_limbo_lock
.release()
465 self
.__oldexitfunc
= _sys
.exitfunc
466 except AttributeError:
467 self
.__oldexitfunc
= None
468 _sys
.exitfunc
= self
.__exitfunc
470 def _set_daemon(self
):
473 def __exitfunc(self
):
475 t
= _pickSomeNonDaemonThread()
478 self
._note
("%s: waiting for other threads", self
)
481 t
= _pickSomeNonDaemonThread()
482 if self
.__oldexitfunc
:
484 self
._note
("%s: calling exit handler", self
)
487 self
._note
("%s: exiting", self
)
488 self
._Thread
__delete
()
490 def _pickSomeNonDaemonThread():
491 for t
in enumerate():
492 if not t
.isDaemon() and t
.isAlive():
497 # Dummy thread class to represent threads not started here.
498 # These aren't garbage collected when they die,
499 # nor can they be waited for.
500 # Their purpose is to return *something* from currentThread().
501 # They are marked as daemon threads so we won't wait for them
502 # when we exit (conform previous semantics).
504 class _DummyThread(Thread
):
507 Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
508 self
._Thread
__started
= 1
509 _active_limbo_lock
.acquire()
510 _active
[_get_ident()] = self
511 _active_limbo_lock
.release()
513 def _set_daemon(self
):
517 assert 0, "cannot join a dummy thread"
520 # Global API functions
524 return _active
[_get_ident()]
526 print "currentThread(): no current thread for", _get_ident()
527 return _DummyThread()
530 _active_limbo_lock
.acquire()
531 count
= len(_active
) + len(_limbo
)
532 _active_limbo_lock
.release()
536 _active_limbo_lock
.acquire()
537 active
= _active
.values() + _limbo
.values()
538 _active_limbo_lock
.release()
542 # Create the main thread object
553 class BoundedQueue(_Verbose
):
555 def __init__(self
, limit
):
556 _Verbose
.__init
__(self
)
558 self
.rc
= Condition(self
.mon
)
559 self
.wc
= Condition(self
.mon
)
565 while len(self
.queue
) >= self
.limit
:
566 self
._note
("put(%s): queue full", item
)
568 self
.queue
.append(item
)
569 self
._note
("put(%s): appended, length now %d",
570 item
, len(self
.queue
))
576 while not self
.queue
:
577 self
._note
("get(): queue empty")
581 self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
586 class ProducerThread(Thread
):
588 def __init__(self
, queue
, quota
):
589 Thread
.__init
__(self
, name
="Producer")
594 from random
import random
596 while counter
< self
.quota
:
597 counter
= counter
+ 1
598 self
.queue
.put("%s.%d" % (self
.getName(), counter
))
599 _sleep(random() * 0.00001)
602 class ConsumerThread(Thread
):
604 def __init__(self
, queue
, count
):
605 Thread
.__init
__(self
, name
="Consumer")
610 while self
.count
> 0:
611 item
= self
.queue
.get()
613 self
.count
= self
.count
- 1
624 t
= ProducerThread(Q
, NI
)
625 t
.setName("Producer-%d" % (i
+1))
627 C
= ConsumerThread(Q
, NI
*NP
)
636 if __name__
== '__main__':