Merged release21-maint changes.
[python/dscho.git] / Lib / asyncore.py
blobec769dbe7e23fa6a31467aaa291f1e8bb155fd84
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import exceptions
50 import select
51 import socket
52 import sys
53 import types
55 import os
56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
57 ENOTCONN, ESHUTDOWN
59 try:
60 socket_map
61 except NameError:
62 socket_map = {}
64 class ExitNow (exceptions.Exception):
65 pass
67 DEBUG = 0
69 def poll (timeout=0.0, map=None):
70 global DEBUG
71 if map is None:
72 map = socket_map
73 if map:
74 r = []; w = []; e = []
75 for fd, obj in map.items():
76 if obj.readable():
77 r.append (fd)
78 if obj.writable():
79 w.append (fd)
80 r,w,e = select.select (r,w,e, timeout)
82 if DEBUG:
83 print r,w,e
85 for fd in r:
86 try:
87 obj = map[fd]
88 try:
89 obj.handle_read_event()
90 except ExitNow:
91 raise ExitNow
92 except:
93 obj.handle_error()
94 except KeyError:
95 pass
97 for fd in w:
98 try:
99 obj = map[fd]
100 try:
101 obj.handle_write_event()
102 except ExitNow:
103 raise ExitNow
104 except:
105 obj.handle_error()
106 except KeyError:
107 pass
109 def poll2 (timeout=0.0, map=None):
110 import poll
111 if map is None:
112 map=socket_map
113 # timeout is in milliseconds
114 timeout = int(timeout*1000)
115 if map:
116 l = []
117 for fd, obj in map.items():
118 flags = 0
119 if obj.readable():
120 flags = poll.POLLIN
121 if obj.writable():
122 flags = flags | poll.POLLOUT
123 if flags:
124 l.append ((fd, flags))
125 r = poll.poll (l, timeout)
126 for fd, flags in r:
127 try:
128 obj = map[fd]
129 try:
130 if (flags & poll.POLLIN):
131 obj.handle_read_event()
132 if (flags & poll.POLLOUT):
133 obj.handle_write_event()
134 except ExitNow:
135 raise ExitNow
136 except:
137 obj.handle_error()
138 except KeyError:
139 pass
141 def poll3 (timeout=0.0, map=None):
142 # Use the poll() support added to the select module in Python 2.0
143 if map is None:
144 map=socket_map
145 # timeout is in milliseconds
146 timeout = int(timeout*1000)
147 pollster = select.poll()
148 if map:
149 l = []
150 for fd, obj in map.items():
151 flags = 0
152 if obj.readable():
153 flags = select.POLLIN
154 if obj.writable():
155 flags = flags | select.POLLOUT
156 if flags:
157 pollster.register(fd, flags)
158 r = pollster.poll (timeout)
159 for fd, flags in r:
160 try:
161 obj = map[fd]
162 try:
163 if (flags & select.POLLIN):
164 obj.handle_read_event()
165 if (flags & select.POLLOUT):
166 obj.handle_write_event()
167 except ExitNow:
168 raise ExitNow
169 except:
170 obj.handle_error()
171 except KeyError:
172 pass
174 def loop (timeout=30.0, use_poll=0, map=None):
176 if map is None:
177 map=socket_map
179 if use_poll:
180 if hasattr (select, 'poll'):
181 poll_fun = poll3
182 else:
183 poll_fun = poll2
184 else:
185 poll_fun = poll
187 while map:
188 poll_fun (timeout, map)
190 class dispatcher:
191 debug = 0
192 connected = 0
193 accepting = 0
194 closing = 0
195 addr = None
197 def __init__ (self, sock=None, map=None):
198 if sock:
199 self.set_socket (sock, map)
200 # I think it should inherit this anyway
201 self.socket.setblocking (0)
202 self.connected = 1
204 def __repr__ (self):
205 try:
206 status = []
207 if self.accepting and self.addr:
208 status.append ('listening')
209 elif self.connected:
210 status.append ('connected')
211 if self.addr:
212 if self.addr == types.TupleType:
213 status.append ('%s:%d' % self.addr)
214 else:
215 status.append (self.addr)
216 return '<%s %s at %x>' % (self.__class__.__name__,
217 ' '.join (status), id (self))
218 except:
219 pass
221 try:
222 ar = repr (self.addr)
223 except AttributeError:
224 ar = 'no self.addr!'
226 return '<__repr__() failed for %s instance at %x (addr=%s)>' % \
227 (self.__class__.__name__, id (self), ar)
229 def add_channel (self, map=None):
230 #self.log_info ('adding channel %s' % self)
231 if map is None:
232 map=socket_map
233 map [self._fileno] = self
235 def del_channel (self, map=None):
236 fd = self._fileno
237 if map is None:
238 map=socket_map
239 if map.has_key (fd):
240 #self.log_info ('closing channel %d:%s' % (fd, self))
241 del map [fd]
243 def create_socket (self, family, type):
244 self.family_and_type = family, type
245 self.socket = socket.socket (family, type)
246 self.socket.setblocking(0)
247 self._fileno = self.socket.fileno()
248 self.add_channel()
250 def set_socket (self, sock, map=None):
251 self.__dict__['socket'] = sock
252 self._fileno = sock.fileno()
253 self.add_channel (map)
255 def set_reuse_addr (self):
256 # try to re-use a server port if possible
257 try:
258 self.socket.setsockopt (
259 socket.SOL_SOCKET, socket.SO_REUSEADDR,
260 self.socket.getsockopt (socket.SOL_SOCKET,
261 socket.SO_REUSEADDR) | 1
263 except socket.error:
264 pass
266 # ==================================================
267 # predicates for select()
268 # these are used as filters for the lists of sockets
269 # to pass to select().
270 # ==================================================
272 def readable (self):
273 return 1
275 if os.name == 'mac':
276 # The macintosh will select a listening socket for
277 # write if you let it. What might this mean?
278 def writable (self):
279 return not self.accepting
280 else:
281 def writable (self):
282 return 1
284 # ==================================================
285 # socket object methods.
286 # ==================================================
288 def listen (self, num):
289 self.accepting = 1
290 if os.name == 'nt' and num > 5:
291 num = 1
292 return self.socket.listen (num)
294 def bind (self, addr):
295 self.addr = addr
296 return self.socket.bind (addr)
298 def connect (self, address):
299 self.connected = 0
300 # XXX why not use connect_ex?
301 try:
302 self.socket.connect (address)
303 except socket.error, why:
304 if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
305 return
306 else:
307 raise socket.error, why
308 self.connected = 1
309 self.handle_connect()
311 def accept (self):
312 try:
313 conn, addr = self.socket.accept()
314 return conn, addr
315 except socket.error, why:
316 if why[0] == EWOULDBLOCK:
317 pass
318 else:
319 raise socket.error, why
321 def send (self, data):
322 try:
323 result = self.socket.send (data)
324 return result
325 except socket.error, why:
326 if why[0] == EWOULDBLOCK:
327 return 0
328 else:
329 raise socket.error, why
330 return 0
332 def recv (self, buffer_size):
333 try:
334 data = self.socket.recv (buffer_size)
335 if not data:
336 # a closed connection is indicated by signaling
337 # a read condition, and having recv() return 0.
338 self.handle_close()
339 return ''
340 else:
341 return data
342 except socket.error, why:
343 # winsock sometimes throws ENOTCONN
344 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
345 self.handle_close()
346 return ''
347 else:
348 raise socket.error, why
350 def close (self):
351 self.del_channel()
352 self.socket.close()
354 # cheap inheritance, used to pass all other attribute
355 # references to the underlying socket object.
356 def __getattr__ (self, attr):
357 return getattr (self.socket, attr)
359 # log and log_info maybe overriden to provide more sophisitcated
360 # logging and warning methods. In general, log is for 'hit' logging
361 # and 'log_info' is for informational, warning and error logging.
363 def log (self, message):
364 sys.stderr.write ('log: %s\n' % str(message))
366 def log_info (self, message, type='info'):
367 if __debug__ or type != 'info':
368 print '%s: %s' % (type, message)
370 def handle_read_event (self):
371 if self.accepting:
372 # for an accepting socket, getting a read implies
373 # that we are connected
374 if not self.connected:
375 self.connected = 1
376 self.handle_accept()
377 elif not self.connected:
378 self.handle_connect()
379 self.connected = 1
380 self.handle_read()
381 else:
382 self.handle_read()
384 def handle_write_event (self):
385 # getting a write implies that we are connected
386 if not self.connected:
387 self.handle_connect()
388 self.connected = 1
389 self.handle_write()
391 def handle_expt_event (self):
392 self.handle_expt()
394 def handle_error (self):
395 nil, t, v, tbinfo = compact_traceback()
397 # sometimes a user repr method will crash.
398 try:
399 self_repr = repr (self)
400 except:
401 self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
403 self.log_info (
404 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
405 self_repr,
408 tbinfo
410 'error'
412 self.close()
414 def handle_expt (self):
415 self.log_info ('unhandled exception', 'warning')
417 def handle_read (self):
418 self.log_info ('unhandled read event', 'warning')
420 def handle_write (self):
421 self.log_info ('unhandled write event', 'warning')
423 def handle_connect (self):
424 self.log_info ('unhandled connect event', 'warning')
426 def handle_accept (self):
427 self.log_info ('unhandled accept event', 'warning')
429 def handle_close (self):
430 self.log_info ('unhandled close event', 'warning')
431 self.close()
433 # ---------------------------------------------------------------------------
434 # adds simple buffered output capability, useful for simple clients.
435 # [for more sophisticated usage use asynchat.async_chat]
436 # ---------------------------------------------------------------------------
438 class dispatcher_with_send (dispatcher):
439 def __init__ (self, sock=None):
440 dispatcher.__init__ (self, sock)
441 self.out_buffer = ''
443 def initiate_send (self):
444 num_sent = 0
445 num_sent = dispatcher.send (self, self.out_buffer[:512])
446 self.out_buffer = self.out_buffer[num_sent:]
448 def handle_write (self):
449 self.initiate_send()
451 def writable (self):
452 return (not self.connected) or len(self.out_buffer)
454 def send (self, data):
455 if self.debug:
456 self.log_info ('sending %s' % repr(data))
457 self.out_buffer = self.out_buffer + data
458 self.initiate_send()
460 # ---------------------------------------------------------------------------
461 # used for debugging.
462 # ---------------------------------------------------------------------------
464 def compact_traceback ():
465 t,v,tb = sys.exc_info()
466 tbinfo = []
467 while 1:
468 tbinfo.append ((
469 tb.tb_frame.f_code.co_filename,
470 tb.tb_frame.f_code.co_name,
471 str(tb.tb_lineno)
473 tb = tb.tb_next
474 if not tb:
475 break
477 # just to be safe
478 del tb
480 file, function, line = tbinfo[-1]
481 info = '[' + '] ['.join(map(lambda x: '|'.join(x), tbinfo)) + ']'
482 return (file, function, line), t, v, info
484 def close_all (map=None):
485 if map is None:
486 map=socket_map
487 for x in map.values():
488 x.socket.close()
489 map.clear()
491 # Asynchronous File I/O:
493 # After a little research (reading man pages on various unixen, and
494 # digging through the linux kernel), I've determined that select()
495 # isn't meant for doing doing asynchronous file i/o.
496 # Heartening, though - reading linux/mm/filemap.c shows that linux
497 # supports asynchronous read-ahead. So _MOST_ of the time, the data
498 # will be sitting in memory for us already when we go to read it.
500 # What other OS's (besides NT) support async file i/o? [VMS?]
502 # Regardless, this is useful for pipes, and stdin/stdout...
504 import os
505 if os.name == 'posix':
506 import fcntl
508 class file_wrapper:
509 # here we override just enough to make a file
510 # look like a socket for the purposes of asyncore.
511 def __init__ (self, fd):
512 self.fd = fd
514 def recv (self, *args):
515 return apply (os.read, (self.fd,)+args)
517 def send (self, *args):
518 return apply (os.write, (self.fd,)+args)
520 read = recv
521 write = send
523 def close (self):
524 return os.close (self.fd)
526 def fileno (self):
527 return self.fd
529 class file_dispatcher (dispatcher):
530 def __init__ (self, fd):
531 dispatcher.__init__ (self)
532 self.connected = 1
533 # set it to non-blocking mode
534 flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0)
535 flags = flags | os.O_NONBLOCK
536 fcntl.fcntl (fd, fcntl.F_SETFL, flags)
537 self.set_file (fd)
539 def set_file (self, fd):
540 self._fileno = fd
541 self.socket = file_wrapper (fd)
542 self.add_channel()