More unit tests
[codimension.git] / thirdparty / pyqtermwidget / pyqterm / backend.py
blob8bb90a96ea5c52e9b6b347f19cb44dd353aa67d5
1 # -*- coding: utf-8 -*-
2 # This code is based on AjaxTerm/Web-Shell which included a fairly complete
3 # vt100 implementation as well as a stable process multiplexer.
4 # I made some small fixes, improved some small parts and added a Session class
5 # which can be used by the widget.
6 # License: GPL2
8 import sys
9 import os
10 import fcntl
11 import array
12 import threading
13 import time
14 from termios import TIOCSWINSZ
15 import pty
16 from signal import signal, SIGCHLD, SIG_IGN, SIGTERM
17 import struct
18 from select import select
19 from subprocess import Popen
22 __version__ = "0.1"
25 VT100_CHARSET_GRAPH = ( 0x25ca, 0x2026, 0x2022, 0x3f,
26 0xb6, 0x3f, 0xb0, 0xb1,
27 0x3f, 0x3f, 0x2b, 0x2b,
28 0x2b, 0x2b, 0x2b, 0xaf,
29 0x2014, 0x2014, 0x2014, 0x5f,
30 0x2b, 0x2b, 0x2b, 0x2b,
31 0x7c, 0x2264, 0x2265, 0xb6,
32 0x2260, 0xa3, 0xb7, 0x7f )
34 VT100_KEYFILTER_ANSIKEYS = {
35 '~': '~', 'A': '\x1b[A',
36 'B': '\x1b[B', 'C': '\x1b[C',
37 'D': '\x1b[D', 'F': '\x1b[F',
38 'H': '\x1b[H', '1': '\x1b[5~',
39 '2': '\x1b[6~', '3': '\x1b[2~',
40 '4': '\x1b[3~', 'a': '\x1bOP',
41 'b': '\x1bOQ', 'c': '\x1bOR',
42 'd': '\x1bOS', 'e': '\x1b[15~',
43 'f': '\x1b[17~', 'g': '\x1b[18~',
44 'h': '\x1b[19~', 'i': '\x1b[20~',
45 'j': '\x1b[21~', 'k': '\x1b[23~',
46 'l': '\x1b[24~',
49 VT100_KEYFILTER_APPKEYS = {
50 '~': '~', 'A': '\x1bOA',
51 'B': '\x1bOB', 'C': '\x1bOC',
52 'D': '\x1bOD', 'F': '\x1bOF',
53 'H': '\x1bOH', '1': '\x1b[5~',
54 '2': '\x1b[6~', '3': '\x1b[2~',
55 '4': '\x1b[3~', 'a': '\x1bOP',
56 'b': '\x1bOQ', 'c': '\x1bOR',
57 'd': '\x1bOS', 'e': '\x1b[15~',
58 'f': '\x1b[17~', 'g': '\x1b[18~',
59 'h': '\x1b[19~', 'i': '\x1b[20~',
60 'j': '\x1b[21~', 'k': '\x1b[23~',
61 'l': '\x1b[24~',
65 class Terminal(object):
67 def __init__(self, w, h):
68 self.w = w
69 self.h = h
70 self.vt100_esc = {
71 '#8': self.esc_DECALN,
72 '(A': self.esc_G0_0,
73 '(B': self.esc_G0_1,
74 '(0': self.esc_G0_2,
75 '(1': self.esc_G0_3,
76 '(2': self.esc_G0_4,
77 ')A': self.esc_G1_0,
78 ')B': self.esc_G1_1,
79 ')0': self.esc_G1_2,
80 ')1': self.esc_G1_3,
81 ')2': self.esc_G1_4,
82 '7': self.esc_DECSC,
83 '8': self.esc_DECRC,
84 '=': self.esc_DECKPAM,
85 '>': self.esc_DECKPNM,
86 'D': self.esc_IND,
87 'E': self.esc_NEL,
88 'H': self.esc_HTS,
89 'M': self.esc_RI,
90 'N': self.esc_SS2,
91 'O': self.esc_SS3,
92 'P': self.esc_DCS,
93 'X': self.esc_SOS,
94 'Z': self.esc_DECID,
95 '[': self.esc_CSI,
96 '\\': self.esc_ST,
97 ']': self.esc_OSC,
98 '^': self.esc_PM,
99 '_': self.esc_APC,
100 'c': self.reset_hard,
102 self.vt100_csi = {
103 '@': self.csi_ICH,
104 'A': self.csi_CUU,
105 'B': self.csi_CUD,
106 'C': self.csi_CUF,
107 'D': self.csi_CUB,
108 'E': self.csi_CNL,
109 'F': self.csi_CPL,
110 'G': self.csi_CHA,
111 'H': self.csi_CUP,
112 'I': self.csi_CHT,
113 'J': self.csi_ED,
114 'K': self.csi_EL,
115 'L': self.csi_IL,
116 'M': self.csi_DL,
117 'P': self.csi_DCH,
118 'S': self.csi_SU,
119 'T': self.csi_SD,
120 'W': self.csi_CTC,
121 'X': self.csi_ECH,
122 'Z': self.csi_CBT,
123 '`': self.csi_HPA,
124 'a': self.csi_HPR,
125 'b': self.csi_REP,
126 'c': self.csi_DA,
127 'd': self.csi_VPA,
128 'e': self.csi_VPR,
129 'f': self.csi_HVP,
130 'g': self.csi_TBC,
131 'h': self.csi_SM,
132 'l': self.csi_RM,
133 'm': self.csi_SGR,
134 'n': self.csi_DSR,
135 'r': self.csi_DECSTBM,
136 's': self.csi_SCP,
137 'u': self.csi_RCP,
138 'x': self.csi_DECREQTPARM,
139 '!p': self.csi_DECSTR,
141 self.reset_hard()
143 # Reset functions
144 def reset_hard(self):
145 # Attribute mask: 0x0XFB0000
146 # X: Bit 0 - Underlined
147 # Bit 1 - Negative
148 # Bit 2 - Concealed
149 # F: Foreground
150 # B: Background
151 self.attr = 0x00fe0000
152 # UTF-8 decoder
153 self.utf8_units_count = 0
154 self.utf8_units_received = 0
155 self.utf8_char = 0
156 # Key filter
157 self.vt100_keyfilter_escape = False
158 # Last char
159 self.vt100_lastchar = 0
160 # Control sequences
161 self.vt100_parse_len = 0
162 self.vt100_parse_state = ""
163 self.vt100_parse_func = ""
164 self.vt100_parse_param = ""
165 # Buffers
166 self.vt100_out = ""
167 # Invoke other resets
168 self.reset_screen()
169 self.reset_soft()
171 def reset_soft(self):
172 # Attribute mask: 0x0XFB0000
173 # X: Bit 0 - Underlined
174 # Bit 1 - Negative
175 # Bit 2 - Concealed
176 # F: Foreground
177 # B: Background
178 self.attr = 0x00fe0000
179 # Scroll parameters
180 self.scroll_area_y0 = 0
181 self.scroll_area_y1 = self.h
182 # Character sets
183 self.vt100_charset_is_single_shift = False
184 self.vt100_charset_is_graphical = False
185 self.vt100_charset_g_sel = 0
186 self.vt100_charset_g = [0, 0]
187 # Modes
188 self.vt100_mode_insert = False
189 self.vt100_mode_lfnewline = False
190 self.vt100_mode_cursorkey = False
191 self.vt100_mode_column_switch = False
192 self.vt100_mode_inverse = False
193 self.vt100_mode_origin = False
194 self.vt100_mode_autowrap = True
195 self.vt100_mode_cursor = True
196 self.vt100_mode_alt_screen = False
197 self.vt100_mode_backspace = False
198 # Init DECSC state
199 self.esc_DECSC()
200 self.vt100_saved2 = self.vt100_saved
201 self.esc_DECSC()
203 def reset_screen(self):
204 # Screen
205 self.screen = array.array('i', [self.attr | 0x20] * self.w * self.h)
206 self.screen2 = array.array('i', [self.attr | 0x20] * self.w * self.h)
207 # Scroll parameters
208 self.scroll_area_y0 = 0
209 self.scroll_area_y1 = self.h
210 # Cursor position
211 self.cx = 0
212 self.cy = 0
213 # Tab stops
214 self.tab_stops = range(0, self.w, 8)
216 # UTF-8 functions
217 def utf8_decode(self, d):
218 o = ''
219 for c in d:
220 char = ord(c)
221 if self.utf8_units_count != self.utf8_units_received:
222 self.utf8_units_received += 1
223 if (char & 0xc0) == 0x80:
224 self.utf8_char = (self.utf8_char << 6) | (char & 0x3f)
225 if self.utf8_units_count == self.utf8_units_received:
226 if self.utf8_char < 0x10000:
227 o += unichr(self.utf8_char)
228 self.utf8_units_count = self.utf8_units_received = 0
229 else:
230 o += '?'
231 while self.utf8_units_received:
232 o += '?'
233 self.utf8_units_received -= 1
234 self.utf8_units_count = 0
235 else:
236 if (char & 0x80) == 0x00:
237 o += c
238 elif (char & 0xe0) == 0xc0:
239 self.utf8_units_count = 1
240 self.utf8_char = char & 0x1f
241 elif (char & 0xf0) == 0xe0:
242 self.utf8_units_count = 2
243 self.utf8_char = char & 0x0f
244 elif (char & 0xf8) == 0xf0:
245 self.utf8_units_count = 3
246 self.utf8_char = char & 0x07
247 else:
248 o += '?'
249 return o
251 @staticmethod
252 def utf8_charwidth(char):
253 if char >= 0x2e80:
254 return 2
255 return 1
257 # Low-level terminal functions
258 def peek(self, y0, x0, y1, x1):
259 return self.screen[self.w * y0 + x0:self.w * (y1 - 1) + x1]
261 def poke(self, y, x, s):
262 pos = self.w * y + x
263 self.screen[pos:pos + len(s)] = s
265 def fill(self, y0, x0, y1, x1, char):
266 n = self.w * (y1 - y0 - 1) + (x1 - x0)
267 self.poke(y0, x0, array.array('i', [char] * n))
269 def clear(self, y0, x0, y1, x1):
270 self.fill(y0, x0, y1, x1, self.attr | 0x20)
272 # Scrolling functions
273 def scroll_area_up(self, y0, y1, n=1):
274 n = min(y1 - y0, n)
275 self.poke(y0, 0, self.peek(y0 + n, 0, y1, self.w))
276 self.clear(y1 - n, 0, y1, self.w)
278 def scroll_area_down(self, y0, y1, n=1):
279 n = min(y1 - y0, n)
280 self.poke(y0 + n, 0, self.peek(y0, 0, y1 - n, self.w))
281 self.clear(y0, 0, y0 + n, self.w)
283 def scroll_area_set(self, y0, y1):
284 y0 = max(0, min(self.h - 1, y0))
285 y1 = max(1, min(self.h, y1))
286 if y1 > y0:
287 self.scroll_area_y0 = y0
288 self.scroll_area_y1 = y1
290 def scroll_line_right(self, y, x, n=1):
291 if x < self.w:
292 n = min(self.w - self.cx, n)
293 self.poke(y, x + n, self.peek(y, x, y + 1, self.w - n))
294 self.clear(y, x, y + 1, x + n)
296 def scroll_line_left(self, y, x, n=1):
297 if x < self.w:
298 n = min(self.w - self.cx, n)
299 self.poke(y, x, self.peek(y, x + n, y + 1, self.w))
300 self.clear(y, self.w - n, y + 1, self.w)
302 # Cursor functions
303 def cursor_line_width(self, next_char):
304 wx = self.utf8_charwidth(next_char)
305 lx = 0
306 for x in xrange(min(self.cx, self.w)):
307 char = self.peek(self.cy, x, self.cy + 1, x + 1)[0] & 0xffff
308 wx += self.utf8_charwidth(char)
309 lx += 1
310 return wx, lx
312 def cursor_up(self, n=1):
313 self.cy = max(self.scroll_area_y0, self.cy - n)
315 def cursor_down(self, n=1):
316 self.cy = min(self.scroll_area_y1 - 1, self.cy + n)
318 def cursor_left(self, n=1):
319 self.cx = max(0, self.cx - n)
321 def cursor_right(self, n=1):
322 self.cx = min(self.w - 1, self.cx + n)
324 def cursor_set_x(self, x):
325 self.cx = max(0, x)
327 def cursor_set_y(self, y):
328 self.cy = max(0, min(self.h - 1, y))
330 def cursor_set(self, y, x):
331 self.cursor_set_x(x)
332 self.cursor_set_y(y)
334 # Dumb terminal
335 def ctrl_BS(self):
336 delta_y, cx = divmod(self.cx - 1, self.w)
337 cy = max(self.scroll_area_y0, self.cy + delta_y)
338 self.cursor_set(cy, cx)
340 def ctrl_HT(self, n=1):
341 if n > 0 and self.cx >= self.w:
342 return
343 if n <= 0 and self.cx == 0:
344 return
345 ts = 0
346 for i in xrange(len(self.tab_stops)):
347 if self.cx >= self.tab_stops[i]:
348 ts = i
349 ts += n
350 if ts < len(self.tab_stops) and ts >= 0:
351 self.cursor_set_x(self.tab_stops[ts])
352 else:
353 self.cursor_set_x(self.w - 1)
355 def ctrl_LF(self):
356 if self.vt100_mode_lfnewline:
357 self.ctrl_CR()
358 if self.cy == self.scroll_area_y1 - 1:
359 self.scroll_area_up(self.scroll_area_y0, self.scroll_area_y1)
360 else:
361 self.cursor_down()
363 def ctrl_CR(self):
364 self.cursor_set_x(0)
366 def dumb_write(self, char):
367 if char < 32:
368 if char == 8:
369 self.ctrl_BS()
370 elif char == 9:
371 self.ctrl_HT()
372 elif char >= 10 and char <= 12:
373 self.ctrl_LF()
374 elif char == 13:
375 self.ctrl_CR()
376 return True
377 return False
379 def dumb_echo(self, char):
380 # Check right bound
381 wx, cx = self.cursor_line_width(char)
382 # Newline
383 if wx > self.w:
384 if self.vt100_mode_autowrap:
385 self.ctrl_CR()
386 self.ctrl_LF()
387 else:
388 self.cx = cx - 1
389 if self.vt100_mode_insert:
390 self.scroll_line_right(self.cy, self.cx)
391 if self.vt100_charset_is_single_shift:
392 self.vt100_charset_is_single_shift = False
393 elif self.vt100_charset_is_graphical and (char & 0xffe0) == 0x0060:
394 char = VT100_CHARSET_GRAPH[char - 0x60]
395 self.poke(self.cy, self.cx, array.array('i', [self.attr | char]))
396 self.cursor_set_x(self.cx + 1)
398 # VT100 CTRL, ESC, CSI handlers
399 def vt100_charset_update(self):
400 self.vt100_charset_is_graphical = (
401 self.vt100_charset_g[self.vt100_charset_g_sel] == 2)
403 def vt100_charset_set(self, g):
404 # Invoke active character set
405 self.vt100_charset_g_sel = g
406 self.vt100_charset_update()
408 def vt100_charset_select(self, g, charset):
409 # Select charset
410 self.vt100_charset_g[g] = charset
411 self.vt100_charset_update()
413 def vt100_setmode(self, p, state):
414 # Set VT100 mode
415 p = self.vt100_parse_params(p, [], False)
416 for m in p:
417 if m == '4':
418 # Insertion replacement mode
419 self.vt100_mode_insert = state
420 elif m == '20':
421 # Linefeed/new line mode
422 self.vt100_mode_lfnewline = state
423 elif m == '?1':
424 # Cursor key mode
425 self.vt100_mode_cursorkey = state
426 elif m == '?3':
427 # Column mode
428 if self.vt100_mode_column_switch:
429 if state:
430 self.w = 132
431 else:
432 self.w = 80
433 self.reset_screen()
434 elif m == '?5':
435 # Screen mode
436 self.vt100_mode_inverse = state
437 elif m == '?6':
438 # Region origin mode
439 self.vt100_mode_origin = state
440 if state:
441 self.cursor_set(self.scroll_area_y0, 0)
442 else:
443 self.cursor_set(0, 0)
444 elif m == '?7':
445 # Autowrap mode
446 self.vt100_mode_autowrap = state
447 elif m == '?25':
448 # Text cursor enable mode
449 self.vt100_mode_cursor = state
450 elif m == '?40':
451 # Column switch control
452 self.vt100_mode_column_switch = state
453 elif m == '?47':
454 # Alternate screen mode
455 if ((state and not self.vt100_mode_alt_screen) or
456 (not state and self.vt100_mode_alt_screen)):
457 self.screen, self.screen2 = self.screen2, self.screen
458 self.vt100_saved, self.vt100_saved2 = self.vt100_saved2, self.vt100_saved
459 self.vt100_mode_alt_screen = state
460 elif m == '?67':
461 # Backspace/delete
462 self.vt100_mode_backspace = state
464 def ctrl_SO(self):
465 # Shift out
466 self.vt100_charset_set(1)
468 def ctrl_SI(self):
469 # Shift in
470 self.vt100_charset_set(0)
472 def esc_CSI(self):
473 # CSI start sequence
474 self.vt100_parse_reset('csi')
476 def esc_DECALN(self):
477 # Screen alignment display
478 self.fill(0, 0, self.h, self.w, 0x00fe0045)
480 def esc_G0_0(self):
481 self.vt100_charset_select(0, 0)
483 def esc_G0_1(self):
484 self.vt100_charset_select(0, 1)
486 def esc_G0_2(self):
487 self.vt100_charset_select(0, 2)
489 def esc_G0_3(self):
490 self.vt100_charset_select(0, 3)
492 def esc_G0_4(self):
493 self.vt100_charset_select(0, 4)
495 def esc_G1_0(self):
496 self.vt100_charset_select(1, 0)
498 def esc_G1_1(self):
499 self.vt100_charset_select(1, 1)
501 def esc_G1_2(self):
502 self.vt100_charset_select(1, 2)
504 def esc_G1_3(self):
505 self.vt100_charset_select(1, 3)
507 def esc_G1_4(self):
508 self.vt100_charset_select(1, 4)
510 def esc_DECSC(self):
511 # Store cursor
512 self.vt100_saved = {}
513 self.vt100_saved['cx'] = self.cx
514 self.vt100_saved['cy'] = self.cy
515 self.vt100_saved['attr'] = self.attr
516 self.vt100_saved['charset_g_sel'] = self.vt100_charset_g_sel
517 self.vt100_saved['charset_g'] = self.vt100_charset_g[:]
518 self.vt100_saved['mode_autowrap'] = self.vt100_mode_autowrap
519 self.vt100_saved['mode_origin'] = self.vt100_mode_origin
521 def esc_DECRC(self):
522 # Retore cursor
523 self.cx = self.vt100_saved['cx']
524 self.cy = self.vt100_saved['cy']
525 self.attr = self.vt100_saved['attr']
526 self.vt100_charset_g_sel = self.vt100_saved['charset_g_sel']
527 self.vt100_charset_g = self.vt100_saved['charset_g'][:]
528 self.vt100_charset_update()
529 self.vt100_mode_autowrap = self.vt100_saved['mode_autowrap']
530 self.vt100_mode_origin = self.vt100_saved['mode_origin']
532 def esc_DECKPAM(self):
533 # Application keypad mode
534 pass
536 def esc_DECKPNM(self):
537 # Numeric keypad mode
538 pass
540 def esc_IND(self):
541 # Index
542 self.ctrl_LF()
544 def esc_NEL(self):
545 # Next line
546 self.ctrl_CR()
547 self.ctrl_LF()
549 def esc_HTS(self):
550 # Character tabulation set
551 self.csi_CTC('0')
553 def esc_RI(self):
554 # Reverse line feed
555 if self.cy == self.scroll_area_y0:
556 self.scroll_area_down(self.scroll_area_y0, self.scroll_area_y1)
557 else:
558 self.cursor_up()
560 def esc_SS2(self):
561 # Single-shift two
562 self.vt100_charset_is_single_shift = True
564 def esc_SS3(self):
565 # Single-shift three
566 self.vt100_charset_is_single_shift = True
568 def esc_DCS(self):
569 # Device control string
570 self.vt100_parse_reset('str')
572 def esc_SOS(self):
573 # Start of string
574 self.vt100_parse_reset('str')
576 def esc_DECID(self):
577 # Identify terminal
578 self.csi_DA('0')
580 def esc_ST(self):
581 # String terminator
582 pass
584 def esc_OSC(self):
585 # Operating system command
586 self.vt100_parse_reset('str')
588 def esc_PM(self):
589 # Privacy message
590 self.vt100_parse_reset('str')
592 def esc_APC(self):
593 # Application program command
594 self.vt100_parse_reset('str')
596 def csi_ICH(self, p):
597 # Insert character
598 p = self.vt100_parse_params(p, [1])
599 self.scroll_line_right(self.cy, self.cx, p[0])
601 def csi_CUU(self, p):
602 # Cursor up
603 p = self.vt100_parse_params(p, [1])
604 self.cursor_up(max(1, p[0]))
606 def csi_CUD(self, p):
607 # Cursor down
608 p = self.vt100_parse_params(p, [1])
609 self.cursor_down(max(1, p[0]))
611 def csi_CUF(self, p):
612 # Cursor right
613 p = self.vt100_parse_params(p, [1])
614 self.cursor_right(max(1, p[0]))
616 def csi_CUB(self, p):
617 # Cursor left
618 p = self.vt100_parse_params(p, [1])
619 self.cursor_left(max(1, p[0]))
621 def csi_CNL(self, p):
622 # Cursor next line
623 self.csi_CUD(p)
624 self.ctrl_CR()
626 def csi_CPL(self, p):
627 # Cursor preceding line
628 self.csi_CUU(p)
629 self.ctrl_CR()
631 def csi_CHA(self, p):
632 # Cursor character absolute
633 p = self.vt100_parse_params(p, [1])
634 self.cursor_set_x(p[0] - 1)
636 def csi_CUP(self, p):
637 # Set cursor position
638 p = self.vt100_parse_params(p, [1, 1])
639 if self.vt100_mode_origin:
640 self.cursor_set(self.scroll_area_y0 + p[0] - 1, p[1] - 1)
641 else:
642 self.cursor_set(p[0] - 1, p[1] - 1)
644 def csi_CHT(self, p):
645 # Cursor forward tabulation
646 p = self.vt100_parse_params(p, [1])
647 self.ctrl_HT(max(1, p[0]))
649 def csi_ED(self, p):
650 # Erase in display
651 p = self.vt100_parse_params(p, ['0'], False)
652 if p[0] == '0':
653 self.clear(self.cy, self.cx, self.h, self.w)
654 elif p[0] == '1':
655 self.clear(0, 0, self.cy + 1, self.cx + 1)
656 elif p[0] == '2':
657 self.clear(0, 0, self.h, self.w)
659 def csi_EL(self, p):
660 # Erase in line
661 p = self.vt100_parse_params(p, ['0'], False)
662 if p[0] == '0':
663 self.clear(self.cy, self.cx, self.cy + 1, self.w)
664 elif p[0] == '1':
665 self.clear(self.cy, 0, self.cy + 1, self.cx + 1)
666 elif p[0] == '2':
667 self.clear(self.cy, 0, self.cy + 1, self.w)
669 def csi_IL(self, p):
670 # Insert line
671 p = self.vt100_parse_params(p, [1])
672 if (self.cy >= self.scroll_area_y0 and self.cy < self.scroll_area_y1):
673 self.scroll_area_down(self.cy, self.scroll_area_y1, max(1, p[0]))
675 def csi_DL(self, p):
676 # Delete line
677 p = self.vt100_parse_params(p, [1])
678 if (self.cy >= self.scroll_area_y0 and self.cy < self.scroll_area_y1):
679 self.scroll_area_up(self.cy, self.scroll_area_y1, max(1, p[0]))
681 def csi_DCH(self, p):
682 # Delete characters
683 p = self.vt100_parse_params(p, [1])
684 self.scroll_line_left(self.cy, self.cx, max(1, p[0]))
686 def csi_SU(self, p):
687 # Scroll up
688 p = self.vt100_parse_params(p, [1])
689 self.scroll_area_up(
690 self.scroll_area_y0, self.scroll_area_y1, max(1, p[0]))
692 def csi_SD(self, p):
693 # Scroll down
694 p = self.vt100_parse_params(p, [1])
695 self.scroll_area_down(
696 self.scroll_area_y0, self.scroll_area_y1, max(1, p[0]))
698 def csi_CTC(self, p):
699 # Cursor tabulation control
700 p = self.vt100_parse_params(p, ['0'], False)
701 for m in p:
702 if m == '0':
703 if self.cx not in self.tab_stops:
704 self.tab_stops.append(self.cx)
705 self.tab_stops.sort()
706 elif m == '2':
707 try:
708 self.tab_stops.remove(self.cx)
709 except ValueError:
710 pass
711 elif m == '5':
712 self.tab_stops = [0]
714 def csi_ECH(self, p):
715 # Erase character
716 p = self.vt100_parse_params(p, [1])
717 n = min(self.w - self.cx, max(1, p[0]))
718 self.clear(self.cy, self.cx, self.cy + 1, self.cx + n)
720 def csi_CBT(self, p):
721 # Cursor backward tabulation
722 p = self.vt100_parse_params(p, [1])
723 self.ctrl_HT(1 - max(1, p[0]))
725 def csi_HPA(self, p):
726 # Character position absolute
727 p = self.vt100_parse_params(p, [1])
728 self.cursor_set_x(p[0] - 1)
730 def csi_HPR(self, p):
731 # Character position forward
732 self.csi_CUF(p)
734 def csi_REP(self, p):
735 # Repeat
736 p = self.vt100_parse_params(p, [1])
737 if self.vt100_lastchar < 32:
738 return
739 n = min(2000, max(1, p[0]))
740 while n:
741 self.dumb_echo(self.vt100_lastchar)
742 n -= 1
743 self.vt100_lastchar = 0
745 def csi_DA(self, p):
746 # Device attributes
747 p = self.vt100_parse_params(p, ['0'], False)
748 if p[0] == '0':
749 self.vt100_out = "\x1b[?1;2c"
750 elif p[0] == '>0' or p[0] == '>':
751 self.vt100_out = "\x1b[>0;184;0c"
753 def csi_VPA(self, p):
754 # Line position absolute
755 p = self.vt100_parse_params(p, [1])
756 self.cursor_set_y(p[0] - 1)
758 def csi_VPR(self, p):
759 # Line position forward
760 self.csi_CUD(p)
762 def csi_HVP(self, p):
763 # Character and line position
764 self.csi_CUP(p)
766 def csi_TBC(self, p):
767 # Tabulation clear
768 p = self.vt100_parse_params(p, ['0'], False)
769 if p[0] == '0':
770 self.csi_CTC('2')
771 elif p[0] == '3':
772 self.csi_CTC('5')
774 def csi_SM(self, p):
775 # Set mode
776 self.vt100_setmode(p, True)
778 def csi_RM(self, p):
779 # Reset mode
780 self.vt100_setmode(p, False)
782 def csi_SGR(self, p):
783 # Select graphic rendition
784 p = self.vt100_parse_params(p, [0])
785 for m in p:
786 if m == 0:
787 # Reset
788 self.attr = 0x00fe0000
789 elif m == 4:
790 # Underlined
791 self.attr |= 0x01000000
792 elif m == 7:
793 # Negative
794 self.attr |= 0x02000000
795 elif m == 8:
796 # Concealed
797 self.attr |= 0x04000000
798 elif m == 24:
799 # Not underlined
800 self.attr &= 0x7eff0000
801 elif m == 27:
802 # Positive
803 self.attr &= 0x7dff0000
804 elif m == 28:
805 # Revealed
806 self.attr &= 0x7bff0000
807 elif m >= 30 and m <= 37:
808 # Foreground
809 self.attr = (self.attr & 0x7f0f0000) | ((m - 30) << 20)
810 elif m == 39:
811 # Default fg color
812 self.attr = (self.attr & 0x7f0f0000) | 0x00f00000
813 elif m >= 40 and m <= 47:
814 # Background
815 self.attr = (self.attr & 0x7ff00000) | ((m - 40) << 16)
816 elif m == 49:
817 # Default bg color
818 self.attr = (self.attr & 0x7ff00000) | 0x000e0000
820 def csi_DSR(self, p):
821 # Device status report
822 p = self.vt100_parse_params(p, ['0'], False)
823 if p[0] == '5':
824 self.vt100_out = "\x1b[0n"
825 elif p[0] == '6':
826 x = self.cx + 1
827 y = self.cy + 1
828 self.vt100_out = '\x1b[%d;%dR' % (y, x)
829 elif p[0] == '7':
830 self.vt100_out = 'WebShell'
831 elif p[0] == '8':
832 self.vt100_out = __version__
833 elif p[0] == '?6':
834 x = self.cx + 1
835 y = self.cy + 1
836 self.vt100_out = '\x1b[?%d;%dR' % (y, x)
837 elif p[0] == '?15':
838 self.vt100_out = '\x1b[?13n'
839 elif p[0] == '?25':
840 self.vt100_out = '\x1b[?20n'
841 elif p[0] == '?26':
842 self.vt100_out = '\x1b[?27;1n'
843 elif p[0] == '?53':
844 self.vt100_out = '\x1b[?53n'
846 def csi_DECSTBM(self, p):
847 # Set top and bottom margins
848 p = self.vt100_parse_params(p, [1, self.h])
849 self.scroll_area_set(p[0] - 1, p[1])
850 if self.vt100_mode_origin:
851 self.cursor_set(self.scroll_area_y0, 0)
852 else:
853 self.cursor_set(0, 0)
855 def csi_SCP(self, p):
856 # Save cursor position
857 self.vt100_saved_cx = self.cx
858 self.vt100_saved_cy = self.cy
860 def csi_RCP(self, p):
861 # Restore cursor position
862 self.cx = self.vt100_saved_cx
863 self.cy = self.vt100_saved_cy
865 def csi_DECREQTPARM(self, p):
866 # Request terminal parameters
867 p = self.vt100_parse_params(p, [], False)
868 if p[0] == '0':
869 self.vt100_out = "\x1b[2;1;1;112;112;1;0x"
870 elif p[0] == '1':
871 self.vt100_out = "\x1b[3;1;1;112;112;1;0x"
873 def csi_DECSTR(self, p):
874 # Soft terminal reset
875 self.reset_soft()
877 # VT100 Parser
878 def vt100_parse_params(self, p, d, to_int=True):
879 # Process parameters (params p with defaults d)
880 # Add prefix to all parameters
881 prefix = ''
882 if p:
883 if p[0] >= '<' and p[0] <= '?':
884 prefix = p[0]
885 p = p[1:]
886 p = p.split(';')
887 else:
888 p = ''
889 # Process parameters
890 n = max(len(p), len(d))
891 o = []
892 for i in xrange(n):
893 value_def = False
894 if i < len(p):
895 value = prefix + p[i]
896 value_def = True
897 if to_int:
898 try:
899 value = int(value)
900 except ValueError:
901 value_def = False
902 if (not value_def) and i < len(d):
903 value = d[i]
904 o.append(value)
905 return o
907 def vt100_parse_reset(self, vt100_parse_state=""):
908 self.vt100_parse_state = vt100_parse_state
909 self.vt100_parse_len = 0
910 self.vt100_parse_func = ""
911 self.vt100_parse_param = ""
913 def vt100_parse_process(self):
914 if self.vt100_parse_state == 'esc':
915 # ESC mode
916 f = self.vt100_parse_func
917 try:
918 self.vt100_esc[f]()
919 except KeyError:
920 pass
921 if self.vt100_parse_state == 'esc':
922 self.vt100_parse_reset()
923 else:
924 # CSI mode
925 f = self.vt100_parse_func
926 p = self.vt100_parse_param
927 try:
928 self.vt100_csi[f](p)
929 except KeyError:
930 pass
931 if self.vt100_parse_state == 'csi':
932 self.vt100_parse_reset()
934 def vt100_write(self, char):
935 if char < 32:
936 if char == 27:
937 self.vt100_parse_reset('esc')
938 return True
939 elif char == 14:
940 self.ctrl_SO()
941 elif char == 15:
942 self.ctrl_SI()
943 elif (char & 0xffe0) == 0x0080:
944 self.vt100_parse_reset('esc')
945 self.vt100_parse_func = chr(char - 0x40)
946 self.vt100_parse_process()
947 return True
949 if self.vt100_parse_state:
950 if self.vt100_parse_state == 'str':
951 if char >= 32:
952 return True
953 self.vt100_parse_reset()
954 else:
955 if char < 32:
956 if char == 24 or char == 26:
957 self.vt100_parse_reset()
958 return True
959 else:
960 self.vt100_parse_len += 1
961 if self.vt100_parse_len > 32:
962 self.vt100_parse_reset()
963 else:
964 char_msb = char & 0xf0
965 if char_msb == 0x20:
966 # Intermediate bytes (added to function)
967 self.vt100_parse_func += unichr(char)
968 elif char_msb == 0x30 and self.vt100_parse_state == 'csi':
969 # Parameter byte
970 self.vt100_parse_param += unichr(char)
971 else:
972 # Function byte
973 self.vt100_parse_func += unichr(char)
974 self.vt100_parse_process()
975 return True
976 self.vt100_lastchar = char
977 return False
979 # External interface
980 def set_size(self, w, h):
981 if w < 2 or w > 256 or h < 2 or h > 256:
982 return False
983 self.w = w
984 self.h = h
985 self.reset_screen()
986 return True
988 def read(self):
989 d = self.vt100_out
990 self.vt100_out = ""
991 return d
993 def write(self, d):
994 d = self.utf8_decode(d)
995 for c in d:
996 char = ord(c)
997 if self.vt100_write(char):
998 continue
999 if self.dumb_write(char):
1000 continue
1001 if char <= 0xffff:
1002 self.dumb_echo(char)
1003 return True
1005 def pipe(self, d):
1006 o = ''
1007 for c in d:
1008 char = ord(c)
1009 if self.vt100_keyfilter_escape:
1010 self.vt100_keyfilter_escape = False
1011 try:
1012 if self.vt100_mode_cursorkey:
1013 o += VT100_KEYFILTER_APPKEYS[c]
1014 else:
1015 o += VT100_KEYFILTER_ANSIKEYS[c]
1016 except KeyError:
1017 pass
1018 elif c == '~':
1019 self.vt100_keyfilter_escape = True
1020 elif char == 127:
1021 if self.vt100_mode_backspace:
1022 o += chr(8)
1023 else:
1024 o += chr(127)
1025 else:
1026 o += c
1027 if self.vt100_mode_lfnewline and char == 13:
1028 o += chr(10)
1029 return o
1031 def dump(self):
1032 screen = []
1033 attr_ = -1
1034 cx, cy = min(self.cx, self.w - 1), self.cy
1035 for y in xrange(0, self.h):
1036 wx = 0
1037 line = [""]
1038 for x in xrange(0, self.w):
1039 d = self.screen[y * self.w + x]
1040 char = d & 0xffff
1041 attr = d >> 16
1042 # Cursor
1043 if cy == y and cx == x and self.vt100_mode_cursor:
1044 attr = attr & 0xfff0 | 0x000c
1045 # Attributes
1046 if attr != attr_:
1047 if attr_ != -1:
1048 line.append("")
1049 bg = attr & 0x000f
1050 fg = (attr & 0x00f0) >> 4
1051 # Inverse
1052 inv = attr & 0x0200
1053 inv2 = self.vt100_mode_inverse
1054 if (inv and not inv2) or (inv2 and not inv):
1055 fg, bg = bg, fg
1056 # Concealed
1057 if attr & 0x0400:
1058 fg = 0xc
1059 # Underline
1060 if attr & 0x0100:
1061 ul = True
1062 else:
1063 ul = False
1064 line.append((fg, bg, ul))
1065 line.append("")
1066 attr_ = attr
1067 wx += self.utf8_charwidth(char)
1068 if wx <= self.w:
1069 line[-1] += unichr(char)
1070 screen.append(line)
1072 return (cx, cy), screen
1075 def synchronized(func):
1076 def wrapper(self, *args, **kwargs):
1077 try:
1078 self.lock.acquire()
1079 except AttributeError:
1080 self.lock = threading.RLock()
1081 self.lock.acquire()
1082 try:
1083 result = func(self, *args, **kwargs)
1084 finally:
1085 self.lock.release()
1086 return result
1087 return wrapper
1090 class Multiplexer(object):
1092 def __init__(self, cmd="/bin/bash", env_term="xterm-color", timeout=60 * 60 * 24):
1093 # Set Linux signal handler
1094 if sys.platform in ("linux2", "linux3"):
1095 self.sigchldhandler = signal(SIGCHLD, SIG_IGN)
1096 # Session
1097 self.session = {}
1098 self.cmd = cmd
1099 self.env_term = env_term
1100 self.timeout = timeout
1102 # Supervisor thread
1103 self.signal_stop = 0
1104 self.thread = threading.Thread(target=self.proc_thread)
1105 self.thread.start()
1107 def stop(self):
1108 # Stop supervisor thread
1109 self.signal_stop = 1
1110 self.thread.join()
1112 def proc_resize(self, sid, w, h):
1113 fd = self.session[sid]['fd']
1114 # Set terminal size
1115 try:
1116 fcntl.ioctl(fd,
1117 struct.unpack('i',
1118 struct.pack('I', TIOCSWINSZ)
1119 )[0],
1120 struct.pack("HHHH", h, w, 0, 0))
1121 except (IOError, OSError):
1122 pass
1123 self.session[sid]['term'].set_size(w, h)
1124 self.session[sid]['w'] = w
1125 self.session[sid]['h'] = h
1127 @synchronized
1128 def proc_keepalive(self, sid, w, h, cmd=None):
1129 if not sid in self.session:
1130 # Start a new session
1131 self.session[sid] = {
1132 'state': 'unborn',
1133 'term': Terminal(w, h),
1134 'time': time.time(),
1135 'w': w,
1136 'h': h }
1137 return self.proc_spawn(sid, cmd)
1138 if self.session[sid]['state'] == 'alive':
1139 self.session[sid]['time'] = time.time()
1140 # Update terminal size
1141 if self.session[sid]['w'] != w or self.session[sid]['h'] != h:
1142 self.proc_resize(sid, w, h)
1143 return True
1144 return False
1146 def proc_spawn(self, sid, cmd=None):
1147 # Session
1148 self.session[sid]['state'] = 'alive'
1149 w, h = self.session[sid]['w'], self.session[sid]['h']
1150 # Fork new process
1151 try:
1152 pid, fd = pty.fork()
1153 except (IOError, OSError):
1154 self.session[sid]['state'] = 'dead'
1155 return False
1156 if pid == 0:
1157 cmd = cmd or self.cmd
1158 # Safe way to make it work under BSD and Linux
1159 try:
1160 ls = os.environ['LANG'].split('.')
1161 except KeyError:
1162 ls = []
1163 if len(ls) < 2:
1164 ls = ['en_US', 'UTF-8']
1165 try:
1166 os.putenv('COLUMNS', str(w))
1167 os.putenv('LINES', str(h))
1168 os.putenv('TERM', self.env_term)
1169 os.putenv('PATH', os.environ['PATH'])
1170 os.putenv('LANG', ls[0] + '.UTF-8')
1171 # os.system(cmd)
1172 proc = Popen(cmd, shell=False)
1173 # print "called with subprocess", proc.pid
1174 child_pid, sts = os.waitpid(proc.pid, 0)
1175 # print "child_pid", child_pid, sts
1176 except (IOError, OSError):
1177 pass
1178 # self.proc_finish(sid)
1179 os._exit(0)
1180 else:
1181 # Store session vars
1182 self.session[sid]['pid'] = pid
1183 self.session[sid]['fd'] = fd
1184 # Set file control
1185 fcntl.fcntl(fd, fcntl.F_SETFL, os.O_NONBLOCK)
1186 # Set terminal size
1187 self.proc_resize(sid, w, h)
1188 return True
1190 def proc_waitfordeath(self, sid):
1191 try:
1192 os.close(self.session[sid]['fd'])
1193 except (KeyError, IOError, OSError):
1194 pass
1195 if sid in self.session:
1196 if 'fd' in self.session[sid]:
1197 del self.session[sid]['fd']
1198 try:
1199 os.waitpid(self.session[sid]['pid'], 0)
1200 except (KeyError, IOError, OSError):
1201 pass
1202 if sid in self.session:
1203 if 'pid' in self.session[sid]:
1204 del self.session[sid]['pid']
1205 self.session[sid]['state'] = 'dead'
1206 return True
1208 def proc_bury(self, sid):
1209 if self.session[sid]['state'] == 'alive':
1210 try:
1211 os.kill(self.session[sid]['pid'], SIGTERM)
1212 except (IOError, OSError):
1213 pass
1214 self.proc_waitfordeath(sid)
1215 if sid in self.session:
1216 del self.session[sid]
1217 return True
1219 @synchronized
1220 def proc_buryall(self):
1221 for sid in self.session.keys():
1222 self.proc_bury(sid)
1224 @synchronized
1225 def proc_read(self, sid):
1227 Read from process
1229 if sid not in self.session:
1230 return False
1231 elif self.session[sid]['state'] != 'alive':
1232 return False
1233 try:
1234 fd = self.session[sid]['fd']
1235 d = os.read(fd, 65536)
1236 if not d:
1237 # Process finished, BSD
1238 self.proc_waitfordeath(sid)
1239 return False
1240 except (IOError, OSError):
1241 # Process finished, Linux
1242 self.proc_waitfordeath(sid)
1243 return False
1244 term = self.session[sid]['term']
1245 term.write(d)
1246 # Read terminal response
1247 d = term.read()
1248 if d:
1249 try:
1250 os.write(fd, d)
1251 except (IOError, OSError):
1252 return False
1253 return True
1255 @synchronized
1256 def proc_write(self, sid, d):
1257 " Write to process "
1258 if sid not in self.session:
1259 return False
1260 if self.session[sid]['state'] != 'alive':
1261 return False
1262 try:
1263 term = self.session[sid]['term']
1264 d = term.pipe(d)
1265 fd = self.session[sid]['fd']
1266 os.write(fd, d)
1267 except (IOError, OSError):
1268 return False
1269 return True
1271 @synchronized
1272 def proc_dump(self, sid):
1273 " Dump terminal output "
1274 if sid not in self.session:
1275 return False
1276 return self.session[sid]['term'].dump()
1278 @synchronized
1279 def proc_getalive(self):
1281 Get alive sessions, bury timed out ones
1283 fds = []
1284 fd2sid = {}
1285 now = time.time()
1286 for sid in self.session.keys():
1287 then = self.session[sid]['time']
1288 if (now - then) > self.timeout:
1289 self.proc_bury(sid)
1290 else:
1291 if self.session[sid]['state'] == 'alive':
1292 fds.append(self.session[sid]['fd'])
1293 fd2sid[self.session[sid]['fd']] = sid
1294 return (fds, fd2sid)
1296 def proc_thread(self):
1298 Supervisor thread
1300 while not self.signal_stop:
1301 # Read fds
1302 (fds, fd2sid) = self.proc_getalive()
1303 try:
1304 i, o, e = select(fds, [], [], 1.0)
1305 except (IOError, OSError):
1306 i = []
1307 for fd in i:
1308 sid = fd2sid[fd]
1309 self.proc_read(sid)
1310 self.session[sid]["changed"] = time.time()
1311 if len(i):
1312 time.sleep(0.002)
1313 self.proc_buryall()
1316 def ssh_command(login, executable="ssh"):
1317 cmd = executable
1318 cmd += ' -oPreferredAuthentications=keyboard-interactive,password'
1319 cmd += ' -oNoHostAuthenticationForLocalhost=yes'
1320 cmd += ' -oLogLevel=FATAL'
1321 cmd += ' -F/dev/null -l' + login + ' localhost'
1322 return cmd
1325 class Session(object):
1326 _mux = None
1328 @classmethod
1329 def close_all(cls):
1330 Session._mux.stop()
1332 def __init__(self, cmd=None, width=80, height=24):
1333 if not Session._mux:
1334 Session._mux = Multiplexer()
1335 self._session_id = "%s-%s" % (time.time(), id(self))
1336 self._width = width
1337 self._height = height
1338 self._started = False
1340 def resize(self, width, height):
1341 self._width = width
1342 self._height = height
1343 if self._started:
1344 self.keepalive()
1346 def start(self, cmd=None):
1347 self._started = Session._mux.proc_keepalive(
1348 self._session_id, self._width, self._height, cmd or self.cmd)
1349 return self._started
1351 def close(self):
1352 return Session._mux.proc_bury(self._session_id)
1354 stop = close
1356 def is_alive(self):
1357 return Session._mux.session.get(self._session_id, {}).get('state') == 'alive'
1359 def keepalive(self):
1360 return Session._mux.proc_keepalive(self._session_id, self._width, self._height)
1362 def dump(self):
1363 if self.keepalive():
1364 return Session._mux.proc_dump(self._session_id)
1366 def write(self, data):
1367 if self.keepalive():
1368 Session._mux.proc_write(self._session_id, data)
1370 def last_change(self):
1371 return Session._mux.session.get(self._session_id, {}).get("changed", None)
1373 def pid(self):
1374 return Session._mux.session.get(self._session_id, {}).get("pid", None)
1377 if __name__ == "__main__":
1378 w, h = (80, 24)
1379 cmd = "/bin/ls --color=yes"
1380 multiplex = Multiplexer(cmd)
1381 sessionID = "session-id-%s"
1382 if multiplex.proc_keepalive(sessionID, w, h):
1383 #multiplex.proc_write(sessionID, k)
1384 time.sleep(1)
1385 # print multiplex.proc_dump(sessionID)
1386 print "Output:", multiplex.proc_dump(sessionID)
1387 multiplex.stop()