Oops -- Lib/Test should be Lib/test, of course!
[python/dscho.git] / Lib / Queue.py
blob843b9dca197ef6abe915ec5ec9e0430f738ed685
1 # A multi-producer, multi-consumer queue.
3 # define this exception to be compatible with Python 1.5's class
4 # exceptions, but also when -X option is used.
5 try:
6 class Empty(Exception):
7 pass
8 except TypeError:
9 # string based exceptions
10 Empty = 'Queue.Empty' # Exception raised by get_nowait()
12 class Queue:
13 def __init__(self, maxsize):
14 """Initialize a queue object with a given maximum size.
16 If maxsize is <= 0, the queue size is infinite.
17 """
18 import thread
19 self._init(maxsize)
20 self.mutex = thread.allocate_lock()
21 self.esema = thread.allocate_lock()
22 self.esema.acquire_lock()
23 self.fsema = thread.allocate_lock()
25 def qsize(self):
26 """Returns the approximate size of the queue (not reliable!)."""
27 self.mutex.acquire_lock()
28 n = self._qsize()
29 self.mutex.release_lock()
30 return n
32 def empty(self):
33 """Returns 1 if the queue is empty, 0 otherwise (not reliable!)."""
34 self.mutex.acquire_lock()
35 n = self._empty()
36 self.mutex.release_lock()
37 return n
39 def full(self):
40 """Returns 1 if the queue is full, 0 otherwise (not reliable!)."""
41 self.mutex.acquire_lock()
42 n = self._full()
43 self.mutex.release_lock()
44 return n
46 def put(self, item):
47 """Put an item into the queue.
49 If the queue is full, block until a free slot is avaiable.
50 """
51 self.fsema.acquire_lock()
52 self.mutex.acquire_lock()
53 was_empty = self._empty()
54 self._put(item)
55 if was_empty:
56 self.esema.release_lock()
57 if not self._full():
58 self.fsema.release_lock()
59 self.mutex.release_lock()
61 def get(self):
62 """Gets and returns an item from the queue.
64 This method blocks if necessary until an item is available.
65 """
66 self.esema.acquire_lock()
67 self.mutex.acquire_lock()
68 was_full = self._full()
69 item = self._get()
70 if was_full:
71 self.fsema.release_lock()
72 if not self._empty():
73 self.esema.release_lock()
74 self.mutex.release_lock()
75 return item
77 # Get an item from the queue if one is immediately available,
78 # raise Empty if the queue is empty or temporarily unavailable
79 def get_nowait(self):
80 """Gets and returns an item from the queue.
82 Only gets an item if one is immediately available, Otherwise
83 this raises the Empty exception if the queue is empty or
84 temporarily unavailable.
85 """
86 locked = self.esema.acquire_lock(0)
87 self.mutex.acquire_lock()
88 if self._empty():
89 # The queue is empty -- we can't have esema
90 self.mutex.release_lock()
91 raise Empty
92 if not locked:
93 locked = self.esema.acquire_lock(0)
94 if not locked:
95 # Somebody else has esema
96 # but we have mutex --
97 # go out of their way
98 self.mutex.release_lock()
99 raise Empty
100 was_full = self._full()
101 item = self._get()
102 if was_full:
103 self.fsema.release_lock()
104 if not self._empty():
105 self.esema.release_lock()
106 self.mutex.release_lock()
107 return item
109 # XXX Need to define put_nowait() as well.
112 # Override these methods to implement other queue organizations
113 # (e.g. stack or priority queue).
114 # These will only be called with appropriate locks held
116 # Initialize the queue representation
117 def _init(self, maxsize):
118 self.maxsize = maxsize
119 self.queue = []
121 def _qsize(self):
122 return len(self.queue)
124 # Check wheter the queue is empty
125 def _empty(self):
126 return not self.queue
128 # Check whether the queue is full
129 def _full(self):
130 return self.maxsize > 0 and len(self.queue) == self.maxsize
132 # Put a new item in the queue
133 def _put(self, item):
134 self.queue.append(item)
136 # Get an item from the queue
137 def _get(self):
138 item = self.queue[0]
139 del self.queue[0]
140 return item