Fix to let documentation build.
[rox-lib.git] / python / rox / su.py
blob5e805b318de3f8e3240cbbdf07671a8996465048
1 """The su (switch user) module allows you to execute a command as some
2 other user (normally 'root'). It supports a variety of methods to perform
3 the switch (using su, xsu, sudo, etc) so you don't have to worry about
4 which ones are available on the current platform."""
6 import os, sys, pwd
7 import rox
8 from rox import g, _, proxy
9 import traceback
10 from select import select
11 import fcntl
12 try:
13 import pty
14 except ImportError:
15 pty = None
17 child_script = os.path.abspath(os.path.join(os.path.dirname(__file__), 'suchild.py'))
19 def create_su_proxy(message, uid = 0, confirm = True):
20 """Creates a new proxy object and starts the child process.
21 If necessary, the user is prompted for a password. If no
22 password is required, the user is simply asked to confirm,
23 unless 'confirm' is False.
24 Raises UserAbort if the user clicks Cancel."""
25 method = default_method(uid)
27 if method.need_interaction or confirm:
28 box = SwitchDialog(message, method)
29 box.show()
30 g.gdk.flush()
31 if method.need_interaction:
32 method.add_interaction_ui(box)
33 box.do_interaction()
34 return method.get_master()
36 class SwitchDialog(rox.Dialog):
37 def __init__(self, message, method):
38 rox.Dialog.__init__(self)
39 self.method = method
40 self.set_has_separator(False)
41 self.set_position(g.WIN_POS_CENTER)
43 label = g.Label(message)
44 label.set_padding(10, 10)
45 self.vbox.pack_start(label)
47 self.add_button(g.STOCK_CANCEL, g.RESPONSE_CANCEL)
48 self.add_button(g.STOCK_OK, g.RESPONSE_OK)
49 self.set_default_response(g.RESPONSE_OK)
51 self.vbox.show_all()
53 self.password_message = g.Label('')
54 self.vbox.pack_start(self.password_message, False, True, 0)
55 self.password_entry = g.Entry()
56 self.vbox.pack_start(self.password_entry, False, True, 0)
57 self.password_entry.set_visibility(False)
58 self.password_entry.set_activates_default(True)
60 def set_password_prompt(self, message):
61 self.password_message.set_text(message)
62 self.password_message.show()
63 self.password_entry.show()
64 self.password_entry.grab_focus()
66 def get_password(self):
67 return self.password_entry.get_text()
69 def do_interaction(self):
70 while True:
71 resp = self.run()
72 if resp != g.RESPONSE_OK:
73 self.destroy()
74 raise rox.UserAbort()
75 try:
76 if self.method.need_interaction:
77 self.method.done_interaction(self)
78 break
79 except:
80 rox.report_exception()
81 self.destroy()
83 class Method:
84 need_interaction = True
86 def __init__(self, uid):
87 self.uid = uid
89 def get_master(self):
90 raise NotImplemented() # Abstract
92 def add_interaction_ui(self, box):
93 self.password_entry = g.Entry()
94 box.vbox.pack_start(entry, True, True, 0)
96 class PtyMethod(Method):
97 _master = None
99 def __init__(self, uid):
100 self.uid = uid
101 (child, self.fd) = pty.fork()
102 if child == 0:
103 try:
104 try:
105 os.environ['roxlib_python'] = sys.executable
106 os.environ['roxlib_script'] = child_script
107 os.execlp('mystrace', 'mystrace', '-o/tmp/log', 'su', '-c', '"$roxlib_python" "$roxlib_script"')
108 #os.execlp('sh', 'sh', '-c', '"$roxlib_python" "$roxlib_script"')
109 except:
110 traceback.print_exc()
111 finally:
112 os._exit(1)
114 # Don't echo input back at us
115 import tty
116 import termios
117 tty.setraw(self.fd, termios.TCSANOW)
119 # Wait two seconds to see if we get a prompt
120 print "Waiting for prompt..."
121 ready = select([self.fd], [], [], 2)[0]
123 # Non-interactive (already running as the new user) if the
124 # child process sends us the special control character.
125 # Otherwise (prompt, or no output), we need to ask the user for
126 # a password.
127 print "Checking whether we need to interact with the user..."
128 self.need_interaction = not ready or os.read(self.fd, 1) != '\1'
129 print "need_interaction =", self.need_interaction
131 def get_master(self):
132 if self._master is None:
133 self._master = proxy.MasterProxy(self.fd, self.fd)
134 return self._master
136 def add_interaction_ui(self, box):
137 user_name = pwd.getpwuid(self.uid)[0]
138 box.set_password_prompt(_("Enter %s's password:") % user_name)
140 def done_interaction(self, box):
141 s = box.get_password() + '\r\n'
142 # Clear any output
143 while True:
144 print "Checking for extra output..."
145 ready = select([self.fd], [], [], 0)[0]
146 if not ready:
147 print "Done"
148 break
149 print "Reading output..."
150 discard = os.read(self.fd, 100)
151 if not discard:
152 raise Exception('Su process quit!')
153 print "Discarding", discard
154 while s:
155 print "Sending", `s`
156 n = os.write(self.fd, s)
157 print n
158 s = s[n:]
159 print "Reading"
160 while True:
161 respone = os.read(self.fd, 1)
162 if respone in '\r\n':
163 continue
164 if respone == '\1':
165 return # OK!
166 break
167 import time
168 time.sleep(0.5)
169 raise Exception('Authentication failed:\n' +
170 (respone + os.read(self.fd, 1000)).strip())
172 def finish(self):
173 if self.fd != -1:
174 self.master.finish()
175 os.close(self.fd)
176 self.fd = -1
178 class Pipe:
179 """Contains Python file objects for two pipe ends.
180 Wrapping the FDs in this way ensures that they will be freed on error."""
181 readable = None
182 writeable = None
184 def __init__(self):
185 r, w = os.pipe()
186 try:
187 self.readable = os.fdopen(r, 'r')
188 except:
189 os.close(r)
190 raise
191 try:
192 self.writeable = os.fdopen(w, 'w')
193 except:
194 os.close(w)
195 raise
197 class XtermMethod(Method):
198 need_interaction = False
199 _master = None
201 def get_master(self):
202 assert self._master is None
204 to_child = Pipe()
205 from_child = Pipe()
207 if os.fork() == 0:
208 try:
209 try:
210 to_child.writeable.close()
211 from_child.readable.close()
212 self.exec_child(from_child.writeable,
213 to_child.readable)
214 except:
215 traceback.print_exc()
216 finally:
217 os._exit(1)
218 from_child.writeable.close()
219 to_child.readable.close()
221 assert self._master is None
222 self._master = proxy.MasterProxy(to_child.writeable,
223 from_child.readable)
224 return self._master
226 def exec_child(self, to_parent, from_parent):
227 fcntl.fcntl(to_parent, fcntl.F_SETFD, 0)
228 fcntl.fcntl(from_parent, fcntl.F_SETFD, 0)
229 env = {'roxlib_python': sys.executable,
230 'roxlib_prog': child_script}
231 env.update(os.environ)
232 os.execlpe('xterm', 'xterm', '-title', 'Enter root password', '-e',
233 #'"$roxlib_python" "$roxlib_prog" %d %d' %
234 'su', '-c', '"$roxlib_python" "$roxlib_prog" %d %d' %
235 (to_parent.fileno(), from_parent.fileno()),
236 env)
238 default_method = XtermMethod