GNU Linux-libre 5.15.137-gnu
[releases.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self) -> None:
21                 self.status = TestStatus.SUCCESS
22                 self.name = ''
23                 self.cases = []  # type: List[TestCase]
24
25         def __str__(self) -> str:
26                 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self) -> str:
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self) -> None:
33                 self.status = TestStatus.SUCCESS
34                 self.name = ''
35                 self.log = []  # type: List[str]
36
37         def __str__(self) -> str:
38                 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self) -> str:
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         SKIPPED = auto()
47         TEST_CRASHED = auto()
48         NO_TESTS = auto()
49         FAILURE_TO_PARSE_TESTS = auto()
50
51 class LineStream:
52         """Provides a peek()/pop() interface over an iterator of (line#, text)."""
53         _lines: Iterator[Tuple[int, str]]
54         _next: Tuple[int, str]
55         _done: bool
56
57         def __init__(self, lines: Iterator[Tuple[int, str]]):
58                 self._lines = lines
59                 self._done = False
60                 self._next = (0, '')
61                 self._get_next()
62
63         def _get_next(self) -> None:
64                 try:
65                         self._next = next(self._lines)
66                 except StopIteration:
67                         self._done = True
68
69         def peek(self) -> str:
70                 return self._next[1]
71
72         def pop(self) -> str:
73                 n = self._next
74                 self._get_next()
75                 return n[1]
76
77         def __bool__(self) -> bool:
78                 return not self._done
79
80         # Only used by kunit_tool_test.py.
81         def __iter__(self) -> Iterator[str]:
82                 while bool(self):
83                         yield self.pop()
84
85         def line_number(self) -> int:
86                 return self._next[0]
87
88 kunit_start_re = re.compile(r'TAP version [0-9]+$')
89 kunit_end_re = re.compile('(List of all partitions:|'
90                           'Kernel panic - not syncing: VFS:|reboot: System halted)')
91
92 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
93         def isolate_kunit_output(kernel_output: Iterable[str]) -> Iterator[Tuple[int, str]]:
94                 line_num = 0
95                 started = False
96                 for line in kernel_output:
97                         line_num += 1
98                         line = line.rstrip()  # line always has a trailing \n
99                         if kunit_start_re.search(line):
100                                 prefix_len = len(line.split('TAP version')[0])
101                                 started = True
102                                 yield line_num, line[prefix_len:]
103                         elif kunit_end_re.search(line):
104                                 break
105                         elif started:
106                                 yield line_num, line[prefix_len:]
107         return LineStream(lines=isolate_kunit_output(kernel_output))
108
109 DIVIDER = '=' * 60
110
111 RESET = '\033[0;0m'
112
113 def red(text) -> str:
114         return '\033[1;31m' + text + RESET
115
116 def yellow(text) -> str:
117         return '\033[1;33m' + text + RESET
118
119 def green(text) -> str:
120         return '\033[1;32m' + text + RESET
121
122 def print_with_timestamp(message) -> None:
123         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
124
125 def format_suite_divider(message) -> str:
126         return '======== ' + message + ' ========'
127
128 def print_suite_divider(message) -> None:
129         print_with_timestamp(DIVIDER)
130         print_with_timestamp(format_suite_divider(message))
131
132 def print_log(log) -> None:
133         for m in log:
134                 print_with_timestamp(m)
135
136 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*# (Subtest:|.*: kunit test case crashed!)).*$')
137
138 def consume_non_diagnostic(lines: LineStream) -> None:
139         while lines and not TAP_ENTRIES.match(lines.peek()):
140                 lines.pop()
141
142 def save_non_diagnostic(lines: LineStream, test_case: TestCase) -> None:
143         while lines and not TAP_ENTRIES.match(lines.peek()):
144                 test_case.log.append(lines.peek())
145                 lines.pop()
146
147 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
148
149 OK_NOT_OK_SKIP = re.compile(r'^[\s]*(ok|not ok) [0-9]+ - (.*) # SKIP(.*)$')
150
151 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
152
153 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
154
155 def parse_ok_not_ok_test_case(lines: LineStream, test_case: TestCase) -> bool:
156         save_non_diagnostic(lines, test_case)
157         if not lines:
158                 test_case.status = TestStatus.TEST_CRASHED
159                 return True
160         line = lines.peek()
161         match = OK_NOT_OK_SUBTEST.match(line)
162         while not match and lines:
163                 line = lines.pop()
164                 match = OK_NOT_OK_SUBTEST.match(line)
165         if match:
166                 test_case.log.append(lines.pop())
167                 test_case.name = match.group(2)
168                 skip_match = OK_NOT_OK_SKIP.match(line)
169                 if skip_match:
170                         test_case.status = TestStatus.SKIPPED
171                         return True
172                 if test_case.status == TestStatus.TEST_CRASHED:
173                         return True
174                 if match.group(1) == 'ok':
175                         test_case.status = TestStatus.SUCCESS
176                 else:
177                         test_case.status = TestStatus.FAILURE
178                 return True
179         else:
180                 return False
181
182 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
183 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
184
185 def parse_diagnostic(lines: LineStream, test_case: TestCase) -> bool:
186         save_non_diagnostic(lines, test_case)
187         if not lines:
188                 return False
189         line = lines.peek()
190         match = SUBTEST_DIAGNOSTIC.match(line)
191         if match:
192                 test_case.log.append(lines.pop())
193                 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
194                 if crash_match:
195                         test_case.status = TestStatus.TEST_CRASHED
196                 return True
197         else:
198                 return False
199
200 def parse_test_case(lines: LineStream) -> Optional[TestCase]:
201         test_case = TestCase()
202         save_non_diagnostic(lines, test_case)
203         while parse_diagnostic(lines, test_case):
204                 pass
205         if parse_ok_not_ok_test_case(lines, test_case):
206                 return test_case
207         else:
208                 return None
209
210 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
211
212 def parse_subtest_header(lines: LineStream) -> Optional[str]:
213         consume_non_diagnostic(lines)
214         if not lines:
215                 return None
216         match = SUBTEST_HEADER.match(lines.peek())
217         if match:
218                 lines.pop()
219                 return match.group(1)
220         else:
221                 return None
222
223 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
224
225 def parse_subtest_plan(lines: LineStream) -> Optional[int]:
226         consume_non_diagnostic(lines)
227         match = SUBTEST_PLAN.match(lines.peek())
228         if match:
229                 lines.pop()
230                 return int(match.group(1))
231         else:
232                 return None
233
234 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
235         if left == right:
236                 return left
237         elif left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
238                 return TestStatus.TEST_CRASHED
239         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
240                 return TestStatus.FAILURE
241         elif left == TestStatus.SKIPPED:
242                 return right
243         else:
244                 return left
245
246 def parse_ok_not_ok_test_suite(lines: LineStream,
247                                test_suite: TestSuite,
248                                expected_suite_index: int) -> bool:
249         consume_non_diagnostic(lines)
250         if not lines:
251                 test_suite.status = TestStatus.TEST_CRASHED
252                 return False
253         line = lines.peek()
254         match = OK_NOT_OK_MODULE.match(line)
255         if match:
256                 lines.pop()
257                 if match.group(1) == 'ok':
258                         test_suite.status = TestStatus.SUCCESS
259                 else:
260                         test_suite.status = TestStatus.FAILURE
261                 skip_match = OK_NOT_OK_SKIP.match(line)
262                 if skip_match:
263                         test_suite.status = TestStatus.SKIPPED
264                 suite_index = int(match.group(2))
265                 if suite_index != expected_suite_index:
266                         print_with_timestamp(
267                                 red('[ERROR] ') + 'expected_suite_index ' +
268                                 str(expected_suite_index) + ', but got ' +
269                                 str(suite_index))
270                 return True
271         else:
272                 return False
273
274 def bubble_up_errors(status_list: Iterable[TestStatus]) -> TestStatus:
275         return reduce(max_status, status_list, TestStatus.SKIPPED)
276
277 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
278         max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
279         return max_status(max_test_case_status, test_suite.status)
280
281 def parse_test_suite(lines: LineStream, expected_suite_index: int) -> Optional[TestSuite]:
282         if not lines:
283                 return None
284         consume_non_diagnostic(lines)
285         test_suite = TestSuite()
286         test_suite.status = TestStatus.SUCCESS
287         name = parse_subtest_header(lines)
288         if not name:
289                 return None
290         test_suite.name = name
291         expected_test_case_num = parse_subtest_plan(lines)
292         if expected_test_case_num is None:
293                 return None
294         while expected_test_case_num > 0:
295                 test_case = parse_test_case(lines)
296                 if not test_case:
297                         break
298                 test_suite.cases.append(test_case)
299                 expected_test_case_num -= 1
300         if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
301                 test_suite.status = bubble_up_test_case_errors(test_suite)
302                 return test_suite
303         elif not lines:
304                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
305                 return test_suite
306         else:
307                 print(f'failed to parse end of suite "{name}", at line {lines.line_number()}: {lines.peek()}')
308                 return None
309
310 TAP_HEADER = re.compile(r'^TAP version 14$')
311
312 def parse_tap_header(lines: LineStream) -> bool:
313         consume_non_diagnostic(lines)
314         if TAP_HEADER.match(lines.peek()):
315                 lines.pop()
316                 return True
317         else:
318                 return False
319
320 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
321
322 def parse_test_plan(lines: LineStream) -> Optional[int]:
323         consume_non_diagnostic(lines)
324         match = TEST_PLAN.match(lines.peek())
325         if match:
326                 lines.pop()
327                 return int(match.group(1))
328         else:
329                 return None
330
331 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
332         return bubble_up_errors(x.status for x in test_suites)
333
334 def parse_test_result(lines: LineStream) -> TestResult:
335         consume_non_diagnostic(lines)
336         if not lines or not parse_tap_header(lines):
337                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
338         expected_test_suite_num = parse_test_plan(lines)
339         if expected_test_suite_num == 0:
340                 return TestResult(TestStatus.NO_TESTS, [], lines)
341         elif expected_test_suite_num is None:
342                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
343         test_suites = []
344         for i in range(1, expected_test_suite_num + 1):
345                 test_suite = parse_test_suite(lines, i)
346                 if test_suite:
347                         test_suites.append(test_suite)
348                 else:
349                         print_with_timestamp(
350                                 red('[ERROR] ') + ' expected ' +
351                                 str(expected_test_suite_num) +
352                                 ' test suites, but got ' + str(i - 2))
353                         break
354         test_suite = parse_test_suite(lines, -1)
355         if test_suite:
356                 print_with_timestamp(red('[ERROR] ') +
357                         'got unexpected test suite: ' + test_suite.name)
358         if test_suites:
359                 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
360         else:
361                 return TestResult(TestStatus.NO_TESTS, [], lines)
362
363 class TestCounts:
364         passed: int
365         failed: int
366         crashed: int
367         skipped: int
368
369         def __init__(self):
370                 self.passed = 0
371                 self.failed = 0
372                 self.crashed = 0
373                 self.skipped = 0
374
375         def total(self) -> int:
376                 return self.passed + self.failed + self.crashed + self.skipped
377
378 def print_and_count_results(test_result: TestResult) -> TestCounts:
379         counts = TestCounts()
380         for test_suite in test_result.suites:
381                 if test_suite.status == TestStatus.SUCCESS:
382                         print_suite_divider(green('[PASSED] ') + test_suite.name)
383                 elif test_suite.status == TestStatus.SKIPPED:
384                         print_suite_divider(yellow('[SKIPPED] ') + test_suite.name)
385                 elif test_suite.status == TestStatus.TEST_CRASHED:
386                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
387                 else:
388                         print_suite_divider(red('[FAILED] ') + test_suite.name)
389                 for test_case in test_suite.cases:
390                         if test_case.status == TestStatus.SUCCESS:
391                                 counts.passed += 1
392                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
393                         elif test_case.status == TestStatus.SKIPPED:
394                                 counts.skipped += 1
395                                 print_with_timestamp(yellow('[SKIPPED] ') + test_case.name)
396                         elif test_case.status == TestStatus.TEST_CRASHED:
397                                 counts.crashed += 1
398                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
399                                 print_log(map(yellow, test_case.log))
400                                 print_with_timestamp('')
401                         else:
402                                 counts.failed += 1
403                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
404                                 print_log(map(yellow, test_case.log))
405                                 print_with_timestamp('')
406         return counts
407
408 def parse_run_tests(kernel_output: Iterable[str]) -> TestResult:
409         counts = TestCounts()
410         lines = extract_tap_lines(kernel_output)
411         test_result = parse_test_result(lines)
412         if test_result.status == TestStatus.NO_TESTS:
413                 print(red('[ERROR] ') + yellow('no tests run!'))
414         elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
415                 print(red('[ERROR] ') + yellow('could not parse test results!'))
416         else:
417                 counts = print_and_count_results(test_result)
418         print_with_timestamp(DIVIDER)
419         if test_result.status == TestStatus.SUCCESS:
420                 fmt = green
421         elif test_result.status == TestStatus.SKIPPED:
422                 fmt = yellow
423         else:
424                 fmt =red
425         print_with_timestamp(
426                 fmt('Testing complete. %d tests run. %d failed. %d crashed. %d skipped.' %
427                     (counts.total(), counts.failed, counts.crashed, counts.skipped)))
428         return test_result