GNU Linux-libre 5.13.14-gnu1
[releases.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses test results from a kernel dmesg log.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import re
10
11 from collections import namedtuple
12 from datetime import datetime
13 from enum import Enum, auto
14 from functools import reduce
15 from typing import Iterable, Iterator, List, Optional, Tuple
16
17 TestResult = namedtuple('TestResult', ['status','suites','log'])
18
19 class TestSuite(object):
20         def __init__(self) -> None:
21                 self.status = TestStatus.SUCCESS
22                 self.name = ''
23                 self.cases = []  # type: List[TestCase]
24
25         def __str__(self) -> str:
26                 return 'TestSuite(' + str(self.status) + ',' + self.name + ',' + str(self.cases) + ')'
27
28         def __repr__(self) -> str:
29                 return str(self)
30
31 class TestCase(object):
32         def __init__(self) -> None:
33                 self.status = TestStatus.SUCCESS
34                 self.name = ''
35                 self.log = []  # type: List[str]
36
37         def __str__(self) -> str:
38                 return 'TestCase(' + str(self.status) + ',' + self.name + ',' + str(self.log) + ')'
39
40         def __repr__(self) -> str:
41                 return str(self)
42
43 class TestStatus(Enum):
44         SUCCESS = auto()
45         FAILURE = auto()
46         TEST_CRASHED = auto()
47         NO_TESTS = auto()
48         FAILURE_TO_PARSE_TESTS = auto()
49
50 kunit_start_re = re.compile(r'TAP version [0-9]+$')
51 kunit_end_re = re.compile('(List of all partitions:|'
52                           'Kernel panic - not syncing: VFS:)')
53
54 def isolate_kunit_output(kernel_output) -> Iterator[str]:
55         started = False
56         for line in kernel_output:
57                 line = line.rstrip()  # line always has a trailing \n
58                 if kunit_start_re.search(line):
59                         prefix_len = len(line.split('TAP version')[0])
60                         started = True
61                         yield line[prefix_len:] if prefix_len > 0 else line
62                 elif kunit_end_re.search(line):
63                         break
64                 elif started:
65                         yield line[prefix_len:] if prefix_len > 0 else line
66
67 def raw_output(kernel_output) -> None:
68         for line in kernel_output:
69                 print(line.rstrip())
70
71 DIVIDER = '=' * 60
72
73 RESET = '\033[0;0m'
74
75 def red(text) -> str:
76         return '\033[1;31m' + text + RESET
77
78 def yellow(text) -> str:
79         return '\033[1;33m' + text + RESET
80
81 def green(text) -> str:
82         return '\033[1;32m' + text + RESET
83
84 def print_with_timestamp(message) -> None:
85         print('[%s] %s' % (datetime.now().strftime('%H:%M:%S'), message))
86
87 def format_suite_divider(message) -> str:
88         return '======== ' + message + ' ========'
89
90 def print_suite_divider(message) -> None:
91         print_with_timestamp(DIVIDER)
92         print_with_timestamp(format_suite_divider(message))
93
94 def print_log(log) -> None:
95         for m in log:
96                 print_with_timestamp(m)
97
98 TAP_ENTRIES = re.compile(r'^(TAP|[\s]*ok|[\s]*not ok|[\s]*[0-9]+\.\.[0-9]+|[\s]*#).*$')
99
100 def consume_non_diagnostic(lines: List[str]) -> None:
101         while lines and not TAP_ENTRIES.match(lines[0]):
102                 lines.pop(0)
103
104 def save_non_diagnostic(lines: List[str], test_case: TestCase) -> None:
105         while lines and not TAP_ENTRIES.match(lines[0]):
106                 test_case.log.append(lines[0])
107                 lines.pop(0)
108
109 OkNotOkResult = namedtuple('OkNotOkResult', ['is_ok','description', 'text'])
110
111 OK_NOT_OK_SUBTEST = re.compile(r'^[\s]+(ok|not ok) [0-9]+ - (.*)$')
112
113 OK_NOT_OK_MODULE = re.compile(r'^(ok|not ok) ([0-9]+) - (.*)$')
114
115 def parse_ok_not_ok_test_case(lines: List[str], test_case: TestCase) -> bool:
116         save_non_diagnostic(lines, test_case)
117         if not lines:
118                 test_case.status = TestStatus.TEST_CRASHED
119                 return True
120         line = lines[0]
121         match = OK_NOT_OK_SUBTEST.match(line)
122         while not match and lines:
123                 line = lines.pop(0)
124                 match = OK_NOT_OK_SUBTEST.match(line)
125         if match:
126                 test_case.log.append(lines.pop(0))
127                 test_case.name = match.group(2)
128                 if test_case.status == TestStatus.TEST_CRASHED:
129                         return True
130                 if match.group(1) == 'ok':
131                         test_case.status = TestStatus.SUCCESS
132                 else:
133                         test_case.status = TestStatus.FAILURE
134                 return True
135         else:
136                 return False
137
138 SUBTEST_DIAGNOSTIC = re.compile(r'^[\s]+# (.*)$')
139 DIAGNOSTIC_CRASH_MESSAGE = re.compile(r'^[\s]+# .*?: kunit test case crashed!$')
140
141 def parse_diagnostic(lines: List[str], test_case: TestCase) -> bool:
142         save_non_diagnostic(lines, test_case)
143         if not lines:
144                 return False
145         line = lines[0]
146         match = SUBTEST_DIAGNOSTIC.match(line)
147         if match:
148                 test_case.log.append(lines.pop(0))
149                 crash_match = DIAGNOSTIC_CRASH_MESSAGE.match(line)
150                 if crash_match:
151                         test_case.status = TestStatus.TEST_CRASHED
152                 return True
153         else:
154                 return False
155
156 def parse_test_case(lines: List[str]) -> Optional[TestCase]:
157         test_case = TestCase()
158         save_non_diagnostic(lines, test_case)
159         while parse_diagnostic(lines, test_case):
160                 pass
161         if parse_ok_not_ok_test_case(lines, test_case):
162                 return test_case
163         else:
164                 return None
165
166 SUBTEST_HEADER = re.compile(r'^[\s]+# Subtest: (.*)$')
167
168 def parse_subtest_header(lines: List[str]) -> Optional[str]:
169         consume_non_diagnostic(lines)
170         if not lines:
171                 return None
172         match = SUBTEST_HEADER.match(lines[0])
173         if match:
174                 lines.pop(0)
175                 return match.group(1)
176         else:
177                 return None
178
179 SUBTEST_PLAN = re.compile(r'[\s]+[0-9]+\.\.([0-9]+)')
180
181 def parse_subtest_plan(lines: List[str]) -> Optional[int]:
182         consume_non_diagnostic(lines)
183         match = SUBTEST_PLAN.match(lines[0])
184         if match:
185                 lines.pop(0)
186                 return int(match.group(1))
187         else:
188                 return None
189
190 def max_status(left: TestStatus, right: TestStatus) -> TestStatus:
191         if left == TestStatus.TEST_CRASHED or right == TestStatus.TEST_CRASHED:
192                 return TestStatus.TEST_CRASHED
193         elif left == TestStatus.FAILURE or right == TestStatus.FAILURE:
194                 return TestStatus.FAILURE
195         elif left != TestStatus.SUCCESS:
196                 return left
197         elif right != TestStatus.SUCCESS:
198                 return right
199         else:
200                 return TestStatus.SUCCESS
201
202 def parse_ok_not_ok_test_suite(lines: List[str],
203                                test_suite: TestSuite,
204                                expected_suite_index: int) -> bool:
205         consume_non_diagnostic(lines)
206         if not lines:
207                 test_suite.status = TestStatus.TEST_CRASHED
208                 return False
209         line = lines[0]
210         match = OK_NOT_OK_MODULE.match(line)
211         if match:
212                 lines.pop(0)
213                 if match.group(1) == 'ok':
214                         test_suite.status = TestStatus.SUCCESS
215                 else:
216                         test_suite.status = TestStatus.FAILURE
217                 suite_index = int(match.group(2))
218                 if suite_index != expected_suite_index:
219                         print_with_timestamp(
220                                 red('[ERROR] ') + 'expected_suite_index ' +
221                                 str(expected_suite_index) + ', but got ' +
222                                 str(suite_index))
223                 return True
224         else:
225                 return False
226
227 def bubble_up_errors(statuses: Iterable[TestStatus]) -> TestStatus:
228         return reduce(max_status, statuses, TestStatus.SUCCESS)
229
230 def bubble_up_test_case_errors(test_suite: TestSuite) -> TestStatus:
231         max_test_case_status = bubble_up_errors(x.status for x in test_suite.cases)
232         return max_status(max_test_case_status, test_suite.status)
233
234 def parse_test_suite(lines: List[str], expected_suite_index: int) -> Optional[TestSuite]:
235         if not lines:
236                 return None
237         consume_non_diagnostic(lines)
238         test_suite = TestSuite()
239         test_suite.status = TestStatus.SUCCESS
240         name = parse_subtest_header(lines)
241         if not name:
242                 return None
243         test_suite.name = name
244         expected_test_case_num = parse_subtest_plan(lines)
245         if expected_test_case_num is None:
246                 return None
247         while expected_test_case_num > 0:
248                 test_case = parse_test_case(lines)
249                 if not test_case:
250                         break
251                 test_suite.cases.append(test_case)
252                 expected_test_case_num -= 1
253         if parse_ok_not_ok_test_suite(lines, test_suite, expected_suite_index):
254                 test_suite.status = bubble_up_test_case_errors(test_suite)
255                 return test_suite
256         elif not lines:
257                 print_with_timestamp(red('[ERROR] ') + 'ran out of lines before end token')
258                 return test_suite
259         else:
260                 print('failed to parse end of suite' + lines[0])
261                 return None
262
263 TAP_HEADER = re.compile(r'^TAP version 14$')
264
265 def parse_tap_header(lines: List[str]) -> bool:
266         consume_non_diagnostic(lines)
267         if TAP_HEADER.match(lines[0]):
268                 lines.pop(0)
269                 return True
270         else:
271                 return False
272
273 TEST_PLAN = re.compile(r'[0-9]+\.\.([0-9]+)')
274
275 def parse_test_plan(lines: List[str]) -> Optional[int]:
276         consume_non_diagnostic(lines)
277         match = TEST_PLAN.match(lines[0])
278         if match:
279                 lines.pop(0)
280                 return int(match.group(1))
281         else:
282                 return None
283
284 def bubble_up_suite_errors(test_suites: Iterable[TestSuite]) -> TestStatus:
285         return bubble_up_errors(x.status for x in test_suites)
286
287 def parse_test_result(lines: List[str]) -> TestResult:
288         consume_non_diagnostic(lines)
289         if not lines or not parse_tap_header(lines):
290                 return TestResult(TestStatus.NO_TESTS, [], lines)
291         expected_test_suite_num = parse_test_plan(lines)
292         if not expected_test_suite_num:
293                 return TestResult(TestStatus.FAILURE_TO_PARSE_TESTS, [], lines)
294         test_suites = []
295         for i in range(1, expected_test_suite_num + 1):
296                 test_suite = parse_test_suite(lines, i)
297                 if test_suite:
298                         test_suites.append(test_suite)
299                 else:
300                         print_with_timestamp(
301                                 red('[ERROR] ') + ' expected ' +
302                                 str(expected_test_suite_num) +
303                                 ' test suites, but got ' + str(i - 2))
304                         break
305         test_suite = parse_test_suite(lines, -1)
306         if test_suite:
307                 print_with_timestamp(red('[ERROR] ') +
308                         'got unexpected test suite: ' + test_suite.name)
309         if test_suites:
310                 return TestResult(bubble_up_suite_errors(test_suites), test_suites, lines)
311         else:
312                 return TestResult(TestStatus.NO_TESTS, [], lines)
313
314 def print_and_count_results(test_result: TestResult) -> Tuple[int, int, int]:
315         total_tests = 0
316         failed_tests = 0
317         crashed_tests = 0
318         for test_suite in test_result.suites:
319                 if test_suite.status == TestStatus.SUCCESS:
320                         print_suite_divider(green('[PASSED] ') + test_suite.name)
321                 elif test_suite.status == TestStatus.TEST_CRASHED:
322                         print_suite_divider(red('[CRASHED] ' + test_suite.name))
323                 else:
324                         print_suite_divider(red('[FAILED] ') + test_suite.name)
325                 for test_case in test_suite.cases:
326                         total_tests += 1
327                         if test_case.status == TestStatus.SUCCESS:
328                                 print_with_timestamp(green('[PASSED] ') + test_case.name)
329                         elif test_case.status == TestStatus.TEST_CRASHED:
330                                 crashed_tests += 1
331                                 print_with_timestamp(red('[CRASHED] ' + test_case.name))
332                                 print_log(map(yellow, test_case.log))
333                                 print_with_timestamp('')
334                         else:
335                                 failed_tests += 1
336                                 print_with_timestamp(red('[FAILED] ') + test_case.name)
337                                 print_log(map(yellow, test_case.log))
338                                 print_with_timestamp('')
339         return total_tests, failed_tests, crashed_tests
340
341 def parse_run_tests(kernel_output) -> TestResult:
342         total_tests = 0
343         failed_tests = 0
344         crashed_tests = 0
345         test_result = parse_test_result(list(isolate_kunit_output(kernel_output)))
346         if test_result.status == TestStatus.NO_TESTS:
347                 print(red('[ERROR] ') + yellow('no tests run!'))
348         elif test_result.status == TestStatus.FAILURE_TO_PARSE_TESTS:
349                 print(red('[ERROR] ') + yellow('could not parse test results!'))
350         else:
351                 (total_tests,
352                  failed_tests,
353                  crashed_tests) = print_and_count_results(test_result)
354         print_with_timestamp(DIVIDER)
355         fmt = green if test_result.status == TestStatus.SUCCESS else red
356         print_with_timestamp(
357                 fmt('Testing complete. %d tests run. %d failed. %d crashed.' %
358                     (total_tests, failed_tests, crashed_tests)))
359         return test_result