1 # Test case for the os.poll() function
3 import os
, select
, random
, unittest
4 from test
.test_support
import TESTFN
, run_unittest
9 raise unittest
.SkipTest
, "select.poll not defined -- skipping test_poll"
12 def find_ready_matching(ready
, flag
):
14 for fd
, mode
in ready
:
19 class PollTests(unittest
.TestCase
):
22 # Basic functional test of poll object
23 # Create a bunch of pipe and test that poll works with them.
28 MSG
= " This is a test."
35 for i
in range(NUM_PIPES
):
38 p
.modify(rd
, select
.POLLIN
)
39 p
.register(wr
, select
.POLLOUT
)
49 ready_writers
= find_ready_matching(ready
, select
.POLLOUT
)
51 raise RuntimeError, "no pipes ready for writing"
52 wr
= random
.choice(ready_writers
)
56 ready_readers
= find_ready_matching(ready
, select
.POLLIN
)
58 raise RuntimeError, "no pipes ready for reading"
59 rd
= random
.choice(ready_readers
)
60 buf
= os
.read(rd
, MSG_LEN
)
61 self
.assertEqual(len(buf
), MSG_LEN
)
63 os
.close(r2w
[rd
]) ; os
.close( rd
)
64 p
.unregister( r2w
[rd
] )
66 writers
.remove(r2w
[rd
])
68 self
.assertEqual(bufs
, [MSG
] * NUM_PIPES
)
70 def poll_unit_tests(self
):
71 # returns NVAL for invalid file descriptor
80 self
.assertEqual(r
[0], (FD
, select
.POLLNVAL
))
87 self
.assertEqual(r
[0][0], fd
)
90 self
.assertEqual(r
[0], (fd
, select
.POLLNVAL
))
93 # type error for invalid arguments
95 self
.assertRaises(TypeError, p
.register
, p
)
96 self
.assertRaises(TypeError, p
.unregister
, p
)
98 # can't unregister non-existent object
100 self
.assertRaises(KeyError, p
.unregister
, 3)
103 pollster
= select
.poll()
111 self
.assertRaises(TypeError, pollster
.register
, Nope(), 0)
112 self
.assertRaises(TypeError, pollster
.register
, Almost(), 0)
114 # Another test case for poll(). This is copied from the test case for
115 # select(), modified to use poll() instead.
117 def test_poll2(self
):
118 cmd
= 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
119 p
= os
.popen(cmd
, 'r')
120 pollster
= select
.poll()
121 pollster
.register( p
, select
.POLLIN
)
122 for tout
in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
123 fdlist
= pollster
.poll(tout
)
126 fd
, flags
= fdlist
[0]
127 if flags
& select
.POLLHUP
:
130 self
.fail('error: pipe seems to be closed, but still returns data')
133 elif flags
& select
.POLLIN
:
139 self
.fail('Unexpected return value from select.poll: %s' % fdlist
)
142 def test_poll3(self
):
144 pollster
= select
.poll()
147 self
.assertRaises(OverflowError, pollster
.poll
, 1L << 64)
151 self
.fail('Overflow must have occurred')
154 run_unittest(PollTests
)
156 if __name__
== '__main__':