- Got rid of newmodule.c
[python/dscho.git] / Lib / asyncore.py
blobc7a8f126847700774dc2d616da0c3d8f142dbdd6
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import exceptions
50 import select
51 import socket
52 import sys
54 import os
55 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
56 ENOTCONN, ESHUTDOWN, EINTR, EISCONN
58 try:
59 socket_map
60 except NameError:
61 socket_map = {}
63 class ExitNow (exceptions.Exception):
64 pass
66 DEBUG = 0
68 def poll (timeout=0.0, map=None):
69 if map is None:
70 map = socket_map
71 if map:
72 r = []; w = []; e = []
73 for fd, obj in map.items():
74 if obj.readable():
75 r.append (fd)
76 if obj.writable():
77 w.append (fd)
78 try:
79 r,w,e = select.select (r,w,e, timeout)
80 except select.error, err:
81 if err[0] != EINTR:
82 raise
83 r = []; w = []; e = []
85 if DEBUG:
86 print r,w,e
88 for fd in r:
89 try:
90 obj = map[fd]
91 except KeyError:
92 continue
94 try:
95 obj.handle_read_event()
96 except ExitNow:
97 raise ExitNow
98 except:
99 obj.handle_error()
101 for fd in w:
102 try:
103 obj = map[fd]
104 except KeyError:
105 continue
107 try:
108 obj.handle_write_event()
109 except ExitNow:
110 raise ExitNow
111 except:
112 obj.handle_error()
114 def poll2 (timeout=0.0, map=None):
115 import poll
116 if map is None:
117 map=socket_map
118 if timeout is not None:
119 # timeout is in milliseconds
120 timeout = int(timeout*1000)
121 if map:
122 l = []
123 for fd, obj in map.items():
124 flags = 0
125 if obj.readable():
126 flags = poll.POLLIN
127 if obj.writable():
128 flags = flags | poll.POLLOUT
129 if flags:
130 l.append ((fd, flags))
131 r = poll.poll (l, timeout)
132 for fd, flags in r:
133 try:
134 obj = map[fd]
135 except KeyError:
136 continue
138 try:
139 if (flags & poll.POLLIN):
140 obj.handle_read_event()
141 if (flags & poll.POLLOUT):
142 obj.handle_write_event()
143 except ExitNow:
144 raise ExitNow
145 except:
146 obj.handle_error()
148 def poll3 (timeout=0.0, map=None):
149 # Use the poll() support added to the select module in Python 2.0
150 if map is None:
151 map=socket_map
152 if timeout is not None:
153 # timeout is in milliseconds
154 timeout = int(timeout*1000)
155 pollster = select.poll()
156 if map:
157 for fd, obj in map.items():
158 flags = 0
159 if obj.readable():
160 flags = select.POLLIN
161 if obj.writable():
162 flags = flags | select.POLLOUT
163 if flags:
164 pollster.register(fd, flags)
165 try:
166 r = pollster.poll (timeout)
167 except select.error, err:
168 if err[0] != EINTR:
169 raise
170 r = []
171 for fd, flags in r:
172 try:
173 obj = map[fd]
174 except KeyError:
175 continue
177 try:
178 if (flags & select.POLLIN):
179 obj.handle_read_event()
180 if (flags & select.POLLOUT):
181 obj.handle_write_event()
182 except ExitNow:
183 raise ExitNow
184 except:
185 obj.handle_error()
187 def loop (timeout=30.0, use_poll=0, map=None):
189 if map is None:
190 map=socket_map
192 if use_poll:
193 if hasattr (select, 'poll'):
194 poll_fun = poll3
195 else:
196 poll_fun = poll2
197 else:
198 poll_fun = poll
200 while map:
201 poll_fun (timeout, map)
203 class dispatcher:
204 debug = 0
205 connected = 0
206 accepting = 0
207 closing = 0
208 addr = None
210 def __init__ (self, sock=None, map=None):
211 if sock:
212 self.set_socket (sock, map)
213 # I think it should inherit this anyway
214 self.socket.setblocking (0)
215 self.connected = 1
216 # XXX Does the constructor require that the socket passed
217 # be connected?
218 try:
219 self.addr = sock.getpeername()
220 except socket.error:
221 # The addr isn't crucial
222 pass
223 else:
224 self.socket = None
226 def __repr__ (self):
227 status = [self.__class__.__module__+"."+self.__class__.__name__]
228 if self.accepting and self.addr:
229 status.append ('listening')
230 elif self.connected:
231 status.append ('connected')
232 if self.addr is not None:
233 try:
234 status.append ('%s:%d' % self.addr)
235 except TypeError:
236 status.append (repr(self.addr))
237 return '<%s at %#x>' % (' '.join (status), id (self))
239 def add_channel (self, map=None):
240 #self.log_info ('adding channel %s' % self)
241 if map is None:
242 map=socket_map
243 map [self._fileno] = self
245 def del_channel (self, map=None):
246 fd = self._fileno
247 if map is None:
248 map=socket_map
249 if map.has_key (fd):
250 #self.log_info ('closing channel %d:%s' % (fd, self))
251 del map [fd]
253 def create_socket (self, family, type):
254 self.family_and_type = family, type
255 self.socket = socket.socket (family, type)
256 self.socket.setblocking(0)
257 self._fileno = self.socket.fileno()
258 self.add_channel()
260 def set_socket (self, sock, map=None):
261 self.socket = sock
262 ## self.__dict__['socket'] = sock
263 self._fileno = sock.fileno()
264 self.add_channel (map)
266 def set_reuse_addr (self):
267 # try to re-use a server port if possible
268 try:
269 self.socket.setsockopt (
270 socket.SOL_SOCKET, socket.SO_REUSEADDR,
271 self.socket.getsockopt (socket.SOL_SOCKET,
272 socket.SO_REUSEADDR) | 1
274 except socket.error:
275 pass
277 # ==================================================
278 # predicates for select()
279 # these are used as filters for the lists of sockets
280 # to pass to select().
281 # ==================================================
283 def readable (self):
284 return True
286 if os.name == 'mac':
287 # The macintosh will select a listening socket for
288 # write if you let it. What might this mean?
289 def writable (self):
290 return not self.accepting
291 else:
292 def writable (self):
293 return True
295 # ==================================================
296 # socket object methods.
297 # ==================================================
299 def listen (self, num):
300 self.accepting = 1
301 if os.name == 'nt' and num > 5:
302 num = 1
303 return self.socket.listen (num)
305 def bind (self, addr):
306 self.addr = addr
307 return self.socket.bind (addr)
309 def connect (self, address):
310 self.connected = 0
311 err = self.socket.connect_ex(address)
312 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
313 return
314 if err in (0, EISCONN):
315 self.addr = address
316 self.connected = 1
317 self.handle_connect()
318 else:
319 raise socket.error, err
321 def accept (self):
322 # XXX can return either an address pair or None
323 try:
324 conn, addr = self.socket.accept()
325 return conn, addr
326 except socket.error, why:
327 if why[0] == EWOULDBLOCK:
328 pass
329 else:
330 raise socket.error, why
332 def send (self, data):
333 try:
334 result = self.socket.send (data)
335 return result
336 except socket.error, why:
337 if why[0] == EWOULDBLOCK:
338 return 0
339 else:
340 raise socket.error, why
341 return 0
343 def recv (self, buffer_size):
344 try:
345 data = self.socket.recv (buffer_size)
346 if not data:
347 # a closed connection is indicated by signaling
348 # a read condition, and having recv() return 0.
349 self.handle_close()
350 return ''
351 else:
352 return data
353 except socket.error, why:
354 # winsock sometimes throws ENOTCONN
355 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
356 self.handle_close()
357 return ''
358 else:
359 raise socket.error, why
361 def close (self):
362 self.del_channel()
363 self.socket.close()
365 # cheap inheritance, used to pass all other attribute
366 # references to the underlying socket object.
367 def __getattr__ (self, attr):
368 return getattr (self.socket, attr)
370 # log and log_info maybe overriden to provide more sophisitcated
371 # logging and warning methods. In general, log is for 'hit' logging
372 # and 'log_info' is for informational, warning and error logging.
374 def log (self, message):
375 sys.stderr.write ('log: %s\n' % str(message))
377 def log_info (self, message, type='info'):
378 if __debug__ or type != 'info':
379 print '%s: %s' % (type, message)
381 def handle_read_event (self):
382 if self.accepting:
383 # for an accepting socket, getting a read implies
384 # that we are connected
385 if not self.connected:
386 self.connected = 1
387 self.handle_accept()
388 elif not self.connected:
389 self.handle_connect()
390 self.connected = 1
391 self.handle_read()
392 else:
393 self.handle_read()
395 def handle_write_event (self):
396 # getting a write implies that we are connected
397 if not self.connected:
398 self.handle_connect()
399 self.connected = 1
400 self.handle_write()
402 def handle_expt_event (self):
403 self.handle_expt()
405 def handle_error (self):
406 nil, t, v, tbinfo = compact_traceback()
408 # sometimes a user repr method will crash.
409 try:
410 self_repr = repr (self)
411 except:
412 self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
414 self.log_info (
415 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
416 self_repr,
419 tbinfo
421 'error'
423 self.close()
425 def handle_expt (self):
426 self.log_info ('unhandled exception', 'warning')
428 def handle_read (self):
429 self.log_info ('unhandled read event', 'warning')
431 def handle_write (self):
432 self.log_info ('unhandled write event', 'warning')
434 def handle_connect (self):
435 self.log_info ('unhandled connect event', 'warning')
437 def handle_accept (self):
438 self.log_info ('unhandled accept event', 'warning')
440 def handle_close (self):
441 self.log_info ('unhandled close event', 'warning')
442 self.close()
444 # ---------------------------------------------------------------------------
445 # adds simple buffered output capability, useful for simple clients.
446 # [for more sophisticated usage use asynchat.async_chat]
447 # ---------------------------------------------------------------------------
449 class dispatcher_with_send (dispatcher):
450 def __init__ (self, sock=None):
451 dispatcher.__init__ (self, sock)
452 self.out_buffer = ''
454 def initiate_send (self):
455 num_sent = 0
456 num_sent = dispatcher.send (self, self.out_buffer[:512])
457 self.out_buffer = self.out_buffer[num_sent:]
459 def handle_write (self):
460 self.initiate_send()
462 def writable (self):
463 return (not self.connected) or len(self.out_buffer)
465 def send (self, data):
466 if self.debug:
467 self.log_info ('sending %s' % repr(data))
468 self.out_buffer = self.out_buffer + data
469 self.initiate_send()
471 # ---------------------------------------------------------------------------
472 # used for debugging.
473 # ---------------------------------------------------------------------------
475 def compact_traceback ():
476 t,v,tb = sys.exc_info()
477 tbinfo = []
478 while 1:
479 tbinfo.append ((
480 tb.tb_frame.f_code.co_filename,
481 tb.tb_frame.f_code.co_name,
482 str(tb.tb_lineno)
484 tb = tb.tb_next
485 if not tb:
486 break
488 # just to be safe
489 del tb
491 file, function, line = tbinfo[-1]
492 info = '[' + '] ['.join(map(lambda x: '|'.join(x), tbinfo)) + ']'
493 return (file, function, line), t, v, info
495 def close_all (map=None):
496 if map is None:
497 map=socket_map
498 for x in map.values():
499 x.socket.close()
500 map.clear()
502 # Asynchronous File I/O:
504 # After a little research (reading man pages on various unixen, and
505 # digging through the linux kernel), I've determined that select()
506 # isn't meant for doing doing asynchronous file i/o.
507 # Heartening, though - reading linux/mm/filemap.c shows that linux
508 # supports asynchronous read-ahead. So _MOST_ of the time, the data
509 # will be sitting in memory for us already when we go to read it.
511 # What other OS's (besides NT) support async file i/o? [VMS?]
513 # Regardless, this is useful for pipes, and stdin/stdout...
515 if os.name == 'posix':
516 import fcntl
518 class file_wrapper:
519 # here we override just enough to make a file
520 # look like a socket for the purposes of asyncore.
521 def __init__ (self, fd):
522 self.fd = fd
524 def recv (self, *args):
525 return os.read(self.fd, *args)
527 def send (self, *args):
528 return os.write(self.fd, *args)
530 read = recv
531 write = send
533 def close (self):
534 return os.close (self.fd)
536 def fileno (self):
537 return self.fd
539 class file_dispatcher (dispatcher):
540 def __init__ (self, fd):
541 dispatcher.__init__ (self)
542 self.connected = 1
543 # set it to non-blocking mode
544 flags = fcntl.fcntl (fd, fcntl.F_GETFL, 0)
545 flags = flags | os.O_NONBLOCK
546 fcntl.fcntl (fd, fcntl.F_SETFL, flags)
547 self.set_file (fd)
549 def set_file (self, fd):
550 self._fileno = fd
551 self.socket = file_wrapper (fd)
552 self.add_channel()