2 # A higher level module for using sockets (or Windows named pipes)
4 # multiprocessing/connection.py
6 # Copyright (c) 2006-2008, R Oudkerk --- see COPYING.txt
9 __all__
= [ 'Client', 'Listener', 'Pipe' ]
19 import _multiprocessing
20 from multiprocessing
import current_process
, AuthenticationError
21 from multiprocessing
.util
import get_temp_dir
, Finalize
, sub_debug
, debug
22 from multiprocessing
.forking
import duplicate
, close
30 # A very generous timeout when it comes to local connections...
31 CONNECTION_TIMEOUT
= 20.
33 _mmap_counter
= itertools
.count()
35 default_family
= 'AF_INET'
36 families
= ['AF_INET']
38 if hasattr(socket
, 'AF_UNIX'):
39 default_family
= 'AF_UNIX'
40 families
+= ['AF_UNIX']
42 if sys
.platform
== 'win32':
43 default_family
= 'AF_PIPE'
44 families
+= ['AF_PIPE']
47 def _init_timeout(timeout
=CONNECTION_TIMEOUT
):
48 return time
.time() + timeout
50 def _check_timeout(t
):
51 return time
.time() > t
57 def arbitrary_address(family
):
59 Return an arbitrary free address for the given family
61 if family
== 'AF_INET':
62 return ('localhost', 0)
63 elif family
== 'AF_UNIX':
64 return tempfile
.mktemp(prefix
='listener-', dir=get_temp_dir())
65 elif family
== 'AF_PIPE':
66 return tempfile
.mktemp(prefix
=r
'\\.\pipe\pyc-%d-%d-' %
67 (os
.getpid(), _mmap_counter
.next()))
69 raise ValueError('unrecognized family')
72 def address_type(address
):
74 Return the types of the address
76 This can be 'AF_INET', 'AF_UNIX', or 'AF_PIPE'
78 if type(address
) == tuple:
80 elif type(address
) is str and address
.startswith('\\\\'):
82 elif type(address
) is str:
85 raise ValueError('address type of %r unrecognized' % address
)
91 class Listener(object):
93 Returns a listener object.
95 This is a wrapper for a bound socket which is 'listening' for
96 connections, or for a Windows named pipe.
98 def __init__(self
, address
=None, family
=None, backlog
=1, authkey
=None):
99 family
= family
or (address
and address_type(address
)) \
101 address
= address
or arbitrary_address(family
)
103 if family
== 'AF_PIPE':
104 self
._listener
= PipeListener(address
, backlog
)
106 self
._listener
= SocketListener(address
, family
, backlog
)
108 if authkey
is not None and not isinstance(authkey
, bytes
):
109 raise TypeError, 'authkey should be a byte string'
111 self
._authkey
= authkey
115 Accept a connection on the bound socket or named pipe of `self`.
117 Returns a `Connection` object.
119 c
= self
._listener
.accept()
121 deliver_challenge(c
, self
._authkey
)
122 answer_challenge(c
, self
._authkey
)
127 Close the bound socket or named pipe of `self`.
129 return self
._listener
.close()
131 address
= property(lambda self
: self
._listener
._address
)
132 last_accepted
= property(lambda self
: self
._listener
._last
_accepted
)
135 def Client(address
, family
=None, authkey
=None):
137 Returns a connection to the address of a `Listener`
139 family
= family
or address_type(address
)
140 if family
== 'AF_PIPE':
141 c
= PipeClient(address
)
143 c
= SocketClient(address
)
145 if authkey
is not None and not isinstance(authkey
, bytes
):
146 raise TypeError, 'authkey should be a byte string'
148 if authkey
is not None:
149 answer_challenge(c
, authkey
)
150 deliver_challenge(c
, authkey
)
155 if sys
.platform
!= 'win32':
157 def Pipe(duplex
=True):
159 Returns pair of connection objects at either end of a pipe
162 s1
, s2
= socket
.socketpair()
163 c1
= _multiprocessing
.Connection(os
.dup(s1
.fileno()))
164 c2
= _multiprocessing
.Connection(os
.dup(s2
.fileno()))
169 c1
= _multiprocessing
.Connection(fd1
, writable
=False)
170 c2
= _multiprocessing
.Connection(fd2
, readable
=False)
176 from _multiprocessing
import win32
178 def Pipe(duplex
=True):
180 Returns pair of connection objects at either end of a pipe
182 address
= arbitrary_address('AF_PIPE')
184 openmode
= win32
.PIPE_ACCESS_DUPLEX
185 access
= win32
.GENERIC_READ | win32
.GENERIC_WRITE
186 obsize
, ibsize
= BUFSIZE
, BUFSIZE
188 openmode
= win32
.PIPE_ACCESS_INBOUND
189 access
= win32
.GENERIC_WRITE
190 obsize
, ibsize
= 0, BUFSIZE
192 h1
= win32
.CreateNamedPipe(
194 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
196 1, obsize
, ibsize
, win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
198 h2
= win32
.CreateFile(
199 address
, access
, 0, win32
.NULL
, win32
.OPEN_EXISTING
, 0, win32
.NULL
201 win32
.SetNamedPipeHandleState(
202 h2
, win32
.PIPE_READMODE_MESSAGE
, None, None
206 win32
.ConnectNamedPipe(h1
, win32
.NULL
)
207 except WindowsError, e
:
208 if e
.args
[0] != win32
.ERROR_PIPE_CONNECTED
:
211 c1
= _multiprocessing
.PipeConnection(h1
, writable
=duplex
)
212 c2
= _multiprocessing
.PipeConnection(h2
, readable
=duplex
)
217 # Definitions for connections based on sockets
220 class SocketListener(object):
222 Representation of a socket which is bound to an address and listening
224 def __init__(self
, address
, family
, backlog
=1):
225 self
._socket
= socket
.socket(getattr(socket
, family
))
226 self
._socket
.setsockopt(socket
.SOL_SOCKET
, socket
.SO_REUSEADDR
, 1)
227 self
._socket
.bind(address
)
228 self
._socket
.listen(backlog
)
229 self
._address
= self
._socket
.getsockname()
230 self
._family
= family
231 self
._last
_accepted
= None
233 if family
== 'AF_UNIX':
234 self
._unlink
= Finalize(
235 self
, os
.unlink
, args
=(address
,), exitpriority
=0
241 s
, self
._last
_accepted
= self
._socket
.accept()
242 fd
= duplicate(s
.fileno())
243 conn
= _multiprocessing
.Connection(fd
)
249 if self
._unlink
is not None:
253 def SocketClient(address
):
255 Return a connection object connected to the socket given by `address`
257 family
= address_type(address
)
258 s
= socket
.socket( getattr(socket
, family
) )
264 except socket
.error
, e
:
265 if e
.args
[0] != errno
.ECONNREFUSED
or _check_timeout(t
):
266 debug('failed to connect to address %s', address
)
274 fd
= duplicate(s
.fileno())
275 conn
= _multiprocessing
.Connection(fd
)
280 # Definitions for connections based on named pipes
283 if sys
.platform
== 'win32':
285 class PipeListener(object):
287 Representation of a named pipe
289 def __init__(self
, address
, backlog
=None):
290 self
._address
= address
291 handle
= win32
.CreateNamedPipe(
292 address
, win32
.PIPE_ACCESS_DUPLEX
,
293 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
295 win32
.PIPE_UNLIMITED_INSTANCES
, BUFSIZE
, BUFSIZE
,
296 win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
298 self
._handle
_queue
= [handle
]
299 self
._last
_accepted
= None
301 sub_debug('listener created with address=%r', self
._address
)
303 self
.close
= Finalize(
304 self
, PipeListener
._finalize
_pipe
_listener
,
305 args
=(self
._handle
_queue
, self
._address
), exitpriority
=0
309 newhandle
= win32
.CreateNamedPipe(
310 self
._address
, win32
.PIPE_ACCESS_DUPLEX
,
311 win32
.PIPE_TYPE_MESSAGE | win32
.PIPE_READMODE_MESSAGE |
313 win32
.PIPE_UNLIMITED_INSTANCES
, BUFSIZE
, BUFSIZE
,
314 win32
.NMPWAIT_WAIT_FOREVER
, win32
.NULL
316 self
._handle
_queue
.append(newhandle
)
317 handle
= self
._handle
_queue
.pop(0)
319 win32
.ConnectNamedPipe(handle
, win32
.NULL
)
320 except WindowsError, e
:
321 if e
.args
[0] != win32
.ERROR_PIPE_CONNECTED
:
323 return _multiprocessing
.PipeConnection(handle
)
326 def _finalize_pipe_listener(queue
, address
):
327 sub_debug('closing listener with address=%r', address
)
331 def PipeClient(address
):
333 Return a connection object connected to the pipe given by `address`
338 win32
.WaitNamedPipe(address
, 1000)
339 h
= win32
.CreateFile(
340 address
, win32
.GENERIC_READ | win32
.GENERIC_WRITE
,
341 0, win32
.NULL
, win32
.OPEN_EXISTING
, 0, win32
.NULL
343 except WindowsError, e
:
344 if e
.args
[0] not in (win32
.ERROR_SEM_TIMEOUT
,
345 win32
.ERROR_PIPE_BUSY
) or _check_timeout(t
):
352 win32
.SetNamedPipeHandleState(
353 h
, win32
.PIPE_READMODE_MESSAGE
, None, None
355 return _multiprocessing
.PipeConnection(h
)
358 # Authentication stuff
363 CHALLENGE
= b
'#CHALLENGE#'
364 WELCOME
= b
'#WELCOME#'
365 FAILURE
= b
'#FAILURE#'
367 def deliver_challenge(connection
, authkey
):
369 assert isinstance(authkey
, bytes
)
370 message
= os
.urandom(MESSAGE_LENGTH
)
371 connection
.send_bytes(CHALLENGE
+ message
)
372 digest
= hmac
.new(authkey
, message
).digest()
373 response
= connection
.recv_bytes(256) # reject large message
374 if response
== digest
:
375 connection
.send_bytes(WELCOME
)
377 connection
.send_bytes(FAILURE
)
378 raise AuthenticationError('digest received was wrong')
380 def answer_challenge(connection
, authkey
):
382 assert isinstance(authkey
, bytes
)
383 message
= connection
.recv_bytes(256) # reject large message
384 assert message
[:len(CHALLENGE
)] == CHALLENGE
, 'message = %r' % message
385 message
= message
[len(CHALLENGE
):]
386 digest
= hmac
.new(authkey
, message
).digest()
387 connection
.send_bytes(digest
)
388 response
= connection
.recv_bytes(256) # reject large message
389 if response
!= WELCOME
:
390 raise AuthenticationError('digest sent was rejected')
393 # Support for using xmlrpclib for serialization
396 class ConnectionWrapper(object):
397 def __init__(self
, conn
, dumps
, loads
):
401 for attr
in ('fileno', 'close', 'poll', 'recv_bytes', 'send_bytes'):
402 obj
= getattr(conn
, attr
)
403 setattr(self
, attr
, obj
)
406 self
._conn
.send_bytes(s
)
408 s
= self
._conn
.recv_bytes()
409 return self
._loads
(s
)
412 return xmlrpclib
.dumps((obj
,), None, None, None, 1).encode('utf8')
415 (obj
,), method
= xmlrpclib
.loads(s
.decode('utf8'))
418 class XmlListener(Listener
):
422 obj
= Listener
.accept(self
)
423 return ConnectionWrapper(obj
, _xml_dumps
, _xml_loads
)
425 def XmlClient(*args
, **kwds
):
428 return ConnectionWrapper(Client(*args
, **kwds
), _xml_dumps
, _xml_loads
)