Bump version number to 2.4.2 to pick up the latest minor bug fixes.
[python/dscho.git] / Lib / idlelib / rpc.py
blobeeb1b4efd44b00ad9a54880c6dc7f9995e9e2bc5
1 """RPC Implemention, originally written for the Python Idle IDE
3 For security reasons, GvR requested that Idle's Python execution server process
4 connect to the Idle process, which listens for the connection. Since Idle has
5 has only one client per server, this was not a limitation.
7 +---------------------------------+ +-------------+
8 | SocketServer.BaseRequestHandler | | SocketIO |
9 +---------------------------------+ +-------------+
10 ^ | register() |
11 | | unregister()|
12 | +-------------+
13 | ^ ^
14 | | |
15 | + -------------------+ |
16 | | |
17 +-------------------------+ +-----------------+
18 | RPCHandler | | RPCClient |
19 | [attribute of RPCServer]| | |
20 +-------------------------+ +-----------------+
22 The RPCServer handler class is expected to provide register/unregister methods.
23 RPCHandler inherits the mix-in class SocketIO, which provides these methods.
25 See the Idle run.main() docstring for further information on how this was
26 accomplished in Idle.
28 """
30 import sys
31 import socket
32 import select
33 import SocketServer
34 import struct
35 import cPickle as pickle
36 import threading
37 import traceback
38 import copy_reg
39 import types
40 import marshal
42 def unpickle_code(ms):
43 co = marshal.loads(ms)
44 assert isinstance(co, types.CodeType)
45 return co
47 def pickle_code(co):
48 assert isinstance(co, types.CodeType)
49 ms = marshal.dumps(co)
50 return unpickle_code, (ms,)
52 # XXX KBK 24Aug02 function pickling capability not used in Idle
53 # def unpickle_function(ms):
54 # return ms
56 # def pickle_function(fn):
57 # assert isinstance(fn, type.FunctionType)
58 # return `fn`
60 copy_reg.pickle(types.CodeType, pickle_code, unpickle_code)
61 # copy_reg.pickle(types.FunctionType, pickle_function, unpickle_function)
63 BUFSIZE = 8*1024
65 class RPCServer(SocketServer.TCPServer):
67 def __init__(self, addr, handlerclass=None):
68 if handlerclass is None:
69 handlerclass = RPCHandler
70 SocketServer.TCPServer.__init__(self, addr, handlerclass)
72 def server_bind(self):
73 "Override TCPServer method, no bind() phase for connecting entity"
74 pass
76 def server_activate(self):
77 """Override TCPServer method, connect() instead of listen()
79 Due to the reversed connection, self.server_address is actually the
80 address of the Idle Client to which we are connecting.
82 """
83 self.socket.connect(self.server_address)
85 def get_request(self):
86 "Override TCPServer method, return already connected socket"
87 return self.socket, self.server_address
89 objecttable = {}
91 class SocketIO:
93 debugging = 0
95 def __init__(self, sock, objtable=None, debugging=None):
96 self.mainthread = threading.currentThread()
97 if debugging is not None:
98 self.debugging = debugging
99 self.sock = sock
100 if objtable is None:
101 objtable = objecttable
102 self.objtable = objtable
103 self.statelock = threading.Lock()
104 self.responses = {}
105 self.cvars = {}
107 def close(self):
108 sock = self.sock
109 self.sock = None
110 if sock is not None:
111 sock.close()
113 def debug(self, *args):
114 if not self.debugging:
115 return
116 s = str(threading.currentThread().getName())
117 for a in args:
118 s = s + " " + str(a)
119 s = s + "\n"
120 sys.__stderr__.write(s)
122 def register(self, oid, object):
123 self.objtable[oid] = object
125 def unregister(self, oid):
126 try:
127 del self.objtable[oid]
128 except KeyError:
129 pass
131 def localcall(self, request):
132 self.debug("localcall:", request)
133 try:
134 how, (oid, methodname, args, kwargs) = request
135 except TypeError:
136 return ("ERROR", "Bad request format")
137 assert how == "call"
138 if not self.objtable.has_key(oid):
139 return ("ERROR", "Unknown object id: %s" % `oid`)
140 obj = self.objtable[oid]
141 if methodname == "__methods__":
142 methods = {}
143 _getmethods(obj, methods)
144 return ("OK", methods)
145 if methodname == "__attributes__":
146 attributes = {}
147 _getattributes(obj, attributes)
148 return ("OK", attributes)
149 if not hasattr(obj, methodname):
150 return ("ERROR", "Unsupported method name: %s" % `methodname`)
151 method = getattr(obj, methodname)
152 try:
153 ret = method(*args, **kwargs)
154 if isinstance(ret, RemoteObject):
155 ret = remoteref(ret)
156 return ("OK", ret)
157 except:
158 ##traceback.print_exc(file=sys.__stderr__)
159 typ, val, tb = info = sys.exc_info()
160 sys.last_type, sys.last_value, sys.last_traceback = info
161 if isinstance(typ, type(Exception)):
162 # Class exceptions
163 mod = typ.__module__
164 name = typ.__name__
165 if issubclass(typ, Exception):
166 args = val.args
167 else:
168 args = (str(val),)
169 else:
170 # String exceptions
171 mod = None
172 name = typ
173 args = (str(val),)
174 tb = traceback.extract_tb(tb)
175 return ("EXCEPTION", (mod, name, args, tb))
177 def remotecall(self, oid, methodname, args, kwargs):
178 self.debug("remotecall:", oid, methodname, args, kwargs)
179 seq = self.asynccall(oid, methodname, args, kwargs)
180 ret = self.asyncreturn(seq)
181 self.debug("return:", ret)
182 return ret
184 def asynccall(self, oid, methodname, args, kwargs):
185 self.debug("asyncall:", oid, methodname, args, kwargs)
186 request = ("call", (oid, methodname, args, kwargs))
187 seq = self.putrequest(request)
188 return seq
190 def asyncreturn(self, seq):
191 response = self.getresponse(seq)
192 return self.decoderesponse(response)
194 def decoderesponse(self, response):
195 how, what = response
196 if how == "OK":
197 return what
198 if how == "EXCEPTION":
199 mod, name, args, tb = what
200 self.traceback = tb
201 if mod: # not string exception
202 try:
203 __import__(mod)
204 module = sys.modules[mod]
205 except ImportError:
206 pass
207 else:
208 try:
209 cls = getattr(module, name)
210 except AttributeError:
211 pass
212 else:
213 # instantiate a built-in exception object and raise it
214 raise getattr(__import__(mod), name)(*args)
215 name = mod + "." + name
216 # do the best we can:
217 raise name, args
218 if how == "ERROR":
219 raise RuntimeError, what
220 raise SystemError, (how, what)
222 def mainloop(self):
223 try:
224 self.getresponse(None)
225 except EOFError:
226 pass
228 def getresponse(self, myseq):
229 response = self._getresponse(myseq)
230 if response is not None:
231 how, what = response
232 if how == "OK":
233 response = how, self._proxify(what)
234 return response
236 def _proxify(self, obj):
237 if isinstance(obj, RemoteProxy):
238 return RPCProxy(self, obj.oid)
239 if isinstance(obj, types.ListType):
240 return map(self._proxify, obj)
241 # XXX Check for other types -- not currently needed
242 return obj
244 def _getresponse(self, myseq):
245 if threading.currentThread() is self.mainthread:
246 # Main thread: does all reading of requests and responses
247 while 1:
248 response = self.pollresponse(myseq, None)
249 if response is not None:
250 return response
251 else:
252 # Auxiliary thread: wait for notification from main thread
253 cvar = threading.Condition(self.statelock)
254 self.statelock.acquire()
255 self.cvars[myseq] = cvar
256 while not self.responses.has_key(myseq):
257 cvar.wait()
258 response = self.responses[myseq]
259 del self.responses[myseq]
260 del self.cvars[myseq]
261 self.statelock.release()
262 return response
264 def putrequest(self, request):
265 seq = self.newseq()
266 self.putmessage((seq, request))
267 return seq
269 nextseq = 0
271 def newseq(self):
272 self.nextseq = seq = self.nextseq + 2
273 return seq
275 def putmessage(self, message):
276 try:
277 s = pickle.dumps(message)
278 except:
279 print >>sys.__stderr__, "Cannot pickle:", `message`
280 raise
281 s = struct.pack("<i", len(s)) + s
282 while len(s) > 0:
283 n = self.sock.send(s)
284 s = s[n:]
286 def ioready(self, wait=0.0):
287 r, w, x = select.select([self.sock.fileno()], [], [], wait)
288 return len(r)
290 buffer = ""
291 bufneed = 4
292 bufstate = 0 # meaning: 0 => reading count; 1 => reading data
294 def pollpacket(self, wait=0.0):
295 self._stage0()
296 if len(self.buffer) < self.bufneed:
297 if not self.ioready(wait):
298 return None
299 try:
300 s = self.sock.recv(BUFSIZE)
301 except socket.error:
302 raise EOFError
303 if len(s) == 0:
304 raise EOFError
305 self.buffer += s
306 self._stage0()
307 return self._stage1()
309 def _stage0(self):
310 if self.bufstate == 0 and len(self.buffer) >= 4:
311 s = self.buffer[:4]
312 self.buffer = self.buffer[4:]
313 self.bufneed = struct.unpack("<i", s)[0]
314 self.bufstate = 1
316 def _stage1(self):
317 if self.bufstate == 1 and len(self.buffer) >= self.bufneed:
318 packet = self.buffer[:self.bufneed]
319 self.buffer = self.buffer[self.bufneed:]
320 self.bufneed = 4
321 self.bufstate = 0
322 return packet
324 def pollmessage(self, wait=0.0):
325 packet = self.pollpacket(wait)
326 if packet is None:
327 return None
328 try:
329 message = pickle.loads(packet)
330 except:
331 print >>sys.__stderr__, "-----------------------"
332 print >>sys.__stderr__, "cannot unpickle packet:", `packet`
333 traceback.print_stack(file=sys.__stderr__)
334 print >>sys.__stderr__, "-----------------------"
335 raise
336 return message
338 def pollresponse(self, myseq, wait=0.0):
339 # Loop while there's no more buffered input or until specific response
340 while 1:
341 message = self.pollmessage(wait)
342 if message is None:
343 return None
344 wait = 0.0
345 seq, resq = message
346 if resq[0] == "call":
347 response = self.localcall(resq)
348 self.putmessage((seq, response))
349 continue
350 elif seq == myseq:
351 return resq
352 else:
353 self.statelock.acquire()
354 self.responses[seq] = resq
355 cv = self.cvars.get(seq)
356 if cv is not None:
357 cv.notify()
358 self.statelock.release()
359 continue
361 #----------------- end class SocketIO --------------------
363 class RemoteObject:
364 # Token mix-in class
365 pass
367 def remoteref(obj):
368 oid = id(obj)
369 objecttable[oid] = obj
370 return RemoteProxy(oid)
372 class RemoteProxy:
374 def __init__(self, oid):
375 self.oid = oid
377 class RPCHandler(SocketServer.BaseRequestHandler, SocketIO):
379 debugging = 0
381 def __init__(self, sock, addr, svr):
382 svr.current_handler = self ## cgt xxx
383 SocketIO.__init__(self, sock)
384 SocketServer.BaseRequestHandler.__init__(self, sock, addr, svr)
386 def handle(self):
387 "handle() method required by SocketServer"
388 self.mainloop()
390 def get_remote_proxy(self, oid):
391 return RPCProxy(self, oid)
393 class RPCClient(SocketIO):
395 nextseq = 1 # Requests coming from the client are odd numbered
397 def __init__(self, address, family=socket.AF_INET, type=socket.SOCK_STREAM):
398 self.listening_sock = socket.socket(family, type)
399 self.listening_sock.setsockopt(socket.SOL_SOCKET,
400 socket.SO_REUSEADDR, 1)
401 self.listening_sock.bind(address)
402 self.listening_sock.listen(1)
404 def accept(self):
405 working_sock, address = self.listening_sock.accept()
406 if address[0] == '127.0.0.1':
407 print>>sys.__stderr__, "Idle accepted connection from ", address
408 SocketIO.__init__(self, working_sock)
409 else:
410 print>>sys.__stderr__, "Invalid host: ", address
411 raise socket.error
413 def get_remote_proxy(self, oid):
414 return RPCProxy(self, oid)
416 class RPCProxy:
418 __methods = None
419 __attributes = None
421 def __init__(self, sockio, oid):
422 self.sockio = sockio
423 self.oid = oid
425 def __getattr__(self, name):
426 if self.__methods is None:
427 self.__getmethods()
428 if self.__methods.get(name):
429 return MethodProxy(self.sockio, self.oid, name)
430 if self.__attributes is None:
431 self.__getattributes()
432 if not self.__attributes.has_key(name):
433 raise AttributeError, name
434 __getattr__.DebuggerStepThrough=1
436 def __getattributes(self):
437 self.__attributes = self.sockio.remotecall(self.oid,
438 "__attributes__", (), {})
440 def __getmethods(self):
441 self.__methods = self.sockio.remotecall(self.oid,
442 "__methods__", (), {})
444 def _getmethods(obj, methods):
445 # Helper to get a list of methods from an object
446 # Adds names to dictionary argument 'methods'
447 for name in dir(obj):
448 attr = getattr(obj, name)
449 if callable(attr):
450 methods[name] = 1
451 if type(obj) == types.InstanceType:
452 _getmethods(obj.__class__, methods)
453 if type(obj) == types.ClassType:
454 for super in obj.__bases__:
455 _getmethods(super, methods)
457 def _getattributes(obj, attributes):
458 for name in dir(obj):
459 attr = getattr(obj, name)
460 if not callable(attr):
461 attributes[name] = 1
463 class MethodProxy:
465 def __init__(self, sockio, oid, name):
466 self.sockio = sockio
467 self.oid = oid
468 self.name = name
470 def __call__(self, *args, **kwargs):
471 value = self.sockio.remotecall(self.oid, self.name, args, kwargs)
472 return value
475 # Self Test
478 def testServer(addr):
479 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
480 class RemotePerson:
481 def __init__(self,name):
482 self.name = name
483 def greet(self, name):
484 print "(someone called greet)"
485 print "Hello %s, I am %s." % (name, self.name)
486 print
487 def getName(self):
488 print "(someone called getName)"
489 print
490 return self.name
491 def greet_this_guy(self, name):
492 print "(someone called greet_this_guy)"
493 print "About to greet %s ..." % name
494 remote_guy = self.server.current_handler.get_remote_proxy(name)
495 remote_guy.greet("Thomas Edison")
496 print "Done."
497 print
499 person = RemotePerson("Thomas Edison")
500 svr = RPCServer(addr)
501 svr.register('thomas', person)
502 person.server = svr # only required if callbacks are used
504 # svr.serve_forever()
505 svr.handle_request() # process once only
507 def testClient(addr):
508 "demonstrates RPC Client"
509 # XXX 25 Jul 02 KBK needs update to use rpc.py register/unregister methods
510 import time
511 clt=RPCClient(addr)
512 thomas = clt.get_remote_proxy("thomas")
513 print "The remote person's name is ..."
514 print thomas.getName()
515 # print clt.remotecall("thomas", "getName", (), {})
516 print
517 time.sleep(1)
518 print "Getting remote thomas to say hi..."
519 thomas.greet("Alexander Bell")
520 #clt.remotecall("thomas","greet",("Alexander Bell",), {})
521 print "Done."
522 print
523 time.sleep(2)
524 # demonstrates remote server calling local instance
525 class LocalPerson:
526 def __init__(self,name):
527 self.name = name
528 def greet(self, name):
529 print "You've greeted me!"
530 def getName(self):
531 return self.name
532 person = LocalPerson("Alexander Bell")
533 clt.register("alexander",person)
534 thomas.greet_this_guy("alexander")
535 # clt.remotecall("thomas","greet_this_guy",("alexander",), {})
537 def test():
538 addr=("localhost",8833)
539 if len(sys.argv) == 2:
540 if sys.argv[1]=='-server':
541 testServer(addr)
542 return
543 testClient(addr)
545 if __name__ == '__main__':
546 test()