Bump version to 1.0.
[python/dscho.git] / Lib / asyncore.py
blobeaeb2cd8f8ecb3e57d7f42b6d026cbe0047c6ab4
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
7 #
8 # All Rights Reserved
9 #
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import exceptions
50 import select
51 import socket
52 import string
53 import sys
55 import os
56 if os.name == 'nt':
57 EWOULDBLOCK = 10035
58 EINPROGRESS = 10036
59 EALREADY = 10037
60 ECONNRESET = 10054
61 ENOTCONN = 10057
62 ESHUTDOWN = 10058
63 else:
64 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, ENOTCONN, ESHUTDOWN
66 try:
67 socket_map
68 except NameError:
69 socket_map = {}
71 class ExitNow (exceptions.Exception):
72 pass
74 DEBUG = 0
76 def poll (timeout=0.0, map=None):
77 global DEBUG
78 if map is None:
79 map = socket_map
80 if map:
81 r = []; w = []; e = []
82 for fd, obj in map.items():
83 if obj.readable():
84 r.append (fd)
85 if obj.writable():
86 w.append (fd)
87 r,w,e = select.select (r,w,e, timeout)
89 if DEBUG:
90 print r,w,e
92 for fd in r:
93 try:
94 obj = map[fd]
95 try:
96 obj.handle_read_event()
97 except ExitNow:
98 raise ExitNow
99 except:
100 obj.handle_error()
101 except KeyError:
102 pass
104 for fd in w:
105 try:
106 obj = map[fd]
107 try:
108 obj.handle_write_event()
109 except ExitNow:
110 raise ExitNow
111 except:
112 obj.handle_error()
113 except KeyError:
114 pass
116 def poll2 (timeout=0.0, map=None):
117 import poll
118 if map is None:
119 map=socket_map
120 # timeout is in milliseconds
121 timeout = int(timeout*1000)
122 if map:
123 l = []
124 for fd, obj in map.items():
125 flags = 0
126 if obj.readable():
127 flags = poll.POLLIN
128 if obj.writable():
129 flags = flags | poll.POLLOUT
130 if flags:
131 l.append ((fd, flags))
132 r = poll.poll (l, timeout)
133 for fd, flags in r:
134 try:
135 obj = map[fd]
136 try:
137 if (flags & poll.POLLIN):
138 obj.handle_read_event()
139 if (flags & poll.POLLOUT):
140 obj.handle_write_event()
141 except ExitNow:
142 raise ExitNow
143 except:
144 obj.handle_error()
145 except KeyError:
146 pass
148 def loop (timeout=30.0, use_poll=0, map=None):
150 if use_poll:
151 poll_fun = poll2
152 else:
153 poll_fun = poll
155 if map is None:
156 map=socket_map
158 while map:
159 poll_fun (timeout, map)
161 class dispatcher:
162 debug = 0
163 connected = 0
164 accepting = 0
165 closing = 0
166 addr = None
168 def __init__ (self, sock=None, map=None):
169 if sock:
170 self.set_socket (sock, map)
171 # I think it should inherit this anyway
172 self.socket.setblocking (0)
173 self.connected = 1
175 def __repr__ (self):
176 try:
177 status = []
178 if self.accepting and self.addr:
179 status.append ('listening')
180 elif self.connected:
181 status.append ('connected')
182 if self.addr:
183 status.append ('%s:%d' % self.addr)
184 return '<%s %s at %x>' % (
185 self.__class__.__name__,
186 string.join (status, ' '),
187 id(self)
189 except:
190 try:
191 ar = repr(self.addr)
192 except:
193 ar = 'no self.addr!'
195 return '<__repr__ (self) failed for object at %x (addr=%s)>' % (id(self),ar)
197 def add_channel (self, map=None):
198 #self.log_info ('adding channel %s' % self)
199 if map is None:
200 map=socket_map
201 map [self._fileno] = self
203 def del_channel (self, map=None):
204 fd = self._fileno
205 if map is None:
206 map=socket_map
207 if map.has_key (fd):
208 #self.log_info ('closing channel %d:%s' % (fd, self))
209 del map [fd]
211 def create_socket (self, family, type):
212 self.family_and_type = family, type
213 self.socket = socket.socket (family, type)
214 self.socket.setblocking(0)
215 self._fileno = self.socket.fileno()
216 self.add_channel()
218 def set_socket (self, sock, map=None):
219 self.__dict__['socket'] = sock
220 self._fileno = sock.fileno()
221 self.add_channel (map)
223 def set_reuse_addr (self):
224 # try to re-use a server port if possible
225 try:
226 self.socket.setsockopt (
227 socket.SOL_SOCKET, socket.SO_REUSEADDR,
228 self.socket.getsockopt (socket.SOL_SOCKET, socket.SO_REUSEADDR) | 1
230 except:
231 pass
233 # ==================================================
234 # predicates for select()
235 # these are used as filters for the lists of sockets
236 # to pass to select().
237 # ==================================================
239 def readable (self):
240 return 1
242 if os.name == 'mac':
243 # The macintosh will select a listening socket for
244 # write if you let it. What might this mean?
245 def writable (self):
246 return not self.accepting
247 else:
248 def writable (self):
249 return 1
251 # ==================================================
252 # socket object methods.
253 # ==================================================
255 def listen (self, num):
256 self.accepting = 1
257 if os.name == 'nt' and num > 5:
258 num = 1
259 return self.socket.listen (num)
261 def bind (self, addr):
262 self.addr = addr
263 return self.socket.bind (addr)
265 def connect (self, address):
266 self.connected = 0
267 try:
268 self.socket.connect (address)
269 except socket.error, why:
270 if why[0] in (EINPROGRESS, EALREADY, EWOULDBLOCK):
271 return
272 else:
273 raise socket.error, why
274 self.connected = 1
275 self.handle_connect()
277 def accept (self):
278 try:
279 conn, addr = self.socket.accept()
280 return conn, addr
281 except socket.error, why:
282 if why[0] == EWOULDBLOCK:
283 pass
284 else:
285 raise socket.error, why
287 def send (self, data):
288 try:
289 result = self.socket.send (data)
290 return result
291 except socket.error, why:
292 if why[0] == EWOULDBLOCK:
293 return 0
294 else:
295 raise socket.error, why
296 return 0
298 def recv (self, buffer_size):
299 try:
300 data = self.socket.recv (buffer_size)
301 if not data:
302 # a closed connection is indicated by signaling
303 # a read condition, and having recv() return 0.
304 self.handle_close()
305 return ''
306 else:
307 return data
308 except socket.error, why:
309 # winsock sometimes throws ENOTCONN
310 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
311 self.handle_close()
312 return ''
313 else:
314 raise socket.error, why
316 def close (self):
317 self.del_channel()
318 self.socket.close()
320 # cheap inheritance, used to pass all other attribute
321 # references to the underlying socket object.
322 def __getattr__ (self, attr):
323 return getattr (self.socket, attr)
325 # log and log_info maybe overriden to provide more sophisitcated
326 # logging and warning methods. In general, log is for 'hit' logging
327 # and 'log_info' is for informational, warning and error logging.
329 def log (self, message):
330 sys.stderr.write ('log: %s\n' % str(message))
332 def log_info (self, message, type='info'):
333 if __debug__ or type != 'info':
334 print '%s: %s' % (type, message)
336 def handle_read_event (self):
337 if self.accepting:
338 # for an accepting socket, getting a read implies
339 # that we are connected
340 if not self.connected:
341 self.connected = 1
342 self.handle_accept()
343 elif not self.connected:
344 self.handle_connect()
345 self.connected = 1
346 self.handle_read()
347 else:
348 self.handle_read()
350 def handle_write_event (self):
351 # getting a write implies that we are connected
352 if not self.connected:
353 self.handle_connect()
354 self.connected = 1
355 self.handle_write()
357 def handle_expt_event (self):
358 self.handle_expt()
360 def handle_error (self):
361 (file,fun,line), t, v, tbinfo = compact_traceback()
363 # sometimes a user repr method will crash.
364 try:
365 self_repr = repr (self)
366 except:
367 self_repr = '<__repr__ (self) failed for object at %0x>' % id(self)
369 self.log_info (
370 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
371 self_repr,
374 tbinfo
376 'error'
378 self.close()
380 def handle_expt (self):
381 self.log_info ('unhandled exception', 'warning')
383 def handle_read (self):
384 self.log_info ('unhandled read event', 'warning')
386 def handle_write (self):
387 self.log_info ('unhandled write event', 'warning')
389 def handle_connect (self):
390 self.log_info ('unhandled connect event', 'warning')
392 def handle_accept (self):
393 self.log_info ('unhandled accept event', 'warning')
395 def handle_close (self):
396 self.log_info ('unhandled close event', 'warning')
397 self.close()
399 # ---------------------------------------------------------------------------
400 # adds simple buffered output capability, useful for simple clients.
401 # [for more sophisticated usage use asynchat.async_chat]
402 # ---------------------------------------------------------------------------
404 class dispatcher_with_send (dispatcher):
405 def __init__ (self, sock=None):
406 dispatcher.__init__ (self, sock)
407 self.out_buffer = ''
409 def initiate_send (self):
410 num_sent = 0
411 num_sent = dispatcher.send (self, self.out_buffer[:512])
412 self.out_buffer = self.out_buffer[num_sent:]
414 def handle_write (self):
415 self.initiate_send()
417 def writable (self):
418 return (not self.connected) or len(self.out_buffer)
420 def send (self, data):
421 if self.debug:
422 self.log_info ('sending %s' % repr(data))
423 self.out_buffer = self.out_buffer + data
424 self.initiate_send()
426 # ---------------------------------------------------------------------------
427 # used for debugging.
428 # ---------------------------------------------------------------------------
430 def compact_traceback ():
431 t,v,tb = sys.exc_info()
432 tbinfo = []
433 while 1:
434 tbinfo.append ((
435 tb.tb_frame.f_code.co_filename,
436 tb.tb_frame.f_code.co_name,
437 str(tb.tb_lineno)
439 tb = tb.tb_next
440 if not tb:
441 break
443 # just to be safe
444 del tb
446 file, function, line = tbinfo[-1]
447 info = '[' + string.join (
448 map (
449 lambda x: string.join (x, '|'),
450 tbinfo
452 '] ['
453 ) + ']'
454 return (file, function, line), t, v, info
456 def close_all (map=None):
457 if map is None:
458 map=socket_map
459 for x in map.values():
460 x.socket.close()
461 map.clear()
463 # Asynchronous File I/O:
465 # After a little research (reading man pages on various unixen, and
466 # digging through the linux kernel), I've determined that select()
467 # isn't meant for doing doing asynchronous file i/o.
468 # Heartening, though - reading linux/mm/filemap.c shows that linux
469 # supports asynchronous read-ahead. So _MOST_ of the time, the data
470 # will be sitting in memory for us already when we go to read it.
472 # What other OS's (besides NT) support async file i/o? [VMS?]
474 # Regardless, this is useful for pipes, and stdin/stdout...
476 import os
477 if os.name == 'posix':
478 import fcntl
479 import FCNTL
481 class file_wrapper:
482 # here we override just enough to make a file
483 # look like a socket for the purposes of asyncore.
484 def __init__ (self, fd):
485 self.fd = fd
487 def recv (self, *args):
488 return apply (os.read, (self.fd,)+args)
490 def send (self, *args):
491 return apply (os.write, (self.fd,)+args)
493 read = recv
494 write = send
496 def close (self):
497 return os.close (self.fd)
499 def fileno (self):
500 return self.fd
502 class file_dispatcher (dispatcher):
503 def __init__ (self, fd):
504 dispatcher.__init__ (self)
505 self.connected = 1
506 # set it to non-blocking mode
507 flags = fcntl.fcntl (fd, FCNTL.F_GETFL, 0)
508 flags = flags | FCNTL.O_NONBLOCK
509 fcntl.fcntl (fd, FCNTL.F_SETFL, flags)
510 self.set_file (fd)
512 def set_file (self, fd):
513 self._fileno = fd
514 self.socket = file_wrapper (fd)
515 self.add_channel()