1 """Thread module emulating a subset of Java's threading model."""
8 del _sys
.modules
[__name__
]
13 from time
import time
as _time
, sleep
as _sleep
14 from traceback
import format_exc
as _format_exc
15 from collections
import deque
17 # Note regarding PEP 8 compliant aliases
18 # This threading model was originally inspired by Java, and inherited
19 # the convention of camelCase function and method names from that
20 # language. While those names are not in any imminent danger of being
21 # deprecated, starting with Python 2.6, the module now provides a
22 # PEP 8 compliant alias for any such method name.
23 # Using the new PEP 8 compliant names also facilitates substitution
24 # with the multiprocessing module, which doesn't provide the old
25 # Java inspired names.
28 # Rename some stuff so "from threading import *" is safe
29 __all__
= ['activeCount', 'active_count', 'Condition', 'currentThread',
30 'current_thread', 'enumerate', 'Event',
31 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
32 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
34 _start_new_thread
= thread
.start_new_thread
35 _allocate_lock
= thread
.allocate_lock
36 _get_ident
= thread
.get_ident
37 ThreadError
= thread
.error
41 # sys.exc_clear is used to work around the fact that except blocks
42 # don't fully clear the exception until 3.0.
43 warnings
.filterwarnings('ignore', category
=DeprecationWarning,
44 module
='threading', message
='sys.exc_clear')
46 # Debug support (adapted from ihooks.py).
47 # All the major classes here derive from _Verbose. We force that to
48 # be a new-style class so that all the major classes here are new-style.
49 # This helps debugging (type(instance) is more revealing for instances
50 # of new-style classes).
56 class _Verbose(object):
58 def __init__(self
, verbose
=None):
61 self
.__verbose
= verbose
63 def _note(self
, format
, *args
):
65 format
= format
% args
66 format
= "%s: %s\n" % (
67 current_thread().name
, format
)
68 _sys
.stderr
.write(format
)
71 # Disable this when using "python -O"
72 class _Verbose(object):
73 def __init__(self
, verbose
=None):
75 def _note(self
, *args
):
78 # Support for profile and trace hooks
91 # Synchronization classes
95 def RLock(*args
, **kwargs
):
96 return _RLock(*args
, **kwargs
)
98 class _RLock(_Verbose
):
100 def __init__(self
, verbose
=None):
101 _Verbose
.__init
__(self
, verbose
)
102 self
.__block
= _allocate_lock()
109 owner
= _active
[owner
].name
112 return "<%s owner=%r count=%d>" % (
113 self
.__class
__.__name
__, owner
, self
.__count
)
115 def acquire(self
, blocking
=1):
117 if self
.__owner
== me
:
118 self
.__count
= self
.__count
+ 1
120 self
._note
("%s.acquire(%s): recursive success", self
, blocking
)
122 rc
= self
.__block
.acquire(blocking
)
127 self
._note
("%s.acquire(%s): initial success", self
, blocking
)
130 self
._note
("%s.acquire(%s): failure", self
, blocking
)
136 if self
.__owner
!= _get_ident():
137 raise RuntimeError("cannot release un-acquired lock")
138 self
.__count
= count
= self
.__count
- 1
141 self
.__block
.release()
143 self
._note
("%s.release(): final release", self
)
146 self
._note
("%s.release(): non-final release", self
)
148 def __exit__(self
, t
, v
, tb
):
151 # Internal methods used by condition variables
153 def _acquire_restore(self
, count_owner
):
154 count
, owner
= count_owner
155 self
.__block
.acquire()
159 self
._note
("%s._acquire_restore()", self
)
161 def _release_save(self
):
163 self
._note
("%s._release_save()", self
)
168 self
.__block
.release()
169 return (count
, owner
)
172 return self
.__owner
== _get_ident()
175 def Condition(*args
, **kwargs
):
176 return _Condition(*args
, **kwargs
)
178 class _Condition(_Verbose
):
180 def __init__(self
, lock
=None, verbose
=None):
181 _Verbose
.__init
__(self
, verbose
)
185 # Export the lock's acquire() and release() methods
186 self
.acquire
= lock
.acquire
187 self
.release
= lock
.release
188 # If the lock defines _release_save() and/or _acquire_restore(),
189 # these override the default implementations (which just call
190 # release() and acquire() on the lock). Ditto for _is_owned().
192 self
._release
_save
= lock
._release
_save
193 except AttributeError:
196 self
._acquire
_restore
= lock
._acquire
_restore
197 except AttributeError:
200 self
._is
_owned
= lock
._is
_owned
201 except AttributeError:
206 return self
.__lock
.__enter
__()
208 def __exit__(self
, *args
):
209 return self
.__lock
.__exit
__(*args
)
212 return "<Condition(%s, %d)>" % (self
.__lock
, len(self
.__waiters
))
214 def _release_save(self
):
215 self
.__lock
.release() # No state to save
217 def _acquire_restore(self
, x
):
218 self
.__lock
.acquire() # Ignore saved state
221 # Return True if lock is owned by current_thread.
222 # This method is called only if __lock doesn't have _is_owned().
223 if self
.__lock
.acquire(0):
224 self
.__lock
.release()
229 def wait(self
, timeout
=None):
230 if not self
._is
_owned
():
231 raise RuntimeError("cannot wait on un-acquired lock")
232 waiter
= _allocate_lock()
234 self
.__waiters
.append(waiter
)
235 saved_state
= self
._release
_save
()
236 try: # restore state no matter what (e.g., KeyboardInterrupt)
240 self
._note
("%s.wait(): got it", self
)
242 # Balancing act: We can't afford a pure busy loop, so we
243 # have to sleep; but if we sleep the whole timeout time,
244 # we'll be unresponsive. The scheme here sleeps very
245 # little at first, longer as time goes on, but never longer
246 # than 20 times per second (or the timeout time remaining).
247 endtime
= _time() + timeout
248 delay
= 0.0005 # 500 us -> initial delay of 1 ms
250 gotit
= waiter
.acquire(0)
253 remaining
= endtime
- _time()
256 delay
= min(delay
* 2, remaining
, .05)
260 self
._note
("%s.wait(%s): timed out", self
, timeout
)
262 self
.__waiters
.remove(waiter
)
267 self
._note
("%s.wait(%s): got it", self
, timeout
)
269 self
._acquire
_restore
(saved_state
)
271 def notify(self
, n
=1):
272 if not self
._is
_owned
():
273 raise RuntimeError("cannot notify on un-acquired lock")
274 __waiters
= self
.__waiters
275 waiters
= __waiters
[:n
]
278 self
._note
("%s.notify(): no waiters", self
)
280 self
._note
("%s.notify(): notifying %d waiter%s", self
, n
,
282 for waiter
in waiters
:
285 __waiters
.remove(waiter
)
290 self
.notify(len(self
.__waiters
))
292 notify_all
= notifyAll
295 def Semaphore(*args
, **kwargs
):
296 return _Semaphore(*args
, **kwargs
)
298 class _Semaphore(_Verbose
):
300 # After Tim Peters' semaphore class, but not quite the same (no maximum)
302 def __init__(self
, value
=1, verbose
=None):
304 raise ValueError("semaphore initial value must be >= 0")
305 _Verbose
.__init
__(self
, verbose
)
306 self
.__cond
= Condition(Lock())
309 def acquire(self
, blocking
=1):
311 self
.__cond
.acquire()
312 while self
.__value
== 0:
316 self
._note
("%s.acquire(%s): blocked waiting, value=%s",
317 self
, blocking
, self
.__value
)
320 self
.__value
= self
.__value
- 1
322 self
._note
("%s.acquire: success, value=%s",
325 self
.__cond
.release()
331 self
.__cond
.acquire()
332 self
.__value
= self
.__value
+ 1
334 self
._note
("%s.release: success, value=%s",
337 self
.__cond
.release()
339 def __exit__(self
, t
, v
, tb
):
343 def BoundedSemaphore(*args
, **kwargs
):
344 return _BoundedSemaphore(*args
, **kwargs
)
346 class _BoundedSemaphore(_Semaphore
):
347 """Semaphore that checks that # releases is <= # acquires"""
348 def __init__(self
, value
=1, verbose
=None):
349 _Semaphore
.__init
__(self
, value
, verbose
)
350 self
._initial
_value
= value
353 if self
._Semaphore
__value
>= self
._initial
_value
:
354 raise ValueError, "Semaphore released too many times"
355 return _Semaphore
.release(self
)
358 def Event(*args
, **kwargs
):
359 return _Event(*args
, **kwargs
)
361 class _Event(_Verbose
):
363 # After Tim Peters' event class (without is_posted())
365 def __init__(self
, verbose
=None):
366 _Verbose
.__init
__(self
, verbose
)
367 self
.__cond
= Condition(Lock())
376 self
.__cond
.acquire()
379 self
.__cond
.notify_all()
381 self
.__cond
.release()
384 self
.__cond
.acquire()
388 self
.__cond
.release()
390 def wait(self
, timeout
=None):
391 self
.__cond
.acquire()
394 self
.__cond
.wait(timeout
)
397 self
.__cond
.release()
399 # Helper to generate new thread names
401 def _newname(template
="Thread-%d"):
403 _counter
= _counter
+ 1
404 return template
% _counter
406 # Active thread administration
407 _active_limbo_lock
= _allocate_lock()
408 _active
= {} # maps thread id to Thread object
412 # Main class for threads
414 class Thread(_Verbose
):
416 __initialized
= False
417 # Need to store a reference to sys.exc_info for printing
418 # out exceptions when a thread tries to use a global var. during interp.
419 # shutdown and thus raises an exception about trying to perform some
420 # operation on/with a NoneType
421 __exc_info
= _sys
.exc_info
422 # Keep sys.exc_clear too to clear the exception just before
423 # allowing .join() to return.
424 __exc_clear
= _sys
.exc_clear
426 def __init__(self
, group
=None, target
=None, name
=None,
427 args
=(), kwargs
=None, verbose
=None):
428 assert group
is None, "group argument must be None for now"
429 _Verbose
.__init
__(self
, verbose
)
432 self
.__target
= target
433 self
.__name
= str(name
or _newname())
435 self
.__kwargs
= kwargs
436 self
.__daemonic
= self
._set
_daemon
()
438 self
.__started
= Event()
439 self
.__stopped
= False
440 self
.__block
= Condition(Lock())
441 self
.__initialized
= True
442 # sys.stderr is not stored in the class like
443 # sys.exc_info since it can be changed between instances
444 self
.__stderr
= _sys
.stderr
446 def _set_daemon(self
):
447 # Overridden in _MainThread and _DummyThread
448 return current_thread().daemon
451 assert self
.__initialized
, "Thread.__init__() was not called"
453 if self
.__started
.is_set():
459 if self
.__ident
is not None:
460 status
+= " %s" % self
.__ident
461 return "<%s(%s, %s)>" % (self
.__class
__.__name
__, self
.__name
, status
)
464 if not self
.__initialized
:
465 raise RuntimeError("thread.__init__() not called")
466 if self
.__started
.is_set():
467 raise RuntimeError("threads can only be started once")
469 self
._note
("%s.start(): starting thread", self
)
470 with _active_limbo_lock
:
473 _start_new_thread(self
.__bootstrap
, ())
475 with _active_limbo_lock
:
478 self
.__started
.wait()
483 self
.__target
(*self
.__args
, **self
.__kwargs
)
485 # Avoid a refcycle if the thread is running a function with
486 # an argument that has a member that points to the thread.
487 del self
.__target
, self
.__args
, self
.__kwargs
489 def __bootstrap(self
):
490 # Wrapper around the real bootstrap code that ignores
491 # exceptions during interpreter cleanup. Those typically
492 # happen when a daemon thread wakes up at an unfortunate
493 # moment, finds the world around it destroyed, and raises some
494 # random exception *** while trying to report the exception in
495 # __bootstrap_inner() below ***. Those random exceptions
496 # don't help anybody, and they confuse users, so we suppress
497 # them. We suppress them only when it appears that the world
498 # indeed has already been destroyed, so that exceptions in
499 # __bootstrap_inner() during normal business hours are properly
500 # reported. Also, we only suppress them for daemonic threads;
501 # if a non-daemonic encounters this, something else is wrong.
503 self
.__bootstrap
_inner
()
505 if self
.__daemonic
and _sys
is None:
509 def _set_ident(self
):
510 self
.__ident
= _get_ident()
512 def __bootstrap_inner(self
):
516 with _active_limbo_lock
:
517 _active
[self
.__ident
] = self
520 self
._note
("%s.__bootstrap(): thread started", self
)
523 self
._note
("%s.__bootstrap(): registering trace hook", self
)
524 _sys
.settrace(_trace_hook
)
526 self
._note
("%s.__bootstrap(): registering profile hook", self
)
527 _sys
.setprofile(_profile_hook
)
533 self
._note
("%s.__bootstrap(): raised SystemExit", self
)
536 self
._note
("%s.__bootstrap(): unhandled exception", self
)
537 # If sys.stderr is no more (most likely from interpreter
538 # shutdown) use self.__stderr. Otherwise still use sys (as in
539 # _sys) in case sys.stderr was redefined since the creation of
542 _sys
.stderr
.write("Exception in thread %s:\n%s\n" %
543 (self
.name
, _format_exc()))
545 # Do the best job possible w/o a huge amt. of code to
546 # approximate a traceback (code ideas from
548 exc_type
, exc_value
, exc_tb
= self
.__exc
_info
()
550 print>>self
.__stderr
, (
551 "Exception in thread " + self
.name
+
552 " (most likely raised during interpreter shutdown):")
553 print>>self
.__stderr
, (
554 "Traceback (most recent call last):")
556 print>>self
.__stderr
, (
557 ' File "%s", line %s, in %s' %
558 (exc_tb
.tb_frame
.f_code
.co_filename
,
560 exc_tb
.tb_frame
.f_code
.co_name
))
561 exc_tb
= exc_tb
.tb_next
562 print>>self
.__stderr
, ("%s: %s" % (exc_type
, exc_value
))
563 # Make sure that exc_tb gets deleted since it is a memory
564 # hog; deleting everything else is just for thoroughness
566 del exc_type
, exc_value
, exc_tb
569 self
._note
("%s.__bootstrap(): normal return", self
)
572 # test_threading.test_no_refcycle_through_target when
573 # the exception keeps the target alive past when we
574 # assert that it's dead.
577 with _active_limbo_lock
:
580 # We don't call self.__delete() because it also
581 # grabs _active_limbo_lock.
582 del _active
[_get_ident()]
587 self
.__block
.acquire()
588 self
.__stopped
= True
589 self
.__block
.notify_all()
590 self
.__block
.release()
593 "Remove current thread from the dict of currently running threads."
595 # Notes about running with dummy_thread:
597 # Must take care to not raise an exception if dummy_thread is being
598 # used (and thus this module is being used as an instance of
599 # dummy_threading). dummy_thread.get_ident() always returns -1 since
600 # there is only one thread if dummy_thread is being used. Thus
601 # len(_active) is always <= 1 here, and any Thread instance created
602 # overwrites the (if any) thread currently registered in _active.
604 # An instance of _MainThread is always created by 'threading'. This
605 # gets overwritten the instant an instance of Thread is created; both
606 # threads return -1 from dummy_thread.get_ident() and thus have the
607 # same key in the dict. So when the _MainThread instance created by
608 # 'threading' tries to clean itself up when atexit calls this method
609 # it gets a KeyError if another Thread instance was created.
611 # This all means that KeyError from trying to delete something from
612 # _active if dummy_threading is being used is a red herring. But
613 # since it isn't if dummy_threading is *not* being used then don't
614 # hide the exception.
617 with _active_limbo_lock
:
618 del _active
[_get_ident()]
619 # There must not be any python code between the previous line
620 # and after the lock is released. Otherwise a tracing function
621 # could try to acquire the lock again in the same thread, (in
622 # current_thread()), and would block.
624 if 'dummy_threading' not in _sys
.modules
:
627 def join(self
, timeout
=None):
628 if not self
.__initialized
:
629 raise RuntimeError("Thread.__init__() not called")
630 if not self
.__started
.is_set():
631 raise RuntimeError("cannot join thread before it is started")
632 if self
is current_thread():
633 raise RuntimeError("cannot join current thread")
636 if not self
.__stopped
:
637 self
._note
("%s.join(): waiting until thread stops", self
)
638 self
.__block
.acquire()
641 while not self
.__stopped
:
644 self
._note
("%s.join(): thread stopped", self
)
646 deadline
= _time() + timeout
647 while not self
.__stopped
:
648 delay
= deadline
- _time()
651 self
._note
("%s.join(): timed out", self
)
653 self
.__block
.wait(delay
)
656 self
._note
("%s.join(): thread stopped", self
)
658 self
.__block
.release()
662 assert self
.__initialized
, "Thread.__init__() not called"
666 def name(self
, name
):
667 assert self
.__initialized
, "Thread.__init__() not called"
668 self
.__name
= str(name
)
672 assert self
.__initialized
, "Thread.__init__() not called"
676 assert self
.__initialized
, "Thread.__init__() not called"
677 return self
.__started
.is_set() and not self
.__stopped
683 assert self
.__initialized
, "Thread.__init__() not called"
684 return self
.__daemonic
687 def daemon(self
, daemonic
):
688 if not self
.__initialized
:
689 raise RuntimeError("Thread.__init__() not called")
690 if self
.__started
.is_set():
691 raise RuntimeError("cannot set daemon status of active thread");
692 self
.__daemonic
= daemonic
697 def setDaemon(self
, daemonic
):
698 self
.daemon
= daemonic
703 def setName(self
, name
):
706 # The timer class was contributed by Itamar Shtull-Trauring
708 def Timer(*args
, **kwargs
):
709 return _Timer(*args
, **kwargs
)
711 class _Timer(Thread
):
712 """Call a function after a specified number of seconds:
714 t = Timer(30.0, f, args=[], kwargs={})
716 t.cancel() # stop the timer's action if it's still waiting
719 def __init__(self
, interval
, function
, args
=[], kwargs
={}):
720 Thread
.__init
__(self
)
721 self
.interval
= interval
722 self
.function
= function
725 self
.finished
= Event()
728 """Stop the timer if it hasn't finished yet"""
732 self
.finished
.wait(self
.interval
)
733 if not self
.finished
.is_set():
734 self
.function(*self
.args
, **self
.kwargs
)
737 # Special thread class to represent the main thread
738 # This is garbage collected through an exit handler
740 class _MainThread(Thread
):
743 Thread
.__init
__(self
, name
="MainThread")
744 self
._Thread
__started
.set()
746 with _active_limbo_lock
:
747 _active
[_get_ident()] = self
749 def _set_daemon(self
):
754 t
= _pickSomeNonDaemonThread()
757 self
._note
("%s: waiting for other threads", self
)
760 t
= _pickSomeNonDaemonThread()
762 self
._note
("%s: exiting", self
)
763 self
._Thread
__delete
()
765 def _pickSomeNonDaemonThread():
766 for t
in enumerate():
767 if not t
.daemon
and t
.is_alive():
772 # Dummy thread class to represent threads not started here.
773 # These aren't garbage collected when they die, nor can they be waited for.
774 # If they invoke anything in threading.py that calls current_thread(), they
775 # leave an entry in the _active dict forever after.
776 # Their purpose is to return *something* from current_thread().
777 # They are marked as daemon threads so we won't wait for them
778 # when we exit (conform previous semantics).
780 class _DummyThread(Thread
):
783 Thread
.__init
__(self
, name
=_newname("Dummy-%d"))
785 # Thread.__block consumes an OS-level locking primitive, which
786 # can never be used by a _DummyThread. Since a _DummyThread
787 # instance is immortal, that's bad, so release this resource.
788 del self
._Thread
__block
790 self
._Thread
__started
.set()
792 with _active_limbo_lock
:
793 _active
[_get_ident()] = self
795 def _set_daemon(self
):
798 def join(self
, timeout
=None):
799 assert False, "cannot join a dummy thread"
802 # Global API functions
806 return _active
[_get_ident()]
808 ##print "current_thread(): no current thread for", _get_ident()
809 return _DummyThread()
811 current_thread
= currentThread
814 with _active_limbo_lock
:
815 return len(_active
) + len(_limbo
)
817 active_count
= activeCount
820 # Same as enumerate(), but without the lock. Internal use only.
821 return _active
.values() + _limbo
.values()
824 with _active_limbo_lock
:
825 return _active
.values() + _limbo
.values()
827 from thread
import stack_size
829 # Create the main thread object,
830 # and make it available for the interpreter
831 # (Py_Main) as threading._shutdown.
833 _shutdown
= _MainThread()._exitfunc
835 # get thread-local implementation, either from the thread
836 # module, or from the python fallback
839 from thread
import _local
as local
841 from _threading_local
import local
845 # This function is called by Python/ceval.c:PyEval_ReInitThreads which
846 # is called from PyOS_AfterFork. Here we cleanup threading module state
847 # that should not exist after a fork.
849 # Reset _active_limbo_lock, in case we forked while the lock was held
850 # by another (non-forked) thread. http://bugs.python.org/issue874900
851 global _active_limbo_lock
852 _active_limbo_lock
= _allocate_lock()
854 # fork() only copied the current thread; clear references to others.
856 current
= current_thread()
857 with _active_limbo_lock
:
858 for thread
in _active
.itervalues():
859 if thread
is current
:
860 # There is only one active thread. We reset the ident to
861 # its new value since it can have changed.
863 thread
._Thread
__ident
= ident
864 new_active
[ident
] = thread
866 # All the others are already stopped.
867 # We don't call _Thread__stop() because it tries to acquire
868 # thread._Thread__block which could also have been held while
870 thread
._Thread
__stopped
= True
874 _active
.update(new_active
)
875 assert len(_active
) == 1
882 class BoundedQueue(_Verbose
):
884 def __init__(self
, limit
):
885 _Verbose
.__init
__(self
)
887 self
.rc
= Condition(self
.mon
)
888 self
.wc
= Condition(self
.mon
)
894 while len(self
.queue
) >= self
.limit
:
895 self
._note
("put(%s): queue full", item
)
897 self
.queue
.append(item
)
898 self
._note
("put(%s): appended, length now %d",
899 item
, len(self
.queue
))
905 while not self
.queue
:
906 self
._note
("get(): queue empty")
908 item
= self
.queue
.popleft()
909 self
._note
("get(): got %s, %d left", item
, len(self
.queue
))
914 class ProducerThread(Thread
):
916 def __init__(self
, queue
, quota
):
917 Thread
.__init
__(self
, name
="Producer")
922 from random
import random
924 while counter
< self
.quota
:
925 counter
= counter
+ 1
926 self
.queue
.put("%s.%d" % (self
.name
, counter
))
927 _sleep(random() * 0.00001)
930 class ConsumerThread(Thread
):
932 def __init__(self
, queue
, count
):
933 Thread
.__init
__(self
, name
="Consumer")
938 while self
.count
> 0:
939 item
= self
.queue
.get()
941 self
.count
= self
.count
- 1
950 t
= ProducerThread(Q
, NI
)
951 t
.name
= ("Producer-%d" % (i
+1))
953 C
= ConsumerThread(Q
, NI
*NP
)
962 if __name__
== '__main__':