1 # Test case for the os.poll() function
3 import sys
, os
, select
, random
4 from test_support
import verify
, verbose
, TestSkipped
, TESTFN
9 raise TestSkipped
, "select.poll not defined -- skipping test_poll"
12 def find_ready_matching(ready
, flag
):
14 for fd
, mode
in ready
:
20 """Basic functional test of poll object
22 Create a bunch of pipe and test that poll works with them.
24 print 'Running poll test 1'
28 MSG
= " This is a test."
35 for i
in range(NUM_PIPES
):
37 p
.register(rd
, select
.POLLIN
)
38 p
.register(wr
, select
.POLLOUT
)
46 ready_writers
= find_ready_matching(ready
, select
.POLLOUT
)
48 raise RuntimeError, "no pipes ready for writing"
49 wr
= random
.choice(ready_writers
)
53 ready_readers
= find_ready_matching(ready
, select
.POLLIN
)
55 raise RuntimeError, "no pipes ready for reading"
56 rd
= random
.choice(ready_readers
)
57 buf
= os
.read(rd
, MSG_LEN
)
58 verify(len(buf
) == MSG_LEN
)
60 os
.close(r2w
[rd
]) ; os
.close( rd
)
61 p
.unregister( r2w
[rd
] )
63 writers
.remove(r2w
[rd
])
66 print 'Poll test 1 complete'
68 def poll_unit_tests():
69 # returns NVAL for invalid file descriptor
78 verify(r
[0] == (FD
, select
.POLLNVAL
))
88 verify(r
[0] == (fd
, select
.POLLNVAL
))
91 # type error for invalid arguments
98 print "Bogus register call did not raise TypeError"
104 print "Bogus unregister call did not raise TypeError"
106 # can't unregister non-existent object
113 print "Bogus unregister call did not raise KeyError"
116 pollster
= select
.poll()
125 pollster
.register( Nope(), 0 )
126 except TypeError: pass
127 else: print 'expected TypeError exception, not raised'
130 pollster
.register( Almost(), 0 )
131 except TypeError: pass
132 else: print 'expected TypeError exception, not raised'
135 # Another test case for poll(). This is copied from the test case for
136 # select(), modified to use poll() instead.
139 print 'Running poll test 2'
140 cmd
= 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
141 p
= os
.popen(cmd
, 'r')
142 pollster
= select
.poll()
143 pollster
.register( p
, select
.POLLIN
)
144 for tout
in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
146 print 'timeout =', tout
147 fdlist
= pollster
.poll(tout
)
150 fd
, flags
= fdlist
[0]
151 if flags
& select
.POLLHUP
:
154 print 'error: pipe seems to be closed, but still returns data'
157 elif flags
& select
.POLLIN
:
167 print 'Unexpected return value from select.poll:', fdlist
169 print 'Poll test 2 complete'