Updated for 2.1b2 distribution.
[python/dscho.git] / Lib / asyncore.py
blob145da58bdb8c457d90f1a2921b7f034199cc5cc0
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import exceptions
50 import select
51 import socket
52 import sys
54 import os
55 if os.name == 'nt':
56 EWOULDBLOCK = 10035
57 EINPROGRESS = 10036
58 EALREADY = 10037
59 ECONNRESET = 10054
60 ENOTCONN = 10057
61 ESHUTDOWN = 10058
62 else:
63 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN
65 try:
66 socket_map
67 except NameError:
68 socket_map = {}
70 class ExitNow (exceptions.Exception):
71 pass
73 DEBUG = 0
75 def poll (timeout=0.0, map=None):
76 global DEBUG
77 if map is None:
78 map = socket_map
79 if map:
80 r = []; w = []; e = []
81 for fd, obj in map.items():
82 if obj.readable():
83 r.append (fd)
84 if obj.writable():
85 w.append (fd)
86 r,w,e = select.select (r,w,e, timeout)
88 if DEBUG:
89 print r,w,e
91 for fd in r:
92 try:
93 obj = map[fd]
94 try:
95 obj.handle_read_event()
96 except ExitNow:
97 raise ExitNow
98 except:
99 obj.handle_error()
100 except KeyError:
101 pass
103 for fd in w:
104 try:
105 obj = map[fd]
106 try:
107 obj.handle_write_event()
108 except ExitNow:
109 raise ExitNow
110 except:
111 obj.handle_error()
112 except KeyError:
113 pass
115 def poll2 (timeout=0.0, map=None):
116 import poll
117 if map is None:
118 map=socket_map
119 # timeout is in milliseconds
120 timeout = int(timeout*1000)
121 if map:
122 l = []
123 for fd, obj in map.items():
124 flags = 0
125 if obj.readable():
126 flags = poll.POLLIN
127 if obj.writable():
128 flags = flags | poll.POLLOUT
129 if flags:
130 l.append ((fd, flags))
131 r = poll.poll (l, timeout)
132 for fd, flags in r:
133 try:
134 obj = map[fd]
135 try:
136 if (flags & poll.POLLIN):
137 obj.handle_read_event()
138 if (flags & poll.POLLOUT):
139 obj.handle_write_event()
140 except ExitNow:
141 raise ExitNow
142 except:
143 obj.handle_error()
144 except KeyError:
145 pass
147 def poll3 (timeout=0.0, map=None):
148 # Use the poll() support added to the select module in Python 2.0
149 if map is None:
150 map=socket_map
151 # timeout is in milliseconds
152 timeout = int(timeout*1000)
153 pollster = select.poll()
154 if map:
155 l = []
156 for fd, obj in map.items():
157 flags = 0
158 if obj.readable():
159 flags = select.POLLIN
160 if obj.writable():
161 flags = flags | select.POLLOUT
162 if flags:
163 pollster.register(fd, flags)
164 r = pollster.poll (timeout)
165 for fd, flags in r:
166 try:
167 obj = map[fd]
168 try:
169 if (flags & select.POLLIN):
170 obj.handle_read_event()
171 if (flags & select.POLLOUT):
172 obj.handle_write_event()
173 except ExitNow:
174 raise ExitNow
175 except:
176 obj.handle_error()
177 except KeyError:
178 pass
180 def loop (timeout=30.0, use_poll=0, map=None):
182 if map is None:
183 map=socket_map
185 if use_poll:
186 if hasattr (select, 'poll'):
187 poll_fun = poll3
188 else:
189 poll_fun = poll2
190 else:
191 poll_fun = poll
193 while map:
194 poll_fun (timeout, map)
196 class dispatcher:
197 debug = 0
198 connected = 0
199 accepting = 0
200 closing = 0
201 addr = None
203 def __init__ (self, sock=None, map=None):
204 if sock:
205 self.set_socket (sock, map)
206 # I think it should inherit this anyway
207 self.socket.setblocking (0)
208 self.connected = 1
210 def __repr__ (self):
211 try:
212 status = []
213 if self.accepting and self.addr:
214 status.append ('listening')
215 elif self.connected:
216 status.append ('connected')
217 if self.addr:
218 status.append ('%s:%d' % self.addr)
219 return '<%s %s at %x>' % (
220 self.__class__.__name__,
221 ' '.join (status),
222 id(self)
224 except:
225 try:
226 ar = repr(self.addr)
227 except:
228 ar = 'no self.addr!'
230 return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
232 def add_channel (self, map=None):
233 #self.log_info ('adding channel %s' % self)
234 if map is None:
235 map=socket_map
236 map [self._fileno] = self
238 def del_channel (self, map=None):
239 fd = self._fileno
240 if map is None:
241 map=socket_map
242 if map.has_key (fd):
243 #self.log_info ('closing channel %d:%s' % (fd, self))
244 del map [fd]
246 def create_socket (self, family, type):
247 self.family_and_type = family, type
248 self.socket = socket.socket (family, type)
249 self.socket.setblocking(0)
250 self._fileno = self.socket.fileno()
251 self.add_channel()
253 def set_socket (self, sock, map=None):
254 self.__dict__['socket'] = sock
255 self._fileno = sock.fileno()
256 self.add_channel (map)
258 def set_reuse_addr (self):
259 # try to re-use a server port if possible
260 try:
261 self.socket.setsockopt (
262 socket.SOL_SOCKET, socket.SO_REUSEADDR,
263 self.socket.getsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1
265 except:
266 pass
268 # ==================================================
269 # predicates for select()
270 # these are used as filters for the lists of sockets
271 # to pass to select().
272 # ==================================================
274 def readable (self):
275 return 1
277 if os.name == 'mac':
278 # The macintosh will select a listening socket for
279 # write if you let it. What might this mean?
280 def writable (self):
281 return not self.accepting
282 else:
283 def writable (self):
284 return 1
286 # ==================================================
287 # socket object methods.
288 # ==================================================
290 def listen (self, num):
291 self.accepting = 1
292 if os.name == 'nt' and num > 5:
293 num = 1
294 return self.socket.listen (num)
296 def bind (self, addr):
297 self.addr = addr
298 return self.socket.bind (addr)
300 def connect (self, address):
301 self.connected = 0
302 try:
303 self.socket.connect (address)
304 except socket.error, why:
305 if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
306 return
307 else:
308 raise socket.error, why
309 self.connected = 1
310 self.handle_connect()
312 def accept (self):
313 try:
314 conn, addr = self.socket.accept()
315 return conn, addr
316 except socket.error, why:
317 if why[0] == EWOULDBLOCK:
318 pass
319 else:
320 raise socket.error, why
322 def send (self, data):
323 try:
324 result = self.socket.send (data)
325 return result
326 except socket.error, why:
327 if why[0] == EWOULDBLOCK:
328 return 0
329 else:
330 raise socket.error, why
331 return 0
333 def recv (self, buffer_size):
334 try:
335 data = self.socket.recv (buffer_size)
336 if not data:
337 # a closed connection is indicated by signaling
338 # a read condition, and having recv() return 0.
339 self.handle_close()
340 return ''
341 else:
342 return data
343 except socket.error, why:
344 # winsock sometimes throws ENOTCONN
345 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
346 self.handle_close()
347 return ''
348 else:
349 raise socket.error, why
351 def close (self):
352 self.del_channel()
353 self.socket.close()
355 # cheap inheritance, used to pass all other attribute
356 # references to the underlying socket object.
357 def __getattr__ (self, attr):
358 return getattr (self.socket, attr)
360 # log and log_info maybe overriden to provide more sophisitcated
361 # logging and warning methods. In general, log is for 'hit' logging
362 # and 'log_info' is for informational, warning and error logging.
364 def log (self, message):
365 sys.stderr.write ('log: %s\n' % str(message))
367 def log_info (self, message, type='info'):
368 if __debug__ or type != 'info':
369 print '%s: %s' % (type, message)
371 def handle_read_event (self):
372 if self.accepting:
373 # for an accepting socket, getting a read implies
374 # that we are connected
375 if not self.connected:
376 self.connected = 1
377 self.handle_accept()
378 elif not self.connected:
379 self.handle_connect()
380 self.connected = 1
381 self.handle_read()
382 else:
383 self.handle_read()
385 def handle_write_event (self):
386 # getting a write implies that we are connected
387 if not self.connected:
388 self.handle_connect()
389 self.connected = 1
390 self.handle_write()
392 def handle_expt_event (self):
393 self.handle_expt()
395 def handle_error (self):
396 (file,fun,line), t, v, tbinfo = compact_traceback()
398 # sometimes a user repr method will crash.
399 try:
400 self_repr = repr (self)
401 except:
402 self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
404 self.log_info (
405 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
406 self_repr,
409 tbinfo
411 'error'
413 self.close()
415 def handle_expt (self):
416 self.log_info ('unhandled exception', 'warning')
418 def handle_read (self):
419 self.log_info ('unhandled read event', 'warning')
421 def handle_write (self):
422 self.log_info ('unhandled write event', 'warning')
424 def handle_connect (self):
425 self.log_info ('unhandled connect event', 'warning')
427 def handle_accept (self):
428 self.log_info ('unhandled accept event', 'warning')
430 def handle_close (self):
431 self.log_info ('unhandled close event', 'warning')
432 self.close()
434 # ---------------------------------------------------------------------------
435 # adds simple buffered output capability, useful for simple clients.
436 # [for more sophisticated usage use asynchat.async_chat]
437 # ---------------------------------------------------------------------------
439 class dispatcher_with_send (dispatcher):
440 def __init__ (self, sock=None):
441 dispatcher.__init__ (self, sock)
442 self.out_buffer = ''
444 def initiate_send (self):
445 num_sent = 0
446 num_sent = dispatcher.send (self, self.out_buffer[:512])
447 self.out_buffer = self.out_buffer[num_sent:]
449 def handle_write (self):
450 self.initiate_send()
452 def writable (self):
453 return (not self.connected) or len(self.out_buffer)
455 def send (self, data):
456 if self.debug:
457 self.log_info ('sending %s' % repr(data))
458 self.out_buffer = self.out_buffer + data
459 self.initiate_send()
461 # ---------------------------------------------------------------------------
462 # used for debugging.
463 # ---------------------------------------------------------------------------
465 def compact_traceback ():
466 t,v,tb = sys.exc_info()
467 tbinfo = []
468 while 1:
469 tbinfo.append ((
470 tb.tb_frame.f_code.co_filename,
471 tb.tb_frame.f_code.co_name,
472 str(tb.tb_lineno)
474 tb = tb.tb_next
475 if not tb:
476 break
478 # just to be safe
479 del tb
481 file, function, line = tbinfo[-1]
482 info = '[' + '] ['.join(map(lambda x: '|'.join(x), tbinfo)) + ']'
483 return (file, function, line), t, v, info
485 def close_all (map=None):
486 if map is None:
487 map=socket_map
488 for x in map.values():
489 x.socket.close()
490 map.clear()
492 # Asynchronous File I/O:
494 # After a little research (reading man pages on various unixen, and
495 # digging through the linux kernel), I've determined that select()
496 # isn't meant for doing doing asynchronous file i/o.
497 # Heartening, though - reading linux/mm/filemap.c shows that linux
498 # supports asynchronous read-ahead. So _MOST_ of the time, the data
499 # will be sitting in memory for us already when we go to read it.
501 # What other OS's (besides NT) support async file i/o? [VMS?]
503 # Regardless, this is useful for pipes, and stdin/stdout...
505 import os
506 if os.name == 'posix':
507 import fcntl
508 import FCNTL
510 class file_wrapper:
511 # here we override just enough to make a file
512 # look like a socket for the purposes of asyncore.
513 def __init__ (self, fd):
514 self.fd = fd
516 def recv (self, *args):
517 return apply (os.read, (self.fd,)+args)
519 def send (self, *args):
520 return apply (os.write, (self.fd,)+args)
522 read = recv
523 write = send
525 def close (self):
526 return os.close (self.fd)
528 def fileno (self):
529 return self.fd
531 class file_dispatcher (dispatcher):
532 def __init__ (self, fd):
533 dispatcher.__init__ (self)
534 self.connected = 1
535 # set it to non-blocking mode
536 flags = fcntl.fcntl (fd, FCNTL.F_GETFL, 0)
537 flags = flags | FCNTL.O_NONBLOCK
538 fcntl.fcntl (fd, FCNTL.F_SETFL, flags)
539 self.set_file (fd)
541 def set_file (self, fd):
542 self._fileno = fd
543 self.socket = file_wrapper (fd)
544 self.add_channel()