XDRRPC fix
[systematiki.git] / Systematiki / Networking / XDRRPC.py
blob104db231f1a228a048b13c832b825dff3febef64
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 """
4 Systematiki xdrlib-based RPC.
5 """
7 # Copyright (C) 2007 Felix Rabe <public@felixrabe.textdriven.com>
9 # This library is free software; you can redistribute it and/or
10 # modify it under the terms of the GNU Lesser General Public
11 # License as published by the Free Software Foundation; either
12 # version 2.1 of the License, or (at your option) any later version.
14 # This library is distributed in the hope that it will be useful,
15 # but WITHOUT ANY WARRANTY; without even the implied warranty of
16 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 # Lesser General Public License for more details.
19 # You should have received a copy of the GNU Lesser General Public License
20 # along with this library; if not, write to the Free Software Foundation,
21 # Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA
23 # Recommended line length or text width: 75 characters.
25 import random
26 import time
27 import xdrlib
29 from Systematiki.DeferredReturn import DeferredReturn
32 def _pack_args(typestr, args):
33 # First typestr byte is "C" for Call, "R" for Return.
34 arg_packer = xdrlib.Packer()
35 for arg in args:
36 if isinstance(arg, string):
37 typestr += "S"
38 arg_packer.pack_string(arg)
39 elif isinstance(arg, int):
40 typestr += "i"
41 arg_packer.pack_int(arg)
42 elif isinstance(arg, unicode):
43 typestr += "U"
44 arg_packer.pack_string(arg.encode("utf-8"))
45 else:
46 raise TypeError, "'%s' type not supported by XDRRPC"
47 return (typestr, arg_packer)
50 class _XDRRPCMethod(object):
52 def __init__(self, xdrrpc, name):
53 self._rpc = xdrrpc
54 self._name = name
56 def __call__(self, *args):
57 typestr, arg_packer = _pack_args("C", args)
58 header_packer = xdrlib.Packer()
59 t, r = time.time(), random.random()
60 header_packer.pack_double(t)
61 header_packer.pack_double(r)
62 header_packer.pack_string(self._name)
63 header_packer.pack_string(typestr)
64 msg = header_packer.get_buffer() + arg_packer.get_buffer()
65 deferred_return = DeferredReturn()
66 self._rpc._wait_for(t, r, self._name, deferred_return)
67 self._rpc._string_protocol.send(msg)
68 return deferred_return
71 class XDRRPC(object):
72 """
73 This RPC class can be used for both clients and servers.
74 """
76 def __init__(self, connected_string_protocol, served_object):
77 self._string_protocol = connected_string_protocol
78 self._wait_queue = {}
79 self._string_protocol.recv(self._recv_cb)
80 self._served_object = served_object
82 def __getattr__(self, name):
83 if name.startswith("_"):
84 return super(XDRRPC, self).__getattr__(name)
85 return _XDRRPCMethod(self, name)
87 def _deferred_cb(self, t, r, name, args):
88 typestr, arg_packer = _pack_args("R", args)
89 header_packer = xdrlib.Packer()
90 header_packer.pack_double(t)
91 header_packer.pack_double(r)
92 header_packer.pack_string(name)
93 header_packer.pack_string(typestr)
94 msg = header_packer.get_buffer() + arg_packer.get_buffer()
95 self._string_protocol.send(msg)
97 def _recv_cb(self, proto, msg):
98 unpacker = xdrlib.Unpacker(msg)
99 t = unpacker.unpack_double()
100 r = unpacker.unpack_double()
101 name = unpacker.unpack_string()
102 typestr = unpacker.unpack_string()
103 args = []
104 for typechar in typestr[1:]:
105 if typechar == "S":
106 args.append(unpacker.unpack_string())
107 elif typechar == "i":
108 args.append(unpacker.unpack_int())
109 elif typechar == "U":
110 args.append(unpacker.unpack_string().decode("utf-8"))
111 if typestr[0] == "C": # Call
112 cb = lambda *a: self._deferred_cb(t, r, name, a)
113 getattr(self._served_object, name)(*args)(cb)
114 return
115 elif typestr[0] == "R": # Return
116 deferred_return = self._wait_queue.pop((t, r, name))
117 deferred_return.callback(*args)
118 return
119 else:
120 raise Exception, "invalid method call received"
122 def _wait_for(self, t, r, name, deferred_return):
123 self._wait_queue[(t, r, name)] = deferred_return