1 """Given a pair of pipes with a python process at each end, this module
2 allows one end to make calls on the other. This is used by the su module
3 to allow control of a subprocess running as another user, but it may also
4 be useful in other situations.
7 from __future__
import generators
8 # Note: do not import rox or gtk. Needs to work without DISPLAY.
12 from select
import select
13 import cPickle
as pickle
15 class _EndOfResponses
:
16 """Internal. Indicates that no more responses to this method will
18 EndOfResponses
= _EndOfResponses()
20 class MasterObject(object):
21 """Invoking a method on a MasterObject invokes the corresponding
22 method on the slave object. The return value is a Queue from
23 which the responses can be read."""
26 def __init__(self
, master
):
29 def __getattr__(self
, name
):
32 queue
= self
._master
._add
_queue
(self
._serial
)
33 self
._master
.write_object((self
._serial
, name
, args
))
38 def __init__(self
, to_peer
, from_peer
, slave_object
= None):
39 self
.to_peer
= to_peer
40 self
.from_peer
= from_peer
44 self
.enable_read_watch()
46 def enable_read_watch(self
):
48 g
.input_add(self
.from_peer
, g
.gdk
.INPUT_READ
,
49 lambda src
, cond
: self
.read_ready())
51 def enable_write_watch(self
):
53 g
.input_add(self
.to_peer
, g
.gdk
.INPUT_WRITE
,
54 lambda src
, cond
: self
.write_ready())
56 def write_object(self
, object):
57 if self
.to_peer
== -1:
58 raise Exception('Peer is defunct')
59 if not self
.out_buffer
:
60 self
.enable_write_watch()
62 s
= pickle
.dumps(object)
63 s
= str(len(s
)) + ":" + s
66 def write_ready(self
):
67 """Returns True if the buffer is not empty on exit."""
68 while self
.out_buffer
:
69 w
= select([], [self
.to_peer
], [], 0)[1]
71 print "Not ready for writing"
73 n
= os
.write(self
.to_peer
, self
.out_buffer
)
74 self
.out_buffer
= self
.out_buffer
[n
:]
78 new
= os
.read(self
.from_peer
, 1000)
81 raise Exception("Lost connection to slave!")
83 while ':' in self
.in_buffer
:
84 l
, rest
= self
.in_buffer
.split(':', 1)
87 return True # Haven't got everything yet
89 self
.in_buffer
= rest
[l
:]
90 value
= pickle
.loads(s
)
99 """A queue of responses to some method call.
100 Queue.blocker is triggered when the response queue becomes non-empty,
101 so yield that before trying to read from the queue, if using the
104 For simple use (exactly one response), use:
105 data = Queue.dequeue_last()
107 For sequences, read the next result with:
108 data = Queue.dequeue()
109 Will return EndOfResponses on the last call.
117 def __init__(self
, master
, serial
):
118 from rox
import tasks
# Don't require tasks for slaves
122 self
.blocker
= tasks
.Blocker()
125 """Add an item to the queue and trigger our current blocker."""
126 self
.queue
.append(data
)
128 # Auto-dequeue EndOfResponses item
131 self
.blocker
.trigger()
134 """Returns the first item in the queue for this serial number.
135 Queue.blocker may change to a new blocker (if the queue is now
136 empty) or None (if no more responses will arrive), so be sure
137 to reread it after this."""
138 assert self
.blocker
.happened
140 data
= self
.queue
.pop(0)
141 if isinstance(data
, _EndOfResponses
):
142 assert not self
.queue
143 self
.master
._remove
_queue
(self
.serial
)
146 return EndOfResponses
147 assert not self
._at
_end
149 # Queue is empty; create a new blocker
150 from rox
import tasks
151 self
.blocker
= tasks
.Blocker()
152 if isinstance(data
, Exception):
156 def dequeue_last(self
):
157 """Calls dequeue, and also sets a flag to indicate that
158 the next item will be EndOfResponses, which will be handled
161 data
= self
.dequeue()
166 self
.dequeue() # Force cleanup now
168 class MasterProxy(Proxy
):
169 """Invoking operations on MasterProxy.root will invoke the same
170 operation on the SlaveProxy's slave_object."""
172 def __init__(self
, to_slave
, from_slave
):
173 Proxy
.__init
__(self
, to_slave
, from_slave
)
174 self
.root
= MasterObject(self
)
175 self
._queue
= {} # Serial -> Queue
177 def _dispatch(self
, value
):
179 self
._queue
[serial
].add(data
)
181 def _add_queue(self
, serial
):
182 assert serial
not in self
._queue
183 queue
= Queue(self
, serial
)
184 self
._queue
[serial
] = queue
187 def _remove_queue(self
, serial
):
188 del self
._queue
[serial
]
192 assert not self
._queue
194 class Request(object):
195 """Call Request.send() to send replies. When destroyed, sends a
196 stop message to the master."""
197 def __init__(self
, send
):
201 self
.send(EndOfResponses
)
203 class SlaveProxy(Proxy
):
204 """Methods invoked on MasterProxy.root will be invoked on
205 slave_object with a callback function as the first argument.
206 This may be called any number of times to send replies."""
207 def __init__(self
, to_master
, from_master
, slave_object
):
208 Proxy
.__init
__(self
, to_master
, from_master
)
209 self
.slave_object
= slave_object
211 def _dispatch(self
, value
):
212 serial
, method
, args
= value
214 self
.write_object((serial
, value
))
215 request
= Request(send
)
217 getattr(self
.slave_object
, method
)(request
, *args
)