1 # Some simple Queue module tests, plus some failure conditions
2 # to ensure the Queue locks remain stable
8 from test_support
import verify
, TestFailed
, verbose
12 # Execute a function that blocks, and in a seperate thread, a function that
13 # triggers the release. Returns the result of the blocking function.
14 class _TriggerThread(threading
.Thread
):
15 def __init__(self
, fn
, args
):
18 self
.startedEvent
= threading
.Event()
19 threading
.Thread
.__init
__(self
)
22 self
.startedEvent
.set()
25 def _doBlockingTest( block_func
, block_args
, trigger_func
, trigger_args
):
26 t
= _TriggerThread(trigger_func
, trigger_args
)
29 return block_func(*block_args
)
31 # If we unblocked before our thread made the call, we failed!
32 if not t
.startedEvent
.isSet():
33 raise TestFailed("blocking function '%r' appeared not to block" % (block_func
,))
34 t
.join(1) # make sure the thread terminates
36 raise TestFailed("trigger function '%r' appeared to not return" % (trigger_func
,))
38 # A Queue subclass that can provoke failure at a moment's notice :)
39 class FailingQueueException(Exception):
42 class FailingQueue(Queue
.Queue
):
43 def __init__(self
, *args
):
44 self
.fail_next_put
= False
45 self
.fail_next_get
= False
46 Queue
.Queue
.__init
__(self
, *args
)
48 if self
.fail_next_put
:
49 self
.fail_next_put
= False
50 raise FailingQueueException
, "You Lose"
51 return Queue
.Queue
._put
(self
, item
)
53 if self
.fail_next_get
:
54 self
.fail_next_get
= False
55 raise FailingQueueException
, "You Lose"
56 return Queue
.Queue
._get
(self
)
58 def FailingQueueTest(q
):
60 raise RuntimeError, "Call this function with an empty queue"
61 for i
in range(queue_size
-1):
63 q
.fail_next_put
= True
64 # Test a failing non-blocking put.
66 q
.put("oops", block
=0)
67 raise TestFailed("The queue didn't fail when it should have")
68 except FailingQueueException
:
71 verify(q
.full(), "Queue should be full")
72 q
.fail_next_put
= True
73 # Test a failing blocking put
75 _doBlockingTest( q
.put
, ("full",), q
.get
, ())
76 raise TestFailed("The queue didn't fail when it should have")
77 except FailingQueueException
:
79 # Check the Queue isn't damaged.
80 # put failed, but get succeeded - re-add
82 verify(q
.full(), "Queue should be full")
84 verify(not q
.full(), "Queue should not be full")
86 verify(q
.full(), "Queue should be full")
88 _doBlockingTest( q
.put
, ("full",), q
.get
, ())
90 for i
in range(queue_size
):
92 verify(q
.empty(), "Queue should be empty")
94 q
.fail_next_get
= True
97 raise TestFailed("The queue didn't fail when it should have")
98 except FailingQueueException
:
100 verify(not q
.empty(), "Queue should not be empty")
102 verify(q
.empty(), "Queue should be empty")
103 q
.fail_next_get
= True
105 _doBlockingTest( q
.get
, (), q
.put
, ('empty',))
106 raise TestFailed("The queue didn't fail when it should have")
107 except FailingQueueException
:
109 # put succeeded, but get failed.
110 verify(not q
.empty(), "Queue should not be empty")
112 verify(q
.empty(), "Queue should be empty")
114 def SimpleQueueTest(q
):
116 raise RuntimeError, "Call this function with an empty queue"
117 # I guess we better check things actually queue correctly a little :)
120 verify(q
.get()==111 and q
.get()==222, "Didn't seem to queue the correct data!")
121 for i
in range(queue_size
-1):
123 verify(not q
.full(), "Queue should not be full")
125 verify(q
.full(), "Queue should be full")
127 q
.put("full", block
=0)
128 raise TestFailed("Didn't appear to block with a full queue")
131 # Test a blocking put
132 _doBlockingTest( q
.put
, ("full",), q
.get
, ())
134 for i
in range(queue_size
):
136 verify(q
.empty(), "Queue should be empty")
139 raise TestFailed("Didn't appear to block with an empty queue")
142 # Test a blocking get
143 _doBlockingTest( q
.get
, (), q
.put
, ('empty',))
146 q
=Queue
.Queue(queue_size
)
147 # Do it a couple of times on the same queue
151 print "Simple Queue tests seemed to work"
152 q
= FailingQueue(queue_size
)
156 print "Failing Queue tests seemed to work"