2 # Id: asyncore.py,v 2.51 2000/09/07 22:29:26 rushing Exp
3 # Author: Sam Rushing <rushing@nightmare.com>
5 # ======================================================================
6 # Copyright 1996 by Sam Rushing
10 # Permission to use, copy, modify, and distribute this software and
11 # its documentation for any purpose and without fee is hereby
12 # granted, provided that the above copyright notice appear in all
13 # copies and that both that copyright notice and this permission
14 # notice appear in supporting documentation, and that the name of Sam
15 # Rushing not be used in advertising or publicity pertaining to
16 # distribution of the software without specific, written prior
19 # SAM RUSHING DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE,
20 # INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS, IN
21 # NO EVENT SHALL SAM RUSHING BE LIABLE FOR ANY SPECIAL, INDIRECT OR
22 # CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM LOSS
23 # OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT,
24 # NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
25 # CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
26 # ======================================================================
28 """Basic infrastructure for asynchronous socket service clients and servers.
30 There are only two ways to have a program on a single processor do "more
31 than one thing at a time". Multi-threaded programming is the simplest and
32 most popular way to do it, but there is another very different technique,
33 that lets you have nearly all the advantages of multi-threading, without
34 actually using multiple threads. it's really only practical if your program
35 is largely I/O bound. If your program is CPU bound, then pre-emptive
36 scheduled threads are probably what you really need. Network servers are
37 rarely CPU-bound, however.
39 If your operating system supports the select() system call in its I/O
40 library (and nearly all do), then you can use it to juggle multiple
41 communication channels at once; doing other work while your I/O is taking
42 place in the "background." Although this strategy can seem strange and
43 complex, especially at first, it is in many ways easier to understand and
44 control than multi-threaded programming. The module documented here solves
45 many of the difficult problems for you, making the task of building
46 sophisticated high-performance network servers and clients a snap.
56 from errno
import EALREADY
, EINPROGRESS
, EWOULDBLOCK
, ECONNRESET
, \
57 ENOTCONN
, ESHUTDOWN
, EINTR
, EISCONN
, EBADF
, ECONNABORTED
, errorcode
66 return os
.strerror(err
)
67 except (ValueError, OverflowError, NameError):
70 return "Unknown error %s" %err
72 class ExitNow(Exception):
75 _reraised_exceptions
= (ExitNow
, KeyboardInterrupt, SystemExit)
79 obj
.handle_read_event()
80 except _reraised_exceptions
:
87 obj
.handle_write_event()
88 except _reraised_exceptions
:
95 obj
.handle_expt_event()
96 except _reraised_exceptions
:
101 def readwrite(obj
, flags
):
103 if flags
& select
.POLLIN
:
104 obj
.handle_read_event()
105 if flags
& select
.POLLOUT
:
106 obj
.handle_write_event()
107 if flags
& select
.POLLPRI
:
108 obj
.handle_expt_event()
109 if flags
& (select
.POLLHUP | select
.POLLERR | select
.POLLNVAL
):
111 except socket
.error
, e
:
112 if e
.args
[0] not in (EBADF
, ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
):
116 except _reraised_exceptions
:
121 def poll(timeout
=0.0, map=None):
125 r
= []; w
= []; e
= []
126 for fd
, obj
in map.items():
127 is_r
= obj
.readable()
128 is_w
= obj
.writable()
135 if [] == r
== w
== e
:
140 r
, w
, e
= select
.select(r
, w
, e
, timeout
)
141 except select
.error
, err
:
142 if err
.args
[0] != EINTR
:
165 def poll2(timeout
=0.0, map=None):
166 # Use the poll() support added to the select module in Python 2.0
169 if timeout
is not None:
170 # timeout is in milliseconds
171 timeout
= int(timeout
*1000)
172 pollster
= select
.poll()
174 for fd
, obj
in map.items():
177 flags |
= select
.POLLIN | select
.POLLPRI
179 flags |
= select
.POLLOUT
181 # Only check for exceptions if object was either readable
183 flags |
= select
.POLLERR | select
.POLLHUP | select
.POLLNVAL
184 pollster
.register(fd
, flags
)
186 r
= pollster
.poll(timeout
)
187 except select
.error
, err
:
188 if err
.args
[0] != EINTR
:
195 readwrite(obj
, flags
)
197 poll3
= poll2
# Alias for backward compatibility
199 def loop(timeout
=30.0, use_poll
=False, map=None, count
=None):
203 if use_poll
and hasattr(select
, 'poll'):
210 poll_fun(timeout
, map)
213 while map and count
> 0:
214 poll_fun(timeout
, map)
224 ignore_log_types
= frozenset(['warning'])
226 def __init__(self
, sock
=None, map=None):
228 self
._map
= socket_map
235 # Set to nonblocking just to make sure for cases where we
236 # get a socket from a blocking source.
238 self
.set_socket(sock
, map)
239 self
.connected
= True
240 # The constructor no longer requires that the socket
241 # passed be connected.
243 self
.addr
= sock
.getpeername()
244 except socket
.error
, err
:
245 if err
.args
[0] == ENOTCONN
:
246 # To handle the case where we got an unconnected
248 self
.connected
= False
250 # The socket is broken in some unknown way, alert
251 # the user and remove it from the map (to prevent
252 # polling of broken sockets).
253 self
.del_channel(map)
259 status
= [self
.__class
__.__module
__+"."+self
.__class
__.__name
__]
260 if self
.accepting
and self
.addr
:
261 status
.append('listening')
263 status
.append('connected')
264 if self
.addr
is not None:
266 status
.append('%s:%d' % self
.addr
)
268 status
.append(repr(self
.addr
))
269 return '<%s at %#x>' % (' '.join(status
), id(self
))
273 def add_channel(self
, map=None):
274 #self.log_info('adding channel %s' % self)
277 map[self
._fileno
] = self
279 def del_channel(self
, map=None):
284 #self.log_info('closing channel %d:%s' % (fd, self))
288 def create_socket(self
, family
, type):
289 self
.family_and_type
= family
, type
290 sock
= socket
.socket(family
, type)
292 self
.set_socket(sock
)
294 def set_socket(self
, sock
, map=None):
296 ## self.__dict__['socket'] = sock
297 self
._fileno
= sock
.fileno()
298 self
.add_channel(map)
300 def set_reuse_addr(self
):
301 # try to re-use a server port if possible
303 self
.socket
.setsockopt(
304 socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
,
305 self
.socket
.getsockopt(socket
.SOL_SOCKET
,
306 socket
.SO_REUSEADDR
) |
1
311 # ==================================================
312 # predicates for select()
313 # these are used as filters for the lists of sockets
314 # to pass to select().
315 # ==================================================
323 # ==================================================
324 # socket object methods.
325 # ==================================================
327 def listen(self
, num
):
328 self
.accepting
= True
329 if os
.name
== 'nt' and num
> 5:
331 return self
.socket
.listen(num
)
333 def bind(self
, addr
):
335 return self
.socket
.bind(addr
)
337 def connect(self
, address
):
338 self
.connected
= False
339 err
= self
.socket
.connect_ex(address
)
340 # XXX Should interpret Winsock return values
341 if err
in (EINPROGRESS
, EALREADY
, EWOULDBLOCK
):
343 if err
in (0, EISCONN
):
345 self
.handle_connect_event()
347 raise socket
.error(err
, errorcode
[err
])
350 # XXX can return either an address pair or None
352 conn
, addr
= self
.socket
.accept()
354 except socket
.error
, why
:
355 if why
.args
[0] == EWOULDBLOCK
:
360 def send(self
, data
):
362 result
= self
.socket
.send(data
)
364 except socket
.error
, why
:
365 if why
.args
[0] == EWOULDBLOCK
:
367 elif why
.args
[0] in (ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
):
373 def recv(self
, buffer_size
):
375 data
= self
.socket
.recv(buffer_size
)
377 # a closed connection is indicated by signaling
378 # a read condition, and having recv() return 0.
383 except socket
.error
, why
:
384 # winsock sometimes throws ENOTCONN
385 if why
.args
[0] in [ECONNRESET
, ENOTCONN
, ESHUTDOWN
, ECONNABORTED
]:
392 self
.connected
= False
393 self
.accepting
= False
397 except socket
.error
, why
:
398 if why
.args
[0] not in (ENOTCONN
, EBADF
):
401 # cheap inheritance, used to pass all other attribute
402 # references to the underlying socket object.
403 def __getattr__(self
, attr
):
405 retattr
= getattr(self
.socket
, attr
)
406 except AttributeError:
407 raise AttributeError("%s instance has no attribute '%s'"
408 %(self
.__class
__.__name
__, attr
))
410 msg
= "%(me)s.%(attr)s is deprecated. Use %(me)s.socket.%(attr)s " \
411 "instead." % {'me': self
.__class
__.__name
__, 'attr':attr
}
412 warnings
.warn(msg
, DeprecationWarning, stacklevel
=2)
415 # log and log_info may be overridden to provide more sophisticated
416 # logging and warning methods. In general, log is for 'hit' logging
417 # and 'log_info' is for informational, warning and error logging.
419 def log(self
, message
):
420 sys
.stderr
.write('log: %s\n' % str(message
))
422 def log_info(self
, message
, type='info'):
423 if type not in self
.ignore_log_types
:
424 print '%s: %s' % (type, message
)
426 def handle_read_event(self
):
428 # accepting sockets are never connected, they "spawn" new
429 # sockets that are connected
431 elif not self
.connected
:
432 self
.handle_connect_event()
437 def handle_connect_event(self
):
438 self
.connected
= True
439 self
.handle_connect()
441 def handle_write_event(self
):
443 # Accepting sockets shouldn't get a write event.
444 # We will pretend it didn't happen.
447 if not self
.connected
:
449 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
451 raise socket
.error(err
, _strerror(err
))
453 self
.handle_connect_event()
456 def handle_expt_event(self
):
457 # handle_expt_event() is called if there might be an error on the
458 # socket, or if there is OOB data
459 # check for the error condition first
460 err
= self
.socket
.getsockopt(socket
.SOL_SOCKET
, socket
.SO_ERROR
)
462 # we can get here when select.select() says that there is an
463 # exceptional condition on the socket
464 # since there is an error, we'll go ahead and close the socket
465 # like we would in a subclassed handle_read() that received no
471 def handle_error(self
):
472 nil
, t
, v
, tbinfo
= compact_traceback()
474 # sometimes a user repr method will crash.
476 self_repr
= repr(self
)
478 self_repr
= '<__repr__(self) failed for object at %0x>' % id(self
)
481 'uncaptured python exception, closing channel %s (%s:%s %s)' % (
491 def handle_expt(self
):
492 self
.log_info('unhandled incoming priority event', 'warning')
494 def handle_read(self
):
495 self
.log_info('unhandled read event', 'warning')
497 def handle_write(self
):
498 self
.log_info('unhandled write event', 'warning')
500 def handle_connect(self
):
501 self
.log_info('unhandled connect event', 'warning')
503 def handle_accept(self
):
504 self
.log_info('unhandled accept event', 'warning')
506 def handle_close(self
):
507 self
.log_info('unhandled close event', 'warning')
510 # ---------------------------------------------------------------------------
511 # adds simple buffered output capability, useful for simple clients.
512 # [for more sophisticated usage use asynchat.async_chat]
513 # ---------------------------------------------------------------------------
515 class dispatcher_with_send(dispatcher
):
517 def __init__(self
, sock
=None, map=None):
518 dispatcher
.__init
__(self
, sock
, map)
521 def initiate_send(self
):
523 num_sent
= dispatcher
.send(self
, self
.out_buffer
[:512])
524 self
.out_buffer
= self
.out_buffer
[num_sent
:]
526 def handle_write(self
):
530 return (not self
.connected
) or len(self
.out_buffer
)
532 def send(self
, data
):
534 self
.log_info('sending %s' % repr(data
))
535 self
.out_buffer
= self
.out_buffer
+ data
538 # ---------------------------------------------------------------------------
539 # used for debugging.
540 # ---------------------------------------------------------------------------
542 def compact_traceback():
543 t
, v
, tb
= sys
.exc_info()
545 if not tb
: # Must have a traceback
546 raise AssertionError("traceback does not exist")
549 tb
.tb_frame
.f_code
.co_filename
,
550 tb
.tb_frame
.f_code
.co_name
,
558 file, function
, line
= tbinfo
[-1]
559 info
= ' '.join(['[%s|%s|%s]' % x
for x
in tbinfo
])
560 return (file, function
, line
), t
, v
, info
562 def close_all(map=None, ignore_all
=False):
565 for x
in map.values():
569 if x
.args
[0] == EBADF
:
573 except _reraised_exceptions
:
580 # Asynchronous File I/O:
582 # After a little research (reading man pages on various unixen, and
583 # digging through the linux kernel), I've determined that select()
584 # isn't meant for doing asynchronous file i/o.
585 # Heartening, though - reading linux/mm/filemap.c shows that linux
586 # supports asynchronous read-ahead. So _MOST_ of the time, the data
587 # will be sitting in memory for us already when we go to read it.
589 # What other OS's (besides NT) support async file i/o? [VMS?]
591 # Regardless, this is useful for pipes, and stdin/stdout...
593 if os
.name
== 'posix':
597 # Here we override just enough to make a file
598 # look like a socket for the purposes of asyncore.
599 # The passed fd is automatically os.dup()'d
601 def __init__(self
, fd
):
604 def recv(self
, *args
):
605 return os
.read(self
.fd
, *args
)
607 def send(self
, *args
):
608 return os
.write(self
.fd
, *args
)
619 class file_dispatcher(dispatcher
):
621 def __init__(self
, fd
, map=None):
622 dispatcher
.__init
__(self
, None, map)
623 self
.connected
= True
626 except AttributeError:
629 # set it to non-blocking mode
630 flags
= fcntl
.fcntl(fd
, fcntl
.F_GETFL
, 0)
631 flags
= flags | os
.O_NONBLOCK
632 fcntl
.fcntl(fd
, fcntl
.F_SETFL
, flags
)
634 def set_file(self
, fd
):
635 self
.socket
= file_wrapper(fd
)
636 self
._fileno
= self
.socket
.fileno()