1 """Async sockets, build on top of Sam Rushing's excellent async library"""
5 from socket
import AF_INET
, SOCK_STREAM
19 class Server(asyncore
.dispatcher
):
21 """Generic asynchronous server class"""
23 def __init__(self
, port
, handler_class
, backlog
=1, host
=""):
25 - port: the port to listen to
26 - handler_class: class to handle requests
27 - backlog: backlog queue size (optional) (don't fully understand, see socket docs)
28 - host: host name (optional: can be empty to use default host name)
31 print "Starting", self
.__class
__.__name
__
32 self
.handler_class
= handler_class
33 asyncore
.dispatcher
.__init
__(self
)
34 self
.create_socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
35 self
.bind((host
, port
))
38 def handle_accept(self
):
39 conn
, addr
= self
.accept()
41 print 'Incoming Connection from %s:%d' % addr
42 self
.handler_class(conn
)
45 class ProxyServer(Server
):
47 """Generic proxy server class"""
49 def __init__(self
, port
, handler_class
, proxyaddr
=None, closepartners
=0):
51 - port: the port to listen to
52 - handler_class: proxy class to handle requests
53 - proxyaddr: a tuple containing the address and
54 port of a remote host to connect to (optional)
55 - closepartners: boolean, specifies whether we should close
56 all proxy connections or not (optional). http seems to *not*
57 want this, but telnet does...
59 Server
.__init
__(self
, port
, handler_class
, 1, "")
60 self
.proxyaddr
= proxyaddr
61 self
.closepartners
= closepartners
63 def handle_accept(self
):
64 conn
, addr
= self
.accept()
66 print 'Incoming Connection from %s:%d' % addr
67 self
.handler_class(conn
, self
.proxyaddr
, closepartner
=self
.closepartners
)
70 class Connection(asyncore
.dispatcher
):
72 """Generic connection class"""
74 def __init__(self
, sock_or_address
=None, readfunc
=None, terminator
=None):
76 - sock_or_address: either a socket, or a tuple containing the name
77 and port number of a remote host
78 - readfunc: callback function (optional). Will be called whenever
79 there is some data available, or when an appropraite terminator
81 - terminator: string which specifies when a read is complete (optional)"""
84 self
.readfunc
= readfunc
85 self
.terminator
= terminator
86 asyncore
.dispatcher
.__init
__(self
)
87 if hasattr(sock_or_address
, "fileno"):
88 self
.set_socket(sock_or_address
)
90 sock
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
94 self
.connect(sock_or_address
)
98 self
._out
_buffer
= self
._out
_buffer
+ data
100 def recv(self
, bytes
=-1):
102 bytes
= len(self
._in
_buffer
)
103 data
= self
._in
_buffer
[:bytes
]
104 self
._in
_buffer
= self
._in
_buffer
[bytes
:]
107 def set_terminator(self
, terminator
):
108 self
.terminator
= terminator
110 # override this if you want to control the incoming data stream
111 def handle_incoming_data(self
, data
):
114 self
._in
_buffer
= self
._in
_buffer
+ data
115 pos
= string
.find(self
._in
_buffer
, self
.terminator
)
118 data
= self
._in
_buffer
[:pos
]
119 self
._in
_buffer
= self
._in
_buffer
[pos
+ len(self
.terminator
):]
122 self
.readfunc(self
._in
_buffer
+ data
)
125 self
._in
_buffer
= self
._in
_buffer
+ data
128 def handle_read(self
):
129 data
= asyncore
.dispatcher
.recv(self
, BUFSIZE
)
132 print "incoming ->", "%x" % id(self
), `data`
133 self
.handle_incoming_data(data
)
135 def handle_write(self
):
137 sent
= self
.socket
.send(self
._out
_buffer
[:BUFSIZE
])
139 print "outgoing ->", "%x" % id(self
), `self
._out
_buffer
[:sent
]`
140 self
._out
_buffer
= self
._out
_buffer
[sent
:]
143 if self
.readfunc
and self
._in
_buffer
:
144 self
.readfunc(self
._in
_buffer
)
146 #elif VERBOSE > 1 and self._in_buffer:
147 # print "--- there is unread data:", `self._in_buffer`
148 asyncore
.dispatcher
.close(self
)
150 def handle_close(self
):
153 def handle_connect(self
):
159 """Glue to let a connection tell things to the UI in a standardized way.
161 The protocoll defines four functions, which the connection will call:
163 def settotal(int total): gets called when the connection knows the data size
164 def setcurrent(int current): gets called when some new data has arrived
165 def done(): gets called when the transaction is complete
166 def error(type, value, tb): gets called wheneven an error occured
169 def __init__(self
, settotal_func
, setcurrent_func
, done_func
, error_func
):
170 self
.settotal
= settotal_func
171 self
.setcurrent
= setcurrent_func
172 self
.done
= done_func
173 self
.error
= error_func
176 class HTTPError(socket
.error
): pass
179 class HTTPClient(Connection
, httplib
.HTTP
):
181 """Asynchronous HTTP connection"""
183 def __init__(self
, (host
, port
), datahandler
, ui
=None):
184 Connection
.__init
__(self
, (host
, port
))
185 self
.datahandler
= datahandler
195 raise TypeError, "getreply() is not supported in async HTTP connection"
197 def handle_incoming_data(self
, data
):
199 if not self
.doneheaders
:
200 self
.buf
= self
.buf
+ data
201 pos
= string
.find(self
.buf
, "\r\n\r\n")
203 self
.handle_reply(self
.buf
[:pos
+4])
204 length
= self
.headers
.getheader("Content-Length")
205 if length
is not None:
206 self
.length
= int(length
)
207 if self
.ui
is not None:
208 self
.ui
.settotal(self
.length
)
212 self
.handle_data(self
.buf
[pos
+4:])
215 self
.handle_data(data
)
217 def handle_reply(self
, data
):
218 f
= cStringIO
.StringIO(data
)
219 ver
, code
, msg
= string
.split(f
.readline(), None, 2)
221 msg
= string
.strip(msg
)
223 # Hm, this is what *I* need, but probably not correct...
224 raise HTTPError
, (code
, msg
)
225 self
.headers
= mimetools
.Message(f
)
227 def handle_data(self
, data
):
228 self
.pos
= self
.pos
+ len(data
)
229 if self
.ui
is not None:
230 self
.ui
.setcurrent(self
.pos
)
231 self
.datahandler(data
)
232 if self
.pos
>= self
.length
:
235 if self
.ui
is not None:
238 def handle_error(self
, type, value
, tb
):
239 if self
.ui
is not None:
240 self
.ui
.error(type, value
, tb
)
242 Connection
.handle_error(self
, type, value
, tb
)
244 def log(self
, message
):
246 print 'LOG:', message
254 self
._checksum
= None
256 def feed(self
, data
):
257 self
._buf
= self
._buf
+ data
258 if self
._len
is None:
259 if len(self
._buf
) >= 8:
261 self
._len
, self
._checksum
= struct
.unpack("ll", self
._buf
[:8])
262 self
._buf
= self
._buf
[8:]
263 if self
._len
is not None:
264 if len(self
._buf
) >= self
._len
:
266 data
= self
._buf
[:self
._len
]
267 leftover
= self
._buf
[self
._len
:]
269 assert self
._checksum
== zlib
.adler32(data
), "corrupt data"
278 class PyConnection(Connection
):
280 def __init__(self
, sock_or_address
):
281 Connection
.__init
__(self
, sock_or_address
)
282 self
.currentmessage
= PyMessage()
284 def handle_incoming_data(self
, data
):
286 done
, data
= self
.currentmessage
.feed(data
)
289 self
.handle_object(cPickle
.loads(self
.currentmessage
.data
))
290 self
.currentmessage
= PyMessage()
292 def handle_object(self
, object):
293 print 'unhandled object:', `
object`
295 def send(self
, object):
296 import cPickle
, zlib
, struct
297 data
= cPickle
.dumps(object, 1)
299 checksum
= zlib
.adler32(data
)
300 data
= struct
.pack("ll", length
, checksum
) + data
301 Connection
.send(self
, data
)
304 class Echo(Connection
):
306 """Simple echoing connection: it sends everything back it receives."""
308 def handle_incoming_data(self
, data
):
312 class Proxy(Connection
):
314 """Generic proxy connection"""
316 def __init__(self
, sock_or_address
=None, proxyaddr
=None, closepartner
=0):
318 - sock_or_address is either a socket or a tuple containing the
319 name and port number of a remote host
320 - proxyaddr: a tuple containing a name and a port number of a
321 remote host (optional).
322 - closepartner: boolean, specifies whether we should close
323 the proxy connection (optional)"""
325 Connection
.__init
__(self
, sock_or_address
)
327 self
.proxyaddr
= proxyaddr
328 self
.closepartner
= closepartner
335 if self
.closepartner
:
337 Connection
.close(self
)
339 def handle_incoming_data(self
, data
):
341 # pass data for possible automatic remote host detection
343 data
= self
.connectproxy(data
)
344 self
.other
.send(data
)
346 def connectproxy(self
, data
):
347 other
= self
.__class
__(self
.proxyaddr
, closepartner
=self
.closepartner
)
353 class HTTPProxy(Proxy
):
355 """Simple, useless, http proxy. It figures out itself where to connect to."""
357 def connectproxy(self
, data
):
359 print "--- proxy request", `data`
360 addr
, data
= de_proxify(data
)
367 # helper for HTTPProxy
368 def de_proxify(data
):
370 req_pattern
= "GET http://([a-zA-Z0-9-_.]+)(:([0-9]+))?"
371 m
= re
.match(req_pattern
, data
)
372 host
, dummy
, port
= m
.groups()
377 # change "GET http://xx.xx.xx/yy" into "GET /yy"
378 data
= re
.sub(req_pattern
, "GET ", data
)
379 return (host
, port
), data
382 # if we're running "under W", let's register the socket poller to the event loop
388 W
.getapplication().addidlefunc(asyncore
.poll
)
392 #testserver = Server(10000, Connection)
393 #echoserver = Server(10007, Echo)
394 #httpproxyserver = Server(8088, HTTPProxy, 5)
395 #asyncore.close_all()