This commit was manufactured by cvs2svn to create tag
[python/dscho.git] / Lib / asyncore.py
blobe79593a7c6f05489630b922c4ad000dde084cf95
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import exceptions
50 import select
51 import socket
52 import sys
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56 ENOTCONN, ESHUTDOWN, EINTR, EISCONN
58 try:
59 socket_map
60 except NameError:
61 socket_map = {}
63 class ExitNow (exceptions.Exception):
64 pass
66 DEBUG = 0
68 def poll (timeout=0.0, map=None):
69 if map is None:
70 map = socket_map
71 if map:
72 r = []; w = []; e = []
73 for fd, obj in map.items():
74 if obj.readable():
75 r.append (fd)
76 if obj.writable():
77 w.append (fd)
78 try:
79 r,w,e = select.select (r,w,e, timeout)
80 except select.error, err:
81 if err[0] != EINTR:
82 raise
84 if DEBUG:
85 print r,w,e
87 for fd in r:
88 try:
89 obj = map[fd]
90 except KeyError:
91 continue
93 try:
94 obj.handle_read_event()
95 except ExitNow:
96 raise ExitNow
97 except:
98 obj.handle_error()
100 for fd in w:
101 try:
102 obj = map[fd]
103 except KeyError:
104 continue
106 try:
107 obj.handle_write_event()
108 except ExitNow:
109 raise ExitNow
110 except:
111 obj.handle_error()
113 def poll2 (timeout=0.0, map=None):
114 import poll
115 if map is None:
116 map=socket_map
117 if timeout is not None:
118 # timeout is in milliseconds
119 timeout = int(timeout*1000)
120 if map:
121 l = []
122 for fd, obj in map.items():
123 flags = 0
124 if obj.readable():
125 flags = poll.POLLIN
126 if obj.writable():
127 flags = flags | poll.POLLOUT
128 if flags:
129 l.append ((fd, flags))
130 r = poll.poll (l, timeout)
131 for fd, flags in r:
132 try:
133 obj = map[fd]
134 except KeyError:
135 continue
137 try:
138 if (flags & poll.POLLIN):
139 obj.handle_read_event()
140 if (flags & poll.POLLOUT):
141 obj.handle_write_event()
142 except ExitNow:
143 raise ExitNow
144 except:
145 obj.handle_error()
147 def poll3 (timeout=0.0, map=None):
148 # Use the poll() support added to the select module in Python 2.0
149 if map is None:
150 map=socket_map
151 if timeout is not None:
152 # timeout is in milliseconds
153 timeout = int(timeout*1000)
154 pollster = select.poll()
155 if map:
156 for fd, obj in map.items():
157 flags = 0
158 if obj.readable():
159 flags = select.POLLIN
160 if obj.writable():
161 flags = flags | select.POLLOUT
162 if flags:
163 pollster.register(fd, flags)
164 try:
165 r = pollster.poll (timeout)
166 except select.error, err:
167 if err[0] != EINTR:
168 raise
169 r = []
170 for fd, flags in r:
171 try:
172 obj = map[fd]
173 except KeyError:
174 continue
176 try:
177 if (flags & select.POLLIN):
178 obj.handle_read_event()
179 if (flags & select.POLLOUT):
180 obj.handle_write_event()
181 except ExitNow:
182 raise ExitNow
183 except:
184 obj.handle_error()
186 def loop (timeout=30.0, use_poll=0, map=None):
188 if map is None:
189 map=socket_map
191 if use_poll:
192 if hasattr (select, 'poll'):
193 poll_fun = poll3
194 else:
195 poll_fun = poll2
196 else:
197 poll_fun = poll
199 while map:
200 poll_fun (timeout, map)
202 class dispatcher:
203 debug = 0
204 connected = 0
205 accepting = 0
206 closing = 0
207 addr = None
209 def __init__ (self, sock=None, map=None):
210 if sock:
211 self.set_socket (sock, map)
212 # I think it should inherit this anyway
213 self.socket.setblocking (0)
214 self.connected = 1
215 # XXX Does the constructor require that the socket passed
216 # be connected?
217 try:
218 self.addr = sock.getpeername()
219 except socket.error:
220 # The addr isn't crucial
221 pass
222 else:
223 self.socket = None
225 def __repr__ (self):
226 status = [self.__class__.__module__+"."+self.__class__.__name__]
227 if self.accepting and self.addr:
228 status.append ('listening')
229 elif self.connected:
230 status.append ('connected')
231 if self.addr is not None:
232 try:
233 status.append ('%s:%d' % self.addr)
234 except TypeError:
235 status.append (repr(self.addr))
236 return '<%s at %#x>' % (' '.join (status), id (self))
238 def add_channel (self, map=None):
239 #self.log_info ('adding channel %s' % self)
240 if map is None:
241 map=socket_map
242 map [self._fileno] = self
244 def del_channel (self, map=None):
245 fd = self._fileno
246 if map is None:
247 map=socket_map
248 if map.has_key (fd):
249 #self.log_info ('closing channel %d:%s' % (fd, self))
250 del map [fd]
252 def create_socket (self, family, type):
253 self.family_and_type = family, type
254 self.socket = socket.socket (family, type)
255 self.socket.setblocking(0)
256 self._fileno = self.socket.fileno()
257 self.add_channel()
259 def set_socket (self, sock, map=None):
260 self.socket = sock
261 ## self.__dict__['socket'] = sock
262 self._fileno = sock.fileno()
263 self.add_channel (map)
265 def set_reuse_addr (self):
266 # try to re-use a server port if possible
267 try:
268 self.socket.setsockopt (
269 socket.SOL_SOCKET, socket.SO_REUSEADDR,
270 self.socket.getsockopt (socket.SOL_SOCKET,
271 socket.SO_REUSEADDR) | 1
273 except socket.error:
274 pass
276 # ==================================================
277 # predicates for select()
278 # these are used as filters for the lists of sockets
279 # to pass to select().
280 # ==================================================
282 def readable (self):
283 return 1
285 if os.name == 'mac':
286 # The macintosh will select a listening socket for
287 # write if you let it. What might this mean?
288 def writable (self):
289 return not self.accepting
290 else:
291 def writable (self):
292 return 1
294 # ==================================================
295 # socket object methods.
296 # ==================================================
298 def listen (self, num):
299 self.accepting = 1
300 if os.name == 'nt' and num > 5:
301 num = 1
302 return self.socket.listen (num)
304 def bind (self, addr):
305 self.addr = addr
306 return self.socket.bind (addr)
308 def connect (self, address):
309 self.connected = 0
310 err = self.socket.connect_ex(address)
311 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
312 return
313 if err in (0, EISCONN):
314 self.addr = address
315 self.connected = 1
316 self.handle_connect()
317 else:
318 raise socket.error, err
320 def accept (self):
321 try:
322 conn, addr = self.socket.accept()
323 return conn, addr
324 except socket.error, why:
325 if why[0] == EWOULDBLOCK:
326 pass
327 else:
328 raise socket.error, why
330 def send (self, data):
331 try:
332 result = self.socket.send (data)
333 return result
334 except socket.error, why:
335 if why[0] == EWOULDBLOCK:
336 return 0
337 else:
338 raise socket.error, why
339 return 0
341 def recv (self, buffer_size):
342 try:
343 data = self.socket.recv (buffer_size)
344 if not data:
345 # a closed connection is indicated by signaling
346 # a read condition, and having recv() return 0.
347 self.handle_close()
348 return ''
349 else:
350 return data
351 except socket.error, why:
352 # winsock sometimes throws ENOTCONN
353 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
354 self.handle_close()
355 return ''
356 else:
357 raise socket.error, why
359 def close (self):
360 self.del_channel()
361 self.socket.close()
363 # cheap inheritance, used to pass all other attribute
364 # references to the underlying socket object.
365 def __getattr__ (self, attr):
366 return getattr (self.socket, attr)
368 # log and log_info maybe overriden to provide more sophisitcated
369 # logging and warning methods. In general, log is for 'hit' logging
370 # and 'log_info' is for informational, warning and error logging.
372 def log (self, message):
373 sys.stderr.write ('log: %s\n' % str(message))
375 def log_info (self, message, type='info'):
376 if __debug__ or type != 'info':
377 print '%s: %s' % (type, message)
379 def handle_read_event (self):
380 if self.accepting:
381 # for an accepting socket, getting a read implies
382 # that we are connected
383 if not self.connected:
384 self.connected = 1
385 self.handle_accept()
386 elif not self.connected:
387 self.handle_connect()
388 self.connected = 1
389 self.handle_read()
390 else:
391 self.handle_read()
393 def handle_write_event (self):
394 # getting a write implies that we are connected
395 if not self.connected:
396 self.handle_connect()
397 self.connected = 1
398 self.handle_write()
400 def handle_expt_event (self):
401 self.handle_expt()
403 def handle_error (self):
404 nil, t, v, tbinfo = compact_traceback()
406 # sometimes a user repr method will crash.
407 try:
408 self_repr = repr (self)
409 except:
410 self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
412 self.log_info (
413 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
414 self_repr,
417 tbinfo
419 'error'
421 self.close()
423 def handle_expt (self):
424 self.log_info ('unhandled exception', 'warning')
426 def handle_read (self):
427 self.log_info ('unhandled read event', 'warning')
429 def handle_write (self):
430 self.log_info ('unhandled write event', 'warning')
432 def handle_connect (self):
433 self.log_info ('unhandled connect event', 'warning')
435 def handle_accept (self):
436 self.log_info ('unhandled accept event', 'warning')
438 def handle_close (self):
439 self.log_info ('unhandled close event', 'warning')
440 self.close()
442 # ---------------------------------------------------------------------------
443 # adds simple buffered output capability, useful for simple clients.
444 # [for more sophisticated usage use asynchat.async_chat]
445 # ---------------------------------------------------------------------------
447 class dispatcher_with_send (dispatcher):
448 def __init__ (self, sock=None):
449 dispatcher.__init__ (self, sock)
450 self.out_buffer = ''
452 def initiate_send (self):
453 num_sent = 0
454 num_sent = dispatcher.send (self, self.out_buffer[:512])
455 self.out_buffer = self.out_buffer[num_sent:]
457 def handle_write (self):
458 self.initiate_send()
460 def writable (self):
461 return (not self.connected) or len(self.out_buffer)
463 def send (self, data):
464 if self.debug:
465 self.log_info ('sending %s' % repr(data))
466 self.out_buffer = self.out_buffer + data
467 self.initiate_send()
469 # ---------------------------------------------------------------------------
470 # used for debugging.
471 # ---------------------------------------------------------------------------
473 def compact_traceback ():
474 t,v,tb = sys.exc_info()
475 tbinfo = []
476 while 1:
477 tbinfo.append ((
478 tb.tb_frame.f_code.co_filename,
479 tb.tb_frame.f_code.co_name,
480 str(tb.tb_lineno)
482 tb = tb.tb_next
483 if not tb:
484 break
486 # just to be safe
487 del tb
489 file, function, line = tbinfo[-1]
490 info = '[' + '] ['.join(map(lambda x: '|'.join(x), tbinfo)) + ']'
491 return (file, function, line), t, v, info
493 def close_all (map=None):
494 if map is None:
495 map=socket_map
496 for x in map.values():
497 x.socket.close()
498 map.clear()
500 # Asynchronous File I/O:
502 # After a little research (reading man pages on various unixen, and
503 # digging through the linux kernel), I've determined that select()
504 # isn't meant for doing doing asynchronous file i/o.
505 # Heartening, though - reading linux/mm/filemap.c shows that linux
506 # supports asynchronous read-ahead. So _MOST_ of the time, the data
507 # will be sitting in memory for us already when we go to read it.
509 # What other OS's (besides NT) support async file i/o? [VMS?]
511 # Regardless, this is useful for pipes, and stdin/stdout...
513 import os
514 if os.name == 'posix':
515 import fcntl
517 class file_wrapper:
518 # here we override just enough to make a file
519 # look like a socket for the purposes of asyncore.
520 def __init__ (self, fd):
521 self.fd = fd
523 def recv (self, *args):
524 return apply (os.read, (self.fd,)+args)
526 def send (self, *args):
527 return apply (os.write, (self.fd,)+args)
529 read = recv
530 write = send
532 def close (self):
533 return os.close (self.fd)
535 def fileno (self):
536 return self.fd
538 class file_dispatcher (dispatcher):
539 def __init__ (self, fd):
540 dispatcher.__init__ (self)
541 self.connected = 1
542 # set it to non-blocking mode
543 flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0)
544 flags = flags | os.O_NONBLOCK
545 fcntl.fcntl (fd, fcntl.F_SETFL, flags)
546 self.set_file (fd)
548 def set_file (self, fd):
549 self._fileno = fd
550 self.socket = file_wrapper (fd)
551 self.add_channel()