1 # SPDX-License-Identifier: GPL-2.0
3 # Parses test results from a kernel dmesg log.
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
19 class TestSuite(object):
20 def __init__(self) -> None:
21 self.status = TestStatus.SUCCESS
23 self.cases = [] # type: List[TestCase]
25 def __str__(self) -> str:
26 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
28 def __repr__(self) -> str:
31 class TestCase(object):
32 def __init__(self) -> None:
33 self.status = TestStatus.SUCCESS
35 self.log = [] # type: List[str]
37 def __str__(self) -> str:
38 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
40 def __repr__(self) -> str:
43 class TestStatus(Enum):
49 FAILURE_TO_PARSE_TESTS = auto()
52 """Provides a peek()/pop() interface over an iterator of (line#, text)."""
53 _lines: Iterator[Tuple[int, str]]
54 _next: Tuple[int, str]
57 def __init__(self, lines: Iterator[Tuple[int, str]]):
63 def _get_next(self) -> None:
65 self._next = next(self._lines)
69 def peek(self) -> str:
77 def __bool__(self) -> bool:
80 # Only used by kunit_tool_test.py.
81 def __iter__(self) -> Iterator[str]:
85 def line_number(self) -> int:
88 kunit_start_re = re.compile(r'TAP version [0-9]+$')
89 kunit_end_re = re.compile('(List of all partitions:|'
90 'Kernel panic - not syncing: VFS:|reboot: System halted)')
92 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
93 def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
96 for line in kernel_output:
98 line = line.rstrip() # line always has a trailing \n
99 if kunit_start_re.search(line):
100 prefix_len = len(line.split('TAP version')[0])
102 yield line_num, line[prefix_len:]
103 elif kunit_end_re.search(line):
106 yield line_num, line[prefix_len:]
107 return LineStream(lines=isolate_kunit_output(kernel_output))
113 def red(text) -> str:
114 return '\033[1;31m' + text + RESET
116 def yellow(text) -> str:
117 return '\033[1;33m' + text + RESET
119 def green(text) -> str:
120 return '\033[1;32m' + text + RESET
122 def print_with_timestamp(message) -> None:
123 print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
125 def format_suite_divider(message) -> str:
126 return '======== ' + message + ' ========'
128 def print_suite_divider(message) -> None:
129 print_with_timestamp(DIVIDER)
130 print_with_timestamp(format_suite_divider(message))
132 def print_log(log) -> None:
134 print_with_timestamp(m)
136 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*# (Subtest:|.*: kunit test case crashed!)).*$')
138 def consume_non_diagnostic(lines: LineStream) -> None:
139 while lines and not TAP_ENTRIES.match(lines.peek()):
142 def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
143 while lines and not TAP_ENTRIES.match(lines.peek()):
144 test_case.log.append(lines.peek())
147 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
149 OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
151 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
153 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
155 def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
156 save_non_diagnostic(lines, test_case)
158 test_case.status = TestStatus.TEST_CRASHED
161 match = OK_NOT_OK_SUBTEST.match(line)
162 while not match and lines:
164 match = OK_NOT_OK_SUBTEST.match(line)
166 test_case.log.append(lines.pop())
167 test_case.name = match.group(2)
168 skip_match = OK_NOT_OK_SKIP.match(line)
170 test_case.status = TestStatus.SKIPPED
172 if test_case.status == TestStatus.TEST_CRASHED:
174 if match.group(1) == 'ok':
175 test_case.status = TestStatus.SUCCESS
177 test_case.status = TestStatus.FAILURE
182 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
183 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
185 def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
186 save_non_diagnostic(lines, test_case)
190 match = SUBTEST_DIAGNOSTIC.match(line)
192 test_case.log.append(lines.pop())
193 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
195 test_case.status = TestStatus.TEST_CRASHED
200 def parse_test_case(lines: LineStream) -> Optional[TestCase]:
201 test_case = TestCase()
202 save_non_diagnostic(lines, test_case)
203 while parse_diagnostic(lines, test_case):
205 if parse_ok_not_ok_test_case(lines, test_case):
210 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
212 def parse_subtest_header(lines: LineStream) -> Optional[str]:
213 consume_non_diagnostic(lines)
216 match = SUBTEST_HEADER.match(lines.peek())
219 return match.group(1)
223 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
225 def parse_subtest_plan(lines: LineStream) -> Optional[int]:
226 consume_non_diagnostic(lines)
227 match = SUBTEST_PLAN.match(lines.peek())
230 return int(match.group(1))
234 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
237 elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
238 return TestStatus.TEST_CRASHED
239 elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
240 return TestStatus.FAILURE
241 elif left == TestStatus.SKIPPED:
246 def parse_ok_not_ok_test_suite(lines: LineStream,
247 test_suite: TestSuite,
248 expected_suite_index: int) -> bool:
249 consume_non_diagnostic(lines)
251 test_suite.status = TestStatus.TEST_CRASHED
254 match = OK_NOT_OK_MODULE.match(line)
257 if match.group(1) == 'ok':
258 test_suite.status = TestStatus.SUCCESS
260 test_suite.status = TestStatus.FAILURE
261 skip_match = OK_NOT_OK_SKIP.match(line)
263 test_suite.status = TestStatus.SKIPPED
264 suite_index = int(match.group(2))
265 if suite_index != expected_suite_index:
266 print_with_timestamp(
267 red('[ERROR] ') + 'expected_suite_index ' +
268 str(expected_suite_index) + ', but got ' +
274 def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
275 return reduce(max_status, status_list, TestStatus.SKIPPED)
277 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
278 max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
279 return max_status(max_test_case_status, test_suite.status)
281 def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
284 consume_non_diagnostic(lines)
285 test_suite = TestSuite()
286 test_suite.status = TestStatus.SUCCESS
287 name = parse_subtest_header(lines)
290 test_suite.name = name
291 expected_test_case_num = parse_subtest_plan(lines)
292 if expected_test_case_num is None:
294 while expected_test_case_num > 0:
295 test_case = parse_test_case(lines)
298 test_suite.cases.append(test_case)
299 expected_test_case_num -= 1
300 if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
301 test_suite.status = bubble_up_test_case_errors(test_suite)
304 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
307 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
310 TAP_HEADER = re.compile(r'^TAP version 14$')
312 def parse_tap_header(lines: LineStream) -> bool:
313 consume_non_diagnostic(lines)
314 if TAP_HEADER.match(lines.peek()):
320 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
322 def parse_test_plan(lines: LineStream) -> Optional[int]:
323 consume_non_diagnostic(lines)
324 match = TEST_PLAN.match(lines.peek())
327 return int(match.group(1))
331 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
332 return bubble_up_errors(x.status for x in test_suites)
334 def parse_test_result(lines: LineStream) -> TestResult:
335 consume_non_diagnostic(lines)
336 if not lines or not parse_tap_header(lines):
337 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
338 expected_test_suite_num = parse_test_plan(lines)
339 if expected_test_suite_num == 0:
340 return TestResult(TestStatus.NO_TESTS, [], lines)
341 elif expected_test_suite_num is None:
342 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
344 for i in range(1, expected_test_suite_num + 1):
345 test_suite = parse_test_suite(lines, i)
347 test_suites.append(test_suite)
349 print_with_timestamp(
350 red('[ERROR] ') + ' expected ' +
351 str(expected_test_suite_num) +
352 ' test suites, but got ' + str(i - 2))
354 test_suite = parse_test_suite(lines, -1)
356 print_with_timestamp(red('[ERROR] ') +
357 'got unexpected test suite: ' + test_suite.name)
359 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
361 return TestResult(TestStatus.NO_TESTS, [], lines)
375 def total(self) -> int:
376 return self.passed + self.failed + self.crashed + self.skipped
378 def print_and_count_results(test_result: TestResult) -> TestCounts:
379 counts = TestCounts()
380 for test_suite in test_result.suites:
381 if test_suite.status == TestStatus.SUCCESS:
382 print_suite_divider(green('[PASSED] ') + test_suite.name)
383 elif test_suite.status == TestStatus.SKIPPED:
384 print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
385 elif test_suite.status == TestStatus.TEST_CRASHED:
386 print_suite_divider(red('[CRASHED] ' + test_suite.name))
388 print_suite_divider(red('[FAILED] ') + test_suite.name)
389 for test_case in test_suite.cases:
390 if test_case.status == TestStatus.SUCCESS:
392 print_with_timestamp(green('[PASSED] ') + test_case.name)
393 elif test_case.status == TestStatus.SKIPPED:
395 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
396 elif test_case.status == TestStatus.TEST_CRASHED:
398 print_with_timestamp(red('[CRASHED] ' + test_case.name))
399 print_log(map(yellow, test_case.log))
400 print_with_timestamp('')
403 print_with_timestamp(red('[FAILED] ') + test_case.name)
404 print_log(map(yellow, test_case.log))
405 print_with_timestamp('')
408 def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
409 counts = TestCounts()
410 lines = extract_tap_lines(kernel_output)
411 test_result = parse_test_result(lines)
412 if test_result.status == TestStatus.NO_TESTS:
413 print(red('[ERROR] ') + yellow('no tests run!'))
414 elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
415 print(red('[ERROR] ') + yellow('could not parse test results!'))
417 counts = print_and_count_results(test_result)
418 print_with_timestamp(DIVIDER)
419 if test_result.status == TestStatus.SUCCESS:
421 elif test_result.status == TestStatus.SKIPPED:
425 print_with_timestamp(
426 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
427 (counts.total(), counts.failed, counts.crashed, counts.skipped)))