1 """RPC Implemention, originally written for the Python Idle IDE
3 For security reasons, GvR requested that Idle's Python execution server process
4 connect to the Idle process, which listens for the connection. Since Idle has
5 has only one client per server, this was not a limitation.
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
15 | + -------------------+ |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
22 The RPCServer handler class is expected to provide register/unregister methods.
23 RPCHandler inherits the mix-in class SocketIO, which provides these methods.
25 See the Idle run.main() docstring for further information on how this was
35 import cPickle
as pickle
42 def unpickle_code(ms
):
43 co
= marshal
.loads(ms
)
44 assert isinstance(co
, types
.CodeType
)
48 assert isinstance(co
, types
.CodeType
)
49 ms
= marshal
.dumps(co
)
50 return unpickle_code
, (ms
,)
52 # XXX KBK 24Aug02 function pickling capability not used in Idle
53 # def unpickle_function(ms):
56 # def pickle_function(fn):
57 # assert isinstance(fn, type.FunctionType)
60 copy_reg
.pickle(types
.CodeType
, pickle_code
, unpickle_code
)
61 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
65 class RPCServer(SocketServer
.TCPServer
):
67 def __init__(self
, addr
, handlerclass
=None):
68 if handlerclass
is None:
69 handlerclass
= RPCHandler
70 SocketServer
.TCPServer
.__init
__(self
, addr
, handlerclass
)
72 def server_bind(self
):
73 "Override TCPServer method, no bind() phase for connecting entity"
76 def server_activate(self
):
77 """Override TCPServer method, connect() instead of listen()
79 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
83 self
.socket
.connect(self
.server_address
)
85 def get_request(self
):
86 "Override TCPServer method, return already connected socket"
87 return self
.socket
, self
.server_address
95 def __init__(self
, sock
, objtable
=None, debugging
=None):
96 self
.mainthread
= threading
.currentThread()
97 if debugging
is not None:
98 self
.debugging
= debugging
101 objtable
= objecttable
102 self
.objtable
= objtable
103 self
.statelock
= threading
.Lock()
113 def debug(self
, *args
):
114 if not self
.debugging
:
116 s
= str(threading
.currentThread().getName())
120 sys
.__stderr
__.write(s
)
122 def register(self
, oid
, object):
123 self
.objtable
[oid
] = object
125 def unregister(self
, oid
):
127 del self
.objtable
[oid
]
131 def localcall(self
, request
):
132 self
.debug("localcall:", request
)
134 how
, (oid
, methodname
, args
, kwargs
) = request
136 return ("ERROR", "Bad request format")
138 if not self
.objtable
.has_key(oid
):
139 return ("ERROR", "Unknown object id: %s" % `oid`
)
140 obj
= self
.objtable
[oid
]
141 if methodname
== "__methods__":
143 _getmethods(obj
, methods
)
144 return ("OK", methods
)
145 if methodname
== "__attributes__":
147 _getattributes(obj
, attributes
)
148 return ("OK", attributes
)
149 if not hasattr(obj
, methodname
):
150 return ("ERROR", "Unsupported method name: %s" % `methodname`
)
151 method
= getattr(obj
, methodname
)
153 ret
= method(*args
, **kwargs
)
154 if isinstance(ret
, RemoteObject
):
158 ##traceback.print_exc(file=sys.__stderr__)
159 typ
, val
, tb
= info
= sys
.exc_info()
160 sys
.last_type
, sys
.last_value
, sys
.last_traceback
= info
161 if isinstance(typ
, type(Exception)):
165 if issubclass(typ
, Exception):
174 tb
= traceback
.extract_tb(tb
)
175 return ("EXCEPTION", (mod
, name
, args
, tb
))
177 def remotecall(self
, oid
, methodname
, args
, kwargs
):
178 self
.debug("remotecall:", oid
, methodname
, args
, kwargs
)
179 seq
= self
.asynccall(oid
, methodname
, args
, kwargs
)
180 ret
= self
.asyncreturn(seq
)
181 self
.debug("return:", ret
)
184 def asynccall(self
, oid
, methodname
, args
, kwargs
):
185 self
.debug("asyncall:", oid
, methodname
, args
, kwargs
)
186 request
= ("call", (oid
, methodname
, args
, kwargs
))
187 seq
= self
.putrequest(request
)
190 def asyncreturn(self
, seq
):
191 response
= self
.getresponse(seq
)
192 return self
.decoderesponse(response
)
194 def decoderesponse(self
, response
):
198 if how
== "EXCEPTION":
199 mod
, name
, args
, tb
= what
201 if mod
: # not string exception
204 module
= sys
.modules
[mod
]
209 cls
= getattr(module
, name
)
210 except AttributeError:
213 # instantiate a built-in exception object and raise it
214 raise getattr(__import__(mod
), name
)(*args
)
215 name
= mod
+ "." + name
216 # do the best we can:
219 raise RuntimeError, what
220 raise SystemError, (how
, what
)
224 self
.getresponse(None)
228 def getresponse(self
, myseq
):
229 response
= self
._getresponse
(myseq
)
230 if response
is not None:
233 response
= how
, self
._proxify
(what
)
236 def _proxify(self
, obj
):
237 if isinstance(obj
, RemoteProxy
):
238 return RPCProxy(self
, obj
.oid
)
239 if isinstance(obj
, types
.ListType
):
240 return map(self
._proxify
, obj
)
241 # XXX Check for other types -- not currently needed
244 def _getresponse(self
, myseq
):
245 if threading
.currentThread() is self
.mainthread
:
246 # Main thread: does all reading of requests and responses
248 response
= self
.pollresponse(myseq
, None)
249 if response
is not None:
252 # Auxiliary thread: wait for notification from main thread
253 cvar
= threading
.Condition(self
.statelock
)
254 self
.statelock
.acquire()
255 self
.cvars
[myseq
] = cvar
256 while not self
.responses
.has_key(myseq
):
258 response
= self
.responses
[myseq
]
259 del self
.responses
[myseq
]
260 del self
.cvars
[myseq
]
261 self
.statelock
.release()
264 def putrequest(self
, request
):
266 self
.putmessage((seq
, request
))
272 self
.nextseq
= seq
= self
.nextseq
+ 2
275 def putmessage(self
, message
):
277 s
= pickle
.dumps(message
)
279 print >>sys
.__stderr
__, "Cannot pickle:", `message`
281 s
= struct
.pack("<i", len(s
)) + s
283 n
= self
.sock
.send(s
)
286 def ioready(self
, wait
=0.0):
287 r
, w
, x
= select
.select([self
.sock
.fileno()], [], [], wait
)
292 bufstate
= 0 # meaning: 0 => reading count; 1 => reading data
294 def pollpacket(self
, wait
=0.0):
296 if len(self
.buffer) < self
.bufneed
:
297 if not self
.ioready(wait
):
300 s
= self
.sock
.recv(BUFSIZE
)
307 return self
._stage
1()
310 if self
.bufstate
== 0 and len(self
.buffer) >= 4:
312 self
.buffer = self
.buffer[4:]
313 self
.bufneed
= struct
.unpack("<i", s
)[0]
317 if self
.bufstate
== 1 and len(self
.buffer) >= self
.bufneed
:
318 packet
= self
.buffer[:self
.bufneed
]
319 self
.buffer = self
.buffer[self
.bufneed
:]
324 def pollmessage(self
, wait
=0.0):
325 packet
= self
.pollpacket(wait
)
329 message
= pickle
.loads(packet
)
331 print >>sys
.__stderr
__, "-----------------------"
332 print >>sys
.__stderr
__, "cannot unpickle packet:", `packet`
333 traceback
.print_stack(file=sys
.__stderr
__)
334 print >>sys
.__stderr
__, "-----------------------"
338 def pollresponse(self
, myseq
, wait
=0.0):
339 # Loop while there's no more buffered input or until specific response
341 message
= self
.pollmessage(wait
)
346 if resq
[0] == "call":
347 response
= self
.localcall(resq
)
348 self
.putmessage((seq
, response
))
353 self
.statelock
.acquire()
354 self
.responses
[seq
] = resq
355 cv
= self
.cvars
.get(seq
)
358 self
.statelock
.release()
361 #----------------- end class SocketIO --------------------
369 objecttable
[oid
] = obj
370 return RemoteProxy(oid
)
374 def __init__(self
, oid
):
377 class RPCHandler(SocketServer
.BaseRequestHandler
, SocketIO
):
381 def __init__(self
, sock
, addr
, svr
):
382 svr
.current_handler
= self
## cgt xxx
383 SocketIO
.__init
__(self
, sock
)
384 SocketServer
.BaseRequestHandler
.__init
__(self
, sock
, addr
, svr
)
387 "handle() method required by SocketServer"
390 def get_remote_proxy(self
, oid
):
391 return RPCProxy(self
, oid
)
393 class RPCClient(SocketIO
):
395 nextseq
= 1 # Requests coming from the client are odd numbered
397 def __init__(self
, address
, family
=socket
.AF_INET
, type=socket
.SOCK_STREAM
):
398 self
.listening_sock
= socket
.socket(family
, type)
399 self
.listening_sock
.setsockopt(socket
.SOL_SOCKET
,
400 socket
.SO_REUSEADDR
, 1)
401 self
.listening_sock
.bind(address
)
402 self
.listening_sock
.listen(1)
405 working_sock
, address
= self
.listening_sock
.accept()
406 if address
[0] == '127.0.0.1':
407 print>>sys
.__stderr
__, "Idle accepted connection from ", address
408 SocketIO
.__init
__(self
, working_sock
)
410 print>>sys
.__stderr
__, "Invalid host: ", address
413 def get_remote_proxy(self
, oid
):
414 return RPCProxy(self
, oid
)
421 def __init__(self
, sockio
, oid
):
425 def __getattr__(self
, name
):
426 if self
.__methods
is None:
428 if self
.__methods
.get(name
):
429 return MethodProxy(self
.sockio
, self
.oid
, name
)
430 if self
.__attributes
is None:
431 self
.__getattributes
()
432 if not self
.__attributes
.has_key(name
):
433 raise AttributeError, name
434 __getattr__
.DebuggerStepThrough
=1
436 def __getattributes(self
):
437 self
.__attributes
= self
.sockio
.remotecall(self
.oid
,
438 "__attributes__", (), {})
440 def __getmethods(self
):
441 self
.__methods
= self
.sockio
.remotecall(self
.oid
,
442 "__methods__", (), {})
444 def _getmethods(obj
, methods
):
445 # Helper to get a list of methods from an object
446 # Adds names to dictionary argument 'methods'
447 for name
in dir(obj
):
448 attr
= getattr(obj
, name
)
451 if type(obj
) == types
.InstanceType
:
452 _getmethods(obj
.__class
__, methods
)
453 if type(obj
) == types
.ClassType
:
454 for super in obj
.__bases
__:
455 _getmethods(super, methods
)
457 def _getattributes(obj
, attributes
):
458 for name
in dir(obj
):
459 attr
= getattr(obj
, name
)
460 if not callable(attr
):
465 def __init__(self
, sockio
, oid
, name
):
470 def __call__(self
, *args
, **kwargs
):
471 value
= self
.sockio
.remotecall(self
.oid
, self
.name
, args
, kwargs
)
478 def testServer(addr
):
479 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
481 def __init__(self
,name
):
483 def greet(self
, name
):
484 print "(someone called greet)"
485 print "Hello %s, I am %s." % (name
, self
.name
)
488 print "(someone called getName)"
491 def greet_this_guy(self
, name
):
492 print "(someone called greet_this_guy)"
493 print "About to greet %s ..." % name
494 remote_guy
= self
.server
.current_handler
.get_remote_proxy(name
)
495 remote_guy
.greet("Thomas Edison")
499 person
= RemotePerson("Thomas Edison")
500 svr
= RPCServer(addr
)
501 svr
.register('thomas', person
)
502 person
.server
= svr
# only required if callbacks are used
504 # svr.serve_forever()
505 svr
.handle_request() # process once only
507 def testClient(addr
):
508 "demonstrates RPC Client"
509 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
512 thomas
= clt
.get_remote_proxy("thomas")
513 print "The remote person's name is ..."
514 print thomas
.getName()
515 # print clt.remotecall("thomas", "getName", (), {})
518 print "Getting remote thomas to say hi..."
519 thomas
.greet("Alexander Bell")
520 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
524 # demonstrates remote server calling local instance
526 def __init__(self
,name
):
528 def greet(self
, name
):
529 print "You've greeted me!"
532 person
= LocalPerson("Alexander Bell")
533 clt
.register("alexander",person
)
534 thomas
.greet_this_guy("alexander")
535 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
538 addr
=("localhost",8833)
539 if len(sys
.argv
) == 2:
540 if sys
.argv
[1]=='-server':
545 if __name__
== '__main__':