Whitespace normalization.
[python/dscho.git] / Lib / test / test_queue.py
blob958e2505336d54e64f63bc2337c08af8c408e1da
1 # Some simple Queue module tests, plus some failure conditions
2 # to ensure the Queue locks remain stable
3 import Queue
4 import sys
5 import threading
6 import time
8 from test.test_support import verify, TestFailed, verbose
10 queue_size = 5
12 # Execute a function that blocks, and in a seperate thread, a function that
13 # triggers the release. Returns the result of the blocking function.
14 class _TriggerThread(threading.Thread):
15 def __init__(self, fn, args):
16 self.fn = fn
17 self.args = args
18 self.startedEvent = threading.Event()
19 threading.Thread.__init__(self)
20 def run(self):
21 time.sleep(.1)
22 self.startedEvent.set()
23 self.fn(*self.args)
25 def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
26 t = _TriggerThread(trigger_func, trigger_args)
27 t.start()
28 try:
29 return block_func(*block_args)
30 finally:
31 # If we unblocked before our thread made the call, we failed!
32 if not t.startedEvent.isSet():
33 raise TestFailed("blocking function '%r' appeared not to block" %
34 block_func)
35 t.join(1) # make sure the thread terminates
36 if t.isAlive():
37 raise TestFailed("trigger function '%r' appeared to not return" %
38 trigger_func)
40 # A Queue subclass that can provoke failure at a moment's notice :)
41 class FailingQueueException(Exception):
42 pass
44 class FailingQueue(Queue.Queue):
45 def __init__(self, *args):
46 self.fail_next_put = False
47 self.fail_next_get = False
48 Queue.Queue.__init__(self, *args)
49 def _put(self, item):
50 if self.fail_next_put:
51 self.fail_next_put = False
52 raise FailingQueueException, "You Lose"
53 return Queue.Queue._put(self, item)
54 def _get(self):
55 if self.fail_next_get:
56 self.fail_next_get = False
57 raise FailingQueueException, "You Lose"
58 return Queue.Queue._get(self)
60 def FailingQueueTest(q):
61 if not q.empty():
62 raise RuntimeError, "Call this function with an empty queue"
63 for i in range(queue_size-1):
64 q.put(i)
65 # Test a failing non-blocking put.
66 q.fail_next_put = True
67 try:
68 q.put("oops", block=0)
69 raise TestFailed("The queue didn't fail when it should have")
70 except FailingQueueException:
71 pass
72 q.fail_next_put = True
73 try:
74 q.put("oops", timeout=0.1)
75 raise TestFailed("The queue didn't fail when it should have")
76 except FailingQueueException:
77 pass
78 q.put("last")
79 verify(q.full(), "Queue should be full")
80 # Test a failing blocking put
81 q.fail_next_put = True
82 try:
83 _doBlockingTest( q.put, ("full",), q.get, ())
84 raise TestFailed("The queue didn't fail when it should have")
85 except FailingQueueException:
86 pass
87 # Check the Queue isn't damaged.
88 # put failed, but get succeeded - re-add
89 q.put("last")
90 # Test a failing timeout put
91 q.fail_next_put = True
92 try:
93 _doBlockingTest( q.put, ("full", True, 0.2), q.get, ())
94 raise TestFailed("The queue didn't fail when it should have")
95 except FailingQueueException:
96 pass
97 # Check the Queue isn't damaged.
98 # put failed, but get succeeded - re-add
99 q.put("last")
100 verify(q.full(), "Queue should be full")
101 q.get()
102 verify(not q.full(), "Queue should not be full")
103 q.put("last")
104 verify(q.full(), "Queue should be full")
105 # Test a blocking put
106 _doBlockingTest( q.put, ("full",), q.get, ())
107 # Empty it
108 for i in range(queue_size):
109 q.get()
110 verify(q.empty(), "Queue should be empty")
111 q.put("first")
112 q.fail_next_get = True
113 try:
114 q.get()
115 raise TestFailed("The queue didn't fail when it should have")
116 except FailingQueueException:
117 pass
118 verify(not q.empty(), "Queue should not be empty")
119 q.fail_next_get = True
120 try:
121 q.get(timeout=0.1)
122 raise TestFailed("The queue didn't fail when it should have")
123 except FailingQueueException:
124 pass
125 verify(not q.empty(), "Queue should not be empty")
126 q.get()
127 verify(q.empty(), "Queue should be empty")
128 q.fail_next_get = True
129 try:
130 _doBlockingTest( q.get, (), q.put, ('empty',))
131 raise TestFailed("The queue didn't fail when it should have")
132 except FailingQueueException:
133 pass
134 # put succeeded, but get failed.
135 verify(not q.empty(), "Queue should not be empty")
136 q.get()
137 verify(q.empty(), "Queue should be empty")
139 def SimpleQueueTest(q):
140 if not q.empty():
141 raise RuntimeError, "Call this function with an empty queue"
142 # I guess we better check things actually queue correctly a little :)
143 q.put(111)
144 q.put(222)
145 verify(q.get() == 111 and q.get() == 222,
146 "Didn't seem to queue the correct data!")
147 for i in range(queue_size-1):
148 q.put(i)
149 verify(not q.full(), "Queue should not be full")
150 q.put("last")
151 verify(q.full(), "Queue should be full")
152 try:
153 q.put("full", block=0)
154 raise TestFailed("Didn't appear to block with a full queue")
155 except Queue.Full:
156 pass
157 try:
158 q.put("full", timeout=0.1)
159 raise TestFailed("Didn't appear to time-out with a full queue")
160 except Queue.Full:
161 pass
162 # Test a blocking put
163 _doBlockingTest( q.put, ("full",), q.get, ())
164 _doBlockingTest( q.put, ("full", True, 0.2), q.get, ())
165 # Empty it
166 for i in range(queue_size):
167 q.get()
168 verify(q.empty(), "Queue should be empty")
169 try:
170 q.get(block=0)
171 raise TestFailed("Didn't appear to block with an empty queue")
172 except Queue.Empty:
173 pass
174 try:
175 q.get(timeout=0.1)
176 raise TestFailed("Didn't appear to time-out with an empty queue")
177 except Queue.Empty:
178 pass
179 # Test a blocking get
180 _doBlockingTest(q.get, (), q.put, ('empty',))
181 _doBlockingTest(q.get, (True, 0.2), q.put, ('empty',))
183 def test():
184 q=Queue.Queue(queue_size)
185 # Do it a couple of times on the same queue
186 SimpleQueueTest(q)
187 SimpleQueueTest(q)
188 if verbose:
189 print "Simple Queue tests seemed to work"
190 q = FailingQueue(queue_size)
191 FailingQueueTest(q)
192 FailingQueueTest(q)
193 if verbose:
194 print "Failing Queue tests seemed to work"
196 test()