1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
11 from collections
import namedtuple
12 from datetime
import datetime
13 from enum
import Enum
, auto
14 from functools
import reduce
15 from typing
import List
, Optional
, Tuple
17 TestResult
= namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
26 return 'TestSuite(' + self
.status
+ ',' + self
.name
+ ',' + str(self
.cases
) + ')'
31 class TestCase(object):
38 return 'TestCase(' + self
.status
+ ',' + self
.name
+ ',' + str(self
.log
) + ')'
43 class TestStatus(Enum
):
48 FAILURE_TO_PARSE_TESTS
= auto()
50 kunit_start_re
= re
.compile(r
'TAP version [0-9]+$')
51 kunit_end_re
= re
.compile('(List of all partitions:|'
52 'Kernel panic - not syncing: VFS:)')
54 def isolate_kunit_output(kernel_output
):
56 for line
in kernel_output
:
57 line
= line
.rstrip() # line always has a trailing \n
58 if kunit_start_re
.search(line
):
59 prefix_len
= len(line
.split('TAP version')[0])
61 yield line
[prefix_len
:] if prefix_len
> 0 else line
62 elif kunit_end_re
.search(line
):
65 yield line
[prefix_len
:] if prefix_len
> 0 else line
67 def raw_output(kernel_output
):
68 for line
in kernel_output
:
76 return '\033[1;31m' + text
+ RESET
79 return '\033[1;33m' + text
+ RESET
82 return '\033[1;32m' + text
+ RESET
84 def print_with_timestamp(message
):
85 print('[%s] %s' % (datetime
.now().strftime('%H:%M:%S'), message
))
87 def format_suite_divider(message
):
88 return '======== ' + message
+ ' ========'
90 def print_suite_divider(message
):
91 print_with_timestamp(DIVIDER
)
92 print_with_timestamp(format_suite_divider(message
))
96 print_with_timestamp(m
)
98 TAP_ENTRIES
= re
.compile(r
'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
100 def consume_non_diagnositic(lines
: List
[str]) -> None:
101 while lines
and not TAP_ENTRIES
.match(lines
[0]):
104 def save_non_diagnositic(lines
: List
[str], test_case
: TestCase
) -> None:
105 while lines
and not TAP_ENTRIES
.match(lines
[0]):
106 test_case
.log
.append(lines
[0])
109 OkNotOkResult
= namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
111 OK_NOT_OK_SUBTEST
= re
.compile(r
'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
113 OK_NOT_OK_MODULE
= re
.compile(r
'^(ok|not ok) ([0-9]+) - (.*)$')
115 def parse_ok_not_ok_test_case(lines
: List
[str], test_case
: TestCase
) -> bool:
116 save_non_diagnositic(lines
, test_case
)
118 test_case
.status
= TestStatus
.TEST_CRASHED
121 match
= OK_NOT_OK_SUBTEST
.match(line
)
122 while not match
and lines
:
124 match
= OK_NOT_OK_SUBTEST
.match(line
)
126 test_case
.log
.append(lines
.pop(0))
127 test_case
.name
= match
.group(2)
128 if test_case
.status
== TestStatus
.TEST_CRASHED
:
130 if match
.group(1) == 'ok':
131 test_case
.status
= TestStatus
.SUCCESS
133 test_case
.status
= TestStatus
.FAILURE
138 SUBTEST_DIAGNOSTIC
= re
.compile(r
'^[\s]+# (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE
= re
.compile(r
'^[\s]+# .*?: kunit test case crashed!$')
141 def parse_diagnostic(lines
: List
[str], test_case
: TestCase
) -> bool:
142 save_non_diagnositic(lines
, test_case
)
146 match
= SUBTEST_DIAGNOSTIC
.match(line
)
148 test_case
.log
.append(lines
.pop(0))
149 crash_match
= DIAGNOSTIC_CRASH_MESSAGE
.match(line
)
151 test_case
.status
= TestStatus
.TEST_CRASHED
156 def parse_test_case(lines
: List
[str]) -> Optional
[TestCase
]:
157 test_case
= TestCase()
158 save_non_diagnositic(lines
, test_case
)
159 while parse_diagnostic(lines
, test_case
):
161 if parse_ok_not_ok_test_case(lines
, test_case
):
166 SUBTEST_HEADER
= re
.compile(r
'^[\s]+# Subtest: (.*)$')
168 def parse_subtest_header(lines
: List
[str]) -> Optional
[str]:
169 consume_non_diagnositic(lines
)
172 match
= SUBTEST_HEADER
.match(lines
[0])
175 return match
.group(1)
179 SUBTEST_PLAN
= re
.compile(r
'[\s]+[0-9]+\.\.([0-9]+)')
181 def parse_subtest_plan(lines
: List
[str]) -> Optional
[int]:
182 consume_non_diagnositic(lines
)
183 match
= SUBTEST_PLAN
.match(lines
[0])
186 return int(match
.group(1))
190 def max_status(left
: TestStatus
, right
: TestStatus
) -> TestStatus
:
191 if left
== TestStatus
.TEST_CRASHED
or right
== TestStatus
.TEST_CRASHED
:
192 return TestStatus
.TEST_CRASHED
193 elif left
== TestStatus
.FAILURE
or right
== TestStatus
.FAILURE
:
194 return TestStatus
.FAILURE
195 elif left
!= TestStatus
.SUCCESS
:
197 elif right
!= TestStatus
.SUCCESS
:
200 return TestStatus
.SUCCESS
202 def parse_ok_not_ok_test_suite(lines
: List
[str],
203 test_suite
: TestSuite
,
204 expected_suite_index
: int) -> bool:
205 consume_non_diagnositic(lines
)
207 test_suite
.status
= TestStatus
.TEST_CRASHED
210 match
= OK_NOT_OK_MODULE
.match(line
)
213 if match
.group(1) == 'ok':
214 test_suite
.status
= TestStatus
.SUCCESS
216 test_suite
.status
= TestStatus
.FAILURE
217 suite_index
= int(match
.group(2))
218 if suite_index
!= expected_suite_index
:
219 print_with_timestamp(
220 red('[ERROR] ') + 'expected_suite_index ' +
221 str(expected_suite_index
) + ', but got ' +
227 def bubble_up_errors(to_status
, status_container_list
) -> TestStatus
:
228 status_list
= map(to_status
, status_container_list
)
229 return reduce(max_status
, status_list
, TestStatus
.SUCCESS
)
231 def bubble_up_test_case_errors(test_suite
: TestSuite
) -> TestStatus
:
232 max_test_case_status
= bubble_up_errors(lambda x
: x
.status
, test_suite
.cases
)
233 return max_status(max_test_case_status
, test_suite
.status
)
235 def parse_test_suite(lines
: List
[str], expected_suite_index
: int) -> Optional
[TestSuite
]:
238 consume_non_diagnositic(lines
)
239 test_suite
= TestSuite()
240 test_suite
.status
= TestStatus
.SUCCESS
241 name
= parse_subtest_header(lines
)
244 test_suite
.name
= name
245 expected_test_case_num
= parse_subtest_plan(lines
)
246 if expected_test_case_num
is None:
248 while expected_test_case_num
> 0:
249 test_case
= parse_test_case(lines
)
252 test_suite
.cases
.append(test_case
)
253 expected_test_case_num
-= 1
254 if parse_ok_not_ok_test_suite(lines
, test_suite
, expected_suite_index
):
255 test_suite
.status
= bubble_up_test_case_errors(test_suite
)
258 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
261 print('failed to parse end of suite' + lines
[0])
264 TAP_HEADER
= re
.compile(r
'^TAP version 14$')
266 def parse_tap_header(lines
: List
[str]) -> bool:
267 consume_non_diagnositic(lines
)
268 if TAP_HEADER
.match(lines
[0]):
274 TEST_PLAN
= re
.compile(r
'[0-9]+\.\.([0-9]+)')
276 def parse_test_plan(lines
: List
[str]) -> Optional
[int]:
277 consume_non_diagnositic(lines
)
278 match
= TEST_PLAN
.match(lines
[0])
281 return int(match
.group(1))
285 def bubble_up_suite_errors(test_suite_list
: List
[TestSuite
]) -> TestStatus
:
286 return bubble_up_errors(lambda x
: x
.status
, test_suite_list
)
288 def parse_test_result(lines
: List
[str]) -> TestResult
:
289 consume_non_diagnositic(lines
)
290 if not lines
or not parse_tap_header(lines
):
291 return TestResult(TestStatus
.NO_TESTS
, [], lines
)
292 expected_test_suite_num
= parse_test_plan(lines
)
293 if not expected_test_suite_num
:
294 return TestResult(TestStatus
.FAILURE_TO_PARSE_TESTS
, [], lines
)
296 for i
in range(1, expected_test_suite_num
+ 1):
297 test_suite
= parse_test_suite(lines
, i
)
299 test_suites
.append(test_suite
)
301 print_with_timestamp(
302 red('[ERROR] ') + ' expected ' +
303 str(expected_test_suite_num
) +
304 ' test suites, but got ' + str(i
- 2))
306 test_suite
= parse_test_suite(lines
, -1)
308 print_with_timestamp(red('[ERROR] ') +
309 'got unexpected test suite: ' + test_suite
.name
)
311 return TestResult(bubble_up_suite_errors(test_suites
), test_suites
, lines
)
313 return TestResult(TestStatus
.NO_TESTS
, [], lines
)
315 def print_and_count_results(test_result
: TestResult
) -> Tuple
[int, int, int]:
319 for test_suite
in test_result
.suites
:
320 if test_suite
.status
== TestStatus
.SUCCESS
:
321 print_suite_divider(green('[PASSED] ') + test_suite
.name
)
322 elif test_suite
.status
== TestStatus
.TEST_CRASHED
:
323 print_suite_divider(red('[CRASHED] ' + test_suite
.name
))
325 print_suite_divider(red('[FAILED] ') + test_suite
.name
)
326 for test_case
in test_suite
.cases
:
328 if test_case
.status
== TestStatus
.SUCCESS
:
329 print_with_timestamp(green('[PASSED] ') + test_case
.name
)
330 elif test_case
.status
== TestStatus
.TEST_CRASHED
:
332 print_with_timestamp(red('[CRASHED] ' + test_case
.name
))
333 print_log(map(yellow
, test_case
.log
))
334 print_with_timestamp('')
337 print_with_timestamp(red('[FAILED] ') + test_case
.name
)
338 print_log(map(yellow
, test_case
.log
))
339 print_with_timestamp('')
340 return total_tests
, failed_tests
, crashed_tests
342 def parse_run_tests(kernel_output
) -> TestResult
:
346 test_result
= parse_test_result(list(isolate_kunit_output(kernel_output
)))
347 if test_result
.status
== TestStatus
.NO_TESTS
:
348 print(red('[ERROR] ') + yellow('no tests run!'))
349 elif test_result
.status
== TestStatus
.FAILURE_TO_PARSE_TESTS
:
350 print(red('[ERROR] ') + yellow('could not parse test results!'))
354 crashed_tests
) = print_and_count_results(test_result
)
355 print_with_timestamp(DIVIDER
)
356 fmt
= green
if test_result
.status
== TestStatus
.SUCCESS
else red
357 print_with_timestamp(
358 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
359 (total_tests
, failed_tests
, crashed_tests
)))