2 # SPDX-License-Identifier: GPL-2.0
5 tdc.py - Linux tc (Traffic Control) unit test driver
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
19 from collections
import OrderedDict
20 from string
import Template
22 from tdc_config
import *
23 from tdc_helper
import *
28 class PluginMgrTestFail(Exception):
29 def __init__(self
, stage
, output
, message
):
32 self
.message
= message
35 def __init__(self
, argparser
):
38 self
.plugin_instances
= []
40 self
.argparser
= argparser
42 # TODO, put plugins in order
43 plugindir
= os
.getenv('TDC_PLUGIN_DIR', './plugins')
44 for dirpath
, dirnames
, filenames
in os
.walk(plugindir
):
46 if (fn
.endswith('.py') and
47 not fn
== '__init__.py' and
48 not fn
.startswith('#') and
49 not fn
.startswith('.#')):
51 foo
= importlib
.import_module('plugins.' + mn
)
52 self
.plugins
[mn
] = foo
53 self
.plugin_instances
.append(foo
.SubPlugin())
55 def call_pre_suite(self
, testcount
, testidlist
):
56 for pgn_inst
in self
.plugin_instances
:
57 pgn_inst
.pre_suite(testcount
, testidlist
)
59 def call_post_suite(self
, index
):
60 for pgn_inst
in reversed(self
.plugin_instances
):
61 pgn_inst
.post_suite(index
)
63 def call_pre_case(self
, test_ordinal
, testid
):
64 for pgn_inst
in self
.plugin_instances
:
66 pgn_inst
.pre_case(test_ordinal
, testid
)
67 except Exception as ee
:
68 print('exception {} in call to pre_case for {} plugin'.
69 format(ee
, pgn_inst
.__class
__))
70 print('test_ordinal is {}'.format(test_ordinal
))
71 print('testid is {}'.format(testid
))
74 def call_post_case(self
):
75 for pgn_inst
in reversed(self
.plugin_instances
):
78 def call_pre_execute(self
):
79 for pgn_inst
in self
.plugin_instances
:
80 pgn_inst
.pre_execute()
82 def call_post_execute(self
):
83 for pgn_inst
in reversed(self
.plugin_instances
):
84 pgn_inst
.post_execute()
86 def call_add_args(self
, parser
):
87 for pgn_inst
in self
.plugin_instances
:
88 parser
= pgn_inst
.add_args(parser
)
91 def call_check_args(self
, args
, remaining
):
92 for pgn_inst
in self
.plugin_instances
:
93 pgn_inst
.check_args(args
, remaining
)
95 def call_adjust_command(self
, stage
, command
):
96 for pgn_inst
in self
.plugin_instances
:
97 command
= pgn_inst
.adjust_command(stage
, command
)
101 def _make_argparser(args
):
102 self
.argparser
= argparse
.ArgumentParser(
103 description
='Linux TC unit tests')
106 def replace_keywords(cmd
):
108 For a given executable command, substitute any known
109 variables contained within NAMES with the correct values
112 subcmd
= tcmd
.safe_substitute(NAMES
)
116 def exec_cmd(args
, pm
, stage
, command
):
118 Perform any required modifications on an executable command, then run
119 it in a subprocess and return the results.
121 if len(command
.strip()) == 0:
124 command
= replace_keywords(command
)
126 command
= pm
.call_adjust_command(stage
, command
)
128 print('command "{}"'.format(command
))
129 proc
= subprocess
.Popen(command
,
131 stdout
=subprocess
.PIPE
,
132 stderr
=subprocess
.PIPE
,
134 (rawout
, serr
) = proc
.communicate()
136 if proc
.returncode
!= 0 and len(serr
) > 0:
137 foutput
= serr
.decode("utf-8", errors
="ignore")
139 foutput
= rawout
.decode("utf-8", errors
="ignore")
146 def prepare_env(args
, pm
, stage
, prefix
, cmdlist
, output
= None):
148 Execute the setup/teardown commands for a test case.
149 Optionally terminate test execution if the command fails.
152 print('{}'.format(prefix
))
153 for cmdinfo
in cmdlist
:
154 if isinstance(cmdinfo
, list):
155 exit_codes
= cmdinfo
[1:]
164 (proc
, foutput
) = exec_cmd(args
, pm
, stage
, cmd
)
166 if proc
and (proc
.returncode
not in exit_codes
):
167 print('', file=sys
.stderr
)
168 print("{} *** Could not execute: \"{}\"".format(prefix
, cmd
),
170 print("\n{} *** Error message: \"{}\"".format(prefix
, foutput
),
172 print("returncode {}; expected {}".format(proc
.returncode
,
174 print("\n{} *** Aborting test run.".format(prefix
), file=sys
.stderr
)
175 print("\n\n{} *** stdout ***".format(proc
.stdout
), file=sys
.stderr
)
176 print("\n\n{} *** stderr ***".format(proc
.stderr
), file=sys
.stderr
)
177 raise PluginMgrTestFail(
179 '"{}" did not complete successfully'.format(prefix
))
181 def run_one_test(pm
, args
, index
, tidx
):
187 print("\t====================\n=====> ", end
="")
188 print("Test " + tidx
["id"] + ": " + tidx
["name"])
190 # populate NAMES with TESTID for this test
191 NAMES
['TESTID'] = tidx
['id']
193 pm
.call_pre_case(index
, tidx
['id'])
194 prepare_env(args
, pm
, 'setup', "-----> prepare stage", tidx
["setup"])
196 if (args
.verbose
> 0):
197 print('-----> execute stage')
198 pm
.call_pre_execute()
199 (p
, procout
) = exec_cmd(args
, pm
, 'execute', tidx
["cmdUnderTest"])
201 exit_code
= p
.returncode
205 pm
.call_post_execute()
207 if (exit_code
is None or exit_code
!= int(tidx
["expExitCode"])):
209 print("exit: {!r}".format(exit_code
))
210 print("exit: {}".format(int(tidx
["expExitCode"])))
211 #print("exit: {!r} {}".format(exit_code, int(tidx["expExitCode"])))
215 print('-----> verify stage')
216 match_pattern
= re
.compile(
217 str(tidx
["matchPattern"]), re
.DOTALL | re
.MULTILINE
)
218 (p
, procout
) = exec_cmd(args
, pm
, 'verify', tidx
["verifyCmd"])
220 match_index
= re
.findall(match_pattern
, procout
)
221 if len(match_index
) != int(tidx
["matchCount"]):
223 elif int(tidx
["matchCount"]) != 0:
228 tresult
+= 'ok {} - {} # {}\n'.format(str(index
), tidx
['id'], tidx
['name'])
235 tap
+= 'No output!\n'
237 prepare_env(args
, pm
, 'teardown', '-----> teardown stage', tidx
['teardown'], procout
)
242 # remove TESTID from NAMES
246 def test_runner(pm
, args
, filtered_tests
):
248 Driver function for the unit tests.
250 Prints information about the tests being run, executes the setup and
251 teardown commands and the command under test itself. Also determines
252 success/failure based on the information in the test case and generates
253 TAP output accordingly.
255 testlist
= filtered_tests
256 tcount
= len(testlist
)
261 emergency_exit
= False
262 emergency_exit_message
= ''
266 tap
= 'notap requested: omitting test plan\n'
268 tap
= str(index
) + ".." + str(tcount
) + "\n"
270 pm
.call_pre_suite(tcount
, [tidx
['id'] for tidx
in testlist
])
271 except Exception as ee
:
272 ex_type
, ex
, ex_tb
= sys
.exc_info()
273 print('Exception {} {} (caught in pre_suite).'.
275 # when the extra print statements are uncommented,
276 # the traceback does not appear between them
277 # (it appears way earlier in the tdc.py output)
278 # so don't bother ...
279 # print('--------------------(')
281 traceback
.print_tb(ex_tb
)
282 # print('--------------------)')
283 emergency_exit_message
= 'EMERGENCY EXIT, call_pre_suite failed with exception {} {}\n'.format(ex_type
, ex
)
284 emergency_exit
= True
288 pm
.call_post_suite(index
)
289 return emergency_exit_message
291 print('give test rig 2 seconds to stabilize')
293 for tidx
in testlist
:
294 if "flower" in tidx
["category"] and args
.device
== None:
296 print('Not executing test {} {} because DEV2 not defined'.
297 format(tidx
['id'], tidx
['name']))
300 badtest
= tidx
# in case it goes bad
301 tap
+= run_one_test(pm
, args
, index
, tidx
)
302 except PluginMgrTestFail
as pmtf
:
303 ex_type
, ex
, ex_tb
= sys
.exc_info()
305 message
= pmtf
.message
308 print('Exception {} {} (caught in test_runner, running test {} {} {} stage {})'.
309 format(ex_type
, ex
, index
, tidx
['id'], tidx
['name'], stage
))
310 print('---------------')
312 traceback
.print_tb(ex_tb
)
313 print('---------------')
314 if stage
== 'teardown':
315 print('accumulated output for this test:')
318 print('---------------')
322 # if we failed in setup or teardown,
323 # fill in the remaining tests with ok-skipped
326 tap
+= 'about to flush the tap output if tests need to be skipped\n'
327 if tcount
+ 1 != index
:
328 for tidx
in testlist
[index
- 1:]:
329 msg
= 'skipped - previous {} failed'.format(stage
)
330 tap
+= 'ok {} - {} # {} {} {}\n'.format(
331 count
, tidx
['id'], msg
, index
, badtest
.get('id', '--Unknown--'))
334 tap
+= 'done flushing skipped test tap output\n'
337 print('Want to pause\nPress enter to continue ...')
339 print('got something on stdin')
341 pm
.call_post_suite(index
)
345 def has_blank_ids(idlist
):
347 Search the list for empty ID fields and return true/false accordingly.
349 return not(all(k
for k
in idlist
))
352 def load_from_file(filename
):
354 Open the JSON file containing the test cases and return them
355 as list of ordered dictionary objects.
358 with
open(filename
) as test_data
:
359 testlist
= json
.load(test_data
, object_pairs_hook
=OrderedDict
)
360 except json
.JSONDecodeError
as jde
:
361 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename
, jde
))
364 idlist
= get_id_list(testlist
)
365 if (has_blank_ids(idlist
)):
367 k
['filename'] = filename
373 Create the argument parser.
375 parser
= argparse
.ArgumentParser(description
='Linux TC unit tests')
379 def set_args(parser
):
381 Set the command line arguments for tdc.
384 '-p', '--path', type=str,
385 help='The full path to the tc executable to use')
386 sg
= parser
.add_argument_group(
387 'selection', 'select which test cases: ' +
388 'files plus directories; filtered by categories plus testids')
389 ag
= parser
.add_argument_group(
390 'action', 'select action to perform on selected test cases')
393 '-D', '--directory', nargs
='+', metavar
='DIR',
394 help='Collect tests from the specified directory(ies) ' +
395 '(default [tc-tests])')
397 '-f', '--file', nargs
='+', metavar
='FILE',
398 help='Run tests from the specified file(s)')
400 '-c', '--category', nargs
='*', metavar
='CATG', default
=['+c'],
401 help='Run tests only from the specified category/ies, ' +
402 'or if no category/ies is/are specified, list known categories.')
404 '-e', '--execute', nargs
='+', metavar
='ID',
405 help='Execute the specified test cases with specified IDs')
407 '-l', '--list', action
='store_true',
408 help='List all test cases, or those only within the specified category')
410 '-s', '--show', action
='store_true', dest
='showID',
411 help='Display the selected test cases')
413 '-i', '--id', action
='store_true', dest
='gen_id',
414 help='Generate ID numbers for new test cases')
416 '-v', '--verbose', action
='count', default
=0,
417 help='Show the commands that are being run')
419 '-N', '--notap', action
='store_true',
420 help='Suppress tap results for command under test')
421 parser
.add_argument('-d', '--device',
422 help='Execute the test case in flower category')
424 '-P', '--pause', action
='store_true',
425 help='Pause execution just before post-suite stage')
429 def check_default_settings(args
, remaining
, pm
):
431 Process any arguments overriding the default settings,
432 and ensure the settings are correct.
434 # Allow for overriding specific settings
437 if args
.path
!= None:
438 NAMES
['TC'] = args
.path
439 if args
.device
!= None:
440 NAMES
['DEV2'] = args
.device
441 if not os
.path
.isfile(NAMES
['TC']):
442 print("The specified tc path " + NAMES
['TC'] + " does not exist.")
445 pm
.call_check_args(args
, remaining
)
448 def get_id_list(alltests
):
450 Generate a list of all IDs in the test cases.
452 return [x
["id"] for x
in alltests
]
455 def check_case_id(alltests
):
457 Check for duplicate test case IDs.
459 idl
= get_id_list(alltests
)
460 return [x
for x
in idl
if idl
.count(x
) > 1]
463 def does_id_exist(alltests
, newid
):
465 Check if a given ID already exists in the list of test cases.
467 idl
= get_id_list(alltests
)
468 return (any(newid
== x
for x
in idl
))
471 def generate_case_ids(alltests
):
473 If a test case has a blank ID field, generate a random hex ID for it
474 and then write the test cases back to disk.
480 newid
= str('{:04x}'.format(random
.randrange(16**4)))
481 if (does_id_exist(alltests
, newid
)):
489 if ('filename' in c
):
490 ufilename
.append(c
['filename'])
491 ufilename
= get_unique_item(ufilename
)
496 if t
['filename'] == f
:
499 outfile
= open(f
, "w")
500 json
.dump(testlist
, outfile
, indent
=4)
504 def filter_tests_by_id(args
, testlist
):
506 Remove tests from testlist that are not in the named id list.
507 If id list is empty, return empty list.
510 if testlist
and args
.execute
:
511 target_ids
= args
.execute
513 if isinstance(target_ids
, list) and (len(target_ids
) > 0):
514 newlist
= list(filter(lambda x
: x
['id'] in target_ids
, testlist
))
517 def filter_tests_by_category(args
, testlist
):
519 Remove tests from testlist that are not in a named category.
522 if args
.category
and testlist
:
524 for catg
in set(args
.category
):
527 print('considering category {}'.format(catg
))
529 if catg
in tc
['category'] and tc
['id'] not in test_ids
:
531 test_ids
.append(tc
['id'])
535 def get_test_cases(args
):
537 If a test case file is specified, retrieve tests from that file.
538 Otherwise, glob for all json files in subdirectories and load from
540 Also, if requested, filter by category, and add tests matching
546 testdirs
= ['tc-tests']
549 # at least one file was specified - remove the default directory
553 if not os
.path
.isfile(ff
):
554 print("IGNORING file " + ff
+ "\n\tBECAUSE does not exist.")
556 flist
.append(os
.path
.abspath(ff
))
559 testdirs
= args
.directory
561 for testdir
in testdirs
:
562 for root
, dirnames
, filenames
in os
.walk(testdir
):
563 for filename
in fnmatch
.filter(filenames
, '*.json'):
564 candidate
= os
.path
.abspath(os
.path
.join(root
, filename
))
565 if candidate
not in testdirs
:
566 flist
.append(candidate
)
568 alltestcases
= list()
569 for casefile
in flist
:
570 alltestcases
= alltestcases
+ (load_from_file(casefile
))
572 allcatlist
= get_test_categories(alltestcases
)
573 allidlist
= get_id_list(alltestcases
)
575 testcases_by_cats
= get_categorized_testlist(alltestcases
, allcatlist
)
576 idtestcases
= filter_tests_by_id(args
, alltestcases
)
577 cattestcases
= filter_tests_by_category(args
, alltestcases
)
579 cat_ids
= [x
['id'] for x
in cattestcases
]
582 alltestcases
= cattestcases
+ [x
for x
in idtestcases
if x
['id'] not in cat_ids
]
584 alltestcases
= idtestcases
587 alltestcases
= cattestcases
589 # just accept the existing value of alltestcases,
590 # which has been filtered by file/directory
593 return allcatlist
, allidlist
, testcases_by_cats
, alltestcases
596 def set_operation_mode(pm
, args
):
598 Load the test case data and process remaining arguments to determine
599 what the script should do for this run, and call the appropriate
602 ucat
, idlist
, testcases
, alltests
= get_test_cases(args
)
605 if (has_blank_ids(idlist
)):
606 alltests
= generate_case_ids(alltests
)
608 print("No empty ID fields found in test files.")
611 duplicate_ids
= check_case_id(alltests
)
612 if (len(duplicate_ids
) > 0):
613 print("The following test case IDs are not unique:")
614 print(str(set(duplicate_ids
)))
615 print("Please correct them before continuing.")
619 for atest
in alltests
:
620 print_test_case(atest
)
623 if isinstance(args
.category
, list) and (len(args
.category
) == 0):
624 print("Available categories:")
630 list_test_cases(alltests
)
634 catresults
= test_runner(pm
, args
, alltests
)
636 catresults
= 'No tests found\n'
638 print('Tap output suppression requested\n')
640 print('All test results: \n\n{}'.format(catresults
))
644 Start of execution; set up argument parser and get the arguments,
645 and start operations.
647 parser
= args_parse()
648 parser
= set_args(parser
)
649 pm
= PluginMgr(parser
)
650 parser
= pm
.call_add_args(parser
)
651 (args
, remaining
) = parser
.parse_known_args()
653 check_default_settings(args
, remaining
, pm
)
655 print('args is {}'.format(args
))
657 set_operation_mode(pm
, args
)
662 if __name__
== "__main__":