Linux 4.19.133
[linux/fpc-iii.git] / tools / testing / selftests / tc-testing / tdc.py
blob7607ba3e3cbe4eba89dfe5a35badae9182b974a9
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
4 """
5 tdc.py - Linux tc (Traffic Control) unit test driver
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8 """
10 import re
11 import os
12 import sys
13 import argparse
14 import importlib
15 import json
16 import subprocess
17 import time
18 import traceback
19 from collections import OrderedDict
20 from string import Template
22 from tdc_config import *
23 from tdc_helper import *
25 import TdcPlugin
28 class PluginMgrTestFail(Exception):
29 def __init__(self, stage, output, message):
30 self.stage = stage
31 self.output = output
32 self.message = message
34 class PluginMgr:
35 def __init__(self, argparser):
36 super().__init__()
37 self.plugins = {}
38 self.plugin_instances = []
39 self.args = []
40 self.argparser = argparser
42 # TODO, put plugins in order
43 plugindir = os.getenv('TDC_PLUGIN_DIR', './plugins')
44 for dirpath, dirnames, filenames in os.walk(plugindir):
45 for fn in filenames:
46 if (fn.endswith('.py') and
47 not fn == '__init__.py' and
48 not fn.startswith('#') and
49 not fn.startswith('.#')):
50 mn = fn[0:-3]
51 foo = importlib.import_module('plugins.' + mn)
52 self.plugins[mn] = foo
53 self.plugin_instances.append(foo.SubPlugin())
55 def call_pre_suite(self, testcount, testidlist):
56 for pgn_inst in self.plugin_instances:
57 pgn_inst.pre_suite(testcount, testidlist)
59 def call_post_suite(self, index):
60 for pgn_inst in reversed(self.plugin_instances):
61 pgn_inst.post_suite(index)
63 def call_pre_case(self, test_ordinal, testid):
64 for pgn_inst in self.plugin_instances:
65 try:
66 pgn_inst.pre_case(test_ordinal, testid)
67 except Exception as ee:
68 print('exception {} in call to pre_case for {} plugin'.
69 format(ee, pgn_inst.__class__))
70 print('test_ordinal is {}'.format(test_ordinal))
71 print('testid is {}'.format(testid))
72 raise
74 def call_post_case(self):
75 for pgn_inst in reversed(self.plugin_instances):
76 pgn_inst.post_case()
78 def call_pre_execute(self):
79 for pgn_inst in self.plugin_instances:
80 pgn_inst.pre_execute()
82 def call_post_execute(self):
83 for pgn_inst in reversed(self.plugin_instances):
84 pgn_inst.post_execute()
86 def call_add_args(self, parser):
87 for pgn_inst in self.plugin_instances:
88 parser = pgn_inst.add_args(parser)
89 return parser
91 def call_check_args(self, args, remaining):
92 for pgn_inst in self.plugin_instances:
93 pgn_inst.check_args(args, remaining)
95 def call_adjust_command(self, stage, command):
96 for pgn_inst in self.plugin_instances:
97 command = pgn_inst.adjust_command(stage, command)
98 return command
100 @staticmethod
101 def _make_argparser(args):
102 self.argparser = argparse.ArgumentParser(
103 description='Linux TC unit tests')
106 def replace_keywords(cmd):
108 For a given executable command, substitute any known
109 variables contained within NAMES with the correct values
111 tcmd = Template(cmd)
112 subcmd = tcmd.safe_substitute(NAMES)
113 return subcmd
116 def exec_cmd(args, pm, stage, command):
118 Perform any required modifications on an executable command, then run
119 it in a subprocess and return the results.
121 if len(command.strip()) == 0:
122 return None, None
123 if '$' in command:
124 command = replace_keywords(command)
126 command = pm.call_adjust_command(stage, command)
127 if args.verbose > 0:
128 print('command "{}"'.format(command))
129 proc = subprocess.Popen(command,
130 shell=True,
131 stdout=subprocess.PIPE,
132 stderr=subprocess.PIPE,
133 env=ENVIR)
134 (rawout, serr) = proc.communicate()
136 if proc.returncode != 0 and len(serr) > 0:
137 foutput = serr.decode("utf-8", errors="ignore")
138 else:
139 foutput = rawout.decode("utf-8", errors="ignore")
141 proc.stdout.close()
142 proc.stderr.close()
143 return proc, foutput
146 def prepare_env(args, pm, stage, prefix, cmdlist, output = None):
148 Execute the setup/teardown commands for a test case.
149 Optionally terminate test execution if the command fails.
151 if args.verbose > 0:
152 print('{}'.format(prefix))
153 for cmdinfo in cmdlist:
154 if isinstance(cmdinfo, list):
155 exit_codes = cmdinfo[1:]
156 cmd = cmdinfo[0]
157 else:
158 exit_codes = [0]
159 cmd = cmdinfo
161 if not cmd:
162 continue
164 (proc, foutput) = exec_cmd(args, pm, stage, cmd)
166 if proc and (proc.returncode not in exit_codes):
167 print('', file=sys.stderr)
168 print("{} *** Could not execute: \"{}\"".format(prefix, cmd),
169 file=sys.stderr)
170 print("\n{} *** Error message: \"{}\"".format(prefix, foutput),
171 file=sys.stderr)
172 print("returncode {}; expected {}".format(proc.returncode,
173 exit_codes))
174 print("\n{} *** Aborting test run.".format(prefix), file=sys.stderr)
175 print("\n\n{} *** stdout ***".format(proc.stdout), file=sys.stderr)
176 print("\n\n{} *** stderr ***".format(proc.stderr), file=sys.stderr)
177 raise PluginMgrTestFail(
178 stage, output,
179 '"{}" did not complete successfully'.format(prefix))
181 def run_one_test(pm, args, index, tidx):
182 global NAMES
183 result = True
184 tresult = ""
185 tap = ""
186 if args.verbose > 0:
187 print("\t====================\n=====> ", end="")
188 print("Test " + tidx["id"] + ": " + tidx["name"])
190 # populate NAMES with TESTID for this test
191 NAMES['TESTID'] = tidx['id']
193 pm.call_pre_case(index, tidx['id'])
194 prepare_env(args, pm, 'setup', "-----> prepare stage", tidx["setup"])
196 if (args.verbose > 0):
197 print('-----> execute stage')
198 pm.call_pre_execute()
199 (p, procout) = exec_cmd(args, pm, 'execute', tidx["cmdUnderTest"])
200 if p:
201 exit_code = p.returncode
202 else:
203 exit_code = None
205 pm.call_post_execute()
207 if (exit_code is None or exit_code != int(tidx["expExitCode"])):
208 result = False
209 print("exit: {!r}".format(exit_code))
210 print("exit: {}".format(int(tidx["expExitCode"])))
211 #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
212 print(procout)
213 else:
214 if args.verbose > 0:
215 print('-----> verify stage')
216 match_pattern = re.compile(
217 str(tidx["matchPattern"]), re.DOTALL | re.MULTILINE)
218 (p, procout) = exec_cmd(args, pm, 'verify', tidx["verifyCmd"])
219 if procout:
220 match_index = re.findall(match_pattern, procout)
221 if len(match_index) != int(tidx["matchCount"]):
222 result = False
223 elif int(tidx["matchCount"]) != 0:
224 result = False
226 if not result:
227 tresult += 'not '
228 tresult += 'ok {} - {} # {}\n'.format(str(index), tidx['id'], tidx['name'])
229 tap += tresult
231 if result == False:
232 if procout:
233 tap += procout
234 else:
235 tap += 'No output!\n'
237 prepare_env(args, pm, 'teardown', '-----> teardown stage', tidx['teardown'], procout)
238 pm.call_post_case()
240 index += 1
242 # remove TESTID from NAMES
243 del(NAMES['TESTID'])
244 return tap
246 def test_runner(pm, args, filtered_tests):
248 Driver function for the unit tests.
250 Prints information about the tests being run, executes the setup and
251 teardown commands and the command under test itself. Also determines
252 success/failure based on the information in the test case and generates
253 TAP output accordingly.
255 testlist = filtered_tests
256 tcount = len(testlist)
257 index = 1
258 tap = ''
259 badtest = None
260 stage = None
261 emergency_exit = False
262 emergency_exit_message = ''
264 if args.notap:
265 if args.verbose:
266 tap = 'notap requested: omitting test plan\n'
267 else:
268 tap = str(index) + ".." + str(tcount) + "\n"
269 try:
270 pm.call_pre_suite(tcount, [tidx['id'] for tidx in testlist])
271 except Exception as ee:
272 ex_type, ex, ex_tb = sys.exc_info()
273 print('Exception {} {} (caught in pre_suite).'.
274 format(ex_type, ex))
275 # when the extra print statements are uncommented,
276 # the traceback does not appear between them
277 # (it appears way earlier in the tdc.py output)
278 # so don't bother ...
279 # print('--------------------(')
280 # print('traceback')
281 traceback.print_tb(ex_tb)
282 # print('--------------------)')
283 emergency_exit_message = 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type, ex)
284 emergency_exit = True
285 stage = 'pre-SUITE'
287 if emergency_exit:
288 pm.call_post_suite(index)
289 return emergency_exit_message
290 if args.verbose > 1:
291 print('give test rig 2 seconds to stabilize')
292 time.sleep(2)
293 for tidx in testlist:
294 if "flower" in tidx["category"] and args.device == None:
295 if args.verbose > 1:
296 print('Not executing test {} {} because DEV2 not defined'.
297 format(tidx['id'], tidx['name']))
298 continue
299 try:
300 badtest = tidx # in case it goes bad
301 tap += run_one_test(pm, args, index, tidx)
302 except PluginMgrTestFail as pmtf:
303 ex_type, ex, ex_tb = sys.exc_info()
304 stage = pmtf.stage
305 message = pmtf.message
306 output = pmtf.output
307 print(message)
308 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
309 format(ex_type, ex, index, tidx['id'], tidx['name'], stage))
310 print('---------------')
311 print('traceback')
312 traceback.print_tb(ex_tb)
313 print('---------------')
314 if stage == 'teardown':
315 print('accumulated output for this test:')
316 if pmtf.output:
317 print(pmtf.output)
318 print('---------------')
319 break
320 index += 1
322 # if we failed in setup or teardown,
323 # fill in the remaining tests with ok-skipped
324 count = index
325 if not args.notap:
326 tap += 'about to flush the tap output if tests need to be skipped\n'
327 if tcount + 1 != index:
328 for tidx in testlist[index - 1:]:
329 msg = 'skipped - previous {} failed'.format(stage)
330 tap += 'ok {} - {} # {} {} {}\n'.format(
331 count, tidx['id'], msg, index, badtest.get('id', '--Unknown--'))
332 count += 1
334 tap += 'done flushing skipped test tap output\n'
336 if args.pause:
337 print('Want to pause\nPress enter to continue ...')
338 if input(sys.stdin):
339 print('got something on stdin')
341 pm.call_post_suite(index)
343 return tap
345 def has_blank_ids(idlist):
347 Search the list for empty ID fields and return true/false accordingly.
349 return not(all(k for k in idlist))
352 def load_from_file(filename):
354 Open the JSON file containing the test cases and return them
355 as list of ordered dictionary objects.
357 try:
358 with open(filename) as test_data:
359 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
360 except json.JSONDecodeError as jde:
361 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde))
362 testlist = list()
363 else:
364 idlist = get_id_list(testlist)
365 if (has_blank_ids(idlist)):
366 for k in testlist:
367 k['filename'] = filename
368 return testlist
371 def args_parse():
373 Create the argument parser.
375 parser = argparse.ArgumentParser(description='Linux TC unit tests')
376 return parser
379 def set_args(parser):
381 Set the command line arguments for tdc.
383 parser.add_argument(
384 '-p', '--path', type=str,
385 help='The full path to the tc executable to use')
386 sg = parser.add_argument_group(
387 'selection', 'select which test cases: ' +
388 'files plus directories; filtered by categories plus testids')
389 ag = parser.add_argument_group(
390 'action', 'select action to perform on selected test cases')
392 sg.add_argument(
393 '-D', '--directory', nargs='+', metavar='DIR',
394 help='Collect tests from the specified directory(ies) ' +
395 '(default [tc-tests])')
396 sg.add_argument(
397 '-f', '--file', nargs='+', metavar='FILE',
398 help='Run tests from the specified file(s)')
399 sg.add_argument(
400 '-c', '--category', nargs='*', metavar='CATG', default=['+c'],
401 help='Run tests only from the specified category/ies, ' +
402 'or if no category/ies is/are specified, list known categories.')
403 sg.add_argument(
404 '-e', '--execute', nargs='+', metavar='ID',
405 help='Execute the specified test cases with specified IDs')
406 ag.add_argument(
407 '-l', '--list', action='store_true',
408 help='List all test cases, or those only within the specified category')
409 ag.add_argument(
410 '-s', '--show', action='store_true', dest='showID',
411 help='Display the selected test cases')
412 ag.add_argument(
413 '-i', '--id', action='store_true', dest='gen_id',
414 help='Generate ID numbers for new test cases')
415 parser.add_argument(
416 '-v', '--verbose', action='count', default=0,
417 help='Show the commands that are being run')
418 parser.add_argument(
419 '-N', '--notap', action='store_true',
420 help='Suppress tap results for command under test')
421 parser.add_argument('-d', '--device',
422 help='Execute the test case in flower category')
423 parser.add_argument(
424 '-P', '--pause', action='store_true',
425 help='Pause execution just before post-suite stage')
426 return parser
429 def check_default_settings(args, remaining, pm):
431 Process any arguments overriding the default settings,
432 and ensure the settings are correct.
434 # Allow for overriding specific settings
435 global NAMES
437 if args.path != None:
438 NAMES['TC'] = args.path
439 if args.device != None:
440 NAMES['DEV2'] = args.device
441 if not os.path.isfile(NAMES['TC']):
442 print("The specified tc path " + NAMES['TC'] + " does not exist.")
443 exit(1)
445 pm.call_check_args(args, remaining)
448 def get_id_list(alltests):
450 Generate a list of all IDs in the test cases.
452 return [x["id"] for x in alltests]
455 def check_case_id(alltests):
457 Check for duplicate test case IDs.
459 idl = get_id_list(alltests)
460 return [x for x in idl if idl.count(x) > 1]
463 def does_id_exist(alltests, newid):
465 Check if a given ID already exists in the list of test cases.
467 idl = get_id_list(alltests)
468 return (any(newid == x for x in idl))
471 def generate_case_ids(alltests):
473 If a test case has a blank ID field, generate a random hex ID for it
474 and then write the test cases back to disk.
476 import random
477 for c in alltests:
478 if (c["id"] == ""):
479 while True:
480 newid = str('{:04x}'.format(random.randrange(16**4)))
481 if (does_id_exist(alltests, newid)):
482 continue
483 else:
484 c['id'] = newid
485 break
487 ufilename = []
488 for c in alltests:
489 if ('filename' in c):
490 ufilename.append(c['filename'])
491 ufilename = get_unique_item(ufilename)
492 for f in ufilename:
493 testlist = []
494 for t in alltests:
495 if 'filename' in t:
496 if t['filename'] == f:
497 del t['filename']
498 testlist.append(t)
499 outfile = open(f, "w")
500 json.dump(testlist, outfile, indent=4)
501 outfile.write("\n")
502 outfile.close()
504 def filter_tests_by_id(args, testlist):
506 Remove tests from testlist that are not in the named id list.
507 If id list is empty, return empty list.
509 newlist = list()
510 if testlist and args.execute:
511 target_ids = args.execute
513 if isinstance(target_ids, list) and (len(target_ids) > 0):
514 newlist = list(filter(lambda x: x['id'] in target_ids, testlist))
515 return newlist
517 def filter_tests_by_category(args, testlist):
519 Remove tests from testlist that are not in a named category.
521 answer = list()
522 if args.category and testlist:
523 test_ids = list()
524 for catg in set(args.category):
525 if catg == '+c':
526 continue
527 print('considering category {}'.format(catg))
528 for tc in testlist:
529 if catg in tc['category'] and tc['id'] not in test_ids:
530 answer.append(tc)
531 test_ids.append(tc['id'])
533 return answer
535 def get_test_cases(args):
537 If a test case file is specified, retrieve tests from that file.
538 Otherwise, glob for all json files in subdirectories and load from
539 each one.
540 Also, if requested, filter by category, and add tests matching
541 certain ids.
543 import fnmatch
545 flist = []
546 testdirs = ['tc-tests']
548 if args.file:
549 # at least one file was specified - remove the default directory
550 testdirs = []
552 for ff in args.file:
553 if not os.path.isfile(ff):
554 print("IGNORING file " + ff + "\n\tBECAUSE does not exist.")
555 else:
556 flist.append(os.path.abspath(ff))
558 if args.directory:
559 testdirs = args.directory
561 for testdir in testdirs:
562 for root, dirnames, filenames in os.walk(testdir):
563 for filename in fnmatch.filter(filenames, '*.json'):
564 candidate = os.path.abspath(os.path.join(root, filename))
565 if candidate not in testdirs:
566 flist.append(candidate)
568 alltestcases = list()
569 for casefile in flist:
570 alltestcases = alltestcases + (load_from_file(casefile))
572 allcatlist = get_test_categories(alltestcases)
573 allidlist = get_id_list(alltestcases)
575 testcases_by_cats = get_categorized_testlist(alltestcases, allcatlist)
576 idtestcases = filter_tests_by_id(args, alltestcases)
577 cattestcases = filter_tests_by_category(args, alltestcases)
579 cat_ids = [x['id'] for x in cattestcases]
580 if args.execute:
581 if args.category:
582 alltestcases = cattestcases + [x for x in idtestcases if x['id'] not in cat_ids]
583 else:
584 alltestcases = idtestcases
585 else:
586 if cat_ids:
587 alltestcases = cattestcases
588 else:
589 # just accept the existing value of alltestcases,
590 # which has been filtered by file/directory
591 pass
593 return allcatlist, allidlist, testcases_by_cats, alltestcases
596 def set_operation_mode(pm, args):
598 Load the test case data and process remaining arguments to determine
599 what the script should do for this run, and call the appropriate
600 function.
602 ucat, idlist, testcases, alltests = get_test_cases(args)
604 if args.gen_id:
605 if (has_blank_ids(idlist)):
606 alltests = generate_case_ids(alltests)
607 else:
608 print("No empty ID fields found in test files.")
609 exit(0)
611 duplicate_ids = check_case_id(alltests)
612 if (len(duplicate_ids) > 0):
613 print("The following test case IDs are not unique:")
614 print(str(set(duplicate_ids)))
615 print("Please correct them before continuing.")
616 exit(1)
618 if args.showID:
619 for atest in alltests:
620 print_test_case(atest)
621 exit(0)
623 if isinstance(args.category, list) and (len(args.category) == 0):
624 print("Available categories:")
625 print_sll(ucat)
626 exit(0)
628 if args.list:
629 if args.list:
630 list_test_cases(alltests)
631 exit(0)
633 if len(alltests):
634 catresults = test_runner(pm, args, alltests)
635 else:
636 catresults = 'No tests found\n'
637 if args.notap:
638 print('Tap output suppression requested\n')
639 else:
640 print('All test results: \n\n{}'.format(catresults))
642 def main():
644 Start of execution; set up argument parser and get the arguments,
645 and start operations.
647 parser = args_parse()
648 parser = set_args(parser)
649 pm = PluginMgr(parser)
650 parser = pm.call_add_args(parser)
651 (args, remaining) = parser.parse_known_args()
652 args.NAMES = NAMES
653 check_default_settings(args, remaining, pm)
654 if args.verbose > 2:
655 print('args is {}'.format(args))
657 set_operation_mode(pm, args)
659 exit(0)
662 if __name__ == "__main__":
663 main()