1 # SPDX-License-Identifier: GPL-2.0
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
12 from __future__
import annotations
13 from dataclasses
import dataclass
17 from enum
import Enum
, auto
18 from typing
import Iterable
, Iterator
, List
, Optional
, Tuple
20 from kunit_printer
import Printer
, stdout
24 A class to represent a test parsed from KTAP results. All KTAP
25 results within a test log are stored in a main Test object as
29 status : TestStatus - status of the test
30 name : str - name of the test
31 expected_count : int - expected number of subtests (0 if single
32 test case and None if unknown expected number of subtests)
33 subtests : List[Test] - list of subtests
34 log : List[str] - log of KTAP lines that correspond to the test
35 counts : TestCounts - counts of the test statuses and errors of
36 subtests or of the test itself if the test is a single
39 def __init__(self
) -> None:
40 """Creates Test object with default attributes."""
41 self
.status
= TestStatus
.TEST_CRASHED
43 self
.expected_count
= 0 # type: Optional[int]
44 self
.subtests
= [] # type: List[Test]
45 self
.log
= [] # type: List[str]
46 self
.counts
= TestCounts()
48 def __str__(self
) -> str:
49 """Returns string representation of a Test class object."""
50 return (f
'Test({self.status}, {self.name}, {self.expected_count}, '
51 f
'{self.subtests}, {self.log}, {self.counts})')
53 def __repr__(self
) -> str:
54 """Returns string representation of a Test class object."""
57 def add_error(self
, printer
: Printer
, error_message
: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self
.counts
.errors
+= 1
60 printer
.print_with_timestamp(stdout
.red('[ERROR]') + f
' Test: {self.name}: {error_message}')
62 def ok_status(self
) -> bool:
63 """Returns true if the status was ok, i.e. passed or skipped."""
64 return self
.status
in (TestStatus
.SUCCESS
, TestStatus
.SKIPPED
)
66 class TestStatus(Enum
):
67 """An enumeration class to represent the status of a test."""
73 FAILURE_TO_PARSE_TESTS
= auto()
78 Tracks the counts of statuses of all test cases and any errors within
87 def __str__(self
) -> str:
88 """Returns the string representation of a TestCounts object."""
89 statuses
= [('passed', self
.passed
), ('failed', self
.failed
),
90 ('crashed', self
.crashed
), ('skipped', self
.skipped
),
91 ('errors', self
.errors
)]
92 return f
'Ran {self.total()} tests: ' + \
93 ', '.join(f
'{s}: {n}' for s
, n
in statuses
if n
> 0)
95 def total(self
) -> int:
96 """Returns the total number of test cases within a test
97 object, where a test case is a test with no subtests.
99 return (self
.passed
+ self
.failed
+ self
.crashed
+
102 def add_subtest_counts(self
, counts
: TestCounts
) -> None:
104 Adds the counts of another TestCounts object to the current
105 TestCounts object. Used to add the counts of a subtest to the
109 counts - a different TestCounts object whose counts
110 will be added to the counts of the TestCounts object
112 self
.passed
+= counts
.passed
113 self
.failed
+= counts
.failed
114 self
.crashed
+= counts
.crashed
115 self
.skipped
+= counts
.skipped
116 self
.errors
+= counts
.errors
118 def get_status(self
) -> TestStatus
:
119 """Returns the aggregated status of a Test using test
122 if self
.total() == 0:
123 return TestStatus
.NO_TESTS
125 # Crashes should take priority.
126 return TestStatus
.TEST_CRASHED
128 return TestStatus
.FAILURE
130 # No failures or crashes, looks good!
131 return TestStatus
.SUCCESS
132 # We have only skipped tests.
133 return TestStatus
.SKIPPED
135 def add_status(self
, status
: TestStatus
) -> None:
136 """Increments the count for `status`."""
137 if status
== TestStatus
.SUCCESS
:
139 elif status
== TestStatus
.FAILURE
:
141 elif status
== TestStatus
.SKIPPED
:
143 elif status
!= TestStatus
.NO_TESTS
:
148 A class to represent the lines of kernel output.
149 Provides a lazy peek()/pop() interface over an iterator of
152 _lines
: Iterator
[Tuple
[int, str]]
153 _next
: Tuple
[int, str]
157 def __init__(self
, lines
: Iterator
[Tuple
[int, str]]):
158 """Creates a new LineStream that wraps the given iterator."""
161 self
._need
_next
= True
164 def _get_next(self
) -> None:
165 """Advances the LineSteam to the next line, if necessary."""
166 if not self
._need
_next
:
169 self
._next
= next(self
._lines
)
170 except StopIteration:
173 self
._need
_next
= False
175 def peek(self
) -> str:
176 """Returns the current line, without advancing the LineStream.
181 def pop(self
) -> str:
182 """Returns the current line and advances the LineStream to
187 raise ValueError(f
'LineStream: going past EOF, last line was {s}')
188 self
._need
_next
= True
191 def __bool__(self
) -> bool:
192 """Returns True if stream has more lines."""
194 return not self
._done
196 # Only used by kunit_tool_test.py.
197 def __iter__(self
) -> Iterator
[str]:
198 """Empties all lines stored in LineStream object into
199 Iterator object and returns the Iterator object.
204 def line_number(self
) -> int:
205 """Returns the line number of the current line."""
209 # Parsing helper methods:
211 KTAP_START
= re
.compile(r
'\s*KTAP version ([0-9]+)$')
212 TAP_START
= re
.compile(r
'\s*TAP version ([0-9]+)$')
213 KTAP_END
= re
.compile(r
'\s*(List of all partitions:|'
214 'Kernel panic - not syncing: VFS:|reboot: System halted)')
215 EXECUTOR_ERROR
= re
.compile(r
'\s*kunit executor: (.*)$')
217 def extract_tap_lines(kernel_output
: Iterable
[str]) -> LineStream
:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output
: Iterable
[str]) \
220 -> Iterator
[Tuple
[int, str]]:
223 for line
in kernel_output
:
225 line
= line
.rstrip() # remove trailing \n
226 if not started
and KTAP_START
.search(line
):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
230 line
.split('KTAP version')[0])
232 yield line_num
, line
[prefix_len
:]
233 elif not started
and TAP_START
.search(line
):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len
= len(line
.split('TAP version')[0])
238 yield line_num
, line
[prefix_len
:]
239 elif started
and KTAP_END
.search(line
):
240 # stop extracting KTAP lines
243 # remove the prefix, if any.
244 line
= line
[prefix_len
:]
246 elif EXECUTOR_ERROR
.search(line
):
248 return LineStream(lines
=isolate_ktap_output(kernel_output
))
251 TAP_VERSIONS
= [13, 14]
253 def check_version(version_num
: int, accepted_versions
: List
[int],
254 version_type
: str, test
: Test
, printer
: Printer
) -> None:
256 Adds error to test object if version number is too high or too
260 version_num - The inputted version number from the parsed KTAP or TAP
262 accepted_version - List of accepted KTAP or TAP versions
263 version_type - 'KTAP' or 'TAP' depending on the type of
265 test - Test object for current test being parsed
266 printer - Printer object to output error
268 if version_num
< min(accepted_versions
):
269 test
.add_error(printer
, f
'{version_type} version lower than expected!')
270 elif version_num
> max(accepted_versions
):
271 test
.add_error(printer
, f
'{version_type} version higer than expected!')
273 def parse_ktap_header(lines
: LineStream
, test
: Test
, printer
: Printer
) -> bool:
275 Parses KTAP/TAP header line and checks version number.
276 Returns False if fails to parse KTAP/TAP header line.
279 - 'KTAP version [version number]'
280 - 'TAP version [version number]'
283 lines - LineStream of KTAP output to parse
284 test - Test object for current test being parsed
285 printer - Printer object to output results
288 True if successfully parsed KTAP/TAP header line
290 ktap_match
= KTAP_START
.match(lines
.peek())
291 tap_match
= TAP_START
.match(lines
.peek())
293 version_num
= int(ktap_match
.group(1))
294 check_version(version_num
, KTAP_VERSIONS
, 'KTAP', test
, printer
)
296 version_num
= int(tap_match
.group(1))
297 check_version(version_num
, TAP_VERSIONS
, 'TAP', test
, printer
)
303 TEST_HEADER
= re
.compile(r
'^\s*# Subtest: (.*)$')
305 def parse_test_header(lines
: LineStream
, test
: Test
) -> bool:
307 Parses test header and stores test name in test object.
308 Returns False if fails to parse test header line.
311 - '# Subtest: [test name]'
314 lines - LineStream of KTAP output to parse
315 test - Test object for current test being parsed
318 True if successfully parsed test header line
320 match
= TEST_HEADER
.match(lines
.peek())
323 test
.name
= match
.group(1)
327 TEST_PLAN
= re
.compile(r
'^\s*1\.\.([0-9]+)')
329 def parse_test_plan(lines
: LineStream
, test
: Test
) -> bool:
331 Parses test plan line and stores the expected number of subtests in
332 test object. Reports an error if expected count is 0.
333 Returns False and sets expected_count to None if there is no valid test
337 - '1..[number of subtests]'
340 lines - LineStream of KTAP output to parse
341 test - Test object for current test being parsed
344 True if successfully parsed test plan line
346 match
= TEST_PLAN
.match(lines
.peek())
348 test
.expected_count
= None
350 expected_count
= int(match
.group(1))
351 test
.expected_count
= expected_count
355 TEST_RESULT
= re
.compile(r
'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
357 TEST_RESULT_SKIP
= re
.compile(r
'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
359 def peek_test_name_match(lines
: LineStream
, test
: Test
) -> bool:
361 Matches current line with the format of a test result line and checks
362 if the name matches the name of the current test.
363 Returns False if fails to match format or name.
366 - '[ok|not ok] [test number] [-] [test name] [optional skip
370 lines - LineStream of KTAP output to parse
371 test - Test object for current test being parsed
374 True if matched a test result line and the name matching the
378 match
= TEST_RESULT
.match(line
)
381 name
= match
.group(4)
382 return name
== test
.name
384 def parse_test_result(lines
: LineStream
, test
: Test
,
385 expected_num
: int, printer
: Printer
) -> bool:
387 Parses test result line and stores the status and name in the test
388 object. Reports an error if the test number does not match expected
390 Returns False if fails to parse test result line.
392 Note that the SKIP directive is the only direction that causes a
396 - '[ok|not ok] [test number] [-] [test name] [optional skip
400 lines - LineStream of KTAP output to parse
401 test - Test object for current test being parsed
402 expected_num - expected test number for current test
403 printer - Printer object to output results
406 True if successfully parsed a test result line.
409 match
= TEST_RESULT
.match(line
)
410 skip_match
= TEST_RESULT_SKIP
.match(line
)
412 # Check if line matches test result line format
417 # Set name of test object
419 test
.name
= skip_match
.group(4)
421 test
.name
= match
.group(4)
424 num
= int(match
.group(2))
425 if num
!= expected_num
:
426 test
.add_error(printer
, f
'Expected test number {expected_num} but found {num}')
428 # Set status of test object
429 status
= match
.group(1)
431 test
.status
= TestStatus
.SKIPPED
433 test
.status
= TestStatus
.SUCCESS
435 test
.status
= TestStatus
.FAILURE
438 def parse_diagnostic(lines
: LineStream
) -> List
[str]:
440 Parse lines that do not match the format of a test result line or
441 test header line and returns them in list.
443 Line formats that are not parsed:
444 - '# Subtest: [test name]'
445 - '[ok|not ok] [test number] [-] [test name] [optional skip
447 - 'KTAP version [version number]'
450 lines - LineStream of KTAP output to parse
453 Log of diagnostic lines
455 log
= [] # type: List[str]
456 non_diagnostic_lines
= [TEST_RESULT
, TEST_HEADER
, KTAP_START
, TAP_START
, TEST_PLAN
]
457 while lines
and not any(re
.match(lines
.peek())
458 for re
in non_diagnostic_lines
):
459 log
.append(lines
.pop())
463 # Printing helper methods:
467 def format_test_divider(message
: str, len_message
: int) -> str:
469 Returns string with message centered in fixed width divider.
472 '===================== message example ====================='
475 message - message to be centered in divider line
476 len_message - length of the message to be printed such that
477 any characters of the color codes are not counted
480 String containing message centered in fixed width divider
482 default_count
= 3 # default number of dashes
483 len_1
= default_count
484 len_2
= default_count
485 difference
= len(DIVIDER
) - len_message
- 2 # 2 spaces added
487 # calculate number of dashes for each side of the divider
488 len_1
= int(difference
/ 2)
489 len_2
= difference
- len_1
490 return ('=' * len_1
) + f
' {message} ' + ('=' * len_2
)
492 def print_test_header(test
: Test
, printer
: Printer
) -> None:
494 Prints test header with test name and optionally the expected number
498 '=================== example (2 subtests) ==================='
501 test - Test object representing current test being printed
502 printer - Printer object to output results
506 # Add a leading space before the subtest counts only if a test name
507 # is provided using a "# Subtest" header line.
509 if test
.expected_count
:
510 if test
.expected_count
== 1:
511 message
+= '(1 subtest)'
513 message
+= f
'({test.expected_count} subtests)'
514 printer
.print_with_timestamp(format_test_divider(message
, len(message
)))
516 def print_log(log
: Iterable
[str], printer
: Printer
) -> None:
517 """Prints all strings in saved log for test in yellow."""
518 formatted
= textwrap
.dedent('\n'.join(log
))
519 for line
in formatted
.splitlines():
520 printer
.print_with_timestamp(printer
.yellow(line
))
522 def format_test_result(test
: Test
, printer
: Printer
) -> str:
524 Returns string with formatted test result with colored status and test
531 test - Test object representing current test being printed
532 printer - Printer object to output results
535 String containing formatted test result
537 if test
.status
== TestStatus
.SUCCESS
:
538 return printer
.green('[PASSED] ') + test
.name
539 if test
.status
== TestStatus
.SKIPPED
:
540 return printer
.yellow('[SKIPPED] ') + test
.name
541 if test
.status
== TestStatus
.NO_TESTS
:
542 return printer
.yellow('[NO TESTS RUN] ') + test
.name
543 if test
.status
== TestStatus
.TEST_CRASHED
:
544 print_log(test
.log
, printer
)
545 return stdout
.red('[CRASHED] ') + test
.name
546 print_log(test
.log
, printer
)
547 return printer
.red('[FAILED] ') + test
.name
549 def print_test_result(test
: Test
, printer
: Printer
) -> None:
551 Prints result line with status of test.
557 test - Test object representing current test being printed
558 printer - Printer object
560 printer
.print_with_timestamp(format_test_result(test
, printer
))
562 def print_test_footer(test
: Test
, printer
: Printer
) -> None:
564 Prints test footer with status of test.
567 '===================== [PASSED] example ====================='
570 test - Test object representing current test being printed
571 printer - Printer object to output results
573 message
= format_test_result(test
, printer
)
574 printer
.print_with_timestamp(format_test_divider(message
,
575 len(message
) - printer
.color_len()))
577 def print_test(test
: Test
, failed_only
: bool, printer
: Printer
) -> None:
579 Prints Test object to given printer. For a child test, the result line is
580 printed. For a parent test, the test header, all child test results, and
581 the test footer are all printed. If failed_only is true, only failed/crashed
582 tests will be printed.
585 test - Test object to print
586 failed_only - True if only failed/crashed tests should be printed.
587 printer - Printer object to output results
589 if test
.name
== "main":
590 printer
.print_with_timestamp(DIVIDER
)
591 for subtest
in test
.subtests
:
592 print_test(subtest
, failed_only
, printer
)
593 printer
.print_with_timestamp(DIVIDER
)
594 elif test
.subtests
!= []:
595 if not failed_only
or not test
.ok_status():
596 print_test_header(test
, printer
)
597 for subtest
in test
.subtests
:
598 print_test(subtest
, failed_only
, printer
)
599 print_test_footer(test
, printer
)
601 if not failed_only
or not test
.ok_status():
602 print_test_result(test
, printer
)
604 def _summarize_failed_tests(test
: Test
) -> str:
605 """Tries to summarize all the failing subtests in `test`."""
607 def failed_names(test
: Test
, parent_name
: str) -> List
[str]:
608 # Note: we use 'main' internally for the top-level test.
609 if not parent_name
or parent_name
== 'main':
610 full_name
= test
.name
612 full_name
= parent_name
+ '.' + test
.name
614 if not test
.subtests
: # this is a leaf node
617 # If all the children failed, just say this subtest failed.
618 # Don't summarize it down "the top-level test failed", though.
619 failed_subtests
= [sub
for sub
in test
.subtests
if not sub
.ok_status()]
620 if parent_name
and len(failed_subtests
) == len(test
.subtests
):
623 all_failures
= [] # type: List[str]
624 for t
in failed_subtests
:
625 all_failures
.extend(failed_names(t
, full_name
))
628 failures
= failed_names(test
, '')
629 # If there are too many failures, printing them out will just be noisy.
630 if len(failures
) > 10: # this is an arbitrary limit
633 return 'Failures: ' + ', '.join(failures
)
636 def print_summary_line(test
: Test
, printer
: Printer
) -> None:
638 Prints summary line of test object. Color of line is dependent on
639 status of test. Color is green if test passes, yellow if test is
640 skipped, and red if the test fails or crashes. Summary line contains
641 counts of the statuses of the tests subtests or the test itself if it
645 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
648 test - Test object representing current test being printed
649 printer - Printer object to output results
651 if test
.status
== TestStatus
.SUCCESS
:
653 elif test
.status
in (TestStatus
.SKIPPED
, TestStatus
.NO_TESTS
):
654 color
= stdout
.yellow
657 printer
.print_with_timestamp(color(f
'Testing complete. {test.counts}'))
659 # Summarize failures that might have gone off-screen since we had a lot
660 # of tests (arbitrarily defined as >=100 for now).
661 if test
.ok_status() or test
.counts
.total() < 100:
663 summarized
= _summarize_failed_tests(test
)
666 printer
.print_with_timestamp(color(summarized
))
670 def bubble_up_test_results(test
: Test
) -> None:
672 If the test has subtests, add the test counts of the subtests to the
673 test and check if any of the tests crashed and if so set the test
674 status to crashed. Otherwise if the test has no subtests add the
675 status of the test to the test counts.
678 test - Test object for current test being parsed
680 subtests
= test
.subtests
684 counts
.add_subtest_counts(t
.counts
)
685 if counts
.total() == 0:
686 counts
.add_status(status
)
687 elif test
.counts
.get_status() == TestStatus
.TEST_CRASHED
:
688 test
.status
= TestStatus
.TEST_CRASHED
690 def parse_test(lines
: LineStream
, expected_num
: int, log
: List
[str], is_subtest
: bool, printer
: Printer
) -> Test
:
692 Finds next test to parse in LineStream, creates new Test object,
693 parses any subtests of the test, populates Test object with all
694 information (status, name) about the test and the Test objects for
695 any subtests, and then returns the Test object. The method accepts
696 three formats of tests:
698 Accepted test formats:
700 - Main KTAP/TAP header
708 - Subtest header (must include either the KTAP version line or
709 "# Subtest" header line)
711 Example (preferred format with both KTAP version line and
720 Example (only "# Subtest" line):
727 Example (only KTAP version line, compliant with KTAP v1 spec):
741 lines - LineStream of KTAP output to parse
742 expected_num - expected test number for test to be parsed
743 log - list of strings containing any preceding diagnostic lines
744 corresponding to the current test
745 is_subtest - boolean indicating whether test is a subtest
746 printer - Printer object to output results
749 Test object populated with characteristics and any subtests
754 # Parse any errors prior to parsing tests
755 err_log
= parse_diagnostic(lines
)
756 test
.log
.extend(err_log
)
759 # If parsing the main/top-level test, parse KTAP version line and
762 ktap_line
= parse_ktap_header(lines
, test
, printer
)
763 test
.log
.extend(parse_diagnostic(lines
))
764 parse_test_plan(lines
, test
)
767 # If not the main test, attempt to parse a test header containing
768 # the KTAP version line and/or subtest header line
769 ktap_line
= parse_ktap_header(lines
, test
, printer
)
770 subtest_line
= parse_test_header(lines
, test
)
771 parent_test
= (ktap_line
or subtest_line
)
773 # If KTAP version line and/or subtest header is found, attempt
774 # to parse test plan and print test header
775 test
.log
.extend(parse_diagnostic(lines
))
776 parse_test_plan(lines
, test
)
777 print_test_header(test
, printer
)
778 expected_count
= test
.expected_count
781 while parent_test
and (expected_count
is None or test_num
<= expected_count
):
782 # Loop to parse any subtests.
783 # Break after parsing expected number of tests or
784 # if expected number of tests is unknown break when test
785 # result line with matching name to subtest header is found
786 # or no more lines in stream.
787 sub_log
= parse_diagnostic(lines
)
789 if not lines
or (peek_test_name_match(lines
, test
) and
791 if expected_count
and test_num
<= expected_count
:
792 # If parser reaches end of test before
793 # parsing expected number of subtests, print
794 # crashed subtest and record error
795 test
.add_error(printer
, 'missing expected subtest!')
796 sub_test
.log
.extend(sub_log
)
797 test
.counts
.add_status(
798 TestStatus
.TEST_CRASHED
)
799 print_test_result(sub_test
, printer
)
801 test
.log
.extend(sub_log
)
804 sub_test
= parse_test(lines
, test_num
, sub_log
, True, printer
)
805 subtests
.append(sub_test
)
807 test
.subtests
= subtests
809 # If not main test, look for test result line
810 test
.log
.extend(parse_diagnostic(lines
))
811 if test
.name
!= "" and not peek_test_name_match(lines
, test
):
812 test
.add_error(printer
, 'missing subtest result line!')
814 parse_test_result(lines
, test
, expected_num
, printer
)
816 # Check for there being no subtests within parent test
817 if parent_test
and len(subtests
) == 0:
818 # Don't override a bad status if this test had one reported.
819 # Assumption: no subtests means CRASHED is from Test.__init__()
820 if test
.status
in (TestStatus
.TEST_CRASHED
, TestStatus
.SUCCESS
):
821 print_log(test
.log
, printer
)
822 test
.status
= TestStatus
.NO_TESTS
823 test
.add_error(printer
, '0 tests run!')
825 # Add statuses to TestCounts attribute in Test object
826 bubble_up_test_results(test
)
827 if parent_test
and is_subtest
:
828 # If test has subtests and is not the main test object, print
830 print_test_footer(test
, printer
)
832 print_test_result(test
, printer
)
835 def parse_run_tests(kernel_output
: Iterable
[str], printer
: Printer
) -> Test
:
837 Using kernel output, extract KTAP lines, parse the lines for test
838 results and print condensed test results and summary line.
841 kernel_output - Iterable object contains lines of kernel output
842 printer - Printer object to output results
845 Test - the main test object with all subtests.
847 printer
.print_with_timestamp(DIVIDER
)
848 lines
= extract_tap_lines(kernel_output
)
851 test
.name
= '<missing>'
852 test
.add_error(printer
, 'Could not find any KTAP output. Did any KUnit tests run?')
853 test
.status
= TestStatus
.FAILURE_TO_PARSE_TESTS
855 test
= parse_test(lines
, 0, [], False, printer
)
856 if test
.status
!= TestStatus
.NO_TESTS
:
857 test
.status
= test
.counts
.get_status()
858 printer
.print_with_timestamp(DIVIDER
)