From b92b94c43148496e863bf3f8db08994e76bf7964 Mon Sep 17 00:00:00 2001 From: Petr Pavlu Date: Tue, 21 Feb 2017 22:05:14 +0000 Subject: [PATCH] Introduce more bits of the termex testing framework --- tests/termex/termex.py | 1023 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 775 insertions(+), 248 deletions(-) diff --git a/tests/termex/termex.py b/tests/termex/termex.py index d0a0f63..43e6607 100755 --- a/tests/termex/termex.py +++ b/tests/termex/termex.py @@ -17,20 +17,31 @@ # You should have received a copy of the GNU General Public License # along with CenterIM. If not, see . +"""Termex terminal emulator and test framework.""" + import argparse +import difflib import errno import os import pty import re +import selectors import sys -import threading import time import tkinter import xml.etree.ElementTree as ElementTree +# The module relies on selectors.select() to automatically retry the operation +# with a recomputed timeout when it gets interrupted by a signal. This +# behaviour was introduced in Python 3.5. +if sys.hexversion < 0x03050000: + print("This program requires at least Python 3.5.", file=sys.stderr) + sys.exit(1) + ROWS = 24 COLUMNS = 80 TERMINFO = '/path_to_terminfo' +CHILD_TIMEOUT = 5 ATTR_NORMAL = 0 ATTR_REVERSE = 1 @@ -44,30 +55,47 @@ COLOR_MAGENTA = 5 COLOR_CYAN = 6 COLOR_WHITE = 7 COLOR_DEFAULT = 9 -COLORS = (COLOR_BLACK, COLOR_RED, COLOR_GREEN, COLOR_YELLOW, COLOR_BLUE, - COLOR_MAGENTA, COLOR_CYAN, COLOR_WHITE, COLOR_DEFAULT) +COLOR_REGISTER = ((COLOR_BLACK, 'black'), (COLOR_RED, 'red'), + (COLOR_GREEN, 'green'), (COLOR_YELLOW, 'yellow'), + (COLOR_BLUE, 'blue'), (COLOR_MAGENTA, 'magenta'), + (COLOR_CYAN, 'cyan'), (COLOR_WHITE, 'white'), + (COLOR_DEFAULT, 'default')) +COLOR_TO_STRING_MAP = {id_: name for id_, name in COLOR_REGISTER} +STRING_TO_COLOR_MAP = {name: id_ for id_, name in COLOR_REGISTER} +COLORS = {id_ for id_, _ in COLOR_REGISTER} +REAL_COLOR_NAMES = tuple( + name for id_, name in COLOR_REGISTER if id_ != COLOR_DEFAULT) COLOR_DEFAULT_FOREGROUND = COLOR_BLACK COLOR_DEFAULT_BACKGROUND = COLOR_WHITE +CODE_ENTER = '\x0d' +CODE_FN = ('\x1bOP', '\x1bOQ', '\x1bOR', '\x1bOS', '\x1b[15~', '\x1b[17~', + '\x1b[18~', '\x1b[19~', '\x1b[20~', '\x1b[21~', '\x1b[23~') +CODE_PAGE_UP = '\x1b[5~' +CODE_PAGE_DOWN = '\x1b[6~' + def attr_to_string(attr): """Get string representation of given attributes.""" res = [] if (attr & ATTR_REVERSE) != 0: - res.append("reverse") - return "|".join(res) + res.append('reverse') + return '|'.join(res) def string_to_attr(string): - """Convert a string to attributes.""" + """ + Convert a string to attributes. Exception ValueError is raised if some + attribute is invalid. + """ res = ATTR_NORMAL - for attr in string.split("|"): - if attr == "normal": + for attr in string.split('|'): + if attr == 'normal': pass - elif attr == "reverse": + elif attr == 'reverse': res |= ATTR_REVERSE else: raise ValueError("Unrecognized attribute '{}'".format(attr)) @@ -77,48 +105,24 @@ def string_to_attr(string): def color_to_string(color): """Get string representation of a given color.""" - if color == COLOR_BLACK: - return "black" - if color == COLOR_RED: - return "red" - if color == COLOR_GREEN: - return "green" - if color == COLOR_YELLOW: - return "yellow" - if color == COLOR_BLUE: - return "blue" - if color == COLOR_MAGENTA: - return "magenta" - if color == COLOR_CYAN: - return "cyan" - if color == COLOR_WHITE: - return "white" - return None + return COLOR_TO_STRING_MAP[color] def string_to_color(string): - """Convert a string to a color.""" - - if string == "black": - return COLOR_BLACK - if string == "red": - return COLOR_RED - if string == "green": - return COLOR_GREEN - if string == "yellow": - return COLOR_YELLOW - if string == "blue": - return COLOR_BLUE - if string == "magenta": - return COLOR_MAGENTA - if string == "cyan": - return COLOR_CYAN - if string == "white": - return COLOR_WHITE - raise ValueError("Unrecognized color '{}'".format(string)) + """ + Convert a string to a color. Exception ValueError is raised if the color + name is not recognized. + """ + + try: + return STRING_TO_COLOR_MAP[string] + except KeyError: + raise ValueError("Unrecognized color '{}'".format(string)) class TermChar: + """On-screen character.""" + def __init__(self, char=" ", attr=ATTR_NORMAL, fgcolor=COLOR_DEFAULT, bgcolor=COLOR_DEFAULT): self.char = char @@ -126,17 +130,35 @@ class TermChar: self.fgcolor = fgcolor self.bgcolor = bgcolor + def __eq__(self, other): + return self.__dict__ == other.__dict__ + def _get_translated_fgcolor(self): + """ + Return the foreground color. If the current color is COLOR_DEFAULT then + COLOR_DEFAULT_FOREGROUND is returned. + """ + if self.fgcolor == COLOR_DEFAULT: return COLOR_DEFAULT_FOREGROUND return self.fgcolor def _get_translated_bgcolor(self): + """ + Return the background color. If the current color is COLOR_DEFAULT then + COLOR_DEFAULT_BACKGROUND is returned. + """ + if self.bgcolor == COLOR_DEFAULT: return COLOR_DEFAULT_BACKGROUND return self.bgcolor def get_tag_foreground(self): + """ + Return a name of the final foreground color that should be used to + display the character on the screen. + """ + if self.attr & ATTR_REVERSE: color = self._get_translated_bgcolor() else: @@ -144,6 +166,11 @@ class TermChar: return color_to_string(color) def get_tag_background(self): + """ + Return a name of the final background color that should be used to + display the character on the screen. + """ + if self.attr & ATTR_REVERSE: color = self._get_translated_fgcolor() else: @@ -151,27 +178,26 @@ class TermChar: return color_to_string(color) -class Term(tkinter.Frame): - def __init__(self, root, command, record=False): - super().__init__(root) - self._root = root - self._root.title("Termex") +class Term: + """Termex terminal emulator.""" - self._record = record - if self._record: - self._test_e = ElementTree.Element("test") + MODE_RUN = 0 + MODE_RECORD = 1 + MODE_TEST = 2 - self._charbuf = b'' - self._fd_lock = threading.Lock() - self._initialized = threading.Event() + class _TerminalConnectionException(Exception): + """ + Exception reported when communication with the pseudo-terminal fails. + """ + pass - # Fork and connect the child's controlling terminal to a - # pseudo-terminal. - env = {'PATH': '/bin:/usr/bin', 'TERM': 'termex', 'TERMINFO': TERMINFO, - 'LC_ALL': 'en_US.UTF-8'} - pid, self._fd = pty.fork() - if pid == 0: - os.execle(command, command, env) + def __init__(self, root, program, mode): + self._root = root + self._program = program + self._mode = mode + + self._child_pid = None + self._fd = None self._screen = None self._cur_y = 0 @@ -179,60 +205,206 @@ class Term(tkinter.Frame): self._attr = ATTR_NORMAL self._fgcolor = COLOR_DEFAULT self._bgcolor = COLOR_DEFAULT + self._charbuf = b'' + + # Initialize the GUI if requested. + if self._root: + self._root.title("Termex") + self._frame = tkinter.Frame(self._root) + + self._text = tkinter.Text(self._root, height=ROWS, width=COLUMNS) + self._text.config( + foreground=color_to_string(COLOR_DEFAULT_FOREGROUND), + background=color_to_string(COLOR_DEFAULT_BACKGROUND)) + self._text.pack() + + # Configure tag values. + for fgcolor_str in REAL_COLOR_NAMES: + for bgcolor_str in REAL_COLOR_NAMES: + tag = 'tag_{}-{}'.format(fgcolor_str, bgcolor_str) + self._text.tag_config(tag, foreground=fgcolor_str, + background=bgcolor_str) - self._text = tkinter.Text(self._root, height=ROWS, width=COLUMNS) - self._text.config(foreground=color_to_string(COLOR_DEFAULT_FOREGROUND), - background=color_to_string(COLOR_DEFAULT_BACKGROUND)) - self._text.pack() self._erase_all() - self._root.createfilehandler(self._fd, tkinter.READABLE, - self._pty_callback) - self._root.bind('', self._key) - self._root.bind('<>', lambda e: self._quit()) - self._root.protocol('WM_DELETE_WINDOW', self._quit) - self._root.after(0, self._term_initialized) + if self._mode == self.MODE_RECORD: + self._test_e = ElementTree.Element('test') + + def _start_program(self): + """ + Fork, connect the child's controlling terminal to a pseudo-terminal and + start the selected child program. + + Parent behaviour: Returns True when the fork was successful, False + otherwise. Note that the returned value does not provide information + whether the exec call in the child process was successful or not. That + must be determined by attempting communication with the child. + + Child behaviour: Execs the selected program and does not return if the + call was successful, returns False otherwise. + """ + + # Fork and connect the child's controlling terminal to a + # pseudo-terminal. + try: + self._child_pid, self._fd = pty.fork() + except OSError as e: + print("Fork to run '{}' failed: {}".format(self._program, e), + file=sys.stderr) + return False + if self._child_pid == 0: + try: + env = {'PATH': '/bin:/usr/bin', 'TERM': 'termex', + 'TERMINFO': TERMINFO, 'LC_ALL': 'en_US.UTF-8'} + os.execle(self._program, self._program, env) + except OSError as e: + print("Failed to execute '{}': {}".format(self._program, e), + file=sys.stderr) + return False + + return True - def run_mainloop(self): - # Run the main loop. - self._root.mainloop() + def _finalize_program(self): + """ + Close the connection to the pseudo-terminal and wait for the child + program to complete. Returns True when the connection was successfully + closed and the child completed in the timeout limit, False otherwise. + """ - self._fd_lock.acquire() - self._root.deletefilehandler(self._fd) - os.close(self._fd) - self._fd = -1 - self._fd_lock.release() + res = True + + # Close the file descriptor that is connected to the child's + # controlling terminal. + try: + os.close(self._fd) + except OSError as e: + print("Failed to close file descriptor '{}' that is connected to " + "the child's controlling terminal: {}.".format(self._fd, e), + file=sys.stderr) + res = False # Wait for the child to finish. It should terminate now that its input # was closed. - os.wait() + for _ in range(CHILD_TIMEOUT): + try: + pid, _status = os.waitpid(self._child_pid, os.WNOHANG) + except OSError as e: + print("Failed to wait on child '{}' to complete: " + "{}.".format(pid, e), file=sys.stderr) + res = False + break + if pid != 0: + break + time.sleep(1) + else: + print("Child '{}' has not completed.".format(self._child_pid), + file=sys.stderr) + res = False + + return res + + def run_gui_mainloop(self): + """Start the selected child program and run the tkinter's main loop.""" - def close(self): - # FIXME Do not send the event if the main loop already exited. - self._root.event_generate('<>', when='tail') + assert self._mode == self.MODE_RUN or self._mode == self.MODE_RECORD - def _quit(self): + # Start the specified program. + if not self._start_program(): + return + + try: + # Prepare for running the main loop. + self._root.createfilehandler( + self._fd, tkinter.READABLE, + lambda fd, mask: self._pty_callback()) + self._root.bind('', self._tk_key) + self._root.bind('<>', lambda e: self._quit_gui_mainloop()) + self._root.protocol('WM_DELETE_WINDOW', self._quit_gui_mainloop) + + # Run the main loop. + try: + self._root.mainloop() + except self._TerminalConnectionException as e: + print("{}.".format(e), file=sys.stderr) + + self._root.deletefilehandler(self._fd) + finally: + # Finalize the run of the child program. + self._finalize_program() + + def _quit_gui_mainloop(self): + """Exit the tkinter's main loop.""" + + assert self._mode == self.MODE_RUN or self._mode == self.MODE_RECORD self._root.quit() - def _pty_callback(self, f, mask): + def _pty_callback(self): + """ + Process a data event from the pseudo-terminal. Returns True when the + connection to the pseudo-terminal was closed, False otherwise. + Exception _TerminalConnectionException is raised if the read of the new + data from the pseudo-terminal fails. + """ + + closed = False try: - char = os.read(f, 1) + char = os.read(self._fd, 1) except OSError as e: if e.errno == errno.EIO: - print("Connection to terminal lost.") + closed = True + else: + raise self._TerminalConnectionException( + "Error reading from file descriptor '{}' that is " + "connected to the child's controlling terminal: " + "{}".format(self._fd, e)) + + # Check whether the descriptor referring to the pseudo-terminal slave + # has been closed or end of file was reached. + if closed or len(char) == 0: + if self._root: self._root.quit() - return - raise + return True self._charbuf += char if self._handle_sequence(self._charbuf): self._charbuf = b'' else: - print("Unmatched {}.".format(self._charbuf)) + # print("Unmatched {}.".format(self._charbuf), file=sys.stderr) pass + return False + + def _send_key(self, chars, name): + """ + Write the specified characters that represent one key to the + pseudo-terminal. If the recording mode is enabled then the specified + key name is recorded in the test playbook. Exception + _TerminalConnectionException is raised if the write to the + pseudo-terminal fails. + """ + + if self._mode == self.MODE_RECORD: + # Record the key. + action_e = ElementTree.SubElement(self._test_e, 'action') + action_e.set('key', name) + print("Recorded key '{}'.".format(name)) + + # Send the key to the terminal. + try: + os.write(self._fd, str.encode(chars)) + except OSError as e: + raise self._TerminalConnectionException( + "Error writing characters '{}' to file descriptor '{}' that " + "is connected to the child's controlling terminal: " + "{}".format(chars, self._fd, e)) def _handle_sequence(self, seq): + """ + Process a byte sequence received from the pseudo-terminal. Returns True + when the sequence was recognized and successfully handled, False + otherwise. + """ + if re.fullmatch(b'[^\x01-\x1f]+', seq): try: uchar = seq.decode('utf-8') @@ -247,7 +419,8 @@ class Term(tkinter.Frame): if seq == b'\x07': # Bell. - self.bell() + if self._root: + self._root.bell() return True if seq == b'\x08': # Backspace non-destructively. @@ -321,6 +494,11 @@ class Term(tkinter.Frame): return False def _cursor_down(self): + """ + Move the screen cursor one line down. The screen is scrolled if the + cursor points to the last line. + """ + if self._cur_y < ROWS - 1: self._cur_y += 1 else: @@ -330,54 +508,63 @@ class Term(tkinter.Frame): del self._screen[0] self._screen.append([TermChar() for x in range(COLUMNS)]) - self._text.config(state=tkinter.NORMAL) - self._text.delete('1.0', '2.0') - self._text.insert(tkinter.END, "\n" + " " * COLUMNS) - self._text.config(state=tkinter.DISABLED) + if self._root: + self._text.config(state=tkinter.NORMAL) + self._text.delete('1.0', '2.0') + self._text.insert(tkinter.END, "\n" + " " * COLUMNS) + self._text.config(state=tkinter.DISABLED) def _erase_all(self): + """Completely clear the terminal's screen.""" + self._screen = [[TermChar() for x in range(COLUMNS)] for y in range(ROWS)] - self._text.config(state=tkinter.NORMAL) - self._text.delete('1.0', tkinter.END) - self._text.insert('1.0', "\n".join([" " * COLUMNS] * ROWS)) - self._text.config(state=tkinter.DISABLED) + + if self._root: + self._text.config(state=tkinter.NORMAL) + self._text.delete('1.0', tkinter.END) + self._text.insert('1.0', "\n".join([" " * COLUMNS] * ROWS)) + self._text.config(state=tkinter.DISABLED) def _insert_blanks(self, w): + """ + Replace the specified number of characters on the current screen line + with blanks. + """ + del self._screen[self._cur_y][-w:] pre = self._screen[self._cur_y][:self._cur_x] post = self._screen[self._cur_y][self._cur_x:] self._screen[self._cur_y] = pre + [TermChar() for x in range(w)] + post - self._text.config(state=tkinter.NORMAL) - self._text.delete('{}.end-{}c'.format(self._cur_y + 1, w), - '{}.end'.format(self._cur_y + 1)) - self._text.insert('{}.{}'.format(self._cur_y + 1, self._cur_x), - " " * w) - self._text.config(state=tkinter.DISABLED) + if self._root: + self._text.config(state=tkinter.NORMAL) + self._text.delete('{}.end-{}c'.format(self._cur_y + 1, w), + '{}.end'.format(self._cur_y + 1)) + self._text.insert('{}.{}'.format(self._cur_y + 1, self._cur_x), + " " * w) + self._text.config(state=tkinter.DISABLED) def _print_char_at(self, y, x, char): + """Output one character on the screen at the specified coordinates.""" + # Record the character in the internal screen representation. self._screen[y][x] = char - # Add the character to the terminal text widget. - self._text.config(state=tkinter.NORMAL) - pos = '{}.{}'.format(y + 1, x) - self._text.delete(pos) - - # Configure tag. There is one tag for each used combination of - # foreground+background color. The tags are currently never deleted. - # Their maximum count is colors# * colors#, which is 256. - foreground = char.get_tag_foreground() - background = char.get_tag_background() - tag = "tag_{}-{}".format(foreground, background) - self._text.tag_config(tag, foreground=foreground, - background=background) + if self._root: + # Add the character to the terminal text widget. + self._text.config(state=tkinter.NORMAL) + pos = '{}.{}'.format(y + 1, x) + self._text.delete(pos) - self._text.insert(pos, char.char, tag) - self._text.config(state=tkinter.DISABLED) + tag = 'tag_{}-{}'.format(char.get_tag_foreground(), + char.get_tag_background()) + self._text.insert(pos, char.char, tag) + self._text.config(state=tkinter.DISABLED) def _print_char(self, char): + """Output one character on the screen at the cursor position.""" + self._print_char_at(self._cur_y, self._cur_x, char) # Advance the cursor. @@ -386,216 +573,556 @@ class Term(tkinter.Frame): self._cur_x = 0 self._cursor_down() - def _key(self, event): + def _tk_key(self, event): + """Process a key pressed by the user.""" + if len(event.char) != 0: - if event.char == '\x0d': - self._send_key('Enter', event.char) + if event.char == CODE_ENTER: + self._send_key(event.char, 'Enter') else: self._send_key(event.char, event.char) - else: - # A special key was pressed. - if event.keysym == 'F1': - self._send_key(event.keysym, '\x1bOP') - elif event.keysym == 'F2': - self._send_key(event.keysym, '\x1bOQ') - elif event.keysym == 'F3': - self._send_key(event.keysym, '\x1bOR') - elif event.keysym == 'F4': - self._send_key(event.keysym, '\x1bOS') - elif event.keysym == 'F5': - self._send_key(event.keysym, '\x1b[15~') - elif event.keysym == 'F6': - self._send_key(event.keysym, '\x1b[17~') - elif event.keysym == 'F7': - self._send_key(event.keysym, '\x1b[18~') - elif event.keysym == 'F8': - self._send_key(event.keysym, '\x1b[19~') - elif event.keysym == 'F9': - self._send_key(event.keysym, '\x1b[20~') - elif event.keysym == 'F10': - self._send_key(event.keysym, '\x1b[21~') - elif event.keysym == 'F11': - self._send_key(event.keysym, '\x1b[23~') - elif event.keysym == 'F12': - self.record_expected_screen() - elif event.keysym == 'Prior': - self._send_key('PageUp', '\x1b[5~') - elif event.keysym == 'Next': - self._send_key('PageDown', '\x1b[6~') - else: - print("Unrecognized key {}.".format(event.keysym)) - - def _term_initialized(self): - self._initialized.set() - - def wait_for_init(self): - # Wait for the main loop to start. - self._initialized.wait() + return - def _send_key(self, name, chars): - if self._record: - # Record the key. - action_e = ElementTree.SubElement(self._test_e, "action") - action_e.set("key", name) - print("Recorded key '{}'.".format(name)) + # A special key was pressed. + if event.keysym == 'F12': + self._record_expected_screen() + return + if event.keysym == 'Prior': + self._send_key(CODE_PAGE_UP, 'PageUp') + return + if event.keysym == 'Next': + self._send_key(CODE_PAGE_DOWN, 'PageDown') + return + match = re.fullmatch('F([0-9]+)', event.keysym) + if match: + # F1 to F11. + fnum = int(match.group(1)) + if fnum >= 1 and fnum <= len(CODE_FN): + self._send_key(CODE_FN[fnum - 1], event.keysym) + return - # Send the key to the terminal. - self._fd_lock.acquire() - if self._fd != -1: - os.write(self._fd, str.encode(chars)) - else: - print("Error writing to terminal because it is closed.", - file=sys.stderr) - self._fd_lock.release() + print("Unrecognized key {}.".format(event.keysym), file=sys.stderr) - def record_expected_screen(self): - if not self._record: - print("Recording is not enabled.", file=sys.stderr) - return + def _get_screen_xml(self, screen): + """ + Return an ElementTree.Element that represents the current screen + content. + """ - expect_e = ElementTree.SubElement(self._test_e, "expect") - data_e = ElementTree.SubElement(expect_e, "data") + expect_e = ElementTree.Element('expect') + data_e = ElementTree.SubElement(expect_e, 'data') colors = {} - new_key = "a" + new_key = 'a' # Print content of the screen. for y in range(ROWS): - line_e = ElementTree.SubElement(data_e, "line") - line_e.text = "" + line_e = ElementTree.SubElement(data_e, 'line') + line_e.text = '' - attr = "" + attr = '' for x in range(COLUMNS): - term_char = self._screen[y][x] + term_char = screen[y][x] line_e.text += term_char.char color = (term_char.attr, term_char.fgcolor, term_char.bgcolor) if color == (ATTR_NORMAL, COLOR_DEFAULT, COLOR_DEFAULT): - key = " " + key = ' ' elif color in colors: key = colors[color] else: key = new_key colors[color] = key - assert new_key != "z" + assert new_key != 'z' new_key = chr(ord(new_key) + 1) attr += key # Record any non-default attributes/colors. - if attr != " " * COLUMNS: - attr_e = ElementTree.SubElement(data_e, "attr") + if attr != ' ' * COLUMNS: + attr_e = ElementTree.SubElement(data_e, 'attr') attr_e.text = attr # Record used color schemes. if colors: - scheme_e = ElementTree.SubElement(expect_e, "scheme") + scheme_e = ElementTree.SubElement(expect_e, 'scheme') for color, key in sorted(colors.items(), key=lambda x: x[1]): attr, fgcolor, bgcolor = color - color_e = ElementTree.SubElement(scheme_e, "color") - color_e.set("key", key) + color_e = ElementTree.SubElement(scheme_e, 'color') + color_e.set('key', key) attr_str = attr_to_string(attr) if attr_str: - color_e.set("attributes", attr_str) + color_e.set('attributes', attr_str) fgcolor_str = color_to_string(fgcolor) if fgcolor_str: - color_e.set("foreground", fgcolor_str) + color_e.set('foreground', fgcolor_str) bgcolor_str = color_to_string(bgcolor) if bgcolor_str: - color_e.set("background", bgcolor_str) + color_e.set('background', bgcolor_str) + + return expect_e + def _record_expected_screen(self): + """ + Record the current screen content as an expected screen in the test + playbook that is being created. + """ + + assert self._mode == self.MODE_RUN or self._mode == self.MODE_RECORD + + if self._mode != self.MODE_RECORD: + print("Recording is not enabled.", file=sys.stderr) + return + + expect_e = self._get_screen_xml(self._screen) + self._test_e.append(expect_e) print("Recorded expected screen.") + # Method _indent_xml() is based on a code from + # http://effbot.org/zone/element-lib.htm#prettyprint. + def _indent_xml(self, elem, level=0): + """ + Indent elements of a given ElementTree so it can be pretty-printed. + """ + + i = '\n' + '\t' * level + if len(elem): + if not elem.text or not elem.text.strip(): + elem.text = i + '\t' + for e in elem: + self._indent_xml(e, level+1) + if not e.tail or not e.tail.strip(): + e.tail = i + if not elem.tail or not elem.tail.strip(): + elem.tail = i + def output_test(self, filename): - assert self._record - - # Function indent_xml() is based on a code from - # http://effbot.org/zone/element-lib.htm#prettyprint. - def indent_xml(elem, level=0): - """Prepare a given ElementTree element for pretty-printing.""" - - i = "\n" + "\t" * level - if len(elem): - if not elem.text or not elem.text.strip(): - elem.text = i + "\t" - for e in elem: - indent_xml(e, level+1) - if not e.tail or not e.tail.strip(): - e.tail = i - if not elem.tail or not elem.tail.strip(): - elem.tail = i + """ + Output a recorded playbook to a file with the given name. Returns True + when the writing of the test data succeeded, False otherwise. + """ + + assert self._mode == self.MODE_RECORD # Pretty-format the XML tree. - indent_xml(self._test_e) + self._indent_xml(self._test_e) # Output the test. tree = ElementTree.ElementTree(self._test_e) - tree.write(filename, 'unicode', True) + try: + tree.write(filename, 'unicode', True) + except Exception as e: + print("Failed to write playbook file '{}': {}.".format( + filename, e), file=sys.stderr) + return False + return True -def command_run(args): - """Handle the 'run' command.""" + class _TestFailure(Exception): + """Exception reported when a test failed.""" + pass - # Start the terminal GUI. - term = Term(tkinter.Tk(), args.prog) - term.run_mainloop() + def _playbook_key(self, cmd_e): + """ + Parse a description of one key action and send the key to the terminal. + Exception _TestFailure is raised if the description is malformed or + incomplete, exception _TerminalConnectionException can be thrown when + communication with the pseudo-terminal fails. + """ + assert self._mode == self.MODE_TEST -def command_record(args): - """Handle the 'record' command.""" + try: + key = cmd_e.attrib['key'] + except KeyError: + raise self._TestFailure("Element 'action' is missing required " + "attribute 'key'") + + # Handle simple characters. + if len(key) == 1: + self._send_key(key, key) + return - # Start the terminal in the record mode. - term = Term(tkinter.Tk(), args.prog, True) - term.run_mainloop() + # Handle special keys. + if key == 'Enter': + self._send_key(CODE_ENTER, key) + return + if key == 'PageUp': + self._send_key(CODE_PAGE_UP, key) + return + if key == 'PageDown': + self._send_key(CODE_PAGE_DOWN, key) + match = re.fullmatch('F([0-9]+)', key) + if match: + # F1 to F11. + fnum = int(match.group(1)) + if fnum >= 1 and fnum <= len(CODE_FN): + self._send_key(CODE_FN[fnum - 1], key) + return - # Get the recorded test data and write them to a file. - term.output_test(args.playbook) + raise self._TestFailure( + "Element 'action' specifies unrecognized key '{}'".format(key)) + def _parse_color_scheme(self, scheme_e): + """ + Parse color scheme of one expected screen. Dictionary with + {'key': (attr, fgcolor, bgcolor), ...} is returned on success, + exception _TestFailure is raised if the description is malformed or + incomplete. + """ -def command_test(args): - """Handle the 'test' command.""" + assert self._mode == self.MODE_TEST - pass + colors = {} + for color_e in scheme_e: + try: + key = color_e.attrib['key'] + except KeyError: + raise self._TestFailure( + "Element 'color' is missing required attribute 'key'") + + attr = None + if 'attributes' in color_e.attrib: + try: + attr = color_e.attrib['attributes'] + except ValueError as e: + raise self._TestFailure( + "Value of attribute 'attributes' is invalid: " + "{}".format(e)) + + fgcolor = None + if 'foreground' in color_e.attrib: + try: + attr = color_e.attrib['foreground'] + except ValueError as e: + raise self._TestFailure( + "Value of attribute 'foreground' is invalid: " + "{}".format(e)) + + bgcolor = None + if 'background' in color_e.attrib: + try: + attr = color_e.attrib['background'] + except ValueError as e: + raise self._TestFailure( + "Value of attribute 'background' is invalid: " + "{}".format(e)) + + colors[key] = (attr, fgcolor, bgcolor) + + return colors + + def _parse_screen_data(self, data_e, colors): + """ + Parse screen lines of one expected screen. Internal screen + representation is returned on success, exception _TestFailure is raised + if the description is malformed or incomplete. + """ + + assert self._mode == self.MODE_TEST + + NEW_LINE = 0 + NEW_LINE_OR_ATTR = 1 + state = NEW_LINE + line = None + expected_screen = [] + + for data_sub_e in data_e: + # Do common processing for both states. + if data_sub_e.tag == 'line': + # Append the previous line. + if line: + expected_screen.append(line) + # Parse the new line. + line = [TermChar(char) for char in data_sub_e.text] + + if state == NEW_LINE and data_sub_e.tag != 'line': + raise self._TestFailure("Element '{}' is invalid, expected " + "'line'".format(data_sub_e.tag)) + + elif state == NEW_LINE_OR_ATTR: + if data_sub_e.tag == 'attr': + if len(data_sub_e.text) != len(line): + raise self._TestFailure( + "Element 'attr' does not match the previous line, " + "expected '{}' attribute characters but got " + "'{}'".format(len(line), len(data_sub_e.text))) + + for i, key in enumerate(data_sub_e.text): + try: + attr, fgcolor, bgcolor = colors[key] + except KeyError: + raise self._TestFailure("Color attribute '{}' is " + "not defined".format(key)) + line[i].attr = attr + line[i].fgcolor = fgcolor + line[i].bgcolor = bgcolor + elif data_sub_e.tag != 'line': + raise self._TestFailure( + "Element '{}' is invalid, expected 'line' or " + "'attr'".format(data_sub_e.tag)) + + # Append the final line. + if line: + expected_screen.append(line) + + return expected_screen + + def _parse_expected_screen(self, expect_e): + """ + Parse a description of one expected screen. Internal screen + representation is returned on success, exception _TestFailure is raised + if the description is malformed or incomplete. + """ + + assert self._mode == self.MODE_TEST + + data_e = None + scheme_e = None + for sub_e in expect_e: + if sub_e.tag == 'data': + if data_e: + raise self._TestFailure("Element 'expect' contains " + "multiple 'data' sub-elements") + data_e = sub_e + elif sub_e.tag == 'scheme': + if scheme_e: + raise self._TestFailure("Element 'expect' contains " + "multiple 'scheme' sub-elements") + scheme_e = sub_e + + if not data_e: + raise self._TestFailure( + "Element 'expect' is missing required sub-element 'data'") + + # Parse the color scheme. + if scheme_e: + colors = self._parse_color_scheme(scheme_e) + else: + colors = {} + + # Parse the screen data. + return self._parse_screen_data(data_e, colors) + + def _report_failed_expectation(self, expected_screen): + """ + Report that the expected screen state has not been reached. The output + consists of the expected screen, the current screen content, followed + by differences between the two screens. + """ + + assert self._mode == self.MODE_TEST + + # Print the expected screen. The output is not verbatim as it was + # specified in the input file, but instead the screen is printed in the + # same way the current screen gets output. This allows to properly show + # differences between the two screens. + + expected_screen_e = self._get_screen_xml(expected_screen) + self._indent_xml(expected_screen_e) + expected_screen_str = ElementTree.tostring( + expected_screen_e, 'unicode') + print("Expected (normalized) screen:", file=sys.stderr) + print(expected_screen_str, file=sys.stderr) + + # Print the current screen. + current_screen_e = self._get_screen_xml(self._screen) + self._indent_xml(current_screen_e) + current_screen_str = ElementTree.tostring(current_screen_e, 'unicode') + print("Current screen:", file=sys.stderr) + print(current_screen_str, file=sys.stderr) + + # Print the delta. + print("Differences:", file=sys.stderr) + sys.stderr.writelines(difflib.unified_diff( + expected_screen_str.splitlines(keepends=True), + current_screen_str.splitlines(keepends=True), + fromfile="Expected screen", tofile="Current screen")) + + def _execute_playbook(self, test_e): + """ + Run the main loop and execute the given test playbook. Normal return + from the method indicates that the test succeeded. Exception + _TestFailure is raised when the test fails and exception + _TerminalConnectionException can be thrown when communication with the + pseudo-terminal fails. + """ + + assert self._mode == self.MODE_TEST + + if test_e.tag != 'test': + raise self._TestFailure("Root element '{}' is invalid, expected " + "'test'".format(test_e.tag)) + cmd_iter = iter(test_e) + + # Start the main loop. + with selectors.DefaultSelector() as sel: + sel.register(self._fd, selectors.EVENT_READ) + + expected_screen = None + more_commands = True + while True: + # Process any actions and find an expected screen. + while not expected_screen and more_commands: + try: + cmd_e = next(cmd_iter) + if cmd_e.tag == 'action': + self._playbook_key(cmd_e) + elif cmd_e.tag == 'expect': + expected_screen = self._parse_expected_screen( + cmd_e) + # Stop processing more commands for now and wait + # for the expected screen to appear. + break + else: + raise self._TestFailure( + "Element '{}' is invalid, expected 'action' " + "or 'expect'".format(cmd_e.tag)) + except StopIteration: + # No more commands. + more_commands = False + + # Wait for the expected screen. + events = sel.select(CHILD_TIMEOUT) + if not events: + if expected_screen: + self._report_failed_expectation(expected_screen) + raise self._TestFailure( + "Timeout reached. No event received in the last {} " + "second(s)".format(CHILD_TIMEOUT)) + + # Expect only an event on self._fd. + assert len(events) == 1 + event = events[0] + key, _mask = event + assert key.fd == self._fd + + closed = self._pty_callback() + if closed: + if more_commands: + raise self._TestFailure( + "Connection to the terminal was closed but the " + "playbook contains more commands") + break + + # Check if the expected screen is present. + if self._screen == expected_screen: + expected_screen = None + + def execute_test(self, filename): + """ + Load test data from a given file, start the program under the test and + execute the test playbook. Returns True when the test succeeded, False + otherwise. + """ + + assert self._mode == self.MODE_TEST + + # Read the test data. + try: + tree = ElementTree.ElementTree(file=filename) + except Exception as e: + print("Failed to read playbook file '{}': {}.".format(filename, e), + file=sys.stderr) + return False + + # Start the specified program. + if not self._start_program(): + return False + + # Execute the test playbook. + res = True + try: + self._execute_playbook(tree.getroot()) + except (self._TerminalConnectionException, self._TestFailure) as e: + print("{}.".format(e), file=sys.stderr) + res = False + finally: + # Finalize the run of the child program. + if not self._finalize_program(): + res = False + + # Return whether the test passed. + return res def main(): + """ + Parse command line arguments and execute the operation that the user + selected. Returns 0 if the operation was successful and a non-zero value + otherwise. + """ + # Parse command line arguments. parser = argparse.ArgumentParser() parser.set_defaults(func=None) subparsers = parser.add_subparsers(dest='command') subparsers.required = True - prog_parser = argparse.ArgumentParser(add_help=False) - prog_parser.add_argument("prog") + program_parser = argparse.ArgumentParser(add_help=False) + program_parser.add_argument('program') - # Create the parser for the "run" command. + # Create the parser for the 'run' command. parser_run = subparsers.add_parser( - "run", parents=[prog_parser], help="run a command") - parser_run.set_defaults(func=command_run) + 'run', parents=[program_parser], help="run a command") + parser_run.set_defaults(mode=Term.MODE_RUN) - # Create the parser for the "record" command. + # Create the parser for the 'record' command. parser_record = subparsers.add_parser( - "record", parents=[prog_parser], help="record a test") - parser_record.set_defaults(func=command_record) + 'record', parents=[program_parser], help="record a test") + parser_record.set_defaults(mode=Term.MODE_RECORD) parser_record.add_argument( - "-p", "--playbook", metavar="FILE", required=True, + '-p', '--playbook', metavar='FILE', required=True, help="output playbook file") - # Create the parser for the "test" command. + # Create the parser for the 'test' command. parser_test = subparsers.add_parser( - "test", parents=[prog_parser], help="run a test") - parser_test.set_defaults(func=command_test) + 'test', parents=[program_parser], help="run a test") + parser_test.set_defaults(mode=Term.MODE_TEST) parser_test.add_argument( - "-p", "--playbook", metavar="FILE", required=True, + '-p', '--playbook', metavar='FILE', required=True, help="input playbook file") args = parser.parse_args() - args.func(args) + + tk_root = None + if args.mode in (Term.MODE_RUN, Term.MODE_RECORD): + # Start the terminal GUI. + try: + tk_root = tkinter.Tk() + except tkinter.TclError as e: + print("Failed to initialize GUI: {}.".format(e), file=sys.stderr) + return 1 + + term = Term(tk_root, args.program, args.mode) + if tk_root: + # Start the GUI main loop. + term.run_gui_mainloop() + else: + # Execute and check the playbook, without running GUI. + ok = term.execute_test(args.playbook) + if ok: + msg = "succeeded" + res = 0 + else: + msg = "failed" + res = 1 + + print("Run of '{}' using playbook '{}' {}.".format( + args.program, args.playbook, msg)) + return res + + if args.mode == Term.MODE_RECORD: + # Get the recorded test data and write them to a file. + if not term.output_test(args.playbook): + return 1 + + return 0 if __name__ == '__main__': - main() + sys.exit(main()) -- 2.11.4.GIT