1 # Defines classes that provide synchronization objects. Note that use of
2 # this module requires that your Python support threads.
4 # condition() # a POSIX-like condition-variable object
5 # barrier(n) # an n-thread barrier
6 # event() # an event object
7 # semaphore(n=1)# a semaphore object, with initial count n
11 # A condition object is created via
13 # your_condition_object = this_module.condition()
17 # acquire the lock associated with the condition
19 # release the lock associated with the condition
21 # block the thread until such time as some other thread does a
22 # .signal or .broadcast on the same condition, and release the
23 # lock associated with the condition. The lock associated with
24 # the condition MUST be in the acquired state at the time
27 # wake up exactly one thread (if any) that previously did a .wait
28 # on the condition; that thread will awaken with the lock associated
29 # with the condition in the acquired state. If no threads are
30 # .wait'ing, this is a nop. If more than one thread is .wait'ing on
31 # the condition, any of them may be awakened.
33 # wake up all threads (if any) that are .wait'ing on the condition;
34 # the threads are woken up serially, each with the lock in the
35 # acquired state, so should .release() as soon as possible. If no
36 # threads are .wait'ing, this is a nop.
38 # Note that if a thread does a .wait *while* a signal/broadcast is
39 # in progress, it's guaranteeed to block until a subsequent
42 # Secret feature: `broadcast' actually takes an integer argument,
43 # and will wake up exactly that many waiting threads (or the total
44 # number waiting, if that's less). Use of this is dubious, though,
45 # and probably won't be supported if this form of condition is
48 # DIFFERENCES FROM POSIX
50 # + A separate mutex is not needed to guard condition data. Instead, a
51 # condition object can (must) be .acquire'ed and .release'ed directly.
52 # This eliminates a common error in using POSIX conditions.
54 # + Because of implementation difficulties, a POSIX `signal' wakes up
55 # _at least_ one .wait'ing thread. Race conditions make it difficult
56 # to stop that. This implementation guarantees to wake up only one,
57 # but you probably shouldn't rely on that.
61 # Condition objects are used to block threads until "some condition" is
62 # true. E.g., a thread may wish to wait until a producer pumps out data
63 # for it to consume, or a server may wish to wait until someone requests
64 # its services, or perhaps a whole bunch of threads want to wait until a
65 # preceding pass over the data is complete. Early models for conditions
66 # relied on some other thread figuring out when a blocked thread's
67 # condition was true, and made the other thread responsible both for
68 # waking up the blocked thread and guaranteeing that it woke up with all
69 # data in a correct state. This proved to be very delicate in practice,
70 # and gave conditions a bad name in some circles.
72 # The POSIX model addresses these problems by making a thread responsible
73 # for ensuring that its own state is correct when it wakes, and relies
74 # on a rigid protocol to make this easy; so long as you stick to the
75 # protocol, POSIX conditions are easy to "get right":
77 # A) The thread that's waiting for some arbitrarily-complex condition
78 # (ACC) to become true does:
81 # while not (code to evaluate the ACC):
83 # # That blocks the thread, *and* releases the lock. When a
84 # # condition.signal() happens, it will wake up some thread that
85 # # did a .wait, *and* acquire the lock again before .wait
88 # # Because the lock is acquired at this point, the state used
89 # # in evaluating the ACC is frozen, so it's safe to go back &
90 # # reevaluate the ACC.
92 # # At this point, ACC is true, and the thread has the condition
94 # # So code here can safely muck with the shared state that
95 # # went into evaluating the ACC -- if it wants to.
96 # # When done mucking with the shared state, do
99 # B) Threads that are mucking with shared state that may affect the
102 # condition.acquire()
103 # # muck with shared state
104 # condition.release()
105 # if it's possible that ACC is true now:
106 # condition.signal() # or .broadcast()
108 # Note: You may prefer to put the "if" clause before the release().
109 # That's fine, but do note that anyone waiting on the signal will
110 # stay blocked until the release() is done (since acquiring the
111 # condition is part of what .wait() does before it returns).
115 # With simpler forms of conditions, it can be impossible to know when
116 # a thread that's supposed to do a .wait has actually done it. But
117 # because this form of condition releases a lock as _part_ of doing a
118 # wait, the state of that lock can be used to guarantee it.
120 # E.g., suppose thread A spawns thread B and later wants to wait for B to
125 # B_done = condition() ... do work ...
126 # B_done.acquire() B_done.acquire(); B_done.release()
127 # spawn B B_done.signal()
128 # ... some time later ... ... and B exits ...
131 # Because B_done was in the acquire'd state at the time B was spawned,
132 # B's attempt to acquire B_done can't succeed until A has done its
133 # B_done.wait() (which releases B_done). So B's B_done.signal() is
134 # guaranteed to be seen by the .wait(). Without the lock trick, B
135 # may signal before A .waits, and then A would wait forever.
139 # A barrier object is created via
141 # your_barrier = this_module.barrier(num_threads)
145 # the thread blocks until num_threads threads in all have done
146 # .enter(). Then the num_threads threads that .enter'ed resume,
147 # and the barrier resets to capture the next num_threads threads
152 # An event object is created via
154 # your_event = this_module.event()
156 # An event has two states, `posted' and `cleared'. An event is
157 # created in the cleared state.
162 # Put the event in the posted state, and resume all threads
163 # .wait'ing on the event (if any).
166 # Put the event in the cleared state.
169 # Returns 0 if the event is in the cleared state, or 1 if the event
170 # is in the posted state.
173 # If the event is in the posted state, returns immediately.
174 # If the event is in the cleared state, blocks the calling thread
175 # until the event is .post'ed by another thread.
177 # Note that an event, once posted, remains posted until explicitly
178 # cleared. Relative to conditions, this is both the strength & weakness
179 # of events. It's a strength because the .post'ing thread doesn't have to
180 # worry about whether the threads it's trying to communicate with have
181 # already done a .wait (a condition .signal is seen only by threads that
182 # do a .wait _prior_ to the .signal; a .signal does not persist). But
183 # it's a weakness because .clear'ing an event is error-prone: it's easy
184 # to mistakenly .clear an event before all the threads you intended to
185 # see the event get around to .wait'ing on it. But so long as you don't
186 # need to .clear an event, events are easy to use safely.
190 # A semaphore object is created via
192 # your_semaphore = this_module.semaphore(count=1)
194 # A semaphore has an integer count associated with it. The initial value
195 # of the count is specified by the optional argument (which defaults to
196 # 1) passed to the semaphore constructor.
201 # If the semaphore's count is greater than 0, decrements the count
203 # Else if the semaphore's count is 0, blocks the calling thread
204 # until a subsequent .v() increases the count. When that happens,
205 # the count will be decremented by 1 and the calling thread resumed.
208 # Increments the semaphore's count by 1, and wakes up a thread (if
209 # any) blocked by a .p(). It's an (detected) error for a .v() to
210 # increase the semaphore's count to a value larger than the initial
217 # the lock actually used by .acquire() and .release()
218 self
.mutex
= thread
.allocate_lock()
220 # lock used to block threads until a signal
221 self
.checkout
= thread
.allocate_lock()
222 self
.checkout
.acquire()
224 # internal critical-section lock, & the data it protects
225 self
.idlock
= thread
.allocate_lock()
227 self
.waiting
= 0 # num waiters subject to current release
228 self
.pending
= 0 # num waiters awaiting next signal
229 self
.torelease
= 0 # num waiters to release
230 self
.releasing
= 0 # 1 iff release is in progress
239 mutex
, checkout
, idlock
= self
.mutex
, self
.checkout
, self
.idlock
240 if not mutex
.locked():
242 "condition must be .acquire'd when .wait() invoked"
246 self
.pending
= self
.pending
+ 1
252 checkout
.acquire(); idlock
.acquire()
255 checkout
.release(); idlock
.release()
257 self
.waiting
= self
.waiting
- 1
258 self
.torelease
= self
.torelease
- 1
263 if self
.waiting
== self
.pending
== 0:
271 def broadcast(self
, num
= -1):
273 raise ValueError, '.broadcast called with num ' + `num`
276 self
.idlock
.acquire()
278 self
.waiting
= self
.waiting
+ self
.pending
280 self
.id = self
.id + 1
282 self
.torelease
= self
.waiting
284 self
.torelease
= min( self
.waiting
,
285 self
.torelease
+ num
)
286 if self
.torelease
and not self
.releasing
:
288 self
.checkout
.release()
289 self
.idlock
.release()
292 def __init__(self
, n
):
295 self
.full
= condition()
300 self
.togo
= self
.togo
- 1
311 self
.posted
= condition()
314 self
.posted
.acquire()
316 self
.posted
.broadcast()
317 self
.posted
.release()
320 self
.posted
.acquire()
322 self
.posted
.release()
325 self
.posted
.acquire()
327 self
.posted
.release()
331 self
.posted
.acquire()
334 self
.posted
.release()
337 def __init__(self
, count
=1):
339 raise ValueError, 'semaphore count %d; must be >= 1' % count
341 self
.maxcount
= count
342 self
.nonzero
= condition()
345 self
.nonzero
.acquire()
346 while self
.count
== 0:
348 self
.count
= self
.count
- 1
349 self
.nonzero
.release()
352 self
.nonzero
.acquire()
353 if self
.count
== self
.maxcount
:
354 raise ValueError, '.v() tried to raise semaphore count above ' \
355 'initial value ' + `maxcount`
356 self
.count
= self
.count
+ 1
357 self
.nonzero
.signal()
358 self
.nonzero
.release()
360 # The rest of the file is a test case, that runs a number of parallelized
361 # quicksorts in parallel. If it works, you'll get about 600 lines of
362 # tracing output, with a line like
363 # test passed! 209 threads created in all
364 # as the last line. The content and order of preceding lines will
367 def _new_thread(func
, *args
):
369 tid
.acquire(); id = TID
= TID
+1; tid
.release()
370 io
.acquire(); alive
.append(id); \
371 print 'starting thread', id, '--', len(alive
), 'alive'; \
373 thread
.start_new_thread( func
, (id,) + args
)
375 def _qsort(tid
, a
, l
, r
, finished
):
376 # sort a[l:r]; post finished when done
377 io
.acquire(); print 'thread', tid
, 'qsort', l
, r
; io
.release()
380 j
= l
+1 # make a[l:j] <= pivot, and a[j:r] > pivot
381 for i
in range(j
, r
):
383 a
[j
], a
[i
] = a
[i
], a
[j
]
385 a
[l
], a
[j
-1] = a
[j
-1], pivot
387 l_subarray_sorted
= event()
388 r_subarray_sorted
= event()
389 _new_thread(_qsort
, a
, l
, j
-1, l_subarray_sorted
)
390 _new_thread(_qsort
, a
, j
, r
, r_subarray_sorted
)
391 l_subarray_sorted
.wait()
392 r_subarray_sorted
.wait()
394 io
.acquire(); print 'thread', tid
, 'qsort done'; \
395 alive
.remove(tid
); io
.release()
398 def _randarray(tid
, a
, finished
):
399 io
.acquire(); print 'thread', tid
, 'randomizing array'; \
401 for i
in range(1, len(a
)):
402 wh
.acquire(); j
= randint(0,i
); wh
.release()
403 a
[i
], a
[j
] = a
[j
], a
[i
]
404 io
.acquire(); print 'thread', tid
, 'randomizing done'; \
405 alive
.remove(tid
); io
.release()
409 if a
!= range(len(a
)):
410 raise ValueError, ('a not sorted', a
)
412 def _run_one_sort(tid
, a
, bar
, done
):
413 # randomize a, and quicksort it
414 # for variety, all the threads running this enter a barrier
415 # at the end, and post `done' after the barrier exits
416 io
.acquire(); print 'thread', tid
, 'randomizing', a
; \
419 _new_thread(_randarray
, a
, finished
)
422 io
.acquire(); print 'thread', tid
, 'sorting', a
; io
.release()
424 _new_thread(_qsort
, a
, 0, len(a
), finished
)
428 io
.acquire(); print 'thread', tid
, 'entering barrier'; \
431 io
.acquire(); print 'thread', tid
, 'leaving barrier'; \
433 io
.acquire(); alive
.remove(tid
); io
.release()
434 bar
.enter() # make sure they've all removed themselves from alive
435 ## before 'done' is posted
436 bar
.enter() # just to be cruel
440 global TID
, tid
, io
, wh
, randint
, alive
442 randint
= whrandom
.randint
444 TID
= 0 # thread ID (1, 2, ...)
445 tid
= thread
.allocate_lock() # for changing TID
446 io
= thread
.allocate_lock() # for printing, and 'alive'
447 wh
= thread
.allocate_lock() # for calls to whrandom
448 alive
= [] # IDs of active threads
452 for i
in range(NSORTS
):
453 arrays
.append( range( (i
+1)*10 ) )
455 bar
= barrier(NSORTS
)
457 for i
in range(NSORTS
):
458 _new_thread(_run_one_sort
, arrays
[i
], bar
, finished
)
461 print 'all threads done, and checking results ...'
463 raise ValueError, ('threads still alive at end', alive
)
464 for i
in range(NSORTS
):
466 if len(a
) != (i
+1)*10:
467 raise ValueError, ('length of array', i
, 'screwed up')
470 print 'test passed!', TID
, 'threads created in all'
472 if __name__
== '__main__':