Moved into a sub-dir, so that a svn checkout has the same structure as
[rox-lib.git] / ROX-Lib2 / tests / python / testtasks.py
blob9e2d04480a96c0d643f173855f418340740f9b9a
1 #!/usr/bin/env python2.3
2 from __future__ import generators
3 import unittest
4 import sys
5 import os, time
6 from os.path import dirname, abspath, join
8 rox_lib = dirname(dirname(dirname(abspath(sys.argv[0]))))
9 sys.path.insert(0, join(rox_lib, 'python'))
11 from rox import tasks, g
13 class TestTasks(unittest.TestCase):
14 def testIdleBlocker(self):
15 def run():
16 yield None
17 g.main_quit()
18 tasks.Task(run())
19 g.main()
21 def testTimeoutBlocker(self):
22 def run():
23 start = time.time()
24 yield tasks.TimeoutBlocker(0.5)
25 end = time.time()
26 assert end > start + 0.5
27 g.main_quit()
28 tasks.Task(run())
29 g.main()
31 def testInputBlocker(self):
32 readable, writeable = os.pipe()
33 def run():
34 ib = tasks.InputBlocker(readable)
35 tb = tasks.TimeoutBlocker(0.2)
36 yield ib, tb
37 assert not ib.happened
38 assert tb.happened
39 os.write(writeable, "!")
41 tb = tasks.TimeoutBlocker(0.2)
42 yield ib, tb
43 assert ib.happened
44 assert not tb.happened
46 g.main_quit()
47 tasks.Task(run())
48 g.main()
50 def testOutputBlocker(self):
51 readable, writeable = os.pipe()
52 def run():
53 # Fill the input buffer...
54 sent = 0
55 while True:
56 ob = tasks.OutputBlocker(writeable)
57 tb = tasks.TimeoutBlocker(0.2)
58 yield ob, tb
59 if ob.happened:
60 sent += os.write(writeable, 'Hello\n')
61 else:
62 assert tb.happened
63 break
64 assert sent > 0
65 #print "send %d bytes" % sent
67 # Read it all back...
68 got = 0
69 while got < sent:
70 got += len(os.read(readable, sent - got))
72 ob = tasks.OutputBlocker(writeable)
73 tb = tasks.TimeoutBlocker(0.2)
74 yield ob, tb
75 assert ob.happened
76 assert not tb.happened
78 g.main_quit()
79 tasks.Task(run())
80 g.main()
82 suite = unittest.makeSuite(TestTasks)
83 if __name__ == '__main__':
84 sys.argv.append('-v')
85 unittest.main()