1 # Defines classes that provide synchronization objects. Note that use of
2 # this module requires that your Python support threads.
4 # condition(lock=None) # a POSIX-like condition-variable object
5 # barrier(n) # an n-thread barrier
6 # event() # an event object
7 # semaphore(n=1) # a semaphore object, with initial count n
8 # mrsw() # a multiple-reader single-writer lock
12 # A condition object is created via
14 # your_condition_object = this_module.condition(lock=None)
16 # As explained below, a condition object has a lock associated with it,
17 # used in the protocol to protect condition data. You can specify a
18 # lock to use in the constructor, else the constructor will allocate
19 # an anonymous lock for you. Specifying a lock explicitly can be useful
20 # when more than one condition keys off the same set of shared data.
24 # acquire the lock associated with the condition
26 # release the lock associated with the condition
28 # block the thread until such time as some other thread does a
29 # .signal or .broadcast on the same condition, and release the
30 # lock associated with the condition. The lock associated with
31 # the condition MUST be in the acquired state at the time
34 # wake up exactly one thread (if any) that previously did a .wait
35 # on the condition; that thread will awaken with the lock associated
36 # with the condition in the acquired state. If no threads are
37 # .wait'ing, this is a nop. If more than one thread is .wait'ing on
38 # the condition, any of them may be awakened.
40 # wake up all threads (if any) that are .wait'ing on the condition;
41 # the threads are woken up serially, each with the lock in the
42 # acquired state, so should .release() as soon as possible. If no
43 # threads are .wait'ing, this is a nop.
45 # Note that if a thread does a .wait *while* a signal/broadcast is
46 # in progress, it's guaranteeed to block until a subsequent
49 # Secret feature: `broadcast' actually takes an integer argument,
50 # and will wake up exactly that many waiting threads (or the total
51 # number waiting, if that's less). Use of this is dubious, though,
52 # and probably won't be supported if this form of condition is
55 # DIFFERENCES FROM POSIX
57 # + A separate mutex is not needed to guard condition data. Instead, a
58 # condition object can (must) be .acquire'ed and .release'ed directly.
59 # This eliminates a common error in using POSIX conditions.
61 # + Because of implementation difficulties, a POSIX `signal' wakes up
62 # _at least_ one .wait'ing thread. Race conditions make it difficult
63 # to stop that. This implementation guarantees to wake up only one,
64 # but you probably shouldn't rely on that.
68 # Condition objects are used to block threads until "some condition" is
69 # true. E.g., a thread may wish to wait until a producer pumps out data
70 # for it to consume, or a server may wish to wait until someone requests
71 # its services, or perhaps a whole bunch of threads want to wait until a
72 # preceding pass over the data is complete. Early models for conditions
73 # relied on some other thread figuring out when a blocked thread's
74 # condition was true, and made the other thread responsible both for
75 # waking up the blocked thread and guaranteeing that it woke up with all
76 # data in a correct state. This proved to be very delicate in practice,
77 # and gave conditions a bad name in some circles.
79 # The POSIX model addresses these problems by making a thread responsible
80 # for ensuring that its own state is correct when it wakes, and relies
81 # on a rigid protocol to make this easy; so long as you stick to the
82 # protocol, POSIX conditions are easy to "get right":
84 # A) The thread that's waiting for some arbitrarily-complex condition
85 # (ACC) to become true does:
88 # while not (code to evaluate the ACC):
90 # # That blocks the thread, *and* releases the lock. When a
91 # # condition.signal() happens, it will wake up some thread that
92 # # did a .wait, *and* acquire the lock again before .wait
95 # # Because the lock is acquired at this point, the state used
96 # # in evaluating the ACC is frozen, so it's safe to go back &
97 # # reevaluate the ACC.
99 # # At this point, ACC is true, and the thread has the condition
101 # # So code here can safely muck with the shared state that
102 # # went into evaluating the ACC -- if it wants to.
103 # # When done mucking with the shared state, do
104 # condition.release()
106 # B) Threads that are mucking with shared state that may affect the
109 # condition.acquire()
110 # # muck with shared state
111 # condition.release()
112 # if it's possible that ACC is true now:
113 # condition.signal() # or .broadcast()
115 # Note: You may prefer to put the "if" clause before the release().
116 # That's fine, but do note that anyone waiting on the signal will
117 # stay blocked until the release() is done (since acquiring the
118 # condition is part of what .wait() does before it returns).
122 # With simpler forms of conditions, it can be impossible to know when
123 # a thread that's supposed to do a .wait has actually done it. But
124 # because this form of condition releases a lock as _part_ of doing a
125 # wait, the state of that lock can be used to guarantee it.
127 # E.g., suppose thread A spawns thread B and later wants to wait for B to
132 # B_done = condition() ... do work ...
133 # B_done.acquire() B_done.acquire(); B_done.release()
134 # spawn B B_done.signal()
135 # ... some time later ... ... and B exits ...
138 # Because B_done was in the acquire'd state at the time B was spawned,
139 # B's attempt to acquire B_done can't succeed until A has done its
140 # B_done.wait() (which releases B_done). So B's B_done.signal() is
141 # guaranteed to be seen by the .wait(). Without the lock trick, B
142 # may signal before A .waits, and then A would wait forever.
146 # A barrier object is created via
148 # your_barrier = this_module.barrier(num_threads)
152 # the thread blocks until num_threads threads in all have done
153 # .enter(). Then the num_threads threads that .enter'ed resume,
154 # and the barrier resets to capture the next num_threads threads
159 # An event object is created via
161 # your_event = this_module.event()
163 # An event has two states, `posted' and `cleared'. An event is
164 # created in the cleared state.
169 # Put the event in the posted state, and resume all threads
170 # .wait'ing on the event (if any).
173 # Put the event in the cleared state.
176 # Returns 0 if the event is in the cleared state, or 1 if the event
177 # is in the posted state.
180 # If the event is in the posted state, returns immediately.
181 # If the event is in the cleared state, blocks the calling thread
182 # until the event is .post'ed by another thread.
184 # Note that an event, once posted, remains posted until explicitly
185 # cleared. Relative to conditions, this is both the strength & weakness
186 # of events. It's a strength because the .post'ing thread doesn't have to
187 # worry about whether the threads it's trying to communicate with have
188 # already done a .wait (a condition .signal is seen only by threads that
189 # do a .wait _prior_ to the .signal; a .signal does not persist). But
190 # it's a weakness because .clear'ing an event is error-prone: it's easy
191 # to mistakenly .clear an event before all the threads you intended to
192 # see the event get around to .wait'ing on it. But so long as you don't
193 # need to .clear an event, events are easy to use safely.
197 # A semaphore object is created via
199 # your_semaphore = this_module.semaphore(count=1)
201 # A semaphore has an integer count associated with it. The initial value
202 # of the count is specified by the optional argument (which defaults to
203 # 1) passed to the semaphore constructor.
208 # If the semaphore's count is greater than 0, decrements the count
210 # Else if the semaphore's count is 0, blocks the calling thread
211 # until a subsequent .v() increases the count. When that happens,
212 # the count will be decremented by 1 and the calling thread resumed.
215 # Increments the semaphore's count by 1, and wakes up a thread (if
216 # any) blocked by a .p(). It's an (detected) error for a .v() to
217 # increase the semaphore's count to a value larger than the initial
220 # MULTIPLE-READER SINGLE-WRITER LOCKS
222 # A mrsw lock is created via
224 # your_mrsw_lock = this_module.mrsw()
226 # This kind of lock is often useful with complex shared data structures.
227 # The object lets any number of "readers" proceed, so long as no thread
228 # wishes to "write". When a (one or more) thread declares its intention
229 # to "write" (e.g., to update a shared structure), all current readers
230 # are allowed to finish, and then a writer gets exclusive access; all
231 # other readers & writers are blocked until the current writer completes.
232 # Finally, if some thread is waiting to write and another is waiting to
233 # read, the writer takes precedence.
238 # If no thread is writing or waiting to write, returns immediately.
239 # Else blocks until no thread is writing or waiting to write. So
240 # long as some thread has completed a .read_in but not a .read_out,
241 # writers are blocked.
244 # Use sometime after a .read_in to declare that the thread is done
245 # reading. When all threads complete reading, a writer can proceed.
248 # If no thread is writing (has completed a .write_in, but hasn't yet
249 # done a .write_out) or reading (similarly), returns immediately.
250 # Else blocks the calling thread, and threads waiting to read, until
251 # the current writer completes writing or all the current readers
252 # complete reading; if then more than one thread is waiting to
253 # write, one of them is allowed to proceed, but which one is not
257 # Use sometime after a .write_in to declare that the thread is done
258 # writing. Then if some other thread is waiting to write, it's
259 # allowed to proceed. Else all threads (if any) waiting to read are
260 # allowed to proceed.
263 # Use instead of a .write_in to declare that the thread is done
264 # writing but wants to continue reading without other writers
265 # intervening. If there are other threads waiting to write, they
266 # are allowed to proceed only if the current thread calls
267 # .read_out; threads waiting to read are only allowed to proceed
268 # if there are are no threads waiting to write. (This is a
269 # weakness of the interface!)
274 def __init__(self
, lock
=None):
275 # the lock actually used by .acquire() and .release()
277 self
.mutex
= thread
.allocate_lock()
279 if hasattr(lock
, 'acquire') and \
280 hasattr(lock
, 'release'):
283 raise TypeError, 'condition constructor requires ' \
286 # lock used to block threads until a signal
287 self
.checkout
= thread
.allocate_lock()
288 self
.checkout
.acquire()
290 # internal critical-section lock, & the data it protects
291 self
.idlock
= thread
.allocate_lock()
293 self
.waiting
= 0 # num waiters subject to current release
294 self
.pending
= 0 # num waiters awaiting next signal
295 self
.torelease
= 0 # num waiters to release
296 self
.releasing
= 0 # 1 iff release is in progress
305 mutex
, checkout
, idlock
= self
.mutex
, self
.checkout
, self
.idlock
306 if not mutex
.locked():
308 "condition must be .acquire'd when .wait() invoked"
312 self
.pending
= self
.pending
+ 1
318 checkout
.acquire(); idlock
.acquire()
321 checkout
.release(); idlock
.release()
323 self
.waiting
= self
.waiting
- 1
324 self
.torelease
= self
.torelease
- 1
329 if self
.waiting
== self
.pending
== 0:
337 def broadcast(self
, num
= -1):
339 raise ValueError, '.broadcast called with num ' + `num`
342 self
.idlock
.acquire()
344 self
.waiting
= self
.waiting
+ self
.pending
346 self
.id = self
.id + 1
348 self
.torelease
= self
.waiting
350 self
.torelease
= min( self
.waiting
,
351 self
.torelease
+ num
)
352 if self
.torelease
and not self
.releasing
:
354 self
.checkout
.release()
355 self
.idlock
.release()
358 def __init__(self
, n
):
361 self
.full
= condition()
366 self
.togo
= self
.togo
- 1
377 self
.posted
= condition()
380 self
.posted
.acquire()
382 self
.posted
.broadcast()
383 self
.posted
.release()
386 self
.posted
.acquire()
388 self
.posted
.release()
391 self
.posted
.acquire()
393 self
.posted
.release()
397 self
.posted
.acquire()
400 self
.posted
.release()
403 def __init__(self
, count
=1):
405 raise ValueError, 'semaphore count %d; must be >= 1' % count
407 self
.maxcount
= count
408 self
.nonzero
= condition()
411 self
.nonzero
.acquire()
412 while self
.count
== 0:
414 self
.count
= self
.count
- 1
415 self
.nonzero
.release()
418 self
.nonzero
.acquire()
419 if self
.count
== self
.maxcount
:
420 raise ValueError, '.v() tried to raise semaphore count above ' \
421 'initial value ' + `maxcount`
422 self
.count
= self
.count
+ 1
423 self
.nonzero
.signal()
424 self
.nonzero
.release()
428 # critical-section lock & the data it protects
429 self
.rwOK
= thread
.allocate_lock()
430 self
.nr
= 0 # number readers actively reading (not just waiting)
431 self
.nw
= 0 # number writers either waiting to write or writing
432 self
.writing
= 0 # 1 iff some thread is writing
435 self
.readOK
= condition(self
.rwOK
) # OK to unblock readers
436 self
.writeOK
= condition(self
.rwOK
) # OK to unblock writers
442 self
.nr
= self
.nr
+ 1
449 '.read_out() invoked without an active reader'
450 self
.nr
= self
.nr
- 1
452 self
.writeOK
.signal()
457 self
.nw
= self
.nw
+ 1
458 while self
.writing
or self
.nr
:
467 '.write_out() invoked without an active writer'
469 self
.nw
= self
.nw
- 1
471 self
.writeOK
.signal()
473 self
.readOK
.broadcast()
476 def write_to_read(self
):
480 '.write_to_read() invoked without an active writer'
482 self
.nw
= self
.nw
- 1
483 self
.nr
= self
.nr
+ 1
485 self
.readOK
.broadcast()
488 # The rest of the file is a test case, that runs a number of parallelized
489 # quicksorts in parallel. If it works, you'll get about 600 lines of
490 # tracing output, with a line like
491 # test passed! 209 threads created in all
492 # as the last line. The content and order of preceding lines will
495 def _new_thread(func
, *args
):
497 tid
.acquire(); id = TID
= TID
+1; tid
.release()
498 io
.acquire(); alive
.append(id); \
499 print 'starting thread', id, '--', len(alive
), 'alive'; \
501 thread
.start_new_thread( func
, (id,) + args
)
503 def _qsort(tid
, a
, l
, r
, finished
):
504 # sort a[l:r]; post finished when done
505 io
.acquire(); print 'thread', tid
, 'qsort', l
, r
; io
.release()
508 j
= l
+1 # make a[l:j] <= pivot, and a[j:r] > pivot
509 for i
in range(j
, r
):
511 a
[j
], a
[i
] = a
[i
], a
[j
]
513 a
[l
], a
[j
-1] = a
[j
-1], pivot
515 l_subarray_sorted
= event()
516 r_subarray_sorted
= event()
517 _new_thread(_qsort
, a
, l
, j
-1, l_subarray_sorted
)
518 _new_thread(_qsort
, a
, j
, r
, r_subarray_sorted
)
519 l_subarray_sorted
.wait()
520 r_subarray_sorted
.wait()
522 io
.acquire(); print 'thread', tid
, 'qsort done'; \
523 alive
.remove(tid
); io
.release()
526 def _randarray(tid
, a
, finished
):
527 io
.acquire(); print 'thread', tid
, 'randomizing array'; \
529 for i
in range(1, len(a
)):
530 wh
.acquire(); j
= randint(0,i
); wh
.release()
531 a
[i
], a
[j
] = a
[j
], a
[i
]
532 io
.acquire(); print 'thread', tid
, 'randomizing done'; \
533 alive
.remove(tid
); io
.release()
537 if a
!= range(len(a
)):
538 raise ValueError, ('a not sorted', a
)
540 def _run_one_sort(tid
, a
, bar
, done
):
541 # randomize a, and quicksort it
542 # for variety, all the threads running this enter a barrier
543 # at the end, and post `done' after the barrier exits
544 io
.acquire(); print 'thread', tid
, 'randomizing', a
; \
547 _new_thread(_randarray
, a
, finished
)
550 io
.acquire(); print 'thread', tid
, 'sorting', a
; io
.release()
552 _new_thread(_qsort
, a
, 0, len(a
), finished
)
556 io
.acquire(); print 'thread', tid
, 'entering barrier'; \
559 io
.acquire(); print 'thread', tid
, 'leaving barrier'; \
561 io
.acquire(); alive
.remove(tid
); io
.release()
562 bar
.enter() # make sure they've all removed themselves from alive
563 ## before 'done' is posted
564 bar
.enter() # just to be cruel
568 global TID
, tid
, io
, wh
, randint
, alive
570 randint
= random
.randint
572 TID
= 0 # thread ID (1, 2, ...)
573 tid
= thread
.allocate_lock() # for changing TID
574 io
= thread
.allocate_lock() # for printing, and 'alive'
575 wh
= thread
.allocate_lock() # for calls to random
576 alive
= [] # IDs of active threads
580 for i
in range(NSORTS
):
581 arrays
.append( range( (i
+1)*10 ) )
583 bar
= barrier(NSORTS
)
585 for i
in range(NSORTS
):
586 _new_thread(_run_one_sort
, arrays
[i
], bar
, finished
)
589 print 'all threads done, and checking results ...'
591 raise ValueError, ('threads still alive at end', alive
)
592 for i
in range(NSORTS
):
594 if len(a
) != (i
+1)*10:
595 raise ValueError, ('length of array', i
, 'screwed up')
598 print 'test passed!', TID
, 'threads created in all'
600 if __name__
== '__main__':