This commit was manufactured by cvs2svn to create tag 'r23b1-mac'.
[python/dscho.git] / Lib / asyncore.py
bloba38c9113f7b88d62d2b51b707f222b8deaf2addb
1 # -*- Mode: Python -*-
2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
8 # All Rights Reserved
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
17 # permission.
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
47 """
49 import exceptions
50 import select
51 import socket
52 import sys
53 import time
55 import os
56 from errno import EALREADY, EINPROGRESS, EWOULDBLOCK, ECONNRESET, \
57 ENOTCONN, ESHUTDOWN, EINTR, EISCONN
59 try:
60 socket_map
61 except NameError:
62 socket_map = {}
64 class ExitNow(exceptions.Exception):
65 pass
67 def read(obj):
68 try:
69 obj.handle_read_event()
70 except ExitNow:
71 raise
72 except:
73 obj.handle_error()
75 def write(obj):
76 try:
77 obj.handle_write_event()
78 except ExitNow:
79 raise
80 except:
81 obj.handle_error()
83 def readwrite(obj, flags):
84 try:
85 if flags & select.POLLIN:
86 obj.handle_read_event()
87 if flags & select.POLLOUT:
88 obj.handle_write_event()
89 except ExitNow:
90 raise
91 except:
92 obj.handle_error()
94 def poll(timeout=0.0, map=None):
95 if map is None:
96 map = socket_map
97 if map:
98 r = []; w = []; e = []
99 for fd, obj in map.items():
100 if obj.readable():
101 r.append(fd)
102 if obj.writable():
103 w.append(fd)
104 if [] == r == w == e:
105 time.sleep(timeout)
106 else:
107 try:
108 r, w, e = select.select(r, w, e, timeout)
109 except select.error, err:
110 if err[0] != EINTR:
111 raise
112 else:
113 return
115 for fd in r:
116 obj = map.get(fd)
117 if obj is None:
118 continue
119 read(obj)
121 for fd in w:
122 obj = map.get(fd)
123 if obj is None:
124 continue
125 write(obj)
127 def poll2(timeout=0.0, map=None):
128 import poll
129 if map is None:
130 map = socket_map
131 if timeout is not None:
132 # timeout is in milliseconds
133 timeout = int(timeout*1000)
134 if map:
135 l = []
136 for fd, obj in map.items():
137 flags = 0
138 if obj.readable():
139 flags = poll.POLLIN
140 if obj.writable():
141 flags = flags | poll.POLLOUT
142 if flags:
143 l.append((fd, flags))
144 r = poll.poll(l, timeout)
145 for fd, flags in r:
146 obj = map.get(fd)
147 if obj is None:
148 continue
149 readwrite(obj, flags)
151 def poll3(timeout=0.0, map=None):
152 # Use the poll() support added to the select module in Python 2.0
153 if map is None:
154 map = socket_map
155 if timeout is not None:
156 # timeout is in milliseconds
157 timeout = int(timeout*1000)
158 pollster = select.poll()
159 if map:
160 for fd, obj in map.items():
161 flags = 0
162 if obj.readable():
163 flags = select.POLLIN
164 if obj.writable():
165 flags = flags | select.POLLOUT
166 if flags:
167 pollster.register(fd, flags)
168 try:
169 r = pollster.poll(timeout)
170 except select.error, err:
171 if err[0] != EINTR:
172 raise
173 r = []
174 for fd, flags in r:
175 obj = map.get(fd)
176 if obj is None:
177 continue
178 readwrite(obj, flags)
180 def loop(timeout=30.0, use_poll=0, map=None):
181 if map is None:
182 map = socket_map
184 if use_poll:
185 if hasattr(select, 'poll'):
186 poll_fun = poll3
187 else:
188 poll_fun = poll2
189 else:
190 poll_fun = poll
192 while map:
193 poll_fun(timeout, map)
195 class dispatcher:
197 debug = 0
198 connected = 0
199 accepting = 0
200 closing = 0
201 addr = None
203 def __init__(self, sock=None, map=None):
204 if sock:
205 self.set_socket(sock, map)
206 # I think it should inherit this anyway
207 self.socket.setblocking(0)
208 self.connected = 1
209 # XXX Does the constructor require that the socket passed
210 # be connected?
211 try:
212 self.addr = sock.getpeername()
213 except socket.error:
214 # The addr isn't crucial
215 pass
216 else:
217 self.socket = None
219 def __repr__(self):
220 status = [self.__class__.__module__+"."+self.__class__.__name__]
221 if self.accepting and self.addr:
222 status.append('listening')
223 elif self.connected:
224 status.append('connected')
225 if self.addr is not None:
226 try:
227 status.append('%s:%d' % self.addr)
228 except TypeError:
229 status.append(repr(self.addr))
230 return '<%s at %#x>' % (' '.join(status), id(self))
232 def add_channel(self, map=None):
233 #self.log_info('adding channel %s' % self)
234 if map is None:
235 map = socket_map
236 map[self._fileno] = self
238 def del_channel(self, map=None):
239 fd = self._fileno
240 if map is None:
241 map = socket_map
242 if map.has_key(fd):
243 #self.log_info('closing channel %d:%s' % (fd, self))
244 del map[fd]
246 def create_socket(self, family, type):
247 self.family_and_type = family, type
248 self.socket = socket.socket(family, type)
249 self.socket.setblocking(0)
250 self._fileno = self.socket.fileno()
251 self.add_channel()
253 def set_socket(self, sock, map=None):
254 self.socket = sock
255 ## self.__dict__['socket'] = sock
256 self._fileno = sock.fileno()
257 self.add_channel(map)
259 def set_reuse_addr(self):
260 # try to re-use a server port if possible
261 try:
262 self.socket.setsockopt(
263 socket.SOL_SOCKET, socket.SO_REUSEADDR,
264 self.socket.getsockopt(socket.SOL_SOCKET,
265 socket.SO_REUSEADDR) | 1
267 except socket.error:
268 pass
270 # ==================================================
271 # predicates for select()
272 # these are used as filters for the lists of sockets
273 # to pass to select().
274 # ==================================================
276 def readable(self):
277 return True
279 if os.name == 'mac':
280 # The macintosh will select a listening socket for
281 # write if you let it. What might this mean?
282 def writable(self):
283 return not self.accepting
284 else:
285 def writable(self):
286 return True
288 # ==================================================
289 # socket object methods.
290 # ==================================================
292 def listen(self, num):
293 self.accepting = 1
294 if os.name == 'nt' and num > 5:
295 num = 1
296 return self.socket.listen(num)
298 def bind(self, addr):
299 self.addr = addr
300 return self.socket.bind(addr)
302 def connect(self, address):
303 self.connected = 0
304 err = self.socket.connect_ex(address)
305 # XXX Should interpret Winsock return values
306 if err in (EINPROGRESS, EALREADY, EWOULDBLOCK):
307 return
308 if err in (0, EISCONN):
309 self.addr = address
310 self.connected = 1
311 self.handle_connect()
312 else:
313 raise socket.error, err
315 def accept(self):
316 # XXX can return either an address pair or None
317 try:
318 conn, addr = self.socket.accept()
319 return conn, addr
320 except socket.error, why:
321 if why[0] == EWOULDBLOCK:
322 pass
323 else:
324 raise socket.error, why
326 def send(self, data):
327 try:
328 result = self.socket.send(data)
329 return result
330 except socket.error, why:
331 if why[0] == EWOULDBLOCK:
332 return 0
333 else:
334 raise socket.error, why
335 return 0
337 def recv(self, buffer_size):
338 try:
339 data = self.socket.recv(buffer_size)
340 if not data:
341 # a closed connection is indicated by signaling
342 # a read condition, and having recv() return 0.
343 self.handle_close()
344 return ''
345 else:
346 return data
347 except socket.error, why:
348 # winsock sometimes throws ENOTCONN
349 if why[0] in [ECONNRESET, ENOTCONN, ESHUTDOWN]:
350 self.handle_close()
351 return ''
352 else:
353 raise socket.error, why
355 def close(self):
356 self.del_channel()
357 self.socket.close()
359 # cheap inheritance, used to pass all other attribute
360 # references to the underlying socket object.
361 def __getattr__(self, attr):
362 return getattr(self.socket, attr)
364 # log and log_info may be overridden to provide more sophisticated
365 # logging and warning methods. In general, log is for 'hit' logging
366 # and 'log_info' is for informational, warning and error logging.
368 def log(self, message):
369 sys.stderr.write('log: %s\n' % str(message))
371 def log_info(self, message, type='info'):
372 if __debug__ or type != 'info':
373 print '%s: %s' % (type, message)
375 def handle_read_event(self):
376 if self.accepting:
377 # for an accepting socket, getting a read implies
378 # that we are connected
379 if not self.connected:
380 self.connected = 1
381 self.handle_accept()
382 elif not self.connected:
383 self.handle_connect()
384 self.connected = 1
385 self.handle_read()
386 else:
387 self.handle_read()
389 def handle_write_event(self):
390 # getting a write implies that we are connected
391 if not self.connected:
392 self.handle_connect()
393 self.connected = 1
394 self.handle_write()
396 def handle_expt_event(self):
397 self.handle_expt()
399 def handle_error(self):
400 nil, t, v, tbinfo = compact_traceback()
402 # sometimes a user repr method will crash.
403 try:
404 self_repr = repr(self)
405 except:
406 self_repr = '<__repr__(self) failed for object at %0x>' % id(self)
408 self.log_info(
409 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
410 self_repr,
413 tbinfo
415 'error'
417 self.close()
419 def handle_expt(self):
420 self.log_info('unhandled exception', 'warning')
422 def handle_read(self):
423 self.log_info('unhandled read event', 'warning')
425 def handle_write(self):
426 self.log_info('unhandled write event', 'warning')
428 def handle_connect(self):
429 self.log_info('unhandled connect event', 'warning')
431 def handle_accept(self):
432 self.log_info('unhandled accept event', 'warning')
434 def handle_close(self):
435 self.log_info('unhandled close event', 'warning')
436 self.close()
438 # ---------------------------------------------------------------------------
439 # adds simple buffered output capability, useful for simple clients.
440 # [for more sophisticated usage use asynchat.async_chat]
441 # ---------------------------------------------------------------------------
443 class dispatcher_with_send(dispatcher):
445 def __init__(self, sock=None):
446 dispatcher.__init__(self, sock)
447 self.out_buffer = ''
449 def initiate_send(self):
450 num_sent = 0
451 num_sent = dispatcher.send(self, self.out_buffer[:512])
452 self.out_buffer = self.out_buffer[num_sent:]
454 def handle_write(self):
455 self.initiate_send()
457 def writable(self):
458 return (not self.connected) or len(self.out_buffer)
460 def send(self, data):
461 if self.debug:
462 self.log_info('sending %s' % repr(data))
463 self.out_buffer = self.out_buffer + data
464 self.initiate_send()
466 # ---------------------------------------------------------------------------
467 # used for debugging.
468 # ---------------------------------------------------------------------------
470 def compact_traceback():
471 t, v, tb = sys.exc_info()
472 tbinfo = []
473 assert tb # Must have a traceback
474 while tb:
475 tbinfo.append((
476 tb.tb_frame.f_code.co_filename,
477 tb.tb_frame.f_code.co_name,
478 str(tb.tb_lineno)
480 tb = tb.tb_next
482 # just to be safe
483 del tb
485 file, function, line = tbinfo[-1]
486 info = ' '.join(['[%s|%s|%s]' % x for x in tbinfo])
487 return (file, function, line), t, v, info
489 def close_all(map=None):
490 if map is None:
491 map = socket_map
492 for x in map.values():
493 x.socket.close()
494 map.clear()
496 # Asynchronous File I/O:
498 # After a little research (reading man pages on various unixen, and
499 # digging through the linux kernel), I've determined that select()
500 # isn't meant for doing doing asynchronous file i/o.
501 # Heartening, though - reading linux/mm/filemap.c shows that linux
502 # supports asynchronous read-ahead. So _MOST_ of the time, the data
503 # will be sitting in memory for us already when we go to read it.
505 # What other OS's (besides NT) support async file i/o? [VMS?]
507 # Regardless, this is useful for pipes, and stdin/stdout...
509 if os.name == 'posix':
510 import fcntl
512 class file_wrapper:
513 # here we override just enough to make a file
514 # look like a socket for the purposes of asyncore.
516 def __init__(self, fd):
517 self.fd = fd
519 def recv(self, *args):
520 return os.read(self.fd, *args)
522 def send(self, *args):
523 return os.write(self.fd, *args)
525 read = recv
526 write = send
528 def close(self):
529 return os.close(self.fd)
531 def fileno(self):
532 return self.fd
534 class file_dispatcher(dispatcher):
536 def __init__(self, fd):
537 dispatcher.__init__(self)
538 self.connected = 1
539 # set it to non-blocking mode
540 flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
541 flags = flags | os.O_NONBLOCK
542 fcntl.fcntl(fd, fcntl.F_SETFL, flags)
543 self.set_file(fd)
545 def set_file(self, fd):
546 self._fileno = fd
547 self.socket = file_wrapper(fd)
548 self.add_channel()