Added / chg tests
[systematiki.git] / Test / gtk-net-test.py
blob147cac741cfc8d8679e8419781861b2b2acea548
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
4 import gobject
5 import socket
7 BUFLEN = 8192
9 def deco(f):
10 def g(*a, **kw):
11 print "Running %s %r %r ..." % (f.__name__, a, kw)
12 return f(*a, **kw)
13 return g
15 @deco
16 def setup_socket():
17 sock = socket.socket()
18 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
19 address = ("localhost", 5000)
20 sock.bind(address)
21 sock.listen(5)
22 gobject.io_add_watch(sock, gobject.IO_IN, accept_ready)
24 @deco
25 def accept_ready(sock, cond):
26 sock2, addr = sock.accept()
27 add_in_watch(sock2)
28 return True
30 @deco
31 def add_in_watch(sock2):
32 gobject.io_add_watch(sock2, gobject.IO_IN, data_ready)
34 @deco
35 def data_ready(sock2, cond):
36 data = sock2.recv(BUFLEN)
37 print "%r" % data
38 if data == "":
39 sock2.close()
40 return False
41 if data == "quit\n":
42 sock2.close()
43 global mainloop
44 mainloop.quit()
45 return False
46 add_in_watch(sock2)
47 return False
49 @deco
50 def setup_mainloop():
51 global mainloop
52 mainloop = gobject.MainLoop()
53 mainloop.run()
55 @deco
56 def main(argv):
57 setup_socket()
58 setup_mainloop()
59 return 0
61 if __name__ == "__main__":
62 import sys
63 sys.exit(main(sys.argv))