Introduce more bits of the termex testing framework
[centerim5.git] / tests / termex / termex.py
blob43e66077d75313aea536818bcf9234471ebf68a3
1 #!/usr/bin/env python3
3 # Copyright (C) 2016-2017 Petr Pavlu <setup@dagobah.cz>
5 # This file is part of CenterIM.
7 # CenterIM is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation; either version 2 of the License, or
10 # (at your option) any later version.
12 # CenterIM is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with CenterIM. If not, see <http://www.gnu.org/licenses/>.
20 """Termex terminal emulator and test framework."""
22 import argparse
23 import difflib
24 import errno
25 import os
26 import pty
27 import re
28 import selectors
29 import sys
30 import time
31 import tkinter
32 import xml.etree.ElementTree as ElementTree
34 # The module relies on selectors.select() to automatically retry the operation
35 # with a recomputed timeout when it gets interrupted by a signal. This
36 # behaviour was introduced in Python 3.5.
37 if sys.hexversion < 0x03050000:
38 print("This program requires at least Python 3.5.", file=sys.stderr)
39 sys.exit(1)
41 ROWS = 24
42 COLUMNS = 80
43 TERMINFO = '/path_to_terminfo'
44 CHILD_TIMEOUT = 5
46 ATTR_NORMAL = 0
47 ATTR_REVERSE = 1
49 COLOR_BLACK = 0
50 COLOR_RED = 1
51 COLOR_GREEN = 2
52 COLOR_YELLOW = 3
53 COLOR_BLUE = 4
54 COLOR_MAGENTA = 5
55 COLOR_CYAN = 6
56 COLOR_WHITE = 7
57 COLOR_DEFAULT = 9
58 COLOR_REGISTER = ((COLOR_BLACK, 'black'), (COLOR_RED, 'red'),
59 (COLOR_GREEN, 'green'), (COLOR_YELLOW, 'yellow'),
60 (COLOR_BLUE, 'blue'), (COLOR_MAGENTA, 'magenta'),
61 (COLOR_CYAN, 'cyan'), (COLOR_WHITE, 'white'),
62 (COLOR_DEFAULT, 'default'))
63 COLOR_TO_STRING_MAP = {id_: name for id_, name in COLOR_REGISTER}
64 STRING_TO_COLOR_MAP = {name: id_ for id_, name in COLOR_REGISTER}
65 COLORS = {id_ for id_, _ in COLOR_REGISTER}
66 REAL_COLOR_NAMES = tuple(
67 name for id_, name in COLOR_REGISTER if id_ != COLOR_DEFAULT)
69 COLOR_DEFAULT_FOREGROUND = COLOR_BLACK
70 COLOR_DEFAULT_BACKGROUND = COLOR_WHITE
72 CODE_ENTER = '\x0d'
73 CODE_FN = ('\x1bOP', '\x1bOQ', '\x1bOR', '\x1bOS', '\x1b[15~', '\x1b[17~',
74 '\x1b[18~', '\x1b[19~', '\x1b[20~', '\x1b[21~', '\x1b[23~')
75 CODE_PAGE_UP = '\x1b[5~'
76 CODE_PAGE_DOWN = '\x1b[6~'
79 def attr_to_string(attr):
80 """Get string representation of given attributes."""
82 res = []
83 if (attr & ATTR_REVERSE) != 0:
84 res.append('reverse')
85 return '|'.join(res)
88 def string_to_attr(string):
89 """
90 Convert a string to attributes. Exception ValueError is raised if some
91 attribute is invalid.
92 """
94 res = ATTR_NORMAL
95 for attr in string.split('|'):
96 if attr == 'normal':
97 pass
98 elif attr == 'reverse':
99 res |= ATTR_REVERSE
100 else:
101 raise ValueError("Unrecognized attribute '{}'".format(attr))
102 return res
105 def color_to_string(color):
106 """Get string representation of a given color."""
108 return COLOR_TO_STRING_MAP[color]
111 def string_to_color(string):
113 Convert a string to a color. Exception ValueError is raised if the color
114 name is not recognized.
117 try:
118 return STRING_TO_COLOR_MAP[string]
119 except KeyError:
120 raise ValueError("Unrecognized color '{}'".format(string))
123 class TermChar:
124 """On-screen character."""
126 def __init__(self, char=" ", attr=ATTR_NORMAL, fgcolor=COLOR_DEFAULT,
127 bgcolor=COLOR_DEFAULT):
128 self.char = char
129 self.attr = attr
130 self.fgcolor = fgcolor
131 self.bgcolor = bgcolor
133 def __eq__(self, other):
134 return self.__dict__ == other.__dict__
136 def _get_translated_fgcolor(self):
138 Return the foreground color. If the current color is COLOR_DEFAULT then
139 COLOR_DEFAULT_FOREGROUND is returned.
142 if self.fgcolor == COLOR_DEFAULT:
143 return COLOR_DEFAULT_FOREGROUND
144 return self.fgcolor
146 def _get_translated_bgcolor(self):
148 Return the background color. If the current color is COLOR_DEFAULT then
149 COLOR_DEFAULT_BACKGROUND is returned.
152 if self.bgcolor == COLOR_DEFAULT:
153 return COLOR_DEFAULT_BACKGROUND
154 return self.bgcolor
156 def get_tag_foreground(self):
158 Return a name of the final foreground color that should be used to
159 display the character on the screen.
162 if self.attr & ATTR_REVERSE:
163 color = self._get_translated_bgcolor()
164 else:
165 color = self._get_translated_fgcolor()
166 return color_to_string(color)
168 def get_tag_background(self):
170 Return a name of the final background color that should be used to
171 display the character on the screen.
174 if self.attr & ATTR_REVERSE:
175 color = self._get_translated_fgcolor()
176 else:
177 color = self._get_translated_bgcolor()
178 return color_to_string(color)
181 class Term:
182 """Termex terminal emulator."""
184 MODE_RUN = 0
185 MODE_RECORD = 1
186 MODE_TEST = 2
188 class _TerminalConnectionException(Exception):
190 Exception reported when communication with the pseudo-terminal fails.
192 pass
194 def __init__(self, root, program, mode):
195 self._root = root
196 self._program = program
197 self._mode = mode
199 self._child_pid = None
200 self._fd = None
202 self._screen = None
203 self._cur_y = 0
204 self._cur_x = 0
205 self._attr = ATTR_NORMAL
206 self._fgcolor = COLOR_DEFAULT
207 self._bgcolor = COLOR_DEFAULT
208 self._charbuf = b''
210 # Initialize the GUI if requested.
211 if self._root:
212 self._root.title("Termex")
213 self._frame = tkinter.Frame(self._root)
215 self._text = tkinter.Text(self._root, height=ROWS, width=COLUMNS)
216 self._text.config(
217 foreground=color_to_string(COLOR_DEFAULT_FOREGROUND),
218 background=color_to_string(COLOR_DEFAULT_BACKGROUND))
219 self._text.pack()
221 # Configure tag values.
222 for fgcolor_str in REAL_COLOR_NAMES:
223 for bgcolor_str in REAL_COLOR_NAMES:
224 tag = 'tag_{}-{}'.format(fgcolor_str, bgcolor_str)
225 self._text.tag_config(tag, foreground=fgcolor_str,
226 background=bgcolor_str)
228 self._erase_all()
230 if self._mode == self.MODE_RECORD:
231 self._test_e = ElementTree.Element('test')
233 def _start_program(self):
235 Fork, connect the child's controlling terminal to a pseudo-terminal and
236 start the selected child program.
238 Parent behaviour: Returns True when the fork was successful, False
239 otherwise. Note that the returned value does not provide information
240 whether the exec call in the child process was successful or not. That
241 must be determined by attempting communication with the child.
243 Child behaviour: Execs the selected program and does not return if the
244 call was successful, returns False otherwise.
247 # Fork and connect the child's controlling terminal to a
248 # pseudo-terminal.
249 try:
250 self._child_pid, self._fd = pty.fork()
251 except OSError as e:
252 print("Fork to run '{}' failed: {}".format(self._program, e),
253 file=sys.stderr)
254 return False
255 if self._child_pid == 0:
256 try:
257 env = {'PATH': '/bin:/usr/bin', 'TERM': 'termex',
258 'TERMINFO': TERMINFO, 'LC_ALL': 'en_US.UTF-8'}
259 os.execle(self._program, self._program, env)
260 except OSError as e:
261 print("Failed to execute '{}': {}".format(self._program, e),
262 file=sys.stderr)
263 return False
265 return True
267 def _finalize_program(self):
269 Close the connection to the pseudo-terminal and wait for the child
270 program to complete. Returns True when the connection was successfully
271 closed and the child completed in the timeout limit, False otherwise.
274 res = True
276 # Close the file descriptor that is connected to the child's
277 # controlling terminal.
278 try:
279 os.close(self._fd)
280 except OSError as e:
281 print("Failed to close file descriptor '{}' that is connected to "
282 "the child's controlling terminal: {}.".format(self._fd, e),
283 file=sys.stderr)
284 res = False
286 # Wait for the child to finish. It should terminate now that its input
287 # was closed.
288 for _ in range(CHILD_TIMEOUT):
289 try:
290 pid, _status = os.waitpid(self._child_pid, os.WNOHANG)
291 except OSError as e:
292 print("Failed to wait on child '{}' to complete: "
293 "{}.".format(pid, e), file=sys.stderr)
294 res = False
295 break
296 if pid != 0:
297 break
298 time.sleep(1)
299 else:
300 print("Child '{}' has not completed.".format(self._child_pid),
301 file=sys.stderr)
302 res = False
304 return res
306 def run_gui_mainloop(self):
307 """Start the selected child program and run the tkinter's main loop."""
309 assert self._mode == self.MODE_RUN or self._mode == self.MODE_RECORD
311 # Start the specified program.
312 if not self._start_program():
313 return
315 try:
316 # Prepare for running the main loop.
317 self._root.createfilehandler(
318 self._fd, tkinter.READABLE,
319 lambda fd, mask: self._pty_callback())
320 self._root.bind('<Key>', self._tk_key)
321 self._root.bind('<<Quit>>', lambda e: self._quit_gui_mainloop())
322 self._root.protocol('WM_DELETE_WINDOW', self._quit_gui_mainloop)
324 # Run the main loop.
325 try:
326 self._root.mainloop()
327 except self._TerminalConnectionException as e:
328 print("{}.".format(e), file=sys.stderr)
330 self._root.deletefilehandler(self._fd)
331 finally:
332 # Finalize the run of the child program.
333 self._finalize_program()
335 def _quit_gui_mainloop(self):
336 """Exit the tkinter's main loop."""
338 assert self._mode == self.MODE_RUN or self._mode == self.MODE_RECORD
339 self._root.quit()
341 def _pty_callback(self):
343 Process a data event from the pseudo-terminal. Returns True when the
344 connection to the pseudo-terminal was closed, False otherwise.
345 Exception _TerminalConnectionException is raised if the read of the new
346 data from the pseudo-terminal fails.
349 closed = False
350 try:
351 char = os.read(self._fd, 1)
352 except OSError as e:
353 if e.errno == errno.EIO:
354 closed = True
355 else:
356 raise self._TerminalConnectionException(
357 "Error reading from file descriptor '{}' that is "
358 "connected to the child's controlling terminal: "
359 "{}".format(self._fd, e))
361 # Check whether the descriptor referring to the pseudo-terminal slave
362 # has been closed or end of file was reached.
363 if closed or len(char) == 0:
364 if self._root:
365 self._root.quit()
366 return True
368 self._charbuf += char
370 if self._handle_sequence(self._charbuf):
371 self._charbuf = b''
372 else:
373 # print("Unmatched {}.".format(self._charbuf), file=sys.stderr)
374 pass
375 return False
377 def _send_key(self, chars, name):
379 Write the specified characters that represent one key to the
380 pseudo-terminal. If the recording mode is enabled then the specified
381 key name is recorded in the test playbook. Exception
382 _TerminalConnectionException is raised if the write to the
383 pseudo-terminal fails.
386 if self._mode == self.MODE_RECORD:
387 # Record the key.
388 action_e = ElementTree.SubElement(self._test_e, 'action')
389 action_e.set('key', name)
390 print("Recorded key '{}'.".format(name))
392 # Send the key to the terminal.
393 try:
394 os.write(self._fd, str.encode(chars))
395 except OSError as e:
396 raise self._TerminalConnectionException(
397 "Error writing characters '{}' to file descriptor '{}' that "
398 "is connected to the child's controlling terminal: "
399 "{}".format(chars, self._fd, e))
401 def _handle_sequence(self, seq):
403 Process a byte sequence received from the pseudo-terminal. Returns True
404 when the sequence was recognized and successfully handled, False
405 otherwise.
408 if re.fullmatch(b'[^\x01-\x1f]+', seq):
409 try:
410 uchar = seq.decode('utf-8')
411 self._print_char(
412 TermChar(uchar, self._attr, self._fgcolor, self._bgcolor))
413 return True
414 except UnicodeError:
415 # Continue on the assumption that it is not yet a complete
416 # character. This assumption is wrong if the received text is
417 # actually malformed.
418 return False
420 if seq == b'\x07':
421 # Bell.
422 if self._root:
423 self._root.bell()
424 return True
425 if seq == b'\x08':
426 # Backspace non-destructively.
427 self._cur_x -= 1
428 return True
429 if seq == b'\x0d':
430 # Go to beginning of line.
431 self._cur_x = 0
432 return True
433 if seq == b'\x0a':
434 # Move cursor down one line.
435 self._cursor_down()
436 return True
438 # Controls beginning with ESC.
440 # Control sequences.
441 match = re.fullmatch(b'\x1b\\[([0-9]+)@', seq)
442 if match:
443 # Insert blank characters.
444 self._insert_blanks(int(match.group(1)))
445 return True
446 if seq == b'\x1b[H':
447 # Set cursor position to the default (top left).
448 self._cur_y = 0
449 self._cur_x = 0
450 return True
451 match = re.fullmatch(b'\x1b\\[([0-9]+);([0-9]+)H', seq)
452 if match:
453 # Set cursor position to (y,x).
454 self._cur_y = int(match.group(1))
455 self._cur_x = int(match.group(2))
456 return True
457 if self._charbuf == b'\x1b[K':
458 # Erase in line to right.
459 for x in range(self._cur_x, COLUMNS):
460 self._print_char_at(self._cur_y, x, TermChar())
461 return True
462 if seq == b'\x1b[2J':
463 # Erase display completely.
464 self._erase_all()
465 return True
466 if seq == b'\x1b[m':
467 # Normal character attribute (all attributes off).
468 self._attr = ATTR_NORMAL
469 return True
470 if seq == b'\x1b[7m':
471 # Inverse character attribute.
472 self._attr |= ATTR_REVERSE
473 return True
474 match = re.fullmatch(b'\x1b\\[3([0-9]+)m', seq)
475 if match:
476 # Set foreground color.
477 color = int(match.group(1))
478 if color in COLORS:
479 self._fgcolor = color
480 return True
481 return False
482 match = re.fullmatch(b'\x1b\\[4([0-9]+)m', seq)
483 if match:
484 # Set background color.
485 color = int(match.group(1))
486 if color in COLORS:
487 self._bgcolor = color
488 return True
489 return False
490 if seq == b'\x1b[?25l':
491 # Hide cursor.
492 return True
494 return False
496 def _cursor_down(self):
498 Move the screen cursor one line down. The screen is scrolled if the
499 cursor points to the last line.
502 if self._cur_y < ROWS - 1:
503 self._cur_y += 1
504 else:
505 assert self._cur_y == ROWS - 1
507 # At the last line of the terminal, scroll up the screen.
508 del self._screen[0]
509 self._screen.append([TermChar() for x in range(COLUMNS)])
511 if self._root:
512 self._text.config(state=tkinter.NORMAL)
513 self._text.delete('1.0', '2.0')
514 self._text.insert(tkinter.END, "\n" + " " * COLUMNS)
515 self._text.config(state=tkinter.DISABLED)
517 def _erase_all(self):
518 """Completely clear the terminal's screen."""
520 self._screen = [[TermChar() for x in range(COLUMNS)]
521 for y in range(ROWS)]
523 if self._root:
524 self._text.config(state=tkinter.NORMAL)
525 self._text.delete('1.0', tkinter.END)
526 self._text.insert('1.0', "\n".join([" " * COLUMNS] * ROWS))
527 self._text.config(state=tkinter.DISABLED)
529 def _insert_blanks(self, w):
531 Replace the specified number of characters on the current screen line
532 with blanks.
535 del self._screen[self._cur_y][-w:]
536 pre = self._screen[self._cur_y][:self._cur_x]
537 post = self._screen[self._cur_y][self._cur_x:]
538 self._screen[self._cur_y] = pre + [TermChar() for x in range(w)] + post
540 if self._root:
541 self._text.config(state=tkinter.NORMAL)
542 self._text.delete('{}.end-{}c'.format(self._cur_y + 1, w),
543 '{}.end'.format(self._cur_y + 1))
544 self._text.insert('{}.{}'.format(self._cur_y + 1, self._cur_x),
545 " " * w)
546 self._text.config(state=tkinter.DISABLED)
548 def _print_char_at(self, y, x, char):
549 """Output one character on the screen at the specified coordinates."""
551 # Record the character in the internal screen representation.
552 self._screen[y][x] = char
554 if self._root:
555 # Add the character to the terminal text widget.
556 self._text.config(state=tkinter.NORMAL)
557 pos = '{}.{}'.format(y + 1, x)
558 self._text.delete(pos)
560 tag = 'tag_{}-{}'.format(char.get_tag_foreground(),
561 char.get_tag_background())
562 self._text.insert(pos, char.char, tag)
563 self._text.config(state=tkinter.DISABLED)
565 def _print_char(self, char):
566 """Output one character on the screen at the cursor position."""
568 self._print_char_at(self._cur_y, self._cur_x, char)
570 # Advance the cursor.
571 self._cur_x += 1
572 if self._cur_x == COLUMNS:
573 self._cur_x = 0
574 self._cursor_down()
576 def _tk_key(self, event):
577 """Process a key pressed by the user."""
579 if len(event.char) != 0:
580 if event.char == CODE_ENTER:
581 self._send_key(event.char, 'Enter')
582 else:
583 self._send_key(event.char, event.char)
584 return
586 # A special key was pressed.
587 if event.keysym == 'F12':
588 self._record_expected_screen()
589 return
590 if event.keysym == 'Prior':
591 self._send_key(CODE_PAGE_UP, 'PageUp')
592 return
593 if event.keysym == 'Next':
594 self._send_key(CODE_PAGE_DOWN, 'PageDown')
595 return
596 match = re.fullmatch('F([0-9]+)', event.keysym)
597 if match:
598 # F1 to F11.
599 fnum = int(match.group(1))
600 if fnum >= 1 and fnum <= len(CODE_FN):
601 self._send_key(CODE_FN[fnum - 1], event.keysym)
602 return
604 print("Unrecognized key {}.".format(event.keysym), file=sys.stderr)
606 def _get_screen_xml(self, screen):
608 Return an ElementTree.Element that represents the current screen
609 content.
612 expect_e = ElementTree.Element('expect')
613 data_e = ElementTree.SubElement(expect_e, 'data')
615 colors = {}
616 new_key = 'a'
618 # Print content of the screen.
619 for y in range(ROWS):
620 line_e = ElementTree.SubElement(data_e, 'line')
621 line_e.text = ''
623 attr = ''
625 for x in range(COLUMNS):
626 term_char = screen[y][x]
628 line_e.text += term_char.char
630 color = (term_char.attr, term_char.fgcolor, term_char.bgcolor)
631 if color == (ATTR_NORMAL, COLOR_DEFAULT, COLOR_DEFAULT):
632 key = ' '
633 elif color in colors:
634 key = colors[color]
635 else:
636 key = new_key
637 colors[color] = key
638 assert new_key != 'z'
639 new_key = chr(ord(new_key) + 1)
640 attr += key
642 # Record any non-default attributes/colors.
643 if attr != ' ' * COLUMNS:
644 attr_e = ElementTree.SubElement(data_e, 'attr')
645 attr_e.text = attr
647 # Record used color schemes.
648 if colors:
649 scheme_e = ElementTree.SubElement(expect_e, 'scheme')
650 for color, key in sorted(colors.items(), key=lambda x: x[1]):
651 attr, fgcolor, bgcolor = color
652 color_e = ElementTree.SubElement(scheme_e, 'color')
653 color_e.set('key', key)
655 attr_str = attr_to_string(attr)
656 if attr_str:
657 color_e.set('attributes', attr_str)
659 fgcolor_str = color_to_string(fgcolor)
660 if fgcolor_str:
661 color_e.set('foreground', fgcolor_str)
663 bgcolor_str = color_to_string(bgcolor)
664 if bgcolor_str:
665 color_e.set('background', bgcolor_str)
667 return expect_e
669 def _record_expected_screen(self):
671 Record the current screen content as an expected screen in the test
672 playbook that is being created.
675 assert self._mode == self.MODE_RUN or self._mode == self.MODE_RECORD
677 if self._mode != self.MODE_RECORD:
678 print("Recording is not enabled.", file=sys.stderr)
679 return
681 expect_e = self._get_screen_xml(self._screen)
682 self._test_e.append(expect_e)
683 print("Recorded expected screen.")
685 # Method _indent_xml() is based on a code from
686 # http://effbot.org/zone/element-lib.htm#prettyprint.
687 def _indent_xml(self, elem, level=0):
689 Indent elements of a given ElementTree so it can be pretty-printed.
692 i = '\n' + '\t' * level
693 if len(elem):
694 if not elem.text or not elem.text.strip():
695 elem.text = i + '\t'
696 for e in elem:
697 self._indent_xml(e, level+1)
698 if not e.tail or not e.tail.strip():
699 e.tail = i
700 if not elem.tail or not elem.tail.strip():
701 elem.tail = i
703 def output_test(self, filename):
705 Output a recorded playbook to a file with the given name. Returns True
706 when the writing of the test data succeeded, False otherwise.
709 assert self._mode == self.MODE_RECORD
711 # Pretty-format the XML tree.
712 self._indent_xml(self._test_e)
714 # Output the test.
715 tree = ElementTree.ElementTree(self._test_e)
716 try:
717 tree.write(filename, 'unicode', True)
718 except Exception as e:
719 print("Failed to write playbook file '{}': {}.".format(
720 filename, e), file=sys.stderr)
721 return False
723 return True
725 class _TestFailure(Exception):
726 """Exception reported when a test failed."""
727 pass
729 def _playbook_key(self, cmd_e):
731 Parse a description of one key action and send the key to the terminal.
732 Exception _TestFailure is raised if the description is malformed or
733 incomplete, exception _TerminalConnectionException can be thrown when
734 communication with the pseudo-terminal fails.
737 assert self._mode == self.MODE_TEST
739 try:
740 key = cmd_e.attrib['key']
741 except KeyError:
742 raise self._TestFailure("Element 'action' is missing required "
743 "attribute 'key'")
745 # Handle simple characters.
746 if len(key) == 1:
747 self._send_key(key, key)
748 return
750 # Handle special keys.
751 if key == 'Enter':
752 self._send_key(CODE_ENTER, key)
753 return
754 if key == 'PageUp':
755 self._send_key(CODE_PAGE_UP, key)
756 return
757 if key == 'PageDown':
758 self._send_key(CODE_PAGE_DOWN, key)
759 match = re.fullmatch('F([0-9]+)', key)
760 if match:
761 # F1 to F11.
762 fnum = int(match.group(1))
763 if fnum >= 1 and fnum <= len(CODE_FN):
764 self._send_key(CODE_FN[fnum - 1], key)
765 return
767 raise self._TestFailure(
768 "Element 'action' specifies unrecognized key '{}'".format(key))
770 def _parse_color_scheme(self, scheme_e):
772 Parse color scheme of one expected screen. Dictionary with
773 {'key': (attr, fgcolor, bgcolor), ...} is returned on success,
774 exception _TestFailure is raised if the description is malformed or
775 incomplete.
778 assert self._mode == self.MODE_TEST
780 colors = {}
781 for color_e in scheme_e:
782 try:
783 key = color_e.attrib['key']
784 except KeyError:
785 raise self._TestFailure(
786 "Element 'color' is missing required attribute 'key'")
788 attr = None
789 if 'attributes' in color_e.attrib:
790 try:
791 attr = color_e.attrib['attributes']
792 except ValueError as e:
793 raise self._TestFailure(
794 "Value of attribute 'attributes' is invalid: "
795 "{}".format(e))
797 fgcolor = None
798 if 'foreground' in color_e.attrib:
799 try:
800 attr = color_e.attrib['foreground']
801 except ValueError as e:
802 raise self._TestFailure(
803 "Value of attribute 'foreground' is invalid: "
804 "{}".format(e))
806 bgcolor = None
807 if 'background' in color_e.attrib:
808 try:
809 attr = color_e.attrib['background']
810 except ValueError as e:
811 raise self._TestFailure(
812 "Value of attribute 'background' is invalid: "
813 "{}".format(e))
815 colors[key] = (attr, fgcolor, bgcolor)
817 return colors
819 def _parse_screen_data(self, data_e, colors):
821 Parse screen lines of one expected screen. Internal screen
822 representation is returned on success, exception _TestFailure is raised
823 if the description is malformed or incomplete.
826 assert self._mode == self.MODE_TEST
828 NEW_LINE = 0
829 NEW_LINE_OR_ATTR = 1
830 state = NEW_LINE
831 line = None
832 expected_screen = []
834 for data_sub_e in data_e:
835 # Do common processing for both states.
836 if data_sub_e.tag == 'line':
837 # Append the previous line.
838 if line:
839 expected_screen.append(line)
840 # Parse the new line.
841 line = [TermChar(char) for char in data_sub_e.text]
843 if state == NEW_LINE and data_sub_e.tag != 'line':
844 raise self._TestFailure("Element '{}' is invalid, expected "
845 "'line'".format(data_sub_e.tag))
847 elif state == NEW_LINE_OR_ATTR:
848 if data_sub_e.tag == 'attr':
849 if len(data_sub_e.text) != len(line):
850 raise self._TestFailure(
851 "Element 'attr' does not match the previous line, "
852 "expected '{}' attribute characters but got "
853 "'{}'".format(len(line), len(data_sub_e.text)))
855 for i, key in enumerate(data_sub_e.text):
856 try:
857 attr, fgcolor, bgcolor = colors[key]
858 except KeyError:
859 raise self._TestFailure("Color attribute '{}' is "
860 "not defined".format(key))
861 line[i].attr = attr
862 line[i].fgcolor = fgcolor
863 line[i].bgcolor = bgcolor
864 elif data_sub_e.tag != 'line':
865 raise self._TestFailure(
866 "Element '{}' is invalid, expected 'line' or "
867 "'attr'".format(data_sub_e.tag))
869 # Append the final line.
870 if line:
871 expected_screen.append(line)
873 return expected_screen
875 def _parse_expected_screen(self, expect_e):
877 Parse a description of one expected screen. Internal screen
878 representation is returned on success, exception _TestFailure is raised
879 if the description is malformed or incomplete.
882 assert self._mode == self.MODE_TEST
884 data_e = None
885 scheme_e = None
886 for sub_e in expect_e:
887 if sub_e.tag == 'data':
888 if data_e:
889 raise self._TestFailure("Element 'expect' contains "
890 "multiple 'data' sub-elements")
891 data_e = sub_e
892 elif sub_e.tag == 'scheme':
893 if scheme_e:
894 raise self._TestFailure("Element 'expect' contains "
895 "multiple 'scheme' sub-elements")
896 scheme_e = sub_e
898 if not data_e:
899 raise self._TestFailure(
900 "Element 'expect' is missing required sub-element 'data'")
902 # Parse the color scheme.
903 if scheme_e:
904 colors = self._parse_color_scheme(scheme_e)
905 else:
906 colors = {}
908 # Parse the screen data.
909 return self._parse_screen_data(data_e, colors)
911 def _report_failed_expectation(self, expected_screen):
913 Report that the expected screen state has not been reached. The output
914 consists of the expected screen, the current screen content, followed
915 by differences between the two screens.
918 assert self._mode == self.MODE_TEST
920 # Print the expected screen. The output is not verbatim as it was
921 # specified in the input file, but instead the screen is printed in the
922 # same way the current screen gets output. This allows to properly show
923 # differences between the two screens.
925 expected_screen_e = self._get_screen_xml(expected_screen)
926 self._indent_xml(expected_screen_e)
927 expected_screen_str = ElementTree.tostring(
928 expected_screen_e, 'unicode')
929 print("Expected (normalized) screen:", file=sys.stderr)
930 print(expected_screen_str, file=sys.stderr)
932 # Print the current screen.
933 current_screen_e = self._get_screen_xml(self._screen)
934 self._indent_xml(current_screen_e)
935 current_screen_str = ElementTree.tostring(current_screen_e, 'unicode')
936 print("Current screen:", file=sys.stderr)
937 print(current_screen_str, file=sys.stderr)
939 # Print the delta.
940 print("Differences:", file=sys.stderr)
941 sys.stderr.writelines(difflib.unified_diff(
942 expected_screen_str.splitlines(keepends=True),
943 current_screen_str.splitlines(keepends=True),
944 fromfile="Expected screen", tofile="Current screen"))
946 def _execute_playbook(self, test_e):
948 Run the main loop and execute the given test playbook. Normal return
949 from the method indicates that the test succeeded. Exception
950 _TestFailure is raised when the test fails and exception
951 _TerminalConnectionException can be thrown when communication with the
952 pseudo-terminal fails.
955 assert self._mode == self.MODE_TEST
957 if test_e.tag != 'test':
958 raise self._TestFailure("Root element '{}' is invalid, expected "
959 "'test'".format(test_e.tag))
960 cmd_iter = iter(test_e)
962 # Start the main loop.
963 with selectors.DefaultSelector() as sel:
964 sel.register(self._fd, selectors.EVENT_READ)
966 expected_screen = None
967 more_commands = True
968 while True:
969 # Process any actions and find an expected screen.
970 while not expected_screen and more_commands:
971 try:
972 cmd_e = next(cmd_iter)
973 if cmd_e.tag == 'action':
974 self._playbook_key(cmd_e)
975 elif cmd_e.tag == 'expect':
976 expected_screen = self._parse_expected_screen(
977 cmd_e)
978 # Stop processing more commands for now and wait
979 # for the expected screen to appear.
980 break
981 else:
982 raise self._TestFailure(
983 "Element '{}' is invalid, expected 'action' "
984 "or 'expect'".format(cmd_e.tag))
985 except StopIteration:
986 # No more commands.
987 more_commands = False
989 # Wait for the expected screen.
990 events = sel.select(CHILD_TIMEOUT)
991 if not events:
992 if expected_screen:
993 self._report_failed_expectation(expected_screen)
994 raise self._TestFailure(
995 "Timeout reached. No event received in the last {} "
996 "second(s)".format(CHILD_TIMEOUT))
998 # Expect only an event on self._fd.
999 assert len(events) == 1
1000 event = events[0]
1001 key, _mask = event
1002 assert key.fd == self._fd
1004 closed = self._pty_callback()
1005 if closed:
1006 if more_commands:
1007 raise self._TestFailure(
1008 "Connection to the terminal was closed but the "
1009 "playbook contains more commands")
1010 break
1012 # Check if the expected screen is present.
1013 if self._screen == expected_screen:
1014 expected_screen = None
1016 def execute_test(self, filename):
1018 Load test data from a given file, start the program under the test and
1019 execute the test playbook. Returns True when the test succeeded, False
1020 otherwise.
1023 assert self._mode == self.MODE_TEST
1025 # Read the test data.
1026 try:
1027 tree = ElementTree.ElementTree(file=filename)
1028 except Exception as e:
1029 print("Failed to read playbook file '{}': {}.".format(filename, e),
1030 file=sys.stderr)
1031 return False
1033 # Start the specified program.
1034 if not self._start_program():
1035 return False
1037 # Execute the test playbook.
1038 res = True
1039 try:
1040 self._execute_playbook(tree.getroot())
1041 except (self._TerminalConnectionException, self._TestFailure) as e:
1042 print("{}.".format(e), file=sys.stderr)
1043 res = False
1044 finally:
1045 # Finalize the run of the child program.
1046 if not self._finalize_program():
1047 res = False
1049 # Return whether the test passed.
1050 return res
1053 def main():
1055 Parse command line arguments and execute the operation that the user
1056 selected. Returns 0 if the operation was successful and a non-zero value
1057 otherwise.
1060 # Parse command line arguments.
1061 parser = argparse.ArgumentParser()
1062 parser.set_defaults(func=None)
1063 subparsers = parser.add_subparsers(dest='command')
1064 subparsers.required = True
1066 program_parser = argparse.ArgumentParser(add_help=False)
1067 program_parser.add_argument('program')
1069 # Create the parser for the 'run' command.
1070 parser_run = subparsers.add_parser(
1071 'run', parents=[program_parser], help="run a command")
1072 parser_run.set_defaults(mode=Term.MODE_RUN)
1074 # Create the parser for the 'record' command.
1075 parser_record = subparsers.add_parser(
1076 'record', parents=[program_parser], help="record a test")
1077 parser_record.set_defaults(mode=Term.MODE_RECORD)
1078 parser_record.add_argument(
1079 '-p', '--playbook', metavar='FILE', required=True,
1080 help="output playbook file")
1082 # Create the parser for the 'test' command.
1083 parser_test = subparsers.add_parser(
1084 'test', parents=[program_parser], help="run a test")
1085 parser_test.set_defaults(mode=Term.MODE_TEST)
1086 parser_test.add_argument(
1087 '-p', '--playbook', metavar='FILE', required=True,
1088 help="input playbook file")
1090 args = parser.parse_args()
1092 tk_root = None
1093 if args.mode in (Term.MODE_RUN, Term.MODE_RECORD):
1094 # Start the terminal GUI.
1095 try:
1096 tk_root = tkinter.Tk()
1097 except tkinter.TclError as e:
1098 print("Failed to initialize GUI: {}.".format(e), file=sys.stderr)
1099 return 1
1101 term = Term(tk_root, args.program, args.mode)
1102 if tk_root:
1103 # Start the GUI main loop.
1104 term.run_gui_mainloop()
1105 else:
1106 # Execute and check the playbook, without running GUI.
1107 ok = term.execute_test(args.playbook)
1108 if ok:
1109 msg = "succeeded"
1110 res = 0
1111 else:
1112 msg = "failed"
1113 res = 1
1115 print("Run of '{}' using playbook '{}' {}.".format(
1116 args.program, args.playbook, msg))
1117 return res
1119 if args.mode == Term.MODE_RECORD:
1120 # Get the recorded test data and write them to a file.
1121 if not term.output_test(args.playbook):
1122 return 1
1124 return 0
1127 if __name__ == '__main__':
1128 sys.exit(main())