GNU Linux-libre 6.8.9-gnu
[releases.git] / tools / testing / kunit / kunit_parser.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Parses KTAP test results from a kernel dmesg log and incrementally prints
4 # results with reader-friendly format. Stores and returns test results in a
5 # Test object.
6 #
7 # Copyright (C) 2019, Google LLC.
8 # Author: Felix Guo <felixguoxiuping@gmail.com>
9 # Author: Brendan Higgins <brendanhiggins@google.com>
10 # Author: Rae Moar <rmoar@google.com>
11
12 from __future__ import annotations
13 from dataclasses import dataclass
14 import re
15 import textwrap
16
17 from enum import Enum, auto
18 from typing import Iterable, Iterator, List, Optional, Tuple
19
20 from kunit_printer import stdout
21
22 class Test:
23         """
24         A class to represent a test parsed from KTAP results. All KTAP
25         results within a test log are stored in a main Test object as
26         subtests.
27
28         Attributes:
29         status : TestStatus - status of the test
30         name : str - name of the test
31         expected_count : int - expected number of subtests (0 if single
32                 test case and None if unknown expected number of subtests)
33         subtests : List[Test] - list of subtests
34         log : List[str] - log of KTAP lines that correspond to the test
35         counts : TestCounts - counts of the test statuses and errors of
36                 subtests or of the test itself if the test is a single
37                 test case.
38         """
39         def __init__(self) -> None:
40                 """Creates Test object with default attributes."""
41                 self.status = TestStatus.TEST_CRASHED
42                 self.name = ''
43                 self.expected_count = 0  # type: Optional[int]
44                 self.subtests = []  # type: List[Test]
45                 self.log = []  # type: List[str]
46                 self.counts = TestCounts()
47
48         def __str__(self) -> str:
49                 """Returns string representation of a Test class object."""
50                 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51                         f'{self.subtests}, {self.log}, {self.counts})')
52
53         def __repr__(self) -> str:
54                 """Returns string representation of a Test class object."""
55                 return str(self)
56
57         def add_error(self, error_message: str) -> None:
58                 """Records an error that occurred while parsing this test."""
59                 self.counts.errors += 1
60                 stdout.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62         def ok_status(self) -> bool:
63                 """Returns true if the status was ok, i.e. passed or skipped."""
64                 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66 class TestStatus(Enum):
67         """An enumeration class to represent the status of a test."""
68         SUCCESS = auto()
69         FAILURE = auto()
70         SKIPPED = auto()
71         TEST_CRASHED = auto()
72         NO_TESTS = auto()
73         FAILURE_TO_PARSE_TESTS = auto()
74
75 @dataclass
76 class TestCounts:
77         """
78         Tracks the counts of statuses of all test cases and any errors within
79         a Test.
80         """
81         passed: int = 0
82         failed: int = 0
83         crashed: int = 0
84         skipped: int = 0
85         errors: int = 0
86
87         def __str__(self) -> str:
88                 """Returns the string representation of a TestCounts object."""
89                 statuses = [('passed', self.passed), ('failed', self.failed),
90                         ('crashed', self.crashed), ('skipped', self.skipped),
91                         ('errors', self.errors)]
92                 return f'Ran {self.total()} tests: ' + \
93                         ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95         def total(self) -> int:
96                 """Returns the total number of test cases within a test
97                 object, where a test case is a test with no subtests.
98                 """
99                 return (self.passed + self.failed + self.crashed +
100                         self.skipped)
101
102         def add_subtest_counts(self, counts: TestCounts) -> None:
103                 """
104                 Adds the counts of another TestCounts object to the current
105                 TestCounts object. Used to add the counts of a subtest to the
106                 parent test.
107
108                 Parameters:
109                 counts - a different TestCounts object whose counts
110                         will be added to the counts of the TestCounts object
111                 """
112                 self.passed += counts.passed
113                 self.failed += counts.failed
114                 self.crashed += counts.crashed
115                 self.skipped += counts.skipped
116                 self.errors += counts.errors
117
118         def get_status(self) -> TestStatus:
119                 """Returns the aggregated status of a Test using test
120                 counts.
121                 """
122                 if self.total() == 0:
123                         return TestStatus.NO_TESTS
124                 if self.crashed:
125                         # Crashes should take priority.
126                         return TestStatus.TEST_CRASHED
127                 if self.failed:
128                         return TestStatus.FAILURE
129                 if self.passed:
130                         # No failures or crashes, looks good!
131                         return TestStatus.SUCCESS
132                 # We have only skipped tests.
133                 return TestStatus.SKIPPED
134
135         def add_status(self, status: TestStatus) -> None:
136                 """Increments the count for `status`."""
137                 if status == TestStatus.SUCCESS:
138                         self.passed += 1
139                 elif status == TestStatus.FAILURE:
140                         self.failed += 1
141                 elif status == TestStatus.SKIPPED:
142                         self.skipped += 1
143                 elif status != TestStatus.NO_TESTS:
144                         self.crashed += 1
145
146 class LineStream:
147         """
148         A class to represent the lines of kernel output.
149         Provides a lazy peek()/pop() interface over an iterator of
150         (line#, text).
151         """
152         _lines: Iterator[Tuple[int, str]]
153         _next: Tuple[int, str]
154         _need_next: bool
155         _done: bool
156
157         def __init__(self, lines: Iterator[Tuple[int, str]]):
158                 """Creates a new LineStream that wraps the given iterator."""
159                 self._lines = lines
160                 self._done = False
161                 self._need_next = True
162                 self._next = (0, '')
163
164         def _get_next(self) -> None:
165                 """Advances the LineSteam to the next line, if necessary."""
166                 if not self._need_next:
167                         return
168                 try:
169                         self._next = next(self._lines)
170                 except StopIteration:
171                         self._done = True
172                 finally:
173                         self._need_next = False
174
175         def peek(self) -> str:
176                 """Returns the current line, without advancing the LineStream.
177                 """
178                 self._get_next()
179                 return self._next[1]
180
181         def pop(self) -> str:
182                 """Returns the current line and advances the LineStream to
183                 the next line.
184                 """
185                 s = self.peek()
186                 if self._done:
187                         raise ValueError(f'LineStream: going past EOF, last line was {s}')
188                 self._need_next = True
189                 return s
190
191         def __bool__(self) -> bool:
192                 """Returns True if stream has more lines."""
193                 self._get_next()
194                 return not self._done
195
196         # Only used by kunit_tool_test.py.
197         def __iter__(self) -> Iterator[str]:
198                 """Empties all lines stored in LineStream object into
199                 Iterator object and returns the Iterator object.
200                 """
201                 while bool(self):
202                         yield self.pop()
203
204         def line_number(self) -> int:
205                 """Returns the line number of the current line."""
206                 self._get_next()
207                 return self._next[0]
208
209 # Parsing helper methods:
210
211 KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212 TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213 KTAP_END = re.compile(r'\s*(List of all partitions:|'
214         'Kernel panic - not syncing: VFS:|reboot: System halted)')
215 EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217 def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218         """Extracts KTAP lines from the kernel output."""
219         def isolate_ktap_output(kernel_output: Iterable[str]) \
220                         -> Iterator[Tuple[int, str]]:
221                 line_num = 0
222                 started = False
223                 for line in kernel_output:
224                         line_num += 1
225                         line = line.rstrip()  # remove trailing \n
226                         if not started and KTAP_START.search(line):
227                                 # start extracting KTAP lines and set prefix
228                                 # to number of characters before version line
229                                 prefix_len = len(
230                                         line.split('KTAP version')[0])
231                                 started = True
232                                 yield line_num, line[prefix_len:]
233                         elif not started and TAP_START.search(line):
234                                 # start extracting KTAP lines and set prefix
235                                 # to number of characters before version line
236                                 prefix_len = len(line.split('TAP version')[0])
237                                 started = True
238                                 yield line_num, line[prefix_len:]
239                         elif started and KTAP_END.search(line):
240                                 # stop extracting KTAP lines
241                                 break
242                         elif started:
243                                 # remove the prefix, if any.
244                                 line = line[prefix_len:]
245                                 yield line_num, line
246                         elif EXECUTOR_ERROR.search(line):
247                                 yield line_num, line
248         return LineStream(lines=isolate_ktap_output(kernel_output))
249
250 KTAP_VERSIONS = [1]
251 TAP_VERSIONS = [13, 14]
252
253 def check_version(version_num: int, accepted_versions: List[int],
254                         version_type: str, test: Test) -> None:
255         """
256         Adds error to test object if version number is too high or too
257         low.
258
259         Parameters:
260         version_num - The inputted version number from the parsed KTAP or TAP
261                 header line
262         accepted_version - List of accepted KTAP or TAP versions
263         version_type - 'KTAP' or 'TAP' depending on the type of
264                 version line.
265         test - Test object for current test being parsed
266         """
267         if version_num < min(accepted_versions):
268                 test.add_error(f'{version_type} version lower than expected!')
269         elif version_num > max(accepted_versions):
270                 test.add_error(f'{version_type} version higer than expected!')
271
272 def parse_ktap_header(lines: LineStream, test: Test) -> bool:
273         """
274         Parses KTAP/TAP header line and checks version number.
275         Returns False if fails to parse KTAP/TAP header line.
276
277         Accepted formats:
278         - 'KTAP version [version number]'
279         - 'TAP version [version number]'
280
281         Parameters:
282         lines - LineStream of KTAP output to parse
283         test - Test object for current test being parsed
284
285         Return:
286         True if successfully parsed KTAP/TAP header line
287         """
288         ktap_match = KTAP_START.match(lines.peek())
289         tap_match = TAP_START.match(lines.peek())
290         if ktap_match:
291                 version_num = int(ktap_match.group(1))
292                 check_version(version_num, KTAP_VERSIONS, 'KTAP', test)
293         elif tap_match:
294                 version_num = int(tap_match.group(1))
295                 check_version(version_num, TAP_VERSIONS, 'TAP', test)
296         else:
297                 return False
298         lines.pop()
299         return True
300
301 TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
302
303 def parse_test_header(lines: LineStream, test: Test) -> bool:
304         """
305         Parses test header and stores test name in test object.
306         Returns False if fails to parse test header line.
307
308         Accepted format:
309         - '# Subtest: [test name]'
310
311         Parameters:
312         lines - LineStream of KTAP output to parse
313         test - Test object for current test being parsed
314
315         Return:
316         True if successfully parsed test header line
317         """
318         match = TEST_HEADER.match(lines.peek())
319         if not match:
320                 return False
321         test.name = match.group(1)
322         lines.pop()
323         return True
324
325 TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
326
327 def parse_test_plan(lines: LineStream, test: Test) -> bool:
328         """
329         Parses test plan line and stores the expected number of subtests in
330         test object. Reports an error if expected count is 0.
331         Returns False and sets expected_count to None if there is no valid test
332         plan.
333
334         Accepted format:
335         - '1..[number of subtests]'
336
337         Parameters:
338         lines - LineStream of KTAP output to parse
339         test - Test object for current test being parsed
340
341         Return:
342         True if successfully parsed test plan line
343         """
344         match = TEST_PLAN.match(lines.peek())
345         if not match:
346                 test.expected_count = None
347                 return False
348         expected_count = int(match.group(1))
349         test.expected_count = expected_count
350         lines.pop()
351         return True
352
353 TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?([^#]*)( # .*)?$')
354
355 TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) (- )?(.*) # SKIP(.*)$')
356
357 def peek_test_name_match(lines: LineStream, test: Test) -> bool:
358         """
359         Matches current line with the format of a test result line and checks
360         if the name matches the name of the current test.
361         Returns False if fails to match format or name.
362
363         Accepted format:
364         - '[ok|not ok] [test number] [-] [test name] [optional skip
365                 directive]'
366
367         Parameters:
368         lines - LineStream of KTAP output to parse
369         test - Test object for current test being parsed
370
371         Return:
372         True if matched a test result line and the name matching the
373                 expected test name
374         """
375         line = lines.peek()
376         match = TEST_RESULT.match(line)
377         if not match:
378                 return False
379         name = match.group(4)
380         return name == test.name
381
382 def parse_test_result(lines: LineStream, test: Test,
383                         expected_num: int) -> bool:
384         """
385         Parses test result line and stores the status and name in the test
386         object. Reports an error if the test number does not match expected
387         test number.
388         Returns False if fails to parse test result line.
389
390         Note that the SKIP directive is the only direction that causes a
391         change in status.
392
393         Accepted format:
394         - '[ok|not ok] [test number] [-] [test name] [optional skip
395                 directive]'
396
397         Parameters:
398         lines - LineStream of KTAP output to parse
399         test - Test object for current test being parsed
400         expected_num - expected test number for current test
401
402         Return:
403         True if successfully parsed a test result line.
404         """
405         line = lines.peek()
406         match = TEST_RESULT.match(line)
407         skip_match = TEST_RESULT_SKIP.match(line)
408
409         # Check if line matches test result line format
410         if not match:
411                 return False
412         lines.pop()
413
414         # Set name of test object
415         if skip_match:
416                 test.name = skip_match.group(4)
417         else:
418                 test.name = match.group(4)
419
420         # Check test num
421         num = int(match.group(2))
422         if num != expected_num:
423                 test.add_error(f'Expected test number {expected_num} but found {num}')
424
425         # Set status of test object
426         status = match.group(1)
427         if skip_match:
428                 test.status = TestStatus.SKIPPED
429         elif status == 'ok':
430                 test.status = TestStatus.SUCCESS
431         else:
432                 test.status = TestStatus.FAILURE
433         return True
434
435 def parse_diagnostic(lines: LineStream) -> List[str]:
436         """
437         Parse lines that do not match the format of a test result line or
438         test header line and returns them in list.
439
440         Line formats that are not parsed:
441         - '# Subtest: [test name]'
442         - '[ok|not ok] [test number] [-] [test name] [optional skip
443                 directive]'
444         - 'KTAP version [version number]'
445
446         Parameters:
447         lines - LineStream of KTAP output to parse
448
449         Return:
450         Log of diagnostic lines
451         """
452         log = []  # type: List[str]
453         non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
454         while lines and not any(re.match(lines.peek())
455                         for re in non_diagnostic_lines):
456                 log.append(lines.pop())
457         return log
458
459
460 # Printing helper methods:
461
462 DIVIDER = '=' * 60
463
464 def format_test_divider(message: str, len_message: int) -> str:
465         """
466         Returns string with message centered in fixed width divider.
467
468         Example:
469         '===================== message example ====================='
470
471         Parameters:
472         message - message to be centered in divider line
473         len_message - length of the message to be printed such that
474                 any characters of the color codes are not counted
475
476         Return:
477         String containing message centered in fixed width divider
478         """
479         default_count = 3  # default number of dashes
480         len_1 = default_count
481         len_2 = default_count
482         difference = len(DIVIDER) - len_message - 2  # 2 spaces added
483         if difference > 0:
484                 # calculate number of dashes for each side of the divider
485                 len_1 = int(difference / 2)
486                 len_2 = difference - len_1
487         return ('=' * len_1) + f' {message} ' + ('=' * len_2)
488
489 def print_test_header(test: Test) -> None:
490         """
491         Prints test header with test name and optionally the expected number
492         of subtests.
493
494         Example:
495         '=================== example (2 subtests) ==================='
496
497         Parameters:
498         test - Test object representing current test being printed
499         """
500         message = test.name
501         if message != "":
502                 # Add a leading space before the subtest counts only if a test name
503                 # is provided using a "# Subtest" header line.
504                 message += " "
505         if test.expected_count:
506                 if test.expected_count == 1:
507                         message += '(1 subtest)'
508                 else:
509                         message += f'({test.expected_count} subtests)'
510         stdout.print_with_timestamp(format_test_divider(message, len(message)))
511
512 def print_log(log: Iterable[str]) -> None:
513         """Prints all strings in saved log for test in yellow."""
514         formatted = textwrap.dedent('\n'.join(log))
515         for line in formatted.splitlines():
516                 stdout.print_with_timestamp(stdout.yellow(line))
517
518 def format_test_result(test: Test) -> str:
519         """
520         Returns string with formatted test result with colored status and test
521         name.
522
523         Example:
524         '[PASSED] example'
525
526         Parameters:
527         test - Test object representing current test being printed
528
529         Return:
530         String containing formatted test result
531         """
532         if test.status == TestStatus.SUCCESS:
533                 return stdout.green('[PASSED] ') + test.name
534         if test.status == TestStatus.SKIPPED:
535                 return stdout.yellow('[SKIPPED] ') + test.name
536         if test.status == TestStatus.NO_TESTS:
537                 return stdout.yellow('[NO TESTS RUN] ') + test.name
538         if test.status == TestStatus.TEST_CRASHED:
539                 print_log(test.log)
540                 return stdout.red('[CRASHED] ') + test.name
541         print_log(test.log)
542         return stdout.red('[FAILED] ') + test.name
543
544 def print_test_result(test: Test) -> None:
545         """
546         Prints result line with status of test.
547
548         Example:
549         '[PASSED] example'
550
551         Parameters:
552         test - Test object representing current test being printed
553         """
554         stdout.print_with_timestamp(format_test_result(test))
555
556 def print_test_footer(test: Test) -> None:
557         """
558         Prints test footer with status of test.
559
560         Example:
561         '===================== [PASSED] example ====================='
562
563         Parameters:
564         test - Test object representing current test being printed
565         """
566         message = format_test_result(test)
567         stdout.print_with_timestamp(format_test_divider(message,
568                 len(message) - stdout.color_len()))
569
570
571
572 def _summarize_failed_tests(test: Test) -> str:
573         """Tries to summarize all the failing subtests in `test`."""
574
575         def failed_names(test: Test, parent_name: str) -> List[str]:
576                 # Note: we use 'main' internally for the top-level test.
577                 if not parent_name or parent_name == 'main':
578                         full_name = test.name
579                 else:
580                         full_name = parent_name + '.' + test.name
581
582                 if not test.subtests:  # this is a leaf node
583                         return [full_name]
584
585                 # If all the children failed, just say this subtest failed.
586                 # Don't summarize it down "the top-level test failed", though.
587                 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
588                 if parent_name and len(failed_subtests) ==  len(test.subtests):
589                         return [full_name]
590
591                 all_failures = []  # type: List[str]
592                 for t in failed_subtests:
593                         all_failures.extend(failed_names(t, full_name))
594                 return all_failures
595
596         failures = failed_names(test, '')
597         # If there are too many failures, printing them out will just be noisy.
598         if len(failures) > 10:  # this is an arbitrary limit
599                 return ''
600
601         return 'Failures: ' + ', '.join(failures)
602
603
604 def print_summary_line(test: Test) -> None:
605         """
606         Prints summary line of test object. Color of line is dependent on
607         status of test. Color is green if test passes, yellow if test is
608         skipped, and red if the test fails or crashes. Summary line contains
609         counts of the statuses of the tests subtests or the test itself if it
610         has no subtests.
611
612         Example:
613         "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
614         Errors: 0"
615
616         test - Test object representing current test being printed
617         """
618         if test.status == TestStatus.SUCCESS:
619                 color = stdout.green
620         elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
621                 color = stdout.yellow
622         else:
623                 color = stdout.red
624         stdout.print_with_timestamp(color(f'Testing complete. {test.counts}'))
625
626         # Summarize failures that might have gone off-screen since we had a lot
627         # of tests (arbitrarily defined as >=100 for now).
628         if test.ok_status() or test.counts.total() < 100:
629                 return
630         summarized = _summarize_failed_tests(test)
631         if not summarized:
632                 return
633         stdout.print_with_timestamp(color(summarized))
634
635 # Other methods:
636
637 def bubble_up_test_results(test: Test) -> None:
638         """
639         If the test has subtests, add the test counts of the subtests to the
640         test and check if any of the tests crashed and if so set the test
641         status to crashed. Otherwise if the test has no subtests add the
642         status of the test to the test counts.
643
644         Parameters:
645         test - Test object for current test being parsed
646         """
647         subtests = test.subtests
648         counts = test.counts
649         status = test.status
650         for t in subtests:
651                 counts.add_subtest_counts(t.counts)
652         if counts.total() == 0:
653                 counts.add_status(status)
654         elif test.counts.get_status() == TestStatus.TEST_CRASHED:
655                 test.status = TestStatus.TEST_CRASHED
656
657 def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool) -> Test:
658         """
659         Finds next test to parse in LineStream, creates new Test object,
660         parses any subtests of the test, populates Test object with all
661         information (status, name) about the test and the Test objects for
662         any subtests, and then returns the Test object. The method accepts
663         three formats of tests:
664
665         Accepted test formats:
666
667         - Main KTAP/TAP header
668
669         Example:
670
671         KTAP version 1
672         1..4
673         [subtests]
674
675         - Subtest header (must include either the KTAP version line or
676           "# Subtest" header line)
677
678         Example (preferred format with both KTAP version line and
679         "# Subtest" line):
680
681         KTAP version 1
682         # Subtest: name
683         1..3
684         [subtests]
685         ok 1 name
686
687         Example (only "# Subtest" line):
688
689         # Subtest: name
690         1..3
691         [subtests]
692         ok 1 name
693
694         Example (only KTAP version line, compliant with KTAP v1 spec):
695
696         KTAP version 1
697         1..3
698         [subtests]
699         ok 1 name
700
701         - Test result line
702
703         Example:
704
705         ok 1 - test
706
707         Parameters:
708         lines - LineStream of KTAP output to parse
709         expected_num - expected test number for test to be parsed
710         log - list of strings containing any preceding diagnostic lines
711                 corresponding to the current test
712         is_subtest - boolean indicating whether test is a subtest
713
714         Return:
715         Test object populated with characteristics and any subtests
716         """
717         test = Test()
718         test.log.extend(log)
719
720         # Parse any errors prior to parsing tests
721         err_log = parse_diagnostic(lines)
722         test.log.extend(err_log)
723
724         if not is_subtest:
725                 # If parsing the main/top-level test, parse KTAP version line and
726                 # test plan
727                 test.name = "main"
728                 ktap_line = parse_ktap_header(lines, test)
729                 test.log.extend(parse_diagnostic(lines))
730                 parse_test_plan(lines, test)
731                 parent_test = True
732         else:
733                 # If not the main test, attempt to parse a test header containing
734                 # the KTAP version line and/or subtest header line
735                 ktap_line = parse_ktap_header(lines, test)
736                 subtest_line = parse_test_header(lines, test)
737                 parent_test = (ktap_line or subtest_line)
738                 if parent_test:
739                         # If KTAP version line and/or subtest header is found, attempt
740                         # to parse test plan and print test header
741                         test.log.extend(parse_diagnostic(lines))
742                         parse_test_plan(lines, test)
743                         print_test_header(test)
744         expected_count = test.expected_count
745         subtests = []
746         test_num = 1
747         while parent_test and (expected_count is None or test_num <= expected_count):
748                 # Loop to parse any subtests.
749                 # Break after parsing expected number of tests or
750                 # if expected number of tests is unknown break when test
751                 # result line with matching name to subtest header is found
752                 # or no more lines in stream.
753                 sub_log = parse_diagnostic(lines)
754                 sub_test = Test()
755                 if not lines or (peek_test_name_match(lines, test) and
756                                 is_subtest):
757                         if expected_count and test_num <= expected_count:
758                                 # If parser reaches end of test before
759                                 # parsing expected number of subtests, print
760                                 # crashed subtest and record error
761                                 test.add_error('missing expected subtest!')
762                                 sub_test.log.extend(sub_log)
763                                 test.counts.add_status(
764                                         TestStatus.TEST_CRASHED)
765                                 print_test_result(sub_test)
766                         else:
767                                 test.log.extend(sub_log)
768                                 break
769                 else:
770                         sub_test = parse_test(lines, test_num, sub_log, True)
771                 subtests.append(sub_test)
772                 test_num += 1
773         test.subtests = subtests
774         if is_subtest:
775                 # If not main test, look for test result line
776                 test.log.extend(parse_diagnostic(lines))
777                 if test.name != "" and not peek_test_name_match(lines, test):
778                         test.add_error('missing subtest result line!')
779                 else:
780                         parse_test_result(lines, test, expected_num)
781
782         # Check for there being no subtests within parent test
783         if parent_test and len(subtests) == 0:
784                 # Don't override a bad status if this test had one reported.
785                 # Assumption: no subtests means CRASHED is from Test.__init__()
786                 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
787                         print_log(test.log)
788                         test.status = TestStatus.NO_TESTS
789                         test.add_error('0 tests run!')
790
791         # Add statuses to TestCounts attribute in Test object
792         bubble_up_test_results(test)
793         if parent_test and is_subtest:
794                 # If test has subtests and is not the main test object, print
795                 # footer.
796                 print_test_footer(test)
797         elif is_subtest:
798                 print_test_result(test)
799         return test
800
801 def parse_run_tests(kernel_output: Iterable[str]) -> Test:
802         """
803         Using kernel output, extract KTAP lines, parse the lines for test
804         results and print condensed test results and summary line.
805
806         Parameters:
807         kernel_output - Iterable object contains lines of kernel output
808
809         Return:
810         Test - the main test object with all subtests.
811         """
812         stdout.print_with_timestamp(DIVIDER)
813         lines = extract_tap_lines(kernel_output)
814         test = Test()
815         if not lines:
816                 test.name = '<missing>'
817                 test.add_error('Could not find any KTAP output. Did any KUnit tests run?')
818                 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
819         else:
820                 test = parse_test(lines, 0, [], False)
821                 if test.status != TestStatus.NO_TESTS:
822                         test.status = test.counts.get_status()
823         stdout.print_with_timestamp(DIVIDER)
824         print_summary_line(test)
825         return test