More scalable display_name handling (actually the length).
[gsh.git] / gsh / stdin.py
blobabbbd1a450a8bd05d6669f23605ea9e815d15c43
1 # This program is free software; you can redistribute it and/or modify
2 # it under the terms of the GNU General Public License as published by
3 # the Free Software Foundation; either version 2 of the License, or
4 # (at your option) any later version.
6 # This program is distributed in the hope that it will be useful,
7 # but WITHOUT ANY WARRANTY; without even the implied warranty of
8 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
9 # GNU Library General Public License for more details.
11 # You should have received a copy of the GNU General Public License
12 # along with this program; if not, write to the Free Software
13 # Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
15 # See the COPYING file for license information.
17 # Copyright (c) 2006, 2007, 2008 Guillaume Chazarain <guichaz@gmail.com>
19 import asyncore
20 import errno
21 import os
22 import readline # Just to say we want to use it with raw_input
23 import signal
24 import socket
25 import subprocess
26 import sys
27 import tempfile
28 import termios
29 from threading import Thread, Event, Lock
31 from gsh import dispatchers, remote_dispatcher
32 from gsh.console import console_output, set_last_status_length
33 from gsh import completion
35 class input_buffer(object):
36 """The shared input buffer between the main thread and the stdin thread"""
37 def __init__(self):
38 self.lock = Lock()
39 self.buf = ''
41 def add(self, data):
42 """Add data to the buffer"""
43 self.lock.acquire()
44 try:
45 self.buf += data
46 finally:
47 self.lock.release()
49 def get(self):
50 """Get the content of the buffer"""
51 self.lock.acquire()
52 try:
53 data = self.buf
54 if data:
55 self.buf = ''
56 return data
57 finally:
58 self.lock.release()
60 def ignore_sigchld(ignore):
61 """Typically we don't want to create zombie. But when executing a user
62 command (!command) the subprocess module relies on zombies not being
63 automatically reclaimed"""
64 if ignore:
65 signal.signal(signal.SIGCHLD, signal.SIG_IGN)
66 # Reclaim previously created zombies
67 try:
68 while os.waitpid(-1, os.WNOHANG) != (0, 0):
69 pass
70 except OSError, e:
71 if e.errno != errno.ECHILD:
72 raise
73 else:
74 signal.signal(signal.SIGCHLD, signal.SIG_DFL)
76 def process_input_buffer():
77 """Send the content of the input buffer to all remote processes, this must
78 be called in the main thread"""
79 from gsh.control_commands_helpers import handle_control_command
80 data = the_stdin_thread.input_buffer.get()
81 remote_dispatcher.log('> ' + data)
83 if data.startswith(':'):
84 handle_control_command(data[1:-1])
85 return
87 if data.startswith('!'):
88 ignore_sigchld(False)
89 try:
90 retcode = subprocess.call(data[1:], shell=True)
91 except OSError, e:
92 if e.errno == errno.EINTR:
93 console_output('Child was interrupted\n')
94 retcode = 0
95 else:
96 raise
97 ignore_sigchld(True)
98 if retcode > 0:
99 console_output('Child returned %d\n' % retcode)
100 elif retcode < 0:
101 console_output('Child was terminated by signal %d\n' % -retcode)
102 return
104 for r in dispatchers.all_instances():
105 try:
106 r.dispatch_command(data)
107 except asyncore.ExitNow, e:
108 raise e
109 except Exception, msg:
110 console_output('%s for %s, disconnecting\n' % (msg, r.display_name))
111 r.disconnect()
112 else:
113 if r.enabled and r.state is remote_dispatcher.STATE_IDLE:
114 r.change_state(remote_dispatcher.STATE_RUNNING)
116 # The stdin thread uses a synchronous (with ACK) socket to communicate with the
117 # main thread, which is most of the time waiting in the poll() loop.
118 # Socket character protocol:
119 # d: there is new data to send
120 # A: ACK, same reply for every message, communications are synchronous, so the
121 # stdin thread sends a character to the socket, the main thread processes it,
122 # sends the ACK, and the stdin thread can go on.
124 class socket_notification_reader(asyncore.dispatcher):
125 """The socket reader in the main thread"""
126 def __init__(self):
127 asyncore.dispatcher.__init__(self, the_stdin_thread.socket_read)
129 def _do(self, c):
130 if c == 'd':
131 process_input_buffer()
132 else:
133 raise Exception, 'Unknown code: %s' % (c)
135 def handle_read(self):
136 """Handle all the available character commands in the socket"""
137 while True:
138 try:
139 c = self.recv(1)
140 except socket.error, why:
141 if why[0] == errno.EWOULDBLOCK:
142 return
143 else:
144 raise
145 else:
146 self._do(c)
147 self.socket.setblocking(True)
148 self.send('A')
149 self.socket.setblocking(False)
151 def writable(self):
152 """Our writes are blocking"""
153 return False
155 def write_main_socket(c):
156 """Synchronous write to the main socket, wait for ACK"""
157 the_stdin_thread.socket_write.send(c)
158 while True:
159 try:
160 the_stdin_thread.socket_write.recv(1)
161 except socket.error, e:
162 if e[0] != errno.EINTR:
163 raise
164 else:
165 break
168 # This file descriptor is used to interrupt readline in raw_input().
169 # /dev/null is not enough as it does not get out of a 'Ctrl-R' reverse-i-search.
170 # A Ctrl-C seems to make raw_input() return in all cases, and avoids printing
171 # a newline
172 tempfile_fd, tempfile_name = tempfile.mkstemp()
173 os.remove(tempfile_name)
174 os.write(tempfile_fd, chr(3))
176 def get_stdin_pid(cached_result=None):
177 """Try to get the PID of the stdin thread, otherwise get the whole process
178 ID"""
179 if cached_result is None:
180 try:
181 tasks = os.listdir('/proc/self/task')
182 except OSError, e:
183 if e.errno != errno.ENOENT:
184 raise
185 cached_result = os.getpid()
186 else:
187 tasks.remove(str(os.getpid()))
188 assert len(tasks) == 1
189 cached_result = int(tasks[0])
190 return cached_result
192 def interrupt_stdin_thread():
193 """The stdin thread may be in raw_input(), get out of it"""
194 dupped_stdin = os.dup(0) # Backup the stdin fd
195 assert not the_stdin_thread.interrupt_asked # Sanity check
196 the_stdin_thread.interrupt_asked = True # Not user triggered
197 os.lseek(tempfile_fd, 0, 0) # Rewind in the temp file
198 os.dup2(tempfile_fd, 0) # This will make raw_input() return
199 pid = get_stdin_pid()
200 os.kill(pid, signal.SIGWINCH) # Try harder to wake up raw_input()
201 the_stdin_thread.out_of_raw_input.wait() # Wait for this return
202 the_stdin_thread.interrupt_asked = False # Restore sanity
203 os.dup2(dupped_stdin, 0) # Restore stdin
204 os.close(dupped_stdin) # Cleanup
206 echo_enabled = True
207 def set_echo(echo):
208 global echo_enabled
209 if echo != echo_enabled:
210 fd = sys.stdin.fileno()
211 attr = termios.tcgetattr(fd)
212 if echo:
213 attr[3] |= termios.ECHO
214 else:
215 attr[3] &= ~termios.ECHO
216 termios.tcsetattr(fd, termios.TCSANOW, attr)
217 echo_enabled = echo
219 class stdin_thread(Thread):
220 """The stdin thread, used to call raw_input()"""
221 def __init__(self):
222 Thread.__init__(self, name='stdin thread')
223 completion.install_completion_handler()
225 @staticmethod
226 def activate(interactive):
227 """Activate the thread at initialization time"""
228 the_stdin_thread.input_buffer = input_buffer()
229 if interactive:
230 the_stdin_thread.raw_input_wanted = Event()
231 the_stdin_thread.in_raw_input = Event()
232 the_stdin_thread.out_of_raw_input = Event()
233 the_stdin_thread.out_of_raw_input.set()
234 s1, s2 = socket.socketpair()
235 the_stdin_thread.socket_read, the_stdin_thread.socket_write = s1, s2
236 the_stdin_thread.interrupt_asked = False
237 the_stdin_thread.setDaemon(True)
238 the_stdin_thread.start()
239 the_stdin_thread.socket_notification = socket_notification_reader()
240 the_stdin_thread.prepend_text = None
241 readline.set_pre_input_hook(the_stdin_thread.prepend_previous_text)
243 def prepend_previous_text(self):
244 if self.prepend_text:
245 readline.insert_text(self.prepend_text)
246 readline.redisplay()
247 self.prepend_text = None
249 def want_raw_input(self):
250 nr, total = dispatchers.count_awaited_processes()
251 if nr:
252 prompt = 'waiting (%d/%d)> ' % (nr, total)
253 else:
254 prompt = 'ready (%d)> ' % total
255 self.prompt = prompt
256 set_last_status_length(len(prompt))
257 self.raw_input_wanted.set()
258 self.socket_notification.handle_read()
259 self.in_raw_input.wait()
260 self.raw_input_wanted.clear()
262 def no_raw_input(self):
263 if not self.out_of_raw_input.isSet():
264 interrupt_stdin_thread()
266 # Beware of races
267 def run(self):
268 while True:
269 self.raw_input_wanted.wait()
270 self.out_of_raw_input.set()
271 self.in_raw_input.set()
272 self.out_of_raw_input.clear()
273 cmd = None
274 try:
275 cmd = raw_input(self.prompt)
276 except EOFError:
277 if self.interrupt_asked:
278 cmd = readline.get_line_buffer()
279 else:
280 cmd = chr(4) # Ctrl-D
281 if self.interrupt_asked:
282 self.prepend_text = cmd
283 cmd = None
284 self.in_raw_input.clear()
285 self.out_of_raw_input.set()
286 if cmd:
287 if echo_enabled:
288 completion.add_to_history(cmd)
289 else:
290 completion.remove_last_history_item()
291 set_echo(True)
292 if cmd is not None:
293 self.input_buffer.add(cmd + '\n')
294 write_main_socket('d')
296 the_stdin_thread = stdin_thread()