Merge branch 'locking-urgent-for-linus' of git://git.kernel.org/pub/scm/linux/kernel...
[cris-mirror.git] / tools / testing / selftests / tc-testing / tdc.py
blobfc373fdf2bdc098c42b60ff6911804a977d49c13
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
4 """
5 tdc.py - Linux tc (Traffic Control) unit test driver
7 Copyright (C) 2017 Lucas Bates <lucasb@mojatatu.com>
8 """
10 import re
11 import os
12 import sys
13 import argparse
14 import json
15 import subprocess
16 from collections import OrderedDict
17 from string import Template
19 from tdc_config import *
20 from tdc_helper import *
23 USE_NS = True
26 def replace_keywords(cmd):
27 """
28 For a given executable command, substitute any known
29 variables contained within NAMES with the correct values
30 """
31 tcmd = Template(cmd)
32 subcmd = tcmd.safe_substitute(NAMES)
33 return subcmd
36 def exec_cmd(command, nsonly=True):
37 """
38 Perform any required modifications on an executable command, then run
39 it in a subprocess and return the results.
40 """
41 if (USE_NS and nsonly):
42 command = 'ip netns exec $NS ' + command
44 if '$' in command:
45 command = replace_keywords(command)
47 proc = subprocess.Popen(command,
48 shell=True,
49 stdout=subprocess.PIPE,
50 stderr=subprocess.PIPE)
51 (rawout, serr) = proc.communicate()
53 if proc.returncode != 0 and len(serr) > 0:
54 foutput = serr.decode("utf-8")
55 else:
56 foutput = rawout.decode("utf-8")
58 proc.stdout.close()
59 proc.stderr.close()
60 return proc, foutput
63 def prepare_env(cmdlist):
64 """
65 Execute the setup/teardown commands for a test case. Optionally
66 terminate test execution if the command fails.
67 """
68 for cmdinfo in cmdlist:
69 if (type(cmdinfo) == list):
70 exit_codes = cmdinfo[1:]
71 cmd = cmdinfo[0]
72 else:
73 exit_codes = [0]
74 cmd = cmdinfo
76 if (len(cmd) == 0):
77 continue
79 (proc, foutput) = exec_cmd(cmd)
81 if proc.returncode not in exit_codes:
82 print
83 print("Could not execute:")
84 print(cmd)
85 print("\nError message:")
86 print(foutput)
87 print("\nAborting test run.")
88 ns_destroy()
89 exit(1)
92 def test_runner(filtered_tests, args):
93 """
94 Driver function for the unit tests.
96 Prints information about the tests being run, executes the setup and
97 teardown commands and the command under test itself. Also determines
98 success/failure based on the information in the test case and generates
99 TAP output accordingly.
101 testlist = filtered_tests
102 tcount = len(testlist)
103 index = 1
104 tap = str(index) + ".." + str(tcount) + "\n"
106 for tidx in testlist:
107 result = True
108 tresult = ""
109 if "flower" in tidx["category"] and args.device == None:
110 continue
111 print("Test " + tidx["id"] + ": " + tidx["name"])
112 prepare_env(tidx["setup"])
113 (p, procout) = exec_cmd(tidx["cmdUnderTest"])
114 exit_code = p.returncode
116 if (exit_code != int(tidx["expExitCode"])):
117 result = False
118 print("exit:", exit_code, int(tidx["expExitCode"]))
119 print(procout)
120 else:
121 match_pattern = re.compile(str(tidx["matchPattern"]), re.DOTALL)
122 (p, procout) = exec_cmd(tidx["verifyCmd"])
123 match_index = re.findall(match_pattern, procout)
124 if len(match_index) != int(tidx["matchCount"]):
125 result = False
127 if result == True:
128 tresult += "ok "
129 else:
130 tresult += "not ok "
131 tap += tresult + str(index) + " " + tidx["id"] + " " + tidx["name"] + "\n"
133 if result == False:
134 tap += procout
136 prepare_env(tidx["teardown"])
137 index += 1
139 return tap
142 def ns_create():
144 Create the network namespace in which the tests will be run and set up
145 the required network devices for it.
147 if (USE_NS):
148 cmd = 'ip netns add $NS'
149 exec_cmd(cmd, False)
150 cmd = 'ip link add $DEV0 type veth peer name $DEV1'
151 exec_cmd(cmd, False)
152 cmd = 'ip link set $DEV1 netns $NS'
153 exec_cmd(cmd, False)
154 cmd = 'ip link set $DEV0 up'
155 exec_cmd(cmd, False)
156 cmd = 'ip -n $NS link set $DEV1 up'
157 exec_cmd(cmd, False)
158 cmd = 'ip link set $DEV2 netns $NS'
159 exec_cmd(cmd, False)
160 cmd = 'ip -n $NS link set $DEV2 up'
161 exec_cmd(cmd, False)
164 def ns_destroy():
166 Destroy the network namespace for testing (and any associated network
167 devices as well)
169 if (USE_NS):
170 cmd = 'ip netns delete $NS'
171 exec_cmd(cmd, False)
174 def has_blank_ids(idlist):
176 Search the list for empty ID fields and return true/false accordingly.
178 return not(all(k for k in idlist))
181 def load_from_file(filename):
183 Open the JSON file containing the test cases and return them
184 as list of ordered dictionary objects.
186 try:
187 with open(filename) as test_data:
188 testlist = json.load(test_data, object_pairs_hook=OrderedDict)
189 except json.JSONDecodeError as jde:
190 print('IGNORING test case file {}\n\tBECAUSE: {}'.format(filename, jde))
191 testlist = list()
192 else:
193 idlist = get_id_list(testlist)
194 if (has_blank_ids(idlist)):
195 for k in testlist:
196 k['filename'] = filename
197 return testlist
200 def args_parse():
202 Create the argument parser.
204 parser = argparse.ArgumentParser(description='Linux TC unit tests')
205 return parser
208 def set_args(parser):
210 Set the command line arguments for tdc.
212 parser.add_argument('-p', '--path', type=str,
213 help='The full path to the tc executable to use')
214 parser.add_argument('-c', '--category', type=str, nargs='?', const='+c',
215 help='Run tests only from the specified category, or if no category is specified, list known categories.')
216 parser.add_argument('-f', '--file', type=str,
217 help='Run tests from the specified file')
218 parser.add_argument('-l', '--list', type=str, nargs='?', const="++", metavar='CATEGORY',
219 help='List all test cases, or those only within the specified category')
220 parser.add_argument('-s', '--show', type=str, nargs=1, metavar='ID', dest='showID',
221 help='Display the test case with specified id')
222 parser.add_argument('-e', '--execute', type=str, nargs=1, metavar='ID',
223 help='Execute the single test case with specified ID')
224 parser.add_argument('-i', '--id', action='store_true', dest='gen_id',
225 help='Generate ID numbers for new test cases')
226 parser.add_argument('-d', '--device',
227 help='Execute the test case in flower category')
228 return parser
231 def check_default_settings(args):
233 Process any arguments overriding the default settings, and ensure the
234 settings are correct.
236 # Allow for overriding specific settings
237 global NAMES
239 if args.path != None:
240 NAMES['TC'] = args.path
241 if args.device != None:
242 NAMES['DEV2'] = args.device
243 if not os.path.isfile(NAMES['TC']):
244 print("The specified tc path " + NAMES['TC'] + " does not exist.")
245 exit(1)
248 def get_id_list(alltests):
250 Generate a list of all IDs in the test cases.
252 return [x["id"] for x in alltests]
255 def check_case_id(alltests):
257 Check for duplicate test case IDs.
259 idl = get_id_list(alltests)
260 return [x for x in idl if idl.count(x) > 1]
263 def does_id_exist(alltests, newid):
265 Check if a given ID already exists in the list of test cases.
267 idl = get_id_list(alltests)
268 return (any(newid == x for x in idl))
271 def generate_case_ids(alltests):
273 If a test case has a blank ID field, generate a random hex ID for it
274 and then write the test cases back to disk.
276 import random
277 for c in alltests:
278 if (c["id"] == ""):
279 while True:
280 newid = str('%04x' % random.randrange(16**4))
281 if (does_id_exist(alltests, newid)):
282 continue
283 else:
284 c['id'] = newid
285 break
287 ufilename = []
288 for c in alltests:
289 if ('filename' in c):
290 ufilename.append(c['filename'])
291 ufilename = get_unique_item(ufilename)
292 for f in ufilename:
293 testlist = []
294 for t in alltests:
295 if 'filename' in t:
296 if t['filename'] == f:
297 del t['filename']
298 testlist.append(t)
299 outfile = open(f, "w")
300 json.dump(testlist, outfile, indent=4)
301 outfile.close()
304 def get_test_cases(args):
306 If a test case file is specified, retrieve tests from that file.
307 Otherwise, glob for all json files in subdirectories and load from
308 each one.
310 import fnmatch
311 if args.file != None:
312 if not os.path.isfile(args.file):
313 print("The specified test case file " + args.file + " does not exist.")
314 exit(1)
315 flist = [args.file]
316 else:
317 flist = []
318 for root, dirnames, filenames in os.walk('tc-tests'):
319 for filename in fnmatch.filter(filenames, '*.json'):
320 flist.append(os.path.join(root, filename))
321 alltests = list()
322 for casefile in flist:
323 alltests = alltests + (load_from_file(casefile))
324 return alltests
327 def set_operation_mode(args):
329 Load the test case data and process remaining arguments to determine
330 what the script should do for this run, and call the appropriate
331 function.
333 alltests = get_test_cases(args)
335 if args.gen_id:
336 idlist = get_id_list(alltests)
337 if (has_blank_ids(idlist)):
338 alltests = generate_case_ids(alltests)
339 else:
340 print("No empty ID fields found in test files.")
341 exit(0)
343 duplicate_ids = check_case_id(alltests)
344 if (len(duplicate_ids) > 0):
345 print("The following test case IDs are not unique:")
346 print(str(set(duplicate_ids)))
347 print("Please correct them before continuing.")
348 exit(1)
350 ucat = get_test_categories(alltests)
352 if args.showID:
353 show_test_case_by_id(alltests, args.showID[0])
354 exit(0)
356 if args.execute:
357 target_id = args.execute[0]
358 else:
359 target_id = ""
361 if args.category:
362 if (args.category == '+c'):
363 print("Available categories:")
364 print_sll(ucat)
365 exit(0)
366 else:
367 target_category = args.category
368 else:
369 target_category = ""
372 testcases = get_categorized_testlist(alltests, ucat)
374 if args.list:
375 if (args.list == "++"):
376 list_test_cases(alltests)
377 exit(0)
378 elif(len(args.list) > 0):
379 if (args.list not in ucat):
380 print("Unknown category " + args.list)
381 print("Available categories:")
382 print_sll(ucat)
383 exit(1)
384 list_test_cases(testcases[args.list])
385 exit(0)
387 if (os.geteuid() != 0):
388 print("This script must be run with root privileges.\n")
389 exit(1)
391 ns_create()
393 if (len(target_category) == 0):
394 if (len(target_id) > 0):
395 alltests = list(filter(lambda x: target_id in x['id'], alltests))
396 if (len(alltests) == 0):
397 print("Cannot find a test case with ID matching " + target_id)
398 exit(1)
399 catresults = test_runner(alltests, args)
400 print("All test results: " + "\n\n" + catresults)
401 elif (len(target_category) > 0):
402 if (target_category == "flower") and args.device == None:
403 print("Please specify a NIC device (-d) to run category flower")
404 exit(1)
405 if (target_category not in ucat):
406 print("Specified category is not present in this file.")
407 exit(1)
408 else:
409 catresults = test_runner(testcases[target_category], args)
410 print("Category " + target_category + "\n\n" + catresults)
412 ns_destroy()
415 def main():
417 Start of execution; set up argument parser and get the arguments,
418 and start operations.
420 parser = args_parse()
421 parser = set_args(parser)
422 (args, remaining) = parser.parse_known_args()
423 check_default_settings(args)
425 set_operation_mode(args)
427 exit(0)
430 if __name__ == "__main__":
431 main()