1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
11 from collections
import namedtuple
12 from datetime
import datetime
13 from enum
import Enum
, auto
14 from functools
import reduce
15 from typing
import List
17 TestResult
= namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
26 return 'TestSuite(' + self
.status
+ ',' + self
.name
+ ',' + str(self
.cases
) + ')'
31 class TestCase(object):
38 return 'TestCase(' + self
.status
+ ',' + self
.name
+ ',' + str(self
.log
) + ')'
43 class TestStatus(Enum
):
49 kunit_start_re
= re
.compile(r
'^TAP version [0-9]+$')
50 kunit_end_re
= re
.compile('List of all partitions:')
52 def isolate_kunit_output(kernel_output
):
54 for line
in kernel_output
:
55 if kunit_start_re
.match(line
):
58 elif kunit_end_re
.match(line
):
63 def raw_output(kernel_output
):
64 for line
in kernel_output
:
72 return '\033[1;31m' + text
+ RESET
75 return '\033[1;33m' + text
+ RESET
78 return '\033[1;32m' + text
+ RESET
80 def print_with_timestamp(message
):
81 print('[%s] %s' % (datetime
.now().strftime('%H:%M:%S'), message
))
83 def format_suite_divider(message
):
84 return '======== ' + message
+ ' ========'
86 def print_suite_divider(message
):
87 print_with_timestamp(DIVIDER
)
88 print_with_timestamp(format_suite_divider(message
))
92 print_with_timestamp(m
)
94 TAP_ENTRIES
= re
.compile(r
'^(TAP|\t?ok|\t?not ok|\t?[0-9]+\.\.[0-9]+|\t?#).*$')
96 def consume_non_diagnositic(lines
: List
[str]) -> None:
97 while lines
and not TAP_ENTRIES
.match(lines
[0]):
100 def save_non_diagnositic(lines
: List
[str], test_case
: TestCase
) -> None:
101 while lines
and not TAP_ENTRIES
.match(lines
[0]):
102 test_case
.log
.append(lines
[0])
105 OkNotOkResult
= namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
107 OK_NOT_OK_SUBTEST
= re
.compile(r
'^\t(ok|not ok) [0-9]+ - (.*)$')
109 OK_NOT_OK_MODULE
= re
.compile(r
'^(ok|not ok) [0-9]+ - (.*)$')
111 def parse_ok_not_ok_test_case(lines
: List
[str],
113 expecting_test_case
: bool) -> bool:
114 save_non_diagnositic(lines
, test_case
)
116 if expecting_test_case
:
117 test_case
.status
= TestStatus
.TEST_CRASHED
122 match
= OK_NOT_OK_SUBTEST
.match(line
)
124 test_case
.log
.append(lines
.pop(0))
125 test_case
.name
= match
.group(2)
126 if test_case
.status
== TestStatus
.TEST_CRASHED
:
128 if match
.group(1) == 'ok':
129 test_case
.status
= TestStatus
.SUCCESS
131 test_case
.status
= TestStatus
.FAILURE
136 SUBTEST_DIAGNOSTIC
= re
.compile(r
'^\t# .*?: (.*)$')
137 DIAGNOSTIC_CRASH_MESSAGE
= 'kunit test case crashed!'
139 def parse_diagnostic(lines
: List
[str], test_case
: TestCase
) -> bool:
140 save_non_diagnositic(lines
, test_case
)
144 match
= SUBTEST_DIAGNOSTIC
.match(line
)
146 test_case
.log
.append(lines
.pop(0))
147 if match
.group(1) == DIAGNOSTIC_CRASH_MESSAGE
:
148 test_case
.status
= TestStatus
.TEST_CRASHED
153 def parse_test_case(lines
: List
[str], expecting_test_case
: bool) -> TestCase
:
154 test_case
= TestCase()
155 save_non_diagnositic(lines
, test_case
)
156 while parse_diagnostic(lines
, test_case
):
158 if parse_ok_not_ok_test_case(lines
, test_case
, expecting_test_case
):
163 SUBTEST_HEADER
= re
.compile(r
'^\t# Subtest: (.*)$')
165 def parse_subtest_header(lines
: List
[str]) -> str:
166 consume_non_diagnositic(lines
)
169 match
= SUBTEST_HEADER
.match(lines
[0])
172 return match
.group(1)
176 SUBTEST_PLAN
= re
.compile(r
'\t[0-9]+\.\.([0-9]+)')
178 def parse_subtest_plan(lines
: List
[str]) -> int:
179 consume_non_diagnositic(lines
)
180 match
= SUBTEST_PLAN
.match(lines
[0])
183 return int(match
.group(1))
187 def max_status(left
: TestStatus
, right
: TestStatus
) -> TestStatus
:
188 if left
== TestStatus
.TEST_CRASHED
or right
== TestStatus
.TEST_CRASHED
:
189 return TestStatus
.TEST_CRASHED
190 elif left
== TestStatus
.FAILURE
or right
== TestStatus
.FAILURE
:
191 return TestStatus
.FAILURE
192 elif left
!= TestStatus
.SUCCESS
:
194 elif right
!= TestStatus
.SUCCESS
:
197 return TestStatus
.SUCCESS
199 def parse_ok_not_ok_test_suite(lines
: List
[str], test_suite
: TestSuite
) -> bool:
200 consume_non_diagnositic(lines
)
202 test_suite
.status
= TestStatus
.TEST_CRASHED
205 match
= OK_NOT_OK_MODULE
.match(line
)
208 if match
.group(1) == 'ok':
209 test_suite
.status
= TestStatus
.SUCCESS
211 test_suite
.status
= TestStatus
.FAILURE
216 def bubble_up_errors(to_status
, status_container_list
) -> TestStatus
:
217 status_list
= map(to_status
, status_container_list
)
218 return reduce(max_status
, status_list
, TestStatus
.SUCCESS
)
220 def bubble_up_test_case_errors(test_suite
: TestSuite
) -> TestStatus
:
221 max_test_case_status
= bubble_up_errors(lambda x
: x
.status
, test_suite
.cases
)
222 return max_status(max_test_case_status
, test_suite
.status
)
224 def parse_test_suite(lines
: List
[str]) -> TestSuite
:
227 consume_non_diagnositic(lines
)
228 test_suite
= TestSuite()
229 test_suite
.status
= TestStatus
.SUCCESS
230 name
= parse_subtest_header(lines
)
233 test_suite
.name
= name
234 expected_test_case_num
= parse_subtest_plan(lines
)
235 if not expected_test_case_num
:
237 test_case
= parse_test_case(lines
, expected_test_case_num
> 0)
238 expected_test_case_num
-= 1
240 test_suite
.cases
.append(test_case
)
241 test_case
= parse_test_case(lines
, expected_test_case_num
> 0)
242 expected_test_case_num
-= 1
243 if parse_ok_not_ok_test_suite(lines
, test_suite
):
244 test_suite
.status
= bubble_up_test_case_errors(test_suite
)
247 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
250 print('failed to parse end of suite' + lines
[0])
253 TAP_HEADER
= re
.compile(r
'^TAP version 14$')
255 def parse_tap_header(lines
: List
[str]) -> bool:
256 consume_non_diagnositic(lines
)
257 if TAP_HEADER
.match(lines
[0]):
263 def bubble_up_suite_errors(test_suite_list
: List
[TestSuite
]) -> TestStatus
:
264 return bubble_up_errors(lambda x
: x
.status
, test_suite_list
)
266 def parse_test_result(lines
: List
[str]) -> TestResult
:
268 return TestResult(TestStatus
.NO_TESTS
, [], lines
)
269 consume_non_diagnositic(lines
)
270 if not parse_tap_header(lines
):
273 test_suite
= parse_test_suite(lines
)
275 test_suites
.append(test_suite
)
276 test_suite
= parse_test_suite(lines
)
277 return TestResult(bubble_up_suite_errors(test_suites
), test_suites
, lines
)
279 def parse_run_tests(kernel_output
) -> TestResult
:
283 test_result
= parse_test_result(list(isolate_kunit_output(kernel_output
)))
284 for test_suite
in test_result
.suites
:
285 if test_suite
.status
== TestStatus
.SUCCESS
:
286 print_suite_divider(green('[PASSED] ') + test_suite
.name
)
287 elif test_suite
.status
== TestStatus
.TEST_CRASHED
:
288 print_suite_divider(red('[CRASHED] ' + test_suite
.name
))
290 print_suite_divider(red('[FAILED] ') + test_suite
.name
)
291 for test_case
in test_suite
.cases
:
293 if test_case
.status
== TestStatus
.SUCCESS
:
294 print_with_timestamp(green('[PASSED] ') + test_case
.name
)
295 elif test_case
.status
== TestStatus
.TEST_CRASHED
:
297 print_with_timestamp(red('[CRASHED] ' + test_case
.name
))
298 print_log(map(yellow
, test_case
.log
))
299 print_with_timestamp('')
302 print_with_timestamp(red('[FAILED] ') + test_case
.name
)
303 print_log(map(yellow
, test_case
.log
))
304 print_with_timestamp('')
305 print_with_timestamp(DIVIDER
)
306 fmt
= green
if test_result
.status
== TestStatus
.SUCCESS
else red
307 print_with_timestamp(
308 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
309 (total_tests
, failed_tests
, crashed_tests
)))