append(): Fixing the test for convertability after consultation with
[python/dscho.git] / Lib / Queue.py
blobcd035da14cf419daa3376b1c03a0180ef8691b08
1 """A multi-producer, multi-consumer queue."""
3 class Empty(Exception):
4 "Exception raised by Queue.get(block=0)/get_nowait()."
5 pass
7 class Full(Exception):
8 "Exception raised by Queue.put(block=0)/put_nowait()."
9 pass
11 class Queue:
12 def __init__(self, maxsize=0):
13 """Initialize a queue object with a given maximum size.
15 If maxsize is <= 0, the queue size is infinite.
16 """
17 import thread
18 self._init(maxsize)
19 self.mutex = thread.allocate_lock()
20 self.esema = thread.allocate_lock()
21 self.esema.acquire()
22 self.fsema = thread.allocate_lock()
24 def qsize(self):
25 """Return the approximate size of the queue (not reliable!)."""
26 self.mutex.acquire()
27 n = self._qsize()
28 self.mutex.release()
29 return n
31 def empty(self):
32 """Return 1 if the queue is empty, 0 otherwise (not reliable!)."""
33 self.mutex.acquire()
34 n = self._empty()
35 self.mutex.release()
36 return n
38 def full(self):
39 """Return 1 if the queue is full, 0 otherwise (not reliable!)."""
40 self.mutex.acquire()
41 n = self._full()
42 self.mutex.release()
43 return n
45 def put(self, item, block=1):
46 """Put an item into the queue.
48 If optional arg 'block' is 1 (the default), block if
49 necessary until a free slot is available. Otherwise (block
50 is 0), put an item on the queue if a free slot is immediately
51 available, else raise the Full exception.
52 """
53 if block:
54 self.fsema.acquire()
55 elif not self.fsema.acquire(0):
56 raise Full
57 self.mutex.acquire()
58 release_fsema = True
59 try:
60 was_empty = self._empty()
61 self._put(item)
62 # If we fail before here, the empty state has
63 # not changed, so we can skip the release of esema
64 if was_empty:
65 self.esema.release()
66 # If we fail before here, the queue can not be full, so
67 # release_full_sema remains True
68 release_fsema = not self._full()
69 finally:
70 # Catching system level exceptions here (RecursionDepth,
71 # OutOfMemory, etc) - so do as little as possible in terms
72 # of Python calls.
73 if release_fsema:
74 self.fsema.release()
75 self.mutex.release()
77 def put_nowait(self, item):
78 """Put an item into the queue without blocking.
80 Only enqueue the item if a free slot is immediately available.
81 Otherwise raise the Full exception.
82 """
83 return self.put(item, 0)
85 def get(self, block=1):
86 """Remove and return an item from the queue.
88 If optional arg 'block' is 1 (the default), block if
89 necessary until an item is available. Otherwise (block is 0),
90 return an item if one is immediately available, else raise the
91 Empty exception.
92 """
93 if block:
94 self.esema.acquire()
95 elif not self.esema.acquire(0):
96 raise Empty
97 self.mutex.acquire()
98 release_esema = True
99 try:
100 was_full = self._full()
101 item = self._get()
102 # If we fail before here, the full state has
103 # not changed, so we can skip the release of fsema
104 if was_full:
105 self.fsema.release()
106 # Failure means empty state also unchanged - release_esema
107 # remains True.
108 release_esema = not self._empty()
109 finally:
110 if release_esema:
111 self.esema.release()
112 self.mutex.release()
113 return item
115 def get_nowait(self):
116 """Remove and return an item from the queue without blocking.
118 Only get an item if one is immediately available. Otherwise
119 raise the Empty exception.
121 return self.get(0)
123 # Override these methods to implement other queue organizations
124 # (e.g. stack or priority queue).
125 # These will only be called with appropriate locks held
127 # Initialize the queue representation
128 def _init(self, maxsize):
129 self.maxsize = maxsize
130 self.queue = []
132 def _qsize(self):
133 return len(self.queue)
135 # Check whether the queue is empty
136 def _empty(self):
137 return not self.queue
139 # Check whether the queue is full
140 def _full(self):
141 return self.maxsize > 0 and len(self.queue) == self.maxsize
143 # Put a new item in the queue
144 def _put(self, item):
145 self.queue.append(item)
147 # Get an item from the queue
148 def _get(self):
149 return self.queue.pop(0)