1 """protocol (David Scherer <dscherer@cmu.edu>)
3 This module implements a simple RPC or "distributed object" protocol.
4 I am probably the 100,000th person to write this in Python, but, hey,
9 connectionLost is an exception that will be thrown by functions in
10 the protocol module or calls to remote methods that fail because
11 the remote program has closed the socket or because no connection
12 could be established in the first place.
14 Server( port=None, connection_hook=None ) creates a server on a
15 well-known port, to which clients can connect. When a client
16 connects, a Connection is created for it. If connection_hook
17 is defined, then connection_hook( socket.getpeername() ) is called
18 before a Connection is created, and if it returns false then the
19 connection is refused. connection_hook must be prepared to be
20 called from any thread.
22 Client( ip='127.0.0.1', port=None ) returns a Connection to a Server
23 object at a well-known address and port.
25 Connection( socket ) creates an RPC connection on an arbitrary socket,
26 which must already be connected to another program. You do not
27 need to use this directly if you are using Client() or Server().
29 publish( name, connect_function ) provides an object with the
30 specified name to some or all Connections. When another program
31 calls Connection.getobject() with the specified name, the
32 specified connect_function is called with the arguments
34 connect_function( conn, addr )
36 where conn is the Connection object to the requesting client and
37 addr is the address returned by socket.getpeername(). If that
38 function returns an object, that object becomes accessible to
39 the caller. If it returns None, the caller's request fails.
43 .close() refuses additional RPC messages from the peer, and notifies
44 the peer that the connection has been closed. All pending remote
45 method calls in either program will fail with a connectionLost
46 exception. Further remote method calls on this connection will
47 also result in errors.
49 .getobject(name) returns a proxy for the remote object with the
50 specified name, if it exists and the peer permits us access.
51 Otherwise, it returns None. It may throw a connectionLost
52 exception. The returned proxy supports basic attribute access
53 and method calls, and its methods have an extra attribute,
54 .void, which is a function that has the same effect but always
55 returns None. This last capability is provided as a performance
56 hack: object.method.void(params) can return without waiting for
57 the remote process to respond, but object.method(params) needs
58 to wait for a return value or exception.
60 .rpc_loop(block=0) processes *incoming* messages for this connection.
61 If block=1, it continues processing until an exception or return
62 value is received, which is normally forever. Otherwise it
63 returns when all currently pending messages have been delivered.
64 It may throw a connectionLost exception.
66 .set_close_hook(f) specifies a function to be called when the remote
67 object closes the connection during a call to rpc_loop(). This
68 is a good way for servers to be notified when clients disconnect.
70 .set_shutdown_hook(f) specifies a function called *immediately* when
71 the receive loop detects that the connection has been lost. The
72 provided function must be prepared to run in any thread.
76 .rpc_loop() processes incoming messages on all connections, and
77 returns when all pending messages have been processed. It will
78 *not* throw connectionLost exceptions; the
79 Connection.set_close_hook() mechanism is much better for servers.
82 import sys
, os
, string
, types
84 from threading
import Thread
85 from Queue
import Queue
, Empty
86 from cPickle
import Pickler
, Unpickler
, PicklingError
89 def __init__(self
, what
=""): self
.what
= what
90 def __repr__(self
): return self
.what
91 def __str__(self
): return self
.what
94 "Returns a list of the names of the methods of a class."
96 for b
in cls
.__bases
__:
97 methods
= methods
+ getmethods(b
)
100 if type(d
[k
])==types
.FunctionType
:
105 "Proxy for a method of a remote object."
106 def __init__(self
, classp
, name
):
109 self
.client
= classp
.client
110 def __call__(self
, *args
, **keywords
):
111 return self
.client
.call( 'm', self
.classp
.name
, self
.name
, args
, keywords
)
113 def void(self
, *args
, **keywords
):
114 self
.client
.call_void( 'm', self
.classp
.name
,self
.name
,args
,keywords
)
117 "Proxy for a remote object."
118 def __init__(self
, client
, name
, methods
):
119 self
.__dict
__['client'] = client
120 self
.__dict
__['name'] = name
123 prox
= methodproxy( self
, m
)
124 self
.__dict
__[m
] = prox
126 def __getattr__(self
, attr
):
127 return self
.client
.call( 'g', self
.name
, attr
)
129 def __setattr__(self
, attr
, value
):
130 self
.client
.call_void( 's', self
.name
, attr
, value
)
133 def publish(name
, connect_function
):
134 local_connect
[name
]=connect_function
137 "File emulator based on a socket. Provides only blocking semantics for now."
139 def __init__(self
, socket
):
143 def _recv(self
,bytes
):
145 r
=self
.socket
.recv(bytes
)
147 raise connectionLost()
149 raise connectionLost()
152 def write(self
, string
):
154 self
.socket
.send( string
)
156 raise connectionLost()
158 def read(self
,bytes
):
159 x
= bytes
-len(self
.buffer)
161 self
.buffer=self
.buffer+self
._recv
(x
)
162 x
= bytes
-len(self
.buffer)
163 s
= self
.buffer[:bytes
]
164 self
.buffer=self
.buffer[bytes
:]
169 f
= string
.find(self
.buffer,'\n')
171 s
= self
.buffer[:f
+1]
172 self
.buffer=self
.buffer[f
+1:]
174 self
.buffer = self
.buffer + self
._recv
(1024)
177 class Connection (Thread
):
179 def __init__(self
, socket
):
180 self
.local_objects
= {}
182 self
.name
= socket
.getpeername()
183 self
.socketfile
= socketFile(socket
)
184 self
.queue
= Queue(-1)
185 self
.refuse_messages
= 0
186 self
.cmds
= { 'm': self
.r_meth
,
191 #'r' handled by rpc_loop
194 Thread
.__init
__(self
)
198 def getobject(self
, name
):
199 methods
= self
.call( 'o', name
)
200 if methods
is None: return None
201 return classproxy(self
, name
, methods
)
203 # close_hook is called from rpc_loop(), like a normal remote method
205 def set_close_hook(self
,hook
): self
.close_hook
= hook
207 # shutdown_hook is called directly from the run() thread, and needs
208 # to be "thread safe"
209 def set_shutdown_hook(self
,hook
): self
.shutdown_hook
= hook
216 self
.refuse_messages
= 1
218 def call(self
, c
, *args
):
219 self
.send( (c
, args
, 1 ) )
220 return self
.rpc_loop( block
= 1 )
222 def call_void(self
, c
, *args
):
224 self
.send( (c
, args
, 0 ) )
228 # the following methods handle individual RPC calls:
230 def r_geto(self
, obj
):
231 c
= local_connect
.get(obj
)
232 if not c
: return None
233 o
= c(self
, self
.name
)
234 if not o
: return None
235 self
.local_objects
[obj
] = o
236 return getmethods(o
.__class
__)
238 def r_meth(self
, obj
, name
, args
, keywords
):
239 return apply( getattr(self
.local_objects
[obj
],name
), args
, keywords
)
241 def r_get(self
, obj
, name
):
242 return getattr(self
.local_objects
[obj
],name
)
244 def r_set(self
, obj
, name
, value
):
245 setattr(self
.local_objects
[obj
],name
,value
)
247 def r_exc(self
, e
, v
):
250 def rpc_exec(self
, cmd
, arg
, ret
):
251 if self
.refuse_messages
: return
252 if self
.debug
: print cmd
,arg
,ret
255 r
=apply(self
.cmds
.get(cmd
), arg
)
256 self
.send( ('r', r
, 0) )
259 self
.send( ('e', sys
.exc_info()[:2], 0) )
260 except PicklingError
:
261 self
.send( ('e', (TypeError, 'Unpicklable exception.'), 0 ) )
263 # we cannot report exceptions to the caller, so
264 # we report them in this process.
265 r
=apply(self
.cmds
.get(cmd
), arg
)
267 # the following methods implement the RPC and message loops:
269 def rpc_loop(self
, block
=0):
270 if self
.refuse_messages
: raise connectionLost('(already closed)')
274 cmd
, arg
, ret
= self
.queue
.get( block
)
277 if cmd
=='r': return arg
278 self
.rpc_exec(cmd
,arg
,ret
)
279 except connectionLost
:
282 self
.close_hook
= None
289 self
.queue
.put( data
)
291 self
.queue
.put( ('e', sys
.exc_info()[:2], 0) )
293 # The following send raw pickled data to the peer
295 def send(self
, data
):
297 Pickler(self
.socketfile
,1).dump( data
)
298 except connectionLost
:
300 if self
.shutdown_hook
: self
.shutdown_hook()
305 return Unpickler(self
.socketfile
).load()
306 except connectionLost
:
308 if self
.shutdown_hook
: self
.shutdown_hook()
315 self
.socket
.shutdown(1)
321 class Server (Thread
):
322 default_port
= 0x1D1E # "IDlE"
324 def __init__(self
, port
=None, connection_hook
=None):
325 self
.connections
= []
326 self
.port
= port
or self
.default_port
327 self
.connection_hook
= connection_hook
330 self
.wellknown
= s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
331 s
.bind(('', self
.port
))
336 Thread
.__init
__(self
)
343 conn
, addr
= s
.accept()
344 if self
.connection_hook
and not self
.connection_hook(addr
):
350 self
.connections
.append( Connection(conn
) )
353 cns
= self
.connections
[:]
356 c
.rpc_loop(block
= 0)
357 except connectionLost
:
358 if c
in self
.connections
:
359 self
.connections
.remove(c
)
361 def Client(ip
='127.0.0.1', port
=None):
363 s
= socket
.socket(socket
.AF_INET
, socket
.SOCK_STREAM
)
364 s
.connect((ip
,port
or Server
.default_port
))
365 except socket
.error
, what
:
366 raise connectionLost(str(what
))
368 raise connectionLost()