WIP FPC-III support
[linux/fpc-iii.git] / tools / testing / kunit / kunit_parser.py
blob6614ec4d08989696b82cc67c1b8c3a92707da740
1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
9 import re
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import List, Optional, Tuple
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
20 def __init__(self):
21 self.status = None
22 self.name = None
23 self.cases = []
25 def __str__(self):
26 return 'TestSuite(' + self.status + ',' + self.name + ',' + str(self.cases) + ')'
28 def __repr__(self):
29 return str(self)
31 class TestCase(object):
32 def __init__(self):
33 self.status = None
34 self.name = ''
35 self.log = []
37 def __str__(self):
38 return 'TestCase(' + self.status + ',' + self.name + ',' + str(self.log) + ')'
40 def __repr__(self):
41 return str(self)
43 class TestStatus(Enum):
44 SUCCESS = auto()
45 FAILURE = auto()
46 TEST_CRASHED = auto()
47 NO_TESTS = auto()
48 FAILURE_TO_PARSE_TESTS = auto()
50 kunit_start_re = re.compile(r'TAP version [0-9]+$')
51 kunit_end_re = re.compile('(List of all partitions:|'
52 'Kernel panic - not syncing: VFS:)')
54 def isolate_kunit_output(kernel_output):
55 started = False
56 for line in kernel_output:
57 line = line.rstrip() # line always has a trailing \n
58 if kunit_start_re.search(line):
59 prefix_len = len(line.split('TAP version')[0])
60 started = True
61 yield line[prefix_len:] if prefix_len > 0 else line
62 elif kunit_end_re.search(line):
63 break
64 elif started:
65 yield line[prefix_len:] if prefix_len > 0 else line
67 def raw_output(kernel_output):
68 for line in kernel_output:
69 print(line.rstrip())
71 DIVIDER = '=' * 60
73 RESET = '\033[0;0m'
75 def red(text):
76 return '\033[1;31m' + text + RESET
78 def yellow(text):
79 return '\033[1;33m' + text + RESET
81 def green(text):
82 return '\033[1;32m' + text + RESET
84 def print_with_timestamp(message):
85 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
87 def format_suite_divider(message):
88 return '======== ' + message + ' ========'
90 def print_suite_divider(message):
91 print_with_timestamp(DIVIDER)
92 print_with_timestamp(format_suite_divider(message))
94 def print_log(log):
95 for m in log:
96 print_with_timestamp(m)
98 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
100 def consume_non_diagnositic(lines: List[str]) -> None:
101 while lines and not TAP_ENTRIES.match(lines[0]):
102 lines.pop(0)
104 def save_non_diagnositic(lines: List[str], test_case: TestCase) -> None:
105 while lines and not TAP_ENTRIES.match(lines[0]):
106 test_case.log.append(lines[0])
107 lines.pop(0)
109 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
111 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
113 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
115 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
116 save_non_diagnositic(lines, test_case)
117 if not lines:
118 test_case.status = TestStatus.TEST_CRASHED
119 return True
120 line = lines[0]
121 match = OK_NOT_OK_SUBTEST.match(line)
122 while not match and lines:
123 line = lines.pop(0)
124 match = OK_NOT_OK_SUBTEST.match(line)
125 if match:
126 test_case.log.append(lines.pop(0))
127 test_case.name = match.group(2)
128 if test_case.status == TestStatus.TEST_CRASHED:
129 return True
130 if match.group(1) == 'ok':
131 test_case.status = TestStatus.SUCCESS
132 else:
133 test_case.status = TestStatus.FAILURE
134 return True
135 else:
136 return False
138 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
141 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
142 save_non_diagnositic(lines, test_case)
143 if not lines:
144 return False
145 line = lines[0]
146 match = SUBTEST_DIAGNOSTIC.match(line)
147 if match:
148 test_case.log.append(lines.pop(0))
149 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
150 if crash_match:
151 test_case.status = TestStatus.TEST_CRASHED
152 return True
153 else:
154 return False
156 def parse_test_case(lines: List[str]) -> Optional[TestCase]:
157 test_case = TestCase()
158 save_non_diagnositic(lines, test_case)
159 while parse_diagnostic(lines, test_case):
160 pass
161 if parse_ok_not_ok_test_case(lines, test_case):
162 return test_case
163 else:
164 return None
166 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
168 def parse_subtest_header(lines: List[str]) -> Optional[str]:
169 consume_non_diagnositic(lines)
170 if not lines:
171 return None
172 match = SUBTEST_HEADER.match(lines[0])
173 if match:
174 lines.pop(0)
175 return match.group(1)
176 else:
177 return None
179 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
181 def parse_subtest_plan(lines: List[str]) -> Optional[int]:
182 consume_non_diagnositic(lines)
183 match = SUBTEST_PLAN.match(lines[0])
184 if match:
185 lines.pop(0)
186 return int(match.group(1))
187 else:
188 return None
190 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
191 if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
192 return TestStatus.TEST_CRASHED
193 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
194 return TestStatus.FAILURE
195 elif left != TestStatus.SUCCESS:
196 return left
197 elif right != TestStatus.SUCCESS:
198 return right
199 else:
200 return TestStatus.SUCCESS
202 def parse_ok_not_ok_test_suite(lines: List[str],
203 test_suite: TestSuite,
204 expected_suite_index: int) -> bool:
205 consume_non_diagnositic(lines)
206 if not lines:
207 test_suite.status = TestStatus.TEST_CRASHED
208 return False
209 line = lines[0]
210 match = OK_NOT_OK_MODULE.match(line)
211 if match:
212 lines.pop(0)
213 if match.group(1) == 'ok':
214 test_suite.status = TestStatus.SUCCESS
215 else:
216 test_suite.status = TestStatus.FAILURE
217 suite_index = int(match.group(2))
218 if suite_index != expected_suite_index:
219 print_with_timestamp(
220 red('[ERROR] ') + 'expected_suite_index ' +
221 str(expected_suite_index) + ', but got ' +
222 str(suite_index))
223 return True
224 else:
225 return False
227 def bubble_up_errors(to_status, status_container_list) -> TestStatus:
228 status_list = map(to_status, status_container_list)
229 return reduce(max_status, status_list, TestStatus.SUCCESS)
231 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
232 max_test_case_status = bubble_up_errors(lambda x: x.status, test_suite.cases)
233 return max_status(max_test_case_status, test_suite.status)
235 def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
236 if not lines:
237 return None
238 consume_non_diagnositic(lines)
239 test_suite = TestSuite()
240 test_suite.status = TestStatus.SUCCESS
241 name = parse_subtest_header(lines)
242 if not name:
243 return None
244 test_suite.name = name
245 expected_test_case_num = parse_subtest_plan(lines)
246 if expected_test_case_num is None:
247 return None
248 while expected_test_case_num > 0:
249 test_case = parse_test_case(lines)
250 if not test_case:
251 break
252 test_suite.cases.append(test_case)
253 expected_test_case_num -= 1
254 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
255 test_suite.status = bubble_up_test_case_errors(test_suite)
256 return test_suite
257 elif not lines:
258 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
259 return test_suite
260 else:
261 print('failed to parse end of suite' + lines[0])
262 return None
264 TAP_HEADER = re.compile(r'^TAP version 14$')
266 def parse_tap_header(lines: List[str]) -> bool:
267 consume_non_diagnositic(lines)
268 if TAP_HEADER.match(lines[0]):
269 lines.pop(0)
270 return True
271 else:
272 return False
274 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
276 def parse_test_plan(lines: List[str]) -> Optional[int]:
277 consume_non_diagnositic(lines)
278 match = TEST_PLAN.match(lines[0])
279 if match:
280 lines.pop(0)
281 return int(match.group(1))
282 else:
283 return None
285 def bubble_up_suite_errors(test_suite_list: List[TestSuite]) -> TestStatus:
286 return bubble_up_errors(lambda x: x.status, test_suite_list)
288 def parse_test_result(lines: List[str]) -> TestResult:
289 consume_non_diagnositic(lines)
290 if not lines or not parse_tap_header(lines):
291 return TestResult(TestStatus.NO_TESTS, [], lines)
292 expected_test_suite_num = parse_test_plan(lines)
293 if not expected_test_suite_num:
294 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
295 test_suites = []
296 for i in range(1, expected_test_suite_num + 1):
297 test_suite = parse_test_suite(lines, i)
298 if test_suite:
299 test_suites.append(test_suite)
300 else:
301 print_with_timestamp(
302 red('[ERROR] ') + ' expected ' +
303 str(expected_test_suite_num) +
304 ' test suites, but got ' + str(i - 2))
305 break
306 test_suite = parse_test_suite(lines, -1)
307 if test_suite:
308 print_with_timestamp(red('[ERROR] ') +
309 'got unexpected test suite: ' + test_suite.name)
310 if test_suites:
311 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
312 else:
313 return TestResult(TestStatus.NO_TESTS, [], lines)
315 def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
316 total_tests = 0
317 failed_tests = 0
318 crashed_tests = 0
319 for test_suite in test_result.suites:
320 if test_suite.status == TestStatus.SUCCESS:
321 print_suite_divider(green('[PASSED] ') + test_suite.name)
322 elif test_suite.status == TestStatus.TEST_CRASHED:
323 print_suite_divider(red('[CRASHED] ' + test_suite.name))
324 else:
325 print_suite_divider(red('[FAILED] ') + test_suite.name)
326 for test_case in test_suite.cases:
327 total_tests += 1
328 if test_case.status == TestStatus.SUCCESS:
329 print_with_timestamp(green('[PASSED] ') + test_case.name)
330 elif test_case.status == TestStatus.TEST_CRASHED:
331 crashed_tests += 1
332 print_with_timestamp(red('[CRASHED] ' + test_case.name))
333 print_log(map(yellow, test_case.log))
334 print_with_timestamp('')
335 else:
336 failed_tests += 1
337 print_with_timestamp(red('[FAILED] ') + test_case.name)
338 print_log(map(yellow, test_case.log))
339 print_with_timestamp('')
340 return total_tests, failed_tests, crashed_tests
342 def parse_run_tests(kernel_output) -> TestResult:
343 total_tests = 0
344 failed_tests = 0
345 crashed_tests = 0
346 test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
347 if test_result.status == TestStatus.NO_TESTS:
348 print(red('[ERROR] ') + yellow('no tests run!'))
349 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
350 print(red('[ERROR] ') + yellow('could not parse test results!'))
351 else:
352 (total_tests,
353 failed_tests,
354 crashed_tests) = print_and_count_results(test_result)
355 print_with_timestamp(DIVIDER)
356 fmt = green if test_result.status == TestStatus.SUCCESS else red
357 print_with_timestamp(
358 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
359 (total_tests, failed_tests, crashed_tests)))
360 return test_result