2 # SPDX-License-Identifier: GPL-2.0
5 tdc.py - Linux tc (Traffic Control) unit test driver
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
16 from collections
import OrderedDict
17 from string
import Template
19 from tdc_config
import *
20 from tdc_helper
import *
26 def replace_keywords(cmd
):
28 For a given executable command, substitute any known
29 variables contained within NAMES with the correct values
32 subcmd
= tcmd
.safe_substitute(NAMES
)
36 def exec_cmd(command
, nsonly
=True):
38 Perform any required modifications on an executable command, then run
39 it in a subprocess and return the results.
41 if (USE_NS
and nsonly
):
42 command
= 'ip netns exec $NS ' + command
45 command
= replace_keywords(command
)
47 proc
= subprocess
.Popen(command
,
49 stdout
=subprocess
.PIPE
,
50 stderr
=subprocess
.PIPE
)
51 (rawout
, serr
) = proc
.communicate()
53 if proc
.returncode
!= 0 and len(serr
) > 0:
54 foutput
= serr
.decode("utf-8")
56 foutput
= rawout
.decode("utf-8")
63 def prepare_env(cmdlist
):
65 Execute the setup/teardown commands for a test case. Optionally
66 terminate test execution if the command fails.
68 for cmdinfo
in cmdlist
:
69 if (type(cmdinfo
) == list):
70 exit_codes
= cmdinfo
[1:]
79 (proc
, foutput
) = exec_cmd(cmd
)
81 if proc
.returncode
not in exit_codes
:
83 print("Could not execute:")
85 print("\nError message:")
87 print("\nAborting test run.")
92 def test_runner(filtered_tests
, args
):
94 Driver function for the unit tests.
96 Prints information about the tests being run, executes the setup and
97 teardown commands and the command under test itself. Also determines
98 success/failure based on the information in the test case and generates
99 TAP output accordingly.
101 testlist
= filtered_tests
102 tcount
= len(testlist
)
104 tap
= str(index
) + ".." + str(tcount
) + "\n"
106 for tidx
in testlist
:
109 if "flower" in tidx
["category"] and args
.device
== None:
111 print("Test " + tidx
["id"] + ": " + tidx
["name"])
112 prepare_env(tidx
["setup"])
113 (p
, procout
) = exec_cmd(tidx
["cmdUnderTest"])
114 exit_code
= p
.returncode
116 if (exit_code
!= int(tidx
["expExitCode"])):
118 print("exit:", exit_code
, int(tidx
["expExitCode"]))
121 match_pattern
= re
.compile(str(tidx
["matchPattern"]), re
.DOTALL
)
122 (p
, procout
) = exec_cmd(tidx
["verifyCmd"])
123 match_index
= re
.findall(match_pattern
, procout
)
124 if len(match_index
) != int(tidx
["matchCount"]):
131 tap
+= tresult
+ str(index
) + " " + tidx
["id"] + " " + tidx
["name"] + "\n"
136 prepare_env(tidx
["teardown"])
144 Create the network namespace in which the tests will be run and set up
145 the required network devices for it.
148 cmd
= 'ip netns add $NS'
150 cmd
= 'ip link add $DEV0 type veth peer name $DEV1'
152 cmd
= 'ip link set $DEV1 netns $NS'
154 cmd
= 'ip link set $DEV0 up'
156 cmd
= 'ip -n $NS link set $DEV1 up'
158 cmd
= 'ip link set $DEV2 netns $NS'
160 cmd
= 'ip -n $NS link set $DEV2 up'
166 Destroy the network namespace for testing (and any associated network
170 cmd
= 'ip netns delete $NS'
174 def has_blank_ids(idlist
):
176 Search the list for empty ID fields and return true/false accordingly.
178 return not(all(k
for k
in idlist
))
181 def load_from_file(filename
):
183 Open the JSON file containing the test cases and return them
184 as list of ordered dictionary objects.
187 with
open(filename
) as test_data
:
188 testlist
= json
.load(test_data
, object_pairs_hook
=OrderedDict
)
189 except json
.JSONDecodeError
as jde
:
190 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename
, jde
))
193 idlist
= get_id_list(testlist
)
194 if (has_blank_ids(idlist
)):
196 k
['filename'] = filename
202 Create the argument parser.
204 parser
= argparse
.ArgumentParser(description
='Linux TC unit tests')
208 def set_args(parser
):
210 Set the command line arguments for tdc.
212 parser
.add_argument('-p', '--path', type=str,
213 help='The full path to the tc executable to use')
214 parser
.add_argument('-c', '--category', type=str, nargs
='?', const
='+c',
215 help='Run tests only from the specified category, or if no category is specified, list known categories.')
216 parser
.add_argument('-f', '--file', type=str,
217 help='Run tests from the specified file')
218 parser
.add_argument('-l', '--list', type=str, nargs
='?', const
="++", metavar
='CATEGORY',
219 help='List all test cases, or those only within the specified category')
220 parser
.add_argument('-s', '--show', type=str, nargs
=1, metavar
='ID', dest
='showID',
221 help='Display the test case with specified id')
222 parser
.add_argument('-e', '--execute', type=str, nargs
=1, metavar
='ID',
223 help='Execute the single test case with specified ID')
224 parser
.add_argument('-i', '--id', action
='store_true', dest
='gen_id',
225 help='Generate ID numbers for new test cases')
226 parser
.add_argument('-d', '--device',
227 help='Execute the test case in flower category')
231 def check_default_settings(args
):
233 Process any arguments overriding the default settings, and ensure the
234 settings are correct.
236 # Allow for overriding specific settings
239 if args
.path
!= None:
240 NAMES
['TC'] = args
.path
241 if args
.device
!= None:
242 NAMES
['DEV2'] = args
.device
243 if not os
.path
.isfile(NAMES
['TC']):
244 print("The specified tc path " + NAMES
['TC'] + " does not exist.")
248 def get_id_list(alltests
):
250 Generate a list of all IDs in the test cases.
252 return [x
["id"] for x
in alltests
]
255 def check_case_id(alltests
):
257 Check for duplicate test case IDs.
259 idl
= get_id_list(alltests
)
260 return [x
for x
in idl
if idl
.count(x
) > 1]
263 def does_id_exist(alltests
, newid
):
265 Check if a given ID already exists in the list of test cases.
267 idl
= get_id_list(alltests
)
268 return (any(newid
== x
for x
in idl
))
271 def generate_case_ids(alltests
):
273 If a test case has a blank ID field, generate a random hex ID for it
274 and then write the test cases back to disk.
280 newid
= str('%04x' % random
.randrange(16**4))
281 if (does_id_exist(alltests
, newid
)):
289 if ('filename' in c
):
290 ufilename
.append(c
['filename'])
291 ufilename
= get_unique_item(ufilename
)
296 if t
['filename'] == f
:
299 outfile
= open(f
, "w")
300 json
.dump(testlist
, outfile
, indent
=4)
304 def get_test_cases(args
):
306 If a test case file is specified, retrieve tests from that file.
307 Otherwise, glob for all json files in subdirectories and load from
311 if args
.file != None:
312 if not os
.path
.isfile(args
.file):
313 print("The specified test case file " + args
.file + " does not exist.")
318 for root
, dirnames
, filenames
in os
.walk('tc-tests'):
319 for filename
in fnmatch
.filter(filenames
, '*.json'):
320 flist
.append(os
.path
.join(root
, filename
))
322 for casefile
in flist
:
323 alltests
= alltests
+ (load_from_file(casefile
))
327 def set_operation_mode(args
):
329 Load the test case data and process remaining arguments to determine
330 what the script should do for this run, and call the appropriate
333 alltests
= get_test_cases(args
)
336 idlist
= get_id_list(alltests
)
337 if (has_blank_ids(idlist
)):
338 alltests
= generate_case_ids(alltests
)
340 print("No empty ID fields found in test files.")
343 duplicate_ids
= check_case_id(alltests
)
344 if (len(duplicate_ids
) > 0):
345 print("The following test case IDs are not unique:")
346 print(str(set(duplicate_ids
)))
347 print("Please correct them before continuing.")
350 ucat
= get_test_categories(alltests
)
353 show_test_case_by_id(alltests
, args
.showID
[0])
357 target_id
= args
.execute
[0]
362 if (args
.category
== '+c'):
363 print("Available categories:")
367 target_category
= args
.category
372 testcases
= get_categorized_testlist(alltests
, ucat
)
375 if (args
.list == "++"):
376 list_test_cases(alltests
)
378 elif(len(args
.list) > 0):
379 if (args
.list not in ucat
):
380 print("Unknown category " + args
.list)
381 print("Available categories:")
384 list_test_cases(testcases
[args
.list])
387 if (os
.geteuid() != 0):
388 print("This script must be run with root privileges.\n")
393 if (len(target_category
) == 0):
394 if (len(target_id
) > 0):
395 alltests
= list(filter(lambda x
: target_id
in x
['id'], alltests
))
396 if (len(alltests
) == 0):
397 print("Cannot find a test case with ID matching " + target_id
)
399 catresults
= test_runner(alltests
, args
)
400 print("All test results: " + "\n\n" + catresults
)
401 elif (len(target_category
) > 0):
402 if (target_category
== "flower") and args
.device
== None:
403 print("Please specify a NIC device (-d) to run category flower")
405 if (target_category
not in ucat
):
406 print("Specified category is not present in this file.")
409 catresults
= test_runner(testcases
[target_category
], args
)
410 print("Category " + target_category
+ "\n\n" + catresults
)
417 Start of execution; set up argument parser and get the arguments,
418 and start operations.
420 parser
= args_parse()
421 parser
= set_args(parser
)
422 (args
, remaining
) = parser
.parse_known_args()
423 check_default_settings(args
)
425 set_operation_mode(args
)
430 if __name__
== "__main__":