Allow "" as a valid value in a OptionsBox menu (reported by Guido Schimmels).
[rox-lib.git] / python / rox / proxy.py
blob75925e916b4ebf0d5b24cb3e5c8e35cd5d7eb64a
1 """Given a pair of pipes with a python process at each end, this module
2 allows one end to make calls on the other. This is used by the su module
3 to allow control of a subprocess running as another user, but it may also
4 be useful in other situations.
5 """
7 from __future__ import generators
8 # Note: do not import rox or gtk. Needs to work without DISPLAY.
9 import os, sys, pwd
10 import traceback
11 import fcntl
12 from select import select
13 import cPickle as pickle
15 class _EndOfResponses:
16 """Internal. Indicates that no more responses to this method will
17 follow."""
18 EndOfResponses = _EndOfResponses()
20 class MasterObject(object):
21 """Invoking a method on a MasterObject invokes the corresponding
22 method on the slave object. The return value is a Queue from
23 which the responses can be read."""
24 _serial = 0
26 def __init__(self, master):
27 self._master = master
29 def __getattr__(self, name):
30 def method(*args):
31 self._serial += 1
32 queue = self._master._add_queue(self._serial)
33 self._master.write_object((self._serial, name, args))
34 return queue
35 return method
37 class Proxy:
38 def __init__(self, to_peer, from_peer, slave_object = None):
39 self.to_peer = to_peer
40 self.from_peer = from_peer
41 self.out_buffer = ""
42 self.in_buffer = ""
44 self.enable_read_watch()
46 def enable_read_watch(self):
47 from rox import g
48 g.input_add(self.from_peer, g.gdk.INPUT_READ,
49 lambda src, cond: self.read_ready())
51 def enable_write_watch(self):
52 from rox import g
53 g.input_add(self.to_peer, g.gdk.INPUT_WRITE,
54 lambda src, cond: self.write_ready())
56 def write_object(self, object):
57 if self.to_peer == -1:
58 raise Exception('Peer is defunct')
59 if not self.out_buffer:
60 self.enable_write_watch()
62 s = pickle.dumps(object)
63 s = str(len(s)) + ":" + s
64 self.out_buffer += s
66 def write_ready(self):
67 """Returns True if the buffer is not empty on exit."""
68 while self.out_buffer:
69 w = select([], [self.to_peer], [], 0)[1]
70 if not w:
71 print "Not ready for writing"
72 return True
73 n = os.write(self.to_peer, self.out_buffer)
74 self.out_buffer = self.out_buffer[n:]
75 return False
77 def read_ready(self):
78 new = os.read(self.from_peer, 1000)
79 if not new:
80 self.finish()
81 raise Exception("Lost connection to slave!")
82 self.in_buffer += new
83 while ':' in self.in_buffer:
84 l, rest = self.in_buffer.split(':', 1)
85 l = int(l)
86 if len(rest) < l:
87 return True # Haven't got everything yet
88 s = rest[:l]
89 self.in_buffer = rest[l:]
90 value = pickle.loads(s)
91 self._dispatch(value)
92 return True
94 def finish(self):
95 self.to_slave = -1
96 self.from_slave = -1
98 class Queue:
99 """A queue of responses to some method call.
100 Queue.blocker is triggered when the response queue becomes non-empty,
101 so yield that before trying to read from the queue, if using the
102 tasks module.
104 For simple use (exactly one response), use:
105 data = Queue.dequeue_last()
107 For sequences, read the next result with:
108 data = Queue.dequeue()
109 Will return EndOfResponses on the last call.
111 master = None
112 serial = None
113 blocker = None
114 queue = None
115 _at_end = False
117 def __init__(self, master, serial):
118 from rox import tasks # Don't require tasks for slaves
119 self.master = master
120 self.serial = serial
121 self.queue = []
122 self.blocker = tasks.Blocker()
124 def add(self, data):
125 """Add an item to the queue and trigger our current blocker."""
126 self.queue.append(data)
127 if self._at_end:
128 # Auto-dequeue EndOfResponses item
129 self.dequeue()
130 else:
131 self.blocker.trigger()
133 def dequeue(self):
134 """Returns the first item in the queue for this serial number.
135 Queue.blocker may change to a new blocker (if the queue is now
136 empty) or None (if no more responses will arrive), so be sure
137 to reread it after this."""
138 assert self.blocker.happened
140 data = self.queue.pop(0)
141 if isinstance(data, _EndOfResponses):
142 assert not self.queue
143 self.master._remove_queue(self.serial)
144 self.queue = None
145 self.blocker = None
146 return EndOfResponses
147 assert not self._at_end
148 if not self.queue:
149 # Queue is empty; create a new blocker
150 from rox import tasks
151 self.blocker = tasks.Blocker()
152 if isinstance(data, Exception):
153 raise data
154 return data
156 def dequeue_last(self):
157 """Calls dequeue, and also sets a flag to indicate that
158 the next item will be EndOfResponses, which will be handled
159 automatically."""
160 try:
161 data = self.dequeue()
162 return data
163 finally:
164 self._at_end = True
165 if self.queue:
166 self.dequeue() # Force cleanup now
168 class MasterProxy(Proxy):
169 """Invoking operations on MasterProxy.root will invoke the same
170 operation on the SlaveProxy's slave_object."""
172 def __init__(self, to_slave, from_slave):
173 Proxy.__init__(self, to_slave, from_slave)
174 self.root = MasterObject(self)
175 self._queue = {} # Serial -> Queue
177 def _dispatch(self, value):
178 serial, data = value
179 self._queue[serial].add(data)
181 def _add_queue(self, serial):
182 assert serial not in self._queue
183 queue = Queue(self, serial)
184 self._queue[serial] = queue
185 return queue
187 def _remove_queue(self, serial):
188 del self._queue[serial]
190 def finish(self):
191 Proxy.finish(self)
192 assert not self._queue
194 class Request(object):
195 """Call Request.send() to send replies. When destroyed, sends a
196 stop message to the master."""
197 def __init__(self, send):
198 self.send = send
200 def __del__(self):
201 self.send(EndOfResponses)
203 class SlaveProxy(Proxy):
204 """Methods invoked on MasterProxy.root will be invoked on
205 slave_object with a callback function as the first argument.
206 This may be called any number of times to send replies."""
207 def __init__(self, to_master, from_master, slave_object):
208 Proxy.__init__(self, to_master, from_master)
209 self.slave_object = slave_object
211 def _dispatch(self, value):
212 serial, method, args = value
213 def send(value):
214 self.write_object((serial, value))
215 request = Request(send)
216 try:
217 getattr(self.slave_object, method)(request, *args)
218 except Exception, e:
219 send(e)