This commit was manufactured by cvs2svn to create tag 'r212'.
[python/dscho.git] / Lib / asyncore.py
blob7340731e0794050080f354999db6fdd881ed3ea0
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import exceptions
50 import select
51 import socket
52 import sys
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56 ENOTCONN, ESHUTDOWN, EINTR
58 try:
59 socket_map
60 except NameError:
61 socket_map = {}
63 class ExitNow (exceptions.Exception):
64 pass
66 DEBUG = 0
68 def poll (timeout=0.0, map=None):
69 global DEBUG
70 if map is None:
71 map = socket_map
72 if map:
73 r = []; w = []; e = []
74 for fd, obj in map.items():
75 if obj.readable():
76 r.append (fd)
77 if obj.writable():
78 w.append (fd)
79 try:
80 r,w,e = select.select (r,w,e, timeout)
81 except select.error, err:
82 if err[0] != EINTR:
83 raise
85 if DEBUG:
86 print r,w,e
88 for fd in r:
89 try:
90 obj = map[fd]
91 try:
92 obj.handle_read_event()
93 except ExitNow:
94 raise ExitNow
95 except:
96 obj.handle_error()
97 except KeyError:
98 pass
100 for fd in w:
101 try:
102 obj = map[fd]
103 try:
104 obj.handle_write_event()
105 except ExitNow:
106 raise ExitNow
107 except:
108 obj.handle_error()
109 except KeyError:
110 pass
112 def poll2 (timeout=0.0, map=None):
113 import poll
114 if map is None:
115 map=socket_map
116 # timeout is in milliseconds
117 timeout = int(timeout*1000)
118 if map:
119 l = []
120 for fd, obj in map.items():
121 flags = 0
122 if obj.readable():
123 flags = poll.POLLIN
124 if obj.writable():
125 flags = flags | poll.POLLOUT
126 if flags:
127 l.append ((fd, flags))
128 r = poll.poll (l, timeout)
129 for fd, flags in r:
130 try:
131 obj = map[fd]
132 try:
133 if (flags & poll.POLLIN):
134 obj.handle_read_event()
135 if (flags & poll.POLLOUT):
136 obj.handle_write_event()
137 except ExitNow:
138 raise ExitNow
139 except:
140 obj.handle_error()
141 except KeyError:
142 pass
144 def poll3 (timeout=0.0, map=None):
145 # Use the poll() support added to the select module in Python 2.0
146 if map is None:
147 map=socket_map
148 # timeout is in milliseconds
149 timeout = int(timeout*1000)
150 pollster = select.poll()
151 if map:
152 l = []
153 for fd, obj in map.items():
154 flags = 0
155 if obj.readable():
156 flags = select.POLLIN
157 if obj.writable():
158 flags = flags | select.POLLOUT
159 if flags:
160 pollster.register(fd, flags)
161 try:
162 r = pollster.poll (timeout)
163 except select.error, err:
164 if err[0] != EINTR:
165 raise
166 r = []
167 for fd, flags in r:
168 try:
169 obj = map[fd]
170 try:
171 if (flags & select.POLLIN):
172 obj.handle_read_event()
173 if (flags & select.POLLOUT):
174 obj.handle_write_event()
175 except ExitNow:
176 raise ExitNow
177 except:
178 obj.handle_error()
179 except KeyError:
180 pass
182 def loop (timeout=30.0, use_poll=0, map=None):
184 if map is None:
185 map=socket_map
187 if use_poll:
188 if hasattr (select, 'poll'):
189 poll_fun = poll3
190 else:
191 poll_fun = poll2
192 else:
193 poll_fun = poll
195 while map:
196 poll_fun (timeout, map)
198 class dispatcher:
199 debug = 0
200 connected = 0
201 accepting = 0
202 closing = 0
203 addr = None
205 def __init__ (self, sock=None, map=None):
206 if sock:
207 self.set_socket (sock, map)
208 # I think it should inherit this anyway
209 self.socket.setblocking (0)
210 self.connected = 1
212 def __repr__ (self):
213 try:
214 status = []
215 if self.accepting and self.addr:
216 status.append ('listening')
217 elif self.connected:
218 status.append ('connected')
219 if self.addr:
220 status.append ('%s:%d' % self.addr)
221 return '<%s %s at %x>' % (
222 self.__class__.__name__,
223 ' '.join (status),
224 id(self)
226 except:
227 try:
228 ar = repr(self.addr)
229 except:
230 ar = 'no self.addr!'
232 return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
234 def add_channel (self, map=None):
235 #self.log_info ('adding channel %s' % self)
236 if map is None:
237 map=socket_map
238 map [self._fileno] = self
240 def del_channel (self, map=None):
241 fd = self._fileno
242 if map is None:
243 map=socket_map
244 if map.has_key (fd):
245 #self.log_info ('closing channel %d:%s' % (fd, self))
246 del map [fd]
248 def create_socket (self, family, type):
249 self.family_and_type = family, type
250 self.socket = socket.socket (family, type)
251 self.socket.setblocking(0)
252 self._fileno = self.socket.fileno()
253 self.add_channel()
255 def set_socket (self, sock, map=None):
256 self.__dict__['socket'] = sock
257 self._fileno = sock.fileno()
258 self.add_channel (map)
260 def set_reuse_addr (self):
261 # try to re-use a server port if possible
262 try:
263 self.socket.setsockopt (
264 socket.SOL_SOCKET, socket.SO_REUSEADDR,
265 self.socket.getsockopt (socket.SOL_SOCKET,
266 socket.SO_REUSEADDR) | 1
268 except:
269 pass
271 # ==================================================
272 # predicates for select()
273 # these are used as filters for the lists of sockets
274 # to pass to select().
275 # ==================================================
277 def readable (self):
278 return 1
280 if os.name == 'mac':
281 # The macintosh will select a listening socket for
282 # write if you let it. What might this mean?
283 def writable (self):
284 return not self.accepting
285 else:
286 def writable (self):
287 return 1
289 # ==================================================
290 # socket object methods.
291 # ==================================================
293 def listen (self, num):
294 self.accepting = 1
295 if os.name == 'nt' and num > 5:
296 num = 1
297 return self.socket.listen (num)
299 def bind (self, addr):
300 self.addr = addr
301 return self.socket.bind (addr)
303 def connect (self, address):
304 self.connected = 0
305 try:
306 self.socket.connect (address)
307 except socket.error, why:
308 if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
309 return
310 else:
311 raise socket.error, why
312 self.connected = 1
313 self.handle_connect()
315 def accept (self):
316 try:
317 conn, addr = self.socket.accept()
318 return conn, addr
319 except socket.error, why:
320 if why[0] == EWOULDBLOCK:
321 pass
322 else:
323 raise socket.error, why
325 def send (self, data):
326 try:
327 result = self.socket.send (data)
328 return result
329 except socket.error, why:
330 if why[0] == EWOULDBLOCK:
331 return 0
332 else:
333 raise socket.error, why
334 return 0
336 def recv (self, buffer_size):
337 try:
338 data = self.socket.recv (buffer_size)
339 if not data:
340 # a closed connection is indicated by signaling
341 # a read condition, and having recv() return 0.
342 self.handle_close()
343 return ''
344 else:
345 return data
346 except socket.error, why:
347 # winsock sometimes throws ENOTCONN
348 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
349 self.handle_close()
350 return ''
351 else:
352 raise socket.error, why
354 def close (self):
355 self.del_channel()
356 self.socket.close()
358 # cheap inheritance, used to pass all other attribute
359 # references to the underlying socket object.
360 def __getattr__ (self, attr):
361 return getattr (self.socket, attr)
363 # log and log_info maybe overriden to provide more sophisitcated
364 # logging and warning methods. In general, log is for 'hit' logging
365 # and 'log_info' is for informational, warning and error logging.
367 def log (self, message):
368 sys.stderr.write ('log: %s\n' % str(message))
370 def log_info (self, message, type='info'):
371 if __debug__ or type != 'info':
372 print '%s: %s' % (type, message)
374 def handle_read_event (self):
375 if self.accepting:
376 # for an accepting socket, getting a read implies
377 # that we are connected
378 if not self.connected:
379 self.connected = 1
380 self.handle_accept()
381 elif not self.connected:
382 self.handle_connect()
383 self.connected = 1
384 self.handle_read()
385 else:
386 self.handle_read()
388 def handle_write_event (self):
389 # getting a write implies that we are connected
390 if not self.connected:
391 self.handle_connect()
392 self.connected = 1
393 self.handle_write()
395 def handle_expt_event (self):
396 self.handle_expt()
398 def handle_error (self):
399 nil, t, v, tbinfo = compact_traceback()
401 # sometimes a user repr method will crash.
402 try:
403 self_repr = repr (self)
404 except:
405 self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
407 self.log_info (
408 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
409 self_repr,
412 tbinfo
414 'error'
416 self.close()
418 def handle_expt (self):
419 self.log_info ('unhandled exception', 'warning')
421 def handle_read (self):
422 self.log_info ('unhandled read event', 'warning')
424 def handle_write (self):
425 self.log_info ('unhandled write event', 'warning')
427 def handle_connect (self):
428 self.log_info ('unhandled connect event', 'warning')
430 def handle_accept (self):
431 self.log_info ('unhandled accept event', 'warning')
433 def handle_close (self):
434 self.log_info ('unhandled close event', 'warning')
435 self.close()
437 # ---------------------------------------------------------------------------
438 # adds simple buffered output capability, useful for simple clients.
439 # [for more sophisticated usage use asynchat.async_chat]
440 # ---------------------------------------------------------------------------
442 class dispatcher_with_send (dispatcher):
443 def __init__ (self, sock=None):
444 dispatcher.__init__ (self, sock)
445 self.out_buffer = ''
447 def initiate_send (self):
448 num_sent = 0
449 num_sent = dispatcher.send (self, self.out_buffer[:512])
450 self.out_buffer = self.out_buffer[num_sent:]
452 def handle_write (self):
453 self.initiate_send()
455 def writable (self):
456 return (not self.connected) or len(self.out_buffer)
458 def send (self, data):
459 if self.debug:
460 self.log_info ('sending %s' % repr(data))
461 self.out_buffer = self.out_buffer + data
462 self.initiate_send()
464 # ---------------------------------------------------------------------------
465 # used for debugging.
466 # ---------------------------------------------------------------------------
468 def compact_traceback ():
469 t,v,tb = sys.exc_info()
470 tbinfo = []
471 while 1:
472 tbinfo.append ((
473 tb.tb_frame.f_code.co_filename,
474 tb.tb_frame.f_code.co_name,
475 str(tb.tb_lineno)
477 tb = tb.tb_next
478 if not tb:
479 break
481 # just to be safe
482 del tb
484 file, function, line = tbinfo[-1]
485 info = '[' + '] ['.join(map(lambda x: '|'.join(x), tbinfo)) + ']'
486 return (file, function, line), t, v, info
488 def close_all (map=None):
489 if map is None:
490 map=socket_map
491 for x in map.values():
492 x.socket.close()
493 map.clear()
495 # Asynchronous File I/O:
497 # After a little research (reading man pages on various unixen, and
498 # digging through the linux kernel), I've determined that select()
499 # isn't meant for doing doing asynchronous file i/o.
500 # Heartening, though - reading linux/mm/filemap.c shows that linux
501 # supports asynchronous read-ahead. So _MOST_ of the time, the data
502 # will be sitting in memory for us already when we go to read it.
504 # What other OS's (besides NT) support async file i/o? [VMS?]
506 # Regardless, this is useful for pipes, and stdin/stdout...
508 import os
509 if os.name == 'posix':
510 import fcntl
511 import FCNTL
513 class file_wrapper:
514 # here we override just enough to make a file
515 # look like a socket for the purposes of asyncore.
516 def __init__ (self, fd):
517 self.fd = fd
519 def recv (self, *args):
520 return apply (os.read, (self.fd,)+args)
522 def send (self, *args):
523 return apply (os.write, (self.fd,)+args)
525 read = recv
526 write = send
528 def close (self):
529 return os.close (self.fd)
531 def fileno (self):
532 return self.fd
534 class file_dispatcher (dispatcher):
535 def __init__ (self, fd):
536 dispatcher.__init__ (self)
537 self.connected = 1
538 # set it to non-blocking mode
539 flags = fcntl.fcntl (fd, FCNTL.F_GETFL, 0)
540 flags = flags | FCNTL.O_NONBLOCK
541 fcntl.fcntl (fd, FCNTL.F_SETFL, flags)
542 self.set_file (fd)
544 def set_file (self, fd):
545 self._fileno = fd
546 self.socket = file_wrapper (fd)
547 self.add_channel()