From 97bdffa9cb5a7f13d1e565dc29ba9c7f522b27ac Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Sun, 26 Sep 2004 16:36:28 +0000 Subject: [PATCH] Several incompatible changes to the experimental proxy API to make it simpler (Thomas Leonard): - Create_su_proxy now returns the MasterObject directly, not the MasterProxy. - You can call finish on the MasterObject (so the MasterProxy isn't needed). - Slave methods no longer take a 'request' argument. Instead, the return value of the function is returned. - Methods can only return one value. dequeue and dequeue_last have been replaced with a 'result' property. - Methods on MasterObject now return a RequestBlocker, not a Queue. This means you just yield the object itself, not object.blocker. Tasks API is no longer experimental, but proxy API is. git-svn-id: https://rox.svn.sourceforge.net/svnroot/rox/trunk/ROX-Lib2@3670 66de3db3-b00d-0410-b41b-f4738ad19bea --- Help/Changes | 15 +++++ python/rox/master_proxy.py | 94 +++++++++++++++++++++++++++++ python/rox/proxy.py | 143 +++------------------------------------------ python/rox/su.py | 10 ++-- python/rox/suchild.py | 51 ++++++++-------- python/rox/tasks.py | 3 - tests/python/testproxy.py | 130 +++++++++++++++++++---------------------- tests/python/testsu.py | 44 +++++++------- 8 files changed, 225 insertions(+), 265 deletions(-) create mode 100644 python/rox/master_proxy.py diff --git a/Help/Changes b/Help/Changes index 2b1409e..f4c1845 100644 --- a/Help/Changes +++ b/Help/Changes @@ -2,6 +2,21 @@ Shared code for ROX applications by Thomas Leonard http://rox.sourceforge.net +26-Sep-2004 +~~~~~~~~~~~ +Several incompatible changes to the experimental proxy API to make it simpler +(Thomas Leonard): + +- Create_su_proxy now returns the MasterObject directly, not the MasterProxy. +- You can call finish on the MasterObject (so the MasterProxy isn't needed). +- Slave methods no longer take a 'request' argument. Instead, the return + value of the function is returned. +- Methods can only return one value. dequeue and dequeue_last have been + replaced with a 'result' property. +- Methods on MasterObject now return a RequestBlocker, not a Queue. This + means you just yield the object itself, not object.blocker. + +Tasks API is no longer experimental, but proxy API is. 22-Sep-2004 ~~~~~~~~~~~ diff --git a/python/rox/master_proxy.py b/python/rox/master_proxy.py new file mode 100644 index 0000000..23bcd2a --- /dev/null +++ b/python/rox/master_proxy.py @@ -0,0 +1,94 @@ +"""This module allows a caller to invoke methods on another process. +It is really part of the proxy module, but separate because it imports some GTK +functions which a slave must not do. + +EXPERIMENTAL. +""" + +from __future__ import generators +from proxy import Proxy +import tasks # (imports rox, and thus gtk) + +class MasterObject(object): + """Invoking a method on a MasterObject invokes the corresponding + method on the slave object. The return value is a ResponseBlocker from + which the response can be read.""" + _serial = 0 + + def __init__(self, master): + self._master = master + + def __getattr__(self, name): + def method(*args): + self._serial += 1 + request = self._master._add_blocker(self._serial) + self._master.write_object((self._serial, name, args)) + return request + return method + + def finish_proxy(self): + """Calls MasterProxy.finish() for our MasterProxy""" + self._master.finish() + +class RequestBlocker(tasks.Blocker): + """The blocker is triggered when the slave object sends a reply + to our method call. You can then call get() to get the result, eg: + + blocker = master.method() + yield blocker + print blocker.result + + If the remote method raised an exception, accessing 'result' will raise + it rather than returning it. + """ + + def _error(self): + if self.error is not None: + raise self.error + raise Exception('No result yet! Yield this blocker first.') + + master = None + serial = None + error = None + result = property(_error) + + def __init__(self, master, serial): + tasks.Blocker.__init__(self) + self.master = master + self.serial = serial + + def add(self, data): + """Store the result and trigger our blocker.""" + assert not self.happened + self.master._remove_blocker(self.serial) + if isinstance(data, Exception): + self.error = data + else: + self.result = data + self.trigger() + +class MasterProxy(Proxy): + """Invoking operations on MasterProxy.root will invoke the same + operation on the SlaveProxy's slave_object.""" + + def __init__(self, to_slave, from_slave): + Proxy.__init__(self, to_slave, from_slave) + self.root = MasterObject(self) + self._queue = {} # Serial -> Queue + + def _dispatch(self, value): + serial, data = value + self._queue[serial].add(data) + + def _add_blocker(self, serial): + assert serial not in self._queue + request = RequestBlocker(self, serial) + self._queue[serial] = request + return request + + def _remove_blocker(self, serial): + del self._queue[serial] + + def finish(self): + Proxy.finish(self) + assert not self._queue diff --git a/python/rox/proxy.py b/python/rox/proxy.py index 6359b33..af2bb57 100644 --- a/python/rox/proxy.py +++ b/python/rox/proxy.py @@ -1,7 +1,10 @@ """Given a pair of pipes with a python process at each end, this module allows one end to make calls on the other. This is used by the su module to allow control of a subprocess running as another user, but it may also -be useful in other situations. +be useful in other situations. The caller end should use the master_proxy +module. + +EXPERIMENTAL. """ from __future__ import generators @@ -12,28 +15,6 @@ import fcntl from select import select import cPickle as pickle -class _EndOfResponses: - """Internal. Indicates that no more responses to this method will - follow.""" -EndOfResponses = _EndOfResponses() - -class MasterObject(object): - """Invoking a method on a MasterObject invokes the corresponding - method on the slave object. The return value is a Queue from - which the responses can be read.""" - _serial = 0 - - def __init__(self, master): - self._master = master - - def __getattr__(self, name): - def method(*args): - self._serial += 1 - queue = self._master._add_queue(self._serial) - self._master.write_object((self._serial, name, args)) - return queue - return method - class Proxy: def __init__(self, to_peer, from_peer, slave_object = None): if not hasattr(to_peer, 'fileno'): @@ -102,128 +83,20 @@ class Proxy: def lost_connection(self): raise Exception("Lost connection to peer!") -class Queue: - """A queue of responses to some method call. - Queue.blocker is triggered when the response queue becomes non-empty, - so yield that before trying to read from the queue, if using the - tasks module. - - For simple use (exactly one response), use: - data = Queue.dequeue_last() - - For sequences, read the next result with: - data = Queue.dequeue() - Will return EndOfResponses on the last call. - """ - master = None - serial = None - blocker = None - queue = None - _at_end = False - - def __init__(self, master, serial): - from rox import tasks # Don't require tasks for slaves - self.master = master - self.serial = serial - self.queue = [] - self.blocker = tasks.Blocker() - - def add(self, data): - """Add an item to the queue and trigger our current blocker.""" - self.queue.append(data) - if self._at_end: - # Auto-dequeue EndOfResponses item - self.dequeue() - else: - self.blocker.trigger() - - def dequeue(self): - """Returns the first item in the queue for this serial number. - Queue.blocker may change to a new blocker (if the queue is now - empty) or None (if no more responses will arrive), so be sure - to reread it after this.""" - assert self.blocker.happened - - data = self.queue.pop(0) - if isinstance(data, _EndOfResponses): - assert not self.queue - self.master._remove_queue(self.serial) - self.queue = None - self.blocker = None - return EndOfResponses - assert not self._at_end - if not self.queue: - # Queue is empty; create a new blocker - from rox import tasks - self.blocker = tasks.Blocker() - if isinstance(data, Exception): - raise data - return data - - def dequeue_last(self): - """Calls dequeue, and also sets a flag to indicate that - the next item will be EndOfResponses, which will be handled - automatically.""" - try: - data = self.dequeue() - return data - finally: - self._at_end = True - if self.queue: - self.dequeue() # Force cleanup now - -class MasterProxy(Proxy): - """Invoking operations on MasterProxy.root will invoke the same - operation on the SlaveProxy's slave_object.""" - - def __init__(self, to_slave, from_slave): - Proxy.__init__(self, to_slave, from_slave) - self.root = MasterObject(self) - self._queue = {} # Serial -> Queue - - def _dispatch(self, value): - serial, data = value - self._queue[serial].add(data) - - def _add_queue(self, serial): - assert serial not in self._queue - queue = Queue(self, serial) - self._queue[serial] = queue - return queue - - def _remove_queue(self, serial): - del self._queue[serial] - - def finish(self): - Proxy.finish(self) - assert not self._queue - -class Request(object): - """Call Request.send() to send replies. When destroyed, sends a - stop message to the master.""" - def __init__(self, send): - self.send = send - - def __del__(self): - self.send(EndOfResponses) - class SlaveProxy(Proxy): """Methods invoked on MasterProxy.root will be invoked on - slave_object with a callback function as the first argument. - This may be called any number of times to send replies.""" + slave_object. The result is a master_proxy.RequestBlocker.""" def __init__(self, to_master, from_master, slave_object): Proxy.__init__(self, to_master, from_master) self.slave_object = slave_object def _dispatch(self, value): serial, method, args = value - def send(value): - self.write_object((serial, value)) - request = Request(send) try: - getattr(self.slave_object, method)(request, *args) + result = getattr(self.slave_object, method)(*args) except Exception, e: - send(e) + result = e + self.write_object((serial, result)) def lost_connection(self): sys.exit() diff --git a/python/rox/su.py b/python/rox/su.py index 8565375..54bcdb7 100644 --- a/python/rox/su.py +++ b/python/rox/su.py @@ -5,7 +5,7 @@ which ones are available on the current platform.""" import os, sys, pwd import rox -from rox import g, _, proxy +from rox import g, _, master_proxy import traceback from select import select import fcntl @@ -18,13 +18,13 @@ _my_dir = os.path.abspath(os.path.dirname(__file__)) _child_script = os.path.join(_my_dir, 'suchild.sh') def create_su_proxy(message, uid = 0, confirm = True): - """Creates a new proxy object and starts the child process. - If necessary, the user is prompted for a password. If no + """Creates a new master_proxy.MasterObject and starts the child + process. If necessary, the user is prompted for a password. If no password is required, the user is simply asked to confirm, unless 'confirm' is False. Raises UserAbort if the user clicks Cancel.""" method = default_method(message, uid, confirm) - return method.get_master() + return method.get_master().root class Method: need_interaction = True @@ -82,7 +82,7 @@ class XtermMethod(Method): to_child.readable.close() assert self._master is None - self._master = proxy.MasterProxy(to_child.writeable, + self._master = master_proxy.MasterProxy(to_child.writeable, from_child.readable) return self._master diff --git a/python/rox/suchild.py b/python/rox/suchild.py index 1a82dee..60de4b0 100644 --- a/python/rox/suchild.py +++ b/python/rox/suchild.py @@ -38,50 +38,47 @@ class Slave: """This object runs as another user. Most methods behave in a similar way to the standard python methods of the same name.""" - def spawnvpe(self, request, mode, file, args, env = None): + def spawnvpe(self, mode, file, args, env = None): if env is None: - request.send(os.spawnvp(mode, file, args)) + return os.spawnvp(mode, file, args) else: - request.send(os.spawnvpe(mode, file, args, env)) + return os.spawnvpe(mode, file, args, env) - def waitpid(self, request, pid, flags): - request.send(os.waitpid(pid, flags)) + def waitpid(self, pid, flags): + return os.waitpid(pid, flags) - def getuid(self, request): - request.send(os.getuid()) + def getuid(self): + return os.getuid() - def setuid(self, request, uid): - request.send(os.setuid(uid)) + def setuid(self, uid): + return os.setuid(uid) - def rmtree(self, request, path): - shutil.rmtree(path) - request.send(None) + def rmtree(self, path): + return shutil.rmtree(path) - def unlink(self, request, path): - os.unlink(path) - request.send(None) + def unlink(self, path): + return os.unlink(path) - def open(self, request, path, mode = 'r'): + def open(self, path, mode = 'r'): stream = file(path, mode) streams[id(stream)] = stream - request.send(id(stream)) + return id(stream) - def close(self, request, stream): + def close(self, stream): streams[stream].close() del streams[stream] - request.send(None) - def read(self, request, stream, length = 0): - request.send(streams[stream].read(length)) + def read(self, stream, length = 0): + return streams[stream].read(length) - def write(self, request, stream, data): - request.send(streams[stream].write(data)) + def write(self, stream, data): + return streams[stream].write(data) - def rename(self, request, old, new): - request.send(os.rename(old, new)) + def rename(self, old, new): + return os.rename(old, new) - def chmod(self, request, path, mode): - request.send(os.chmod(path, mode)) + def chmod(self, path, mode): + return os.chmod(path, mode) if __name__ == '__main__': from select import select diff --git a/python/rox/tasks.py b/python/rox/tasks.py index 5ee2ea7..2081735 100644 --- a/python/rox/tasks.py +++ b/python/rox/tasks.py @@ -1,8 +1,5 @@ """The tasks module provides a simple light-weight alternative to threads. -THIS MODULE IS EXPERIMENTAL. Feedback on the API is appreciated. Things may -change in the next few versions, so watch out! - When you have a long-running job you will want to run it in the background, while the user does other things. There are four ways to do this: diff --git a/tests/python/testproxy.py b/tests/python/testproxy.py index 911663e..84721c5 100755 --- a/tests/python/testproxy.py +++ b/tests/python/testproxy.py @@ -9,15 +9,11 @@ import tempfile, shutil rox_lib = dirname(dirname(dirname(abspath(sys.argv[0])))) sys.path.insert(0, join(rox_lib, 'python')) -from rox import proxy, tasks, g, suchild +from rox import proxy, tasks, g, suchild, master_proxy class Slave(suchild.Slave): - def invoke(self, request): - request.send("Invoked") - - def count(self, request, a, b): - for x in range(a, b): - request.send(x) + def invoke(self): + return "Invoked" class TestProxy(unittest.TestCase): master = None @@ -26,7 +22,7 @@ class TestProxy(unittest.TestCase): def setUp(self): to_slave = os.pipe() from_slave = os.pipe() - self.master = proxy.MasterProxy(to_slave[1], from_slave[0]) + self.master = master_proxy.MasterProxy(to_slave[1], from_slave[0]) self.slave = proxy.SlaveProxy(from_slave[1], to_slave[0], Slave()) def tearDown(self): @@ -35,20 +31,16 @@ class TestProxy(unittest.TestCase): def testSetup(self): pass - + def testManual(self): - queue = self.master.root.invoke() + response = self.master.root.invoke() self.master.write_ready() self.slave.read_ready() self.slave.write_ready() - assert not queue.blocker.happened + assert not response.happened self.master.read_ready() - assert queue.blocker.happened - data = queue.dequeue() - self.assertEquals('Invoked', data) - data = queue.dequeue() - assert queue.blocker is None - assert data is proxy.EndOfResponses + assert response.happened + self.assertEquals('Invoked', response.result) def testSingle(self): blocker = self.master.root.invoke() @@ -56,38 +48,35 @@ class TestProxy(unittest.TestCase): self.slave.read_ready() self.slave.write_ready() self.master.read_ready() - data = blocker.dequeue_last() - self.assertEquals('Invoked', data) - - def testCount(self): + self.assertEquals('Invoked', blocker.result) + + def testMissing(self): def run(): - queue = self.master.root.count(1, 5) - self.sum = 0 - while queue.blocker: - yield queue.blocker - data = queue.dequeue() - if data is proxy.EndOfResponses: - assert not queue.blocker - else: - assert queue.blocker - self.sum += data + response = self.master.root.missing('foo') + yield response + try: + response.result + assert 0, 'Expected an exception!' + except AttributeError: + pass g.mainquit() tasks.Task(run()) g.mainloop() - self.assertEquals(self.sum, 10) - def testMissing(self): + def testTooSoon(self): def run(): - queue = self.master.root.missing('foo') - yield queue.blocker + response = self.master.root.invoke() try: - queue.dequeue_last() + response.result assert 0, 'Expected an exception!' - except AttributeError: + except Exception: pass + yield response + response.result g.mainquit() tasks.Task(run()) g.mainloop() + # spawnvpe, waitpid, setuid and getuid are tested in testsu.py @@ -95,9 +84,9 @@ class TestProxy(unittest.TestCase): tmp_dir = tempfile.mkdtemp('-roxlib-test') def run(): assert os.path.isdir(tmp_dir) - queue = self.master.root.rmtree(tmp_dir) - yield queue.blocker - queue.dequeue_last() + response = self.master.root.rmtree(tmp_dir) + yield response + assert response.result is None assert not os.path.exists(tmp_dir) g.mainquit() tasks.Task(run()) @@ -108,9 +97,9 @@ class TestProxy(unittest.TestCase): os.close(fd) def run(): assert os.path.isfile(tmp) - queue = self.master.root.unlink(tmp) - yield queue.blocker - queue.dequeue_last() + response = self.master.root.unlink(tmp) + yield response + assert response.result is None assert not os.path.exists(tmp) g.mainquit() tasks.Task(run()) @@ -122,18 +111,17 @@ class TestProxy(unittest.TestCase): tmp_file.flush() root = self.master.root def run(): - queue = root.open(tmp_file.name) - yield queue.blocker - stream = queue.dequeue_last() + response = root.open(tmp_file.name) + yield response + stream = response.result - queue = root.read(stream, 5) - yield queue.blocker - data = queue.dequeue_last() - assert data == 'Hello' + response = root.read(stream, 5) + yield response + assert "Hello" == response.result - queue = root.close(stream) - yield queue.blocker - queue.dequeue_last() + response = root.close(stream) + yield response + assert response.result is None g.mainquit() tasks.Task(run()) @@ -144,26 +132,26 @@ class TestProxy(unittest.TestCase): root = self.master.root tmp_file = join(tmp_dir, 'new') def run(): - queue = root.open(tmp_file, 'w') - yield queue.blocker - stream = queue.dequeue_last() + response = root.open(tmp_file, 'w') + yield response + stream = response.result assert os.path.isfile(tmp_file) - queue = root.write(stream, 'Hello\n') - yield queue.blocker - queue.dequeue_last() + response = root.write(stream, 'Hello\n') + yield response + assert response.result == None - queue = root.close(stream) - yield queue.blocker - queue.dequeue_last() + response = root.close(stream) + yield response + assert response.result is None assert file(tmp_file).read() == 'Hello\n' - queue = root.close(stream) - yield queue.blocker + response = root.close(stream) + yield response try: - queue.dequeue_last() + response.result assert 0, 'Expected an exception!' except KeyError: pass @@ -181,10 +169,10 @@ class TestProxy(unittest.TestCase): f.close() def run(): - queue = root.rename(join(tmp_dir, 'old'), + response = root.rename(join(tmp_dir, 'old'), join(tmp_dir, 'new')) - yield queue.blocker - queue.dequeue_last() + yield response + assert response.result == None assert file(join(tmp_dir, 'new')).read() == 'Hello\n' @@ -200,9 +188,9 @@ class TestProxy(unittest.TestCase): def run(): assert os.stat(tmp_file.name).st_mode & 0777 == 0700 - queue = root.chmod(tmp_file.name, 0655) - yield queue.blocker - queue.dequeue_last() + response = root.chmod(tmp_file.name, 0655) + yield response + response.result assert os.stat(tmp_file.name).st_mode & 0777 == 0655 g.mainquit() tasks.Task(run()) diff --git a/tests/python/testsu.py b/tests/python/testsu.py index d1ea4e4..067977d 100755 --- a/tests/python/testsu.py +++ b/tests/python/testsu.py @@ -13,43 +13,39 @@ assert os.getuid() != 0, "Can't run tests as root" class TestSU(unittest.TestCase): def testSu(self): - master = su.create_su_proxy('Need to become root to test this module.', + root = su.create_su_proxy('Need to become root to test this module.', confirm = False) - root = master.root def run(): - queue = root.spawnvpe(os.P_NOWAIT, 'false', ['false']) - yield queue.blocker - pid = queue.dequeue_last() + response = root.spawnvpe(os.P_NOWAIT, 'false', ['false']) + yield response + pid = response.result assert pid - queue = root.waitpid(pid, 0) - yield queue.blocker - (pid, status) = queue.dequeue_last() + response = root.waitpid(pid, 0) + yield response + (pid, status) = response.result assert status == 0x100 - queue = root.spawnvpe(os.P_WAIT, 'true', ['true']) - yield queue.blocker - status = queue.dequeue_last() - assert status == 0 + response = root.spawnvpe(os.P_WAIT, 'true', ['true']) + yield response + assert response.result == 0 - queue = root.getuid() - yield queue.blocker - uid = queue.dequeue_last() - assert uid == 0 + response = root.getuid() + yield response + assert response.result == 0 - queue = root.setuid(os.getuid()) - yield queue.blocker - queue.dequeue_last() + response = root.setuid(os.getuid()) + yield response + assert response.result is None - queue = root.getuid() - yield queue.blocker - uid = queue.dequeue_last() - assert uid == os.getuid() + response = root.getuid() + yield response + assert response.result == os.getuid() g.mainquit() tasks.Task(run()) g.mainloop() - master.finish() + root.finish() sys.argv.append('-v') unittest.main() -- 2.11.4.GIT