1 """RPC Implemention, originally written for the Python Idle IDE
3 For security reasons, GvR requested that Idle's Python execution server process
4 connect to the Idle process, which listens for the connection. Since Idle has
5 has only one client per server, this was not a limitation.
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
15 | + -------------------+ |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
22 The RPCServer handler class is expected to provide register/unregister methods.
23 RPCHandler inherits the mix-in class SocketIO, which provides these methods.
25 See the Idle run.main() docstring for further information on how this was
35 import cPickle
as pickle
42 def unpickle_code(ms
):
43 co
= marshal
.loads(ms
)
44 assert isinstance(co
, types
.CodeType
)
48 assert isinstance(co
, types
.CodeType
)
49 ms
= marshal
.dumps(co
)
50 return unpickle_code
, (ms
,)
52 # XXX KBK 24Aug02 function pickling capability not used in Idle
53 # def unpickle_function(ms):
56 # def pickle_function(fn):
57 # assert isinstance(fn, type.FunctionType)
60 copy_reg
.pickle(types
.CodeType
, pickle_code
, unpickle_code
)
61 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
65 class RPCServer(SocketServer
.TCPServer
):
67 def __init__(self
, addr
, handlerclass
=None):
68 if handlerclass
is None:
69 handlerclass
= RPCHandler
70 SocketServer
.TCPServer
.__init
__(self
, addr
, handlerclass
)
72 def server_bind(self
):
73 "Override TCPServer method, no bind() phase for connecting entity"
76 def server_activate(self
):
77 """Override TCPServer method, connect() instead of listen()
79 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
83 self
.socket
.connect(self
.server_address
)
85 def get_request(self
):
86 "Override TCPServer method, return already connected socket"
87 return self
.socket
, self
.server_address
89 def handle_error(self
, request
, client_address
):
90 """Override TCPServer method
92 Error message goes to __stderr__. No error message if exiting
93 normally or socket raised EOF. Other exceptions not handled in
94 server code will cause os._exit.
105 print>>erf
, '\n' + '-'*40
106 print>>erf
, 'Unhandled server exception!'
107 print>>erf
, 'Thread: %s' % threading
.currentThread().getName()
108 print>>erf
, 'Client Address: ', client_address
109 print>>erf
, 'Request: ', repr(request
)
110 traceback
.print_exc(file=erf
)
111 print>>erf
, '\n*** Unrecoverable, server exiting!'
123 def __init__(self
, sock
, objtable
=None, debugging
=None):
124 self
.mainthread
= threading
.currentThread()
125 if debugging
is not None:
126 self
.debugging
= debugging
129 objtable
= objecttable
130 self
.objtable
= objtable
131 self
.cvar
= threading
.Condition()
134 self
.interrupted
= False
142 def debug(self
, *args
):
143 if not self
.debugging
:
145 s
= self
.location
+ " " + str(threading
.currentThread().getName())
148 print>>sys
.__stderr
__, s
150 def register(self
, oid
, object):
151 self
.objtable
[oid
] = object
153 def unregister(self
, oid
):
155 del self
.objtable
[oid
]
159 def localcall(self
, request
):
160 self
.debug("localcall:", request
)
162 how
, (oid
, methodname
, args
, kwargs
) = request
164 return ("ERROR", "Bad request format")
166 if not self
.objtable
.has_key(oid
):
167 return ("ERROR", "Unknown object id: %s" % `oid`
)
168 obj
= self
.objtable
[oid
]
169 if methodname
== "__methods__":
171 _getmethods(obj
, methods
)
172 return ("OK", methods
)
173 if methodname
== "__attributes__":
175 _getattributes(obj
, attributes
)
176 return ("OK", attributes
)
177 if not hasattr(obj
, methodname
):
178 return ("ERROR", "Unsupported method name: %s" % `methodname`
)
179 method
= getattr(obj
, methodname
)
181 ret
= method(*args
, **kwargs
)
182 if isinstance(ret
, RemoteObject
):
190 self
.debug("localcall:EXCEPTION")
191 traceback
.print_exc(file=sys
.__stderr
__)
192 return ("EXCEPTION", None)
194 def remotecall(self
, oid
, methodname
, args
, kwargs
):
195 self
.debug("remotecall:asynccall: ", oid
, methodname
)
196 # XXX KBK 06Feb03 self.interrupted logic may not be necessary if
197 # subprocess is threaded.
199 self
.interrupted
= False
200 raise KeyboardInterrupt
201 seq
= self
.asynccall(oid
, methodname
, args
, kwargs
)
202 return self
.asyncreturn(seq
)
204 def asynccall(self
, oid
, methodname
, args
, kwargs
):
205 request
= ("call", (oid
, methodname
, args
, kwargs
))
207 self
.debug(("asynccall:%d:" % seq
), oid
, methodname
, args
, kwargs
)
208 self
.putmessage((seq
, request
))
211 def asyncreturn(self
, seq
):
212 self
.debug("asyncreturn:%d:call getresponse(): " % seq
)
213 response
= self
.getresponse(seq
, wait
=None)
214 self
.debug(("asyncreturn:%d:response: " % seq
), response
)
215 return self
.decoderesponse(response
)
217 def decoderesponse(self
, response
):
221 if how
== "EXCEPTION":
222 self
.debug("decoderesponse: EXCEPTION")
225 self
.debug("decoderesponse: Internal ERROR:", what
)
226 raise RuntimeError, what
227 raise SystemError, (how
, what
)
230 """Listen on socket until I/O not ready or EOF
232 Main thread pollresponse() will loop looking for seq number None, which
233 never comes, and exit on EOFError.
237 self
.getresponse(myseq
=None, wait
=None)
241 def getresponse(self
, myseq
, wait
):
242 response
= self
._getresponse
(myseq
, wait
)
243 if response
is not None:
246 response
= how
, self
._proxify
(what
)
249 def _proxify(self
, obj
):
250 if isinstance(obj
, RemoteProxy
):
251 return RPCProxy(self
, obj
.oid
)
252 if isinstance(obj
, types
.ListType
):
253 return map(self
._proxify
, obj
)
254 # XXX Check for other types -- not currently needed
257 def _getresponse(self
, myseq
, wait
):
258 self
.debug("_getresponse:myseq:", myseq
)
259 if threading
.currentThread() is self
.mainthread
:
260 # Main thread: does all reading of requests or responses
261 # Loop here, blocking each time until socket is ready.
263 response
= self
.pollresponse(myseq
, wait
)
264 if response
is not None:
267 # Auxiliary thread: wait for notification from main thread
269 self
.cvars
[myseq
] = self
.cvar
270 while not self
.responses
.has_key(myseq
):
272 response
= self
.responses
[myseq
]
273 del self
.responses
[myseq
]
274 del self
.cvars
[myseq
]
279 self
.nextseq
= seq
= self
.nextseq
+ 2
282 def putmessage(self
, message
):
283 self
.debug("putmessage:%d:" % message
[0])
285 s
= pickle
.dumps(message
)
287 print >>sys
.__stderr
__, "Cannot pickle:", `message`
289 s
= struct
.pack("<i", len(s
)) + s
292 n
= self
.sock
.send(s
)
293 except AttributeError:
299 def ioready(self
, wait
=0.0):
300 r
, w
, x
= select
.select([self
.sock
.fileno()], [], [], wait
)
305 bufstate
= 0 # meaning: 0 => reading count; 1 => reading data
307 def pollpacket(self
, wait
=0.0):
309 if len(self
.buffer) < self
.bufneed
:
310 if not self
.ioready(wait
):
313 s
= self
.sock
.recv(BUFSIZE
)
320 return self
._stage
1()
323 if self
.bufstate
== 0 and len(self
.buffer) >= 4:
325 self
.buffer = self
.buffer[4:]
326 self
.bufneed
= struct
.unpack("<i", s
)[0]
330 if self
.bufstate
== 1 and len(self
.buffer) >= self
.bufneed
:
331 packet
= self
.buffer[:self
.bufneed
]
332 self
.buffer = self
.buffer[self
.bufneed
:]
337 def pollmessage(self
, wait
=0.0):
338 packet
= self
.pollpacket(wait
)
342 message
= pickle
.loads(packet
)
344 print >>sys
.__stderr
__, "-----------------------"
345 print >>sys
.__stderr
__, "cannot unpickle packet:", `packet`
346 traceback
.print_stack(file=sys
.__stderr
__)
347 print >>sys
.__stderr
__, "-----------------------"
351 def pollresponse(self
, myseq
, wait
=0.0):
352 """Handle messages received on the socket.
354 Some messages received may be asynchronous 'call' commands, and
355 some may be responses intended for other threads.
357 Loop until message with myseq sequence number is received. Save others
358 in self.responses and notify the owning thread, except that 'call'
359 commands are handed off to localcall() and the response sent back
360 across the link with the appropriate sequence number.
364 message
= self
.pollmessage(wait
)
365 if message
is None: # socket not ready
367 #wait = 0.0 # poll on subsequent passes instead of blocking
369 self
.debug("pollresponse:%d:myseq:%s" % (seq
, myseq
))
370 if resq
[0] == "call":
371 self
.debug("pollresponse:%d:localcall:call:" % seq
)
372 response
= self
.localcall(resq
)
373 self
.debug("pollresponse:%d:localcall:response:%s"
375 self
.putmessage((seq
, response
))
381 cv
= self
.cvars
.get(seq
)
382 # response involving unknown sequence number is discarded,
383 # probably intended for prior incarnation
385 self
.responses
[seq
] = resq
390 #----------------- end class SocketIO --------------------
398 objecttable
[oid
] = obj
399 return RemoteProxy(oid
)
403 def __init__(self
, oid
):
406 class RPCHandler(SocketServer
.BaseRequestHandler
, SocketIO
):
409 location
= "#S" # Server
411 def __init__(self
, sock
, addr
, svr
):
412 svr
.current_handler
= self
## cgt xxx
413 SocketIO
.__init
__(self
, sock
)
414 SocketServer
.BaseRequestHandler
.__init
__(self
, sock
, addr
, svr
)
417 "handle() method required by SocketServer"
420 def get_remote_proxy(self
, oid
):
421 return RPCProxy(self
, oid
)
423 class RPCClient(SocketIO
):
426 location
= "#C" # Client
428 nextseq
= 1 # Requests coming from the client are odd numbered
430 def __init__(self
, address
, family
=socket
.AF_INET
, type=socket
.SOCK_STREAM
):
431 self
.listening_sock
= socket
.socket(family
, type)
432 self
.listening_sock
.setsockopt(socket
.SOL_SOCKET
,
433 socket
.SO_REUSEADDR
, 1)
434 self
.listening_sock
.bind(address
)
435 self
.listening_sock
.listen(1)
438 working_sock
, address
= self
.listening_sock
.accept()
440 print>>sys
.__stderr
__, "****** Connection request from ", address
441 if address
[0] == '127.0.0.1':
442 SocketIO
.__init
__(self
, working_sock
)
444 print>>sys
.__stderr
__, "** Invalid host: ", address
447 def get_remote_proxy(self
, oid
):
448 return RPCProxy(self
, oid
)
455 def __init__(self
, sockio
, oid
):
459 def __getattr__(self
, name
):
460 if self
.__methods
is None:
462 if self
.__methods
.get(name
):
463 return MethodProxy(self
.sockio
, self
.oid
, name
)
464 if self
.__attributes
is None:
465 self
.__getattributes
()
466 if not self
.__attributes
.has_key(name
):
467 raise AttributeError, name
468 __getattr__
.DebuggerStepThrough
=1
470 def __getattributes(self
):
471 self
.__attributes
= self
.sockio
.remotecall(self
.oid
,
472 "__attributes__", (), {})
474 def __getmethods(self
):
475 self
.__methods
= self
.sockio
.remotecall(self
.oid
,
476 "__methods__", (), {})
478 def _getmethods(obj
, methods
):
479 # Helper to get a list of methods from an object
480 # Adds names to dictionary argument 'methods'
481 for name
in dir(obj
):
482 attr
= getattr(obj
, name
)
485 if type(obj
) == types
.InstanceType
:
486 _getmethods(obj
.__class
__, methods
)
487 if type(obj
) == types
.ClassType
:
488 for super in obj
.__bases
__:
489 _getmethods(super, methods
)
491 def _getattributes(obj
, attributes
):
492 for name
in dir(obj
):
493 attr
= getattr(obj
, name
)
494 if not callable(attr
):
499 def __init__(self
, sockio
, oid
, name
):
504 def __call__(self
, *args
, **kwargs
):
505 value
= self
.sockio
.remotecall(self
.oid
, self
.name
, args
, kwargs
)
512 def testServer(addr
):
513 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
515 def __init__(self
,name
):
517 def greet(self
, name
):
518 print "(someone called greet)"
519 print "Hello %s, I am %s." % (name
, self
.name
)
522 print "(someone called getName)"
525 def greet_this_guy(self
, name
):
526 print "(someone called greet_this_guy)"
527 print "About to greet %s ..." % name
528 remote_guy
= self
.server
.current_handler
.get_remote_proxy(name
)
529 remote_guy
.greet("Thomas Edison")
533 person
= RemotePerson("Thomas Edison")
534 svr
= RPCServer(addr
)
535 svr
.register('thomas', person
)
536 person
.server
= svr
# only required if callbacks are used
538 # svr.serve_forever()
539 svr
.handle_request() # process once only
541 def testClient(addr
):
542 "demonstrates RPC Client"
543 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
546 thomas
= clt
.get_remote_proxy("thomas")
547 print "The remote person's name is ..."
548 print thomas
.getName()
549 # print clt.remotecall("thomas", "getName", (), {})
552 print "Getting remote thomas to say hi..."
553 thomas
.greet("Alexander Bell")
554 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
558 # demonstrates remote server calling local instance
560 def __init__(self
,name
):
562 def greet(self
, name
):
563 print "You've greeted me!"
566 person
= LocalPerson("Alexander Bell")
567 clt
.register("alexander",person
)
568 thomas
.greet_this_guy("alexander")
569 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
572 addr
=("localhost",8833)
573 if len(sys
.argv
) == 2:
574 if sys
.argv
[1]=='-server':
579 if __name__
== '__main__':