3 ## This file is part of the sigrok-test project.
5 ## Copyright (C) 2013 Bert Vermeulen <bert@biot.com>
7 ## This program is free software: you can redistribute it and/or modify
8 ## it under the terms of the GNU General Public License as published by
9 ## the Free Software Foundation, either version 3 of the License, or
10 ## (at your option) any later version.
12 ## This program is distributed in the hope that it will be useful,
13 ## but WITHOUT ANY WARRANTY; without even the implied warranty of
14 ## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 ## GNU General Public License for more details.
17 ## You should have received a copy of the GNU General Public License
18 ## along with this program. If not, see <http://www.gnu.org/licenses/>.
24 from getopt import getopt
25 from tempfile import mkstemp
26 from subprocess import Popen, PIPE
27 from difflib import unified_diff
28 from hashlib import md5
29 from shutil import copy
35 class E_syntax(Exception):
37 class E_badline(Exception):
40 def INFO(msg, end='\n'):
52 print(msg, file=sys.stderr)
57 print(msg.strip() + '\n')
58 print("""Usage: testpd [-dvalsrfcR] [<test1> <test2> ...]
65 -f Fix failed test(s) / create initial output for new test(s)
66 -c Report decoder code coverage
67 -R <directory> Save test reports to <directory>
68 <test> Protocol decoder name ("i2c") and optionally test name ("i2c/rtc")""")
73 if 'pdlist' not in tc or not tc['pdlist']:
74 return("No protocol decoders")
75 if 'input' not in tc or not tc['input']:
77 if 'output' not in tc or not tc['output']:
79 for op in tc['output']:
81 return("No match in output")
86 def parse_testfile(path, pd, tc, op_type, op_class):
87 DBG("Opening '%s'" % path)
89 for line in open(path).read().split('\n'):
92 if len(line) == 0 or line[0] == "#":
95 if not tclist and f[0] != "test":
109 elif key == 'protocol-decoder':
120 # Always needs <key> <value>
126 opt, val = b.split('=')
132 pd_spec['channels'].append([opt, val])
134 pd_spec['options'].append([opt, val])
135 elif a == 'initial_pin':
140 pd_spec['initial_pins'].append([opt, val])
143 tclist[-1]['pdlist'].append(pd_spec)
147 tclist[-1]['stack'] = f
151 tclist[-1]['input'] = f[0]
152 elif key == 'output':
159 # Always needs <key> <value>
169 tclist[-1]['output'].append(op_spec)
172 except E_badline as e:
173 ERR("Invalid syntax in %s: line '%s'" % (path, line))
175 except E_syntax as e:
176 ERR("Unable to parse %s: unknown line '%s'" % (path, line))
179 # If a specific testcase was requested, keep only that one.
186 # ...and a specific output type
187 if op_type is not None:
189 for op in target_tc['output']:
190 if op['type'] == op_type:
191 # ...and a specific output class
192 if op_class is None or ('class' in op and op['class'] == op_class):
193 target_oplist.append(op)
194 DBG("match on [%s]" % str(op))
195 target_tc['output'] = target_oplist
196 if target_tc is None:
201 error = check_tclist(t)
203 ERR("Error in %s: %s" % (path, error))
209 def get_tests(testnames):
211 for testspec in testnames:
212 # Optional testspec in the form pd/testcase/type/class
213 tc = op_type = op_class = None
214 ts = testspec.strip("/").split("/")
223 path = os.path.join(tests_dir, pd)
224 if not os.path.isdir(path):
225 # User specified non-existent PD
226 raise Exception("%s not found." % path)
227 path = os.path.join(tests_dir, pd, "test.conf")
228 if not os.path.exists(path):
229 # PD doesn't have any tests yet
231 tests[pd].append(parse_testfile(path, pd, tc, op_type, op_class))
236 def diff_text(f1, f2):
237 t1 = open(f1).readlines()
238 t2 = open(f2).readlines()
239 diff = list(unified_diff(t1, t2))
240 diff = diff[2:] # Strip two from/to filename lines with "+++"/"---".
241 diff = [d.strip() for d in diff if d[0] in ('+', '-')]
245 def compare_binary(f1, f2):
247 h1.update(open(f1, 'rb').read())
249 h2.update(open(f2, 'rb').read())
250 if h1.digest() == h2.digest():
253 result = ["Binary output does not match."]
258 # runtc's stdout can have lines like:
259 # coverage: lines=161 missed=2 coverage=99%
260 def parse_stats(text):
262 for line in text.strip().split('\n'):
263 fields = line.split()
264 key = fields.pop(0).strip(':')
267 stats[key].append({})
270 stats[key][-1][k] = v
275 # take result set of all tests in a PD, and summarize which lines
276 # were not covered by any of the tests.
277 def coverage_sum(cvglist):
281 for record in cvglist:
282 lines = int(record['lines'])
283 missed += int(record['missed'])
284 if 'missed_lines' not in record:
286 for linespec in record['missed_lines'].split(','):
287 if linespec not in missed_lines:
288 missed_lines[linespec] = 1
290 missed_lines[linespec] += 1
292 # keep only those lines that didn't show up in every non-summary record
294 for linespec in missed_lines:
295 if missed_lines[linespec] != len(cvglist):
297 final_missed.append(linespec)
299 return lines, final_missed
302 def run_tests(tests, fix=False):
305 cmd = [os.path.join(runtc_dir, 'runtc')]
307 fd, coverage = mkstemp()
309 cmd.extend(['-c', coverage])
312 for pd in sorted(tests.keys()):
314 for tclist in tests[pd]:
319 # Set up PD stack for this test.
320 for spd in tc['pdlist']:
321 args.extend(['-P', spd['name']])
322 for label, channel in spd['channels']:
323 args.extend(['-p', "%s=%d" % (label, channel)])
324 for option, value in spd['options']:
325 args.extend(['-o', "%s=%s" % (option, value)])
326 for label, initial_pin in spd['initial_pins']:
327 args.extend(['-N', "%s=%d" % (label, initial_pin)])
328 args.extend(['-i', os.path.join(dumps_dir, tc['input'])])
329 for op in tc['output']:
330 name = "%s/%s/%s" % (pd, tc['name'], op['type'])
331 opargs = ['-O', "%s:%s" % (op['pd'], op['type'])]
333 opargs[-1] += ":%s" % op['class']
334 name += "/%s" % op['class']
336 dots = '.' * (77 - len(name) - 2)
337 INFO("%s %s " % (name, dots), end='')
342 fd, outfile = mkstemp()
344 opargs.extend(['-f', outfile])
345 DBG("Running %s" % (' '.join(args + opargs)))
346 p = Popen(args + opargs, stdout=PIPE, stderr=PIPE)
347 stdout, stderr = p.communicate()
349 # statistics and coverage data on stdout
350 results[-1].update(parse_stats(stdout.decode('utf-8')))
352 results[-1]['error'] = stderr.decode('utf-8').strip()
354 elif p.returncode != 0:
355 # runtc indicated an error, but didn't output a
356 # message on stderr about it
357 results[-1]['error'] = "Unknown error: runtc %d" % p.returncode
358 if 'error' not in results[-1]:
359 matchfile = os.path.join(tests_dir, op['pd'], op['match'])
360 DBG("Comparing with %s" % matchfile)
362 diff = diff_error = None
363 if op['type'] in ('annotation', 'python'):
364 diff = diff_text(matchfile, outfile)
365 elif op['type'] == 'binary':
366 diff = compare_binary(matchfile, outfile)
368 diff = ["Unsupported output type '%s'." % op['type']]
369 except Exception as e:
372 if diff or diff_error:
373 copy(outfile, matchfile)
374 DBG("Wrote %s" % matchfile)
377 results[-1]['diff'] = diff
378 elif diff_error is not None:
380 except Exception as e:
381 results[-1]['error'] = str(e)
384 results[-1]['coverage_report'] = coverage
386 if op['type'] == 'exception' and 'error' in results[-1]:
387 # filter out the exception we were looking for
388 reg = "^Error: srd: %s:" % op['match']
389 if re.match(reg, results[-1]['error']):
390 # found it, not an error
391 results[-1].pop('error')
394 if 'diff' in results[-1]:
395 INFO("Output mismatch")
396 elif 'error' in results[-1]:
397 error = results[-1]['error']
399 error = error[:17] + '...'
401 elif 'coverage' in results[-1]:
402 # report coverage of this PD
403 for record in results[-1]['coverage']:
404 # but not others used in the stack
405 # as part of the test.
406 if record['scope'] == pd:
407 INFO(record['coverage'])
411 gen_report(results[-1])
414 # only keep track of coverage records for this PD,
415 # not others in the stack just used for testing.
416 for cvg in results[-1]['coverage']:
417 if cvg['scope'] == pd:
419 if opt_coverage and len(pd_cvg) > 1:
420 # report total coverage of this PD, across all the tests
421 # that were done on it.
422 total_lines, missed_lines = coverage_sum(pd_cvg)
423 pd_coverage = 100 - (float(len(missed_lines)) / total_lines * 100)
425 dots = '.' * (54 - len(pd) - 2)
426 INFO("%s total %s %d%%" % (pd, dots, pd_coverage))
428 # generate a missing lines list across all the files in
431 for entry in missed_lines:
432 filename, line = entry.split(':')
433 if filename not in files:
435 files[filename].append(line)
437 for filename in sorted(files.keys()):
438 line_list = ','.join(sorted(files[filename], key=int))
439 text += "%s: %s\n" % (filename, line_list)
440 open(os.path.join(report_dir, pd + "_total"), 'w').write(text)
443 return results, errors
445 def get_run_tests_error_diff_counts(results):
446 """Get error and diff counters from run_tests() results."""
449 for result in results:
450 if 'error' in result:
457 def gen_report(result):
459 if 'error' in result:
461 out.append(result['error'])
464 out.append("Test output mismatch:")
465 out.extend(result['diff'])
467 if 'coverage_report' in result:
468 out.append(open(result['coverage_report'], 'r').read())
472 text = "Testcase: %s\n" % result['testcase']
473 text += '\n'.join(out)
478 filename = result['testcase'].replace('/', '_')
479 open(os.path.join(report_dir, filename), 'w').write(text)
484 def show_tests(tests):
485 for pd in sorted(tests.keys()):
486 for tclist in tests[pd]:
488 print("Testcase: %s/%s" % (tc['pd'], tc['name']))
489 for pd in tc['pdlist']:
490 print(" Protocol decoder: %s" % pd['name'])
491 for label, channel in pd['channels']:
492 print(" Channel %s=%d" % (label, channel))
493 for option, value in pd['options']:
494 print(" Option %s=%s" % (option, value))
495 for label, initial_pin in pd['initial_pins']:
496 print(" Initial pin %s=%d" % (label, initial_pin))
498 print(" Stack: %s" % ' '.join(tc['stack']))
499 print(" Input: %s" % tc['input'])
500 for op in tc['output']:
501 print(" Output:\n Protocol decoder: %s" % op['pd'])
502 print(" Type: %s" % op['type'])
504 print(" Class: %s" % op['class'])
505 print(" Match: %s" % op['match'])
509 def list_tests(tests):
510 for pd in sorted(tests.keys()):
511 for tclist in tests[pd]:
513 for op in tc['output']:
514 line = "%s/%s/%s" % (tc['pd'], tc['name'], op['type'])
516 line += "/%s" % op['class']
525 runtc_dir = os.path.abspath(os.path.dirname(sys.argv[0]))
526 base_dir = os.path.abspath(os.path.join(os.curdir, runtc_dir, os.path.pardir))
527 dumps_dir = os.path.abspath(os.path.join(base_dir, os.path.pardir, 'sigrok-dumps'))
528 tests_dir = os.path.abspath(os.path.join(runtc_dir, 'test'))
530 if len(sys.argv) == 1:
533 opt_all = opt_run = opt_show = opt_list = opt_fix = opt_coverage = False
536 opts, args = getopt(sys.argv[1:], "dvarslfcR:S:")
537 except Exception as e:
538 usage('error while parsing command line arguments: {}'.format(e))
539 for opt, arg in opts:
561 if opt_run and opt_show:
562 usage("Use either -s or -r, not both.")
564 usage("Specify either -a or tests, not both.")
565 if report_dir is not None and not os.path.isdir(report_dir):
566 usage("%s is not a directory" % report_dir)
571 testlist = get_tests(args)
572 elif opt_all or opt_list:
573 testlist = get_tests(os.listdir(tests_dir))
575 usage("Specify either -a or tests.")
578 if not os.path.isdir(dumps_dir):
579 ERR("Could not find sigrok-dumps repository at %s" % dumps_dir)
581 results, errors = run_tests(testlist, fix=opt_fix)
583 errs, diffs = get_run_tests_error_diff_counts(results)
593 run_tests(testlist, fix=True)
596 except Exception as e:
597 print("Error: %s" % str(e))