Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Parses KTAP test results from a kernel dmesg log and incrementally prints
4# results with reader-friendly format. Stores and returns test results in a
5# Test object.
6#
7# Copyright (C) 2019, Google LLC.
8# Author: Felix Guo <felixguoxiuping@gmail.com>
9# Author: Brendan Higgins <brendanhiggins@google.com>
10# Author: Rae Moar <rmoar@google.com>
11
12from __future__ import annotations
13from dataclasses import dataclass
14import re
15import textwrap
16
17from enum import Enum, auto
18from typing import Iterable, Iterator, List, Optional, Tuple
19
20from kunit_printer import Printer, stdout
21
22class Test:
23 """
24 A class to represent a test parsed from KTAP results. All KTAP
25 results within a test log are stored in a main Test object as
26 subtests.
27
28 Attributes:
29 status : TestStatus - status of the test
30 name : str - name of the test
31 expected_count : int - expected number of subtests (0 if single
32 test case and None if unknown expected number of subtests)
33 subtests : List[Test] - list of subtests
34 log : List[str] - log of KTAP lines that correspond to the test
35 counts : TestCounts - counts of the test statuses and errors of
36 subtests or of the test itself if the test is a single
37 test case.
38 """
39 def __init__(self) -> None:
40 """Creates Test object with default attributes."""
41 self.status = TestStatus.TEST_CRASHED
42 self.name = ''
43 self.expected_count = 0 # type: Optional[int]
44 self.subtests = [] # type: List[Test]
45 self.log = [] # type: List[str]
46 self.counts = TestCounts()
47
48 def __str__(self) -> str:
49 """Returns string representation of a Test class object."""
50 return (f'Test({self.status}, {self.name}, {self.expected_count}, '
51 f'{self.subtests}, {self.log}, {self.counts})')
52
53 def __repr__(self) -> str:
54 """Returns string representation of a Test class object."""
55 return str(self)
56
57 def add_error(self, printer: Printer, error_message: str) -> None:
58 """Records an error that occurred while parsing this test."""
59 self.counts.errors += 1
60 printer.print_with_timestamp(stdout.red('[ERROR]') + f' Test: {self.name}: {error_message}')
61
62 def ok_status(self) -> bool:
63 """Returns true if the status was ok, i.e. passed or skipped."""
64 return self.status in (TestStatus.SUCCESS, TestStatus.SKIPPED)
65
66class TestStatus(Enum):
67 """An enumeration class to represent the status of a test."""
68 SUCCESS = auto()
69 FAILURE = auto()
70 SKIPPED = auto()
71 TEST_CRASHED = auto()
72 NO_TESTS = auto()
73 FAILURE_TO_PARSE_TESTS = auto()
74
75@dataclass
76class TestCounts:
77 """
78 Tracks the counts of statuses of all test cases and any errors within
79 a Test.
80 """
81 passed: int = 0
82 failed: int = 0
83 crashed: int = 0
84 skipped: int = 0
85 errors: int = 0
86
87 def __str__(self) -> str:
88 """Returns the string representation of a TestCounts object."""
89 statuses = [('passed', self.passed), ('failed', self.failed),
90 ('crashed', self.crashed), ('skipped', self.skipped),
91 ('errors', self.errors)]
92 return f'Ran {self.total()} tests: ' + \
93 ', '.join(f'{s}: {n}' for s, n in statuses if n > 0)
94
95 def total(self) -> int:
96 """Returns the total number of test cases within a test
97 object, where a test case is a test with no subtests.
98 """
99 return (self.passed + self.failed + self.crashed +
100 self.skipped)
101
102 def add_subtest_counts(self, counts: TestCounts) -> None:
103 """
104 Adds the counts of another TestCounts object to the current
105 TestCounts object. Used to add the counts of a subtest to the
106 parent test.
107
108 Parameters:
109 counts - a different TestCounts object whose counts
110 will be added to the counts of the TestCounts object
111 """
112 self.passed += counts.passed
113 self.failed += counts.failed
114 self.crashed += counts.crashed
115 self.skipped += counts.skipped
116 self.errors += counts.errors
117
118 def get_status(self) -> TestStatus:
119 """Returns the aggregated status of a Test using test
120 counts.
121 """
122 if self.total() == 0:
123 return TestStatus.NO_TESTS
124 if self.crashed:
125 # Crashes should take priority.
126 return TestStatus.TEST_CRASHED
127 if self.failed:
128 return TestStatus.FAILURE
129 if self.passed:
130 # No failures or crashes, looks good!
131 return TestStatus.SUCCESS
132 # We have only skipped tests.
133 return TestStatus.SKIPPED
134
135 def add_status(self, status: TestStatus) -> None:
136 """Increments the count for `status`."""
137 if status == TestStatus.SUCCESS:
138 self.passed += 1
139 elif status == TestStatus.FAILURE:
140 self.failed += 1
141 elif status == TestStatus.SKIPPED:
142 self.skipped += 1
143 elif status != TestStatus.NO_TESTS:
144 self.crashed += 1
145
146class LineStream:
147 """
148 A class to represent the lines of kernel output.
149 Provides a lazy peek()/pop() interface over an iterator of
150 (line#, text).
151 """
152 _lines: Iterator[Tuple[int, str]]
153 _next: Tuple[int, str]
154 _need_next: bool
155 _done: bool
156
157 def __init__(self, lines: Iterator[Tuple[int, str]]):
158 """Creates a new LineStream that wraps the given iterator."""
159 self._lines = lines
160 self._done = False
161 self._need_next = True
162 self._next = (0, '')
163
164 def _get_next(self) -> None:
165 """Advances the LineSteam to the next line, if necessary."""
166 if not self._need_next:
167 return
168 try:
169 self._next = next(self._lines)
170 except StopIteration:
171 self._done = True
172 finally:
173 self._need_next = False
174
175 def peek(self) -> str:
176 """Returns the current line, without advancing the LineStream.
177 """
178 self._get_next()
179 return self._next[1]
180
181 def pop(self) -> str:
182 """Returns the current line and advances the LineStream to
183 the next line.
184 """
185 s = self.peek()
186 if self._done:
187 raise ValueError(f'LineStream: going past EOF, last line was {s}')
188 self._need_next = True
189 return s
190
191 def __bool__(self) -> bool:
192 """Returns True if stream has more lines."""
193 self._get_next()
194 return not self._done
195
196 # Only used by kunit_tool_test.py.
197 def __iter__(self) -> Iterator[str]:
198 """Empties all lines stored in LineStream object into
199 Iterator object and returns the Iterator object.
200 """
201 while bool(self):
202 yield self.pop()
203
204 def line_number(self) -> int:
205 """Returns the line number of the current line."""
206 self._get_next()
207 return self._next[0]
208
209# Parsing helper methods:
210
211KTAP_START = re.compile(r'\s*KTAP version ([0-9]+)$')
212TAP_START = re.compile(r'\s*TAP version ([0-9]+)$')
213KTAP_END = re.compile(r'\s*(List of all partitions:|'
214 'Kernel panic - not syncing: VFS:|reboot: System halted)')
215EXECUTOR_ERROR = re.compile(r'\s*kunit executor: (.*)$')
216
217def extract_tap_lines(kernel_output: Iterable[str]) -> LineStream:
218 """Extracts KTAP lines from the kernel output."""
219 def isolate_ktap_output(kernel_output: Iterable[str]) \
220 -> Iterator[Tuple[int, str]]:
221 line_num = 0
222 started = False
223 for line in kernel_output:
224 line_num += 1
225 line = line.rstrip() # remove trailing \n
226 if not started and KTAP_START.search(line):
227 # start extracting KTAP lines and set prefix
228 # to number of characters before version line
229 prefix_len = len(
230 line.split('KTAP version')[0])
231 started = True
232 yield line_num, line[prefix_len:]
233 elif not started and TAP_START.search(line):
234 # start extracting KTAP lines and set prefix
235 # to number of characters before version line
236 prefix_len = len(line.split('TAP version')[0])
237 started = True
238 yield line_num, line[prefix_len:]
239 elif started and KTAP_END.search(line):
240 # stop extracting KTAP lines
241 break
242 elif started:
243 # remove the prefix, if any.
244 line = line[prefix_len:]
245 yield line_num, line
246 elif EXECUTOR_ERROR.search(line):
247 yield line_num, line
248 return LineStream(lines=isolate_ktap_output(kernel_output))
249
250KTAP_VERSIONS = [1]
251TAP_VERSIONS = [13, 14]
252
253def check_version(version_num: int, accepted_versions: List[int],
254 version_type: str, test: Test, printer: Printer) -> None:
255 """
256 Adds error to test object if version number is too high or too
257 low.
258
259 Parameters:
260 version_num - The inputted version number from the parsed KTAP or TAP
261 header line
262 accepted_version - List of accepted KTAP or TAP versions
263 version_type - 'KTAP' or 'TAP' depending on the type of
264 version line.
265 test - Test object for current test being parsed
266 printer - Printer object to output error
267 """
268 if version_num < min(accepted_versions):
269 test.add_error(printer, f'{version_type} version lower than expected!')
270 elif version_num > max(accepted_versions):
271 test.add_error(printer, f'{version_type} version higer than expected!')
272
273def parse_ktap_header(lines: LineStream, test: Test, printer: Printer) -> bool:
274 """
275 Parses KTAP/TAP header line and checks version number.
276 Returns False if fails to parse KTAP/TAP header line.
277
278 Accepted formats:
279 - 'KTAP version [version number]'
280 - 'TAP version [version number]'
281
282 Parameters:
283 lines - LineStream of KTAP output to parse
284 test - Test object for current test being parsed
285 printer - Printer object to output results
286
287 Return:
288 True if successfully parsed KTAP/TAP header line
289 """
290 ktap_match = KTAP_START.match(lines.peek())
291 tap_match = TAP_START.match(lines.peek())
292 if ktap_match:
293 version_num = int(ktap_match.group(1))
294 check_version(version_num, KTAP_VERSIONS, 'KTAP', test, printer)
295 elif tap_match:
296 version_num = int(tap_match.group(1))
297 check_version(version_num, TAP_VERSIONS, 'TAP', test, printer)
298 else:
299 return False
300 lines.pop()
301 return True
302
303TEST_HEADER = re.compile(r'^\s*# Subtest: (.*)$')
304
305def parse_test_header(lines: LineStream, test: Test) -> bool:
306 """
307 Parses test header and stores test name in test object.
308 Returns False if fails to parse test header line.
309
310 Accepted format:
311 - '# Subtest: [test name]'
312
313 Parameters:
314 lines - LineStream of KTAP output to parse
315 test - Test object for current test being parsed
316
317 Return:
318 True if successfully parsed test header line
319 """
320 match = TEST_HEADER.match(lines.peek())
321 if not match:
322 return False
323 test.name = match.group(1)
324 lines.pop()
325 return True
326
327TEST_PLAN = re.compile(r'^\s*1\.\.([0-9]+)')
328
329def parse_test_plan(lines: LineStream, test: Test) -> bool:
330 """
331 Parses test plan line and stores the expected number of subtests in
332 test object. Reports an error if expected count is 0.
333 Returns False and sets expected_count to None if there is no valid test
334 plan.
335
336 Accepted format:
337 - '1..[number of subtests]'
338
339 Parameters:
340 lines - LineStream of KTAP output to parse
341 test - Test object for current test being parsed
342
343 Return:
344 True if successfully parsed test plan line
345 """
346 match = TEST_PLAN.match(lines.peek())
347 if not match:
348 test.expected_count = None
349 return False
350 expected_count = int(match.group(1))
351 test.expected_count = expected_count
352 lines.pop()
353 return True
354
355TEST_RESULT = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?([^#]*)( # .*)?$')
356
357TEST_RESULT_SKIP = re.compile(r'^\s*(ok|not ok) ([0-9]+) ?(- )?(.*) # SKIP ?(.*)$')
358
359def peek_test_name_match(lines: LineStream, test: Test) -> bool:
360 """
361 Matches current line with the format of a test result line and checks
362 if the name matches the name of the current test.
363 Returns False if fails to match format or name.
364
365 Accepted format:
366 - '[ok|not ok] [test number] [-] [test name] [optional skip
367 directive]'
368
369 Parameters:
370 lines - LineStream of KTAP output to parse
371 test - Test object for current test being parsed
372
373 Return:
374 True if matched a test result line and the name matching the
375 expected test name
376 """
377 line = lines.peek()
378 match = TEST_RESULT.match(line)
379 if not match:
380 return False
381 name = match.group(4)
382 if not name:
383 return False
384 return name == test.name
385
386def parse_test_result(lines: LineStream, test: Test,
387 expected_num: int, printer: Printer) -> bool:
388 """
389 Parses test result line and stores the status and name in the test
390 object. Reports an error if the test number does not match expected
391 test number.
392 Returns False if fails to parse test result line.
393
394 Note that the SKIP directive is the only direction that causes a
395 change in status.
396
397 Accepted format:
398 - '[ok|not ok] [test number] [-] [test name] [optional skip
399 directive]'
400
401 Parameters:
402 lines - LineStream of KTAP output to parse
403 test - Test object for current test being parsed
404 expected_num - expected test number for current test
405 printer - Printer object to output results
406
407 Return:
408 True if successfully parsed a test result line.
409 """
410 line = lines.peek()
411 match = TEST_RESULT.match(line)
412 skip_match = TEST_RESULT_SKIP.match(line)
413
414 # Check if line matches test result line format
415 if not match:
416 return False
417 lines.pop()
418
419 # Set name of test object
420 if skip_match:
421 test.name = skip_match.group(4) or skip_match.group(5)
422 else:
423 test.name = match.group(4)
424
425 # Check test num
426 num = int(match.group(2))
427 if num != expected_num:
428 test.add_error(printer, f'Expected test number {expected_num} but found {num}')
429
430 # Set status of test object
431 status = match.group(1)
432 if skip_match:
433 test.status = TestStatus.SKIPPED
434 elif status == 'ok':
435 test.status = TestStatus.SUCCESS
436 else:
437 test.status = TestStatus.FAILURE
438 return True
439
440def parse_diagnostic(lines: LineStream) -> List[str]:
441 """
442 Parse lines that do not match the format of a test result line or
443 test header line and returns them in list.
444
445 Line formats that are not parsed:
446 - '# Subtest: [test name]'
447 - '[ok|not ok] [test number] [-] [test name] [optional skip
448 directive]'
449 - 'KTAP version [version number]'
450
451 Parameters:
452 lines - LineStream of KTAP output to parse
453
454 Return:
455 Log of diagnostic lines
456 """
457 log = [] # type: List[str]
458 non_diagnostic_lines = [TEST_RESULT, TEST_HEADER, KTAP_START, TAP_START, TEST_PLAN]
459 while lines and not any(re.match(lines.peek())
460 for re in non_diagnostic_lines):
461 log.append(lines.pop())
462 return log
463
464
465# Printing helper methods:
466
467DIVIDER = '=' * 60
468
469def format_test_divider(message: str, len_message: int) -> str:
470 """
471 Returns string with message centered in fixed width divider.
472
473 Example:
474 '===================== message example ====================='
475
476 Parameters:
477 message - message to be centered in divider line
478 len_message - length of the message to be printed such that
479 any characters of the color codes are not counted
480
481 Return:
482 String containing message centered in fixed width divider
483 """
484 default_count = 3 # default number of dashes
485 len_1 = default_count
486 len_2 = default_count
487 difference = len(DIVIDER) - len_message - 2 # 2 spaces added
488 if difference > 0:
489 # calculate number of dashes for each side of the divider
490 len_1 = int(difference / 2)
491 len_2 = difference - len_1
492 return ('=' * len_1) + f' {message} ' + ('=' * len_2)
493
494def print_test_header(test: Test, printer: Printer) -> None:
495 """
496 Prints test header with test name and optionally the expected number
497 of subtests.
498
499 Example:
500 '=================== example (2 subtests) ==================='
501
502 Parameters:
503 test - Test object representing current test being printed
504 printer - Printer object to output results
505 """
506 message = test.name
507 if message != "":
508 # Add a leading space before the subtest counts only if a test name
509 # is provided using a "# Subtest" header line.
510 message += " "
511 if test.expected_count:
512 if test.expected_count == 1:
513 message += '(1 subtest)'
514 else:
515 message += f'({test.expected_count} subtests)'
516 printer.print_with_timestamp(format_test_divider(message, len(message)))
517
518def print_log(log: Iterable[str], printer: Printer) -> None:
519 """Prints all strings in saved log for test in yellow."""
520 formatted = textwrap.dedent('\n'.join(log))
521 for line in formatted.splitlines():
522 printer.print_with_timestamp(printer.yellow(line))
523
524def format_test_result(test: Test, printer: Printer) -> str:
525 """
526 Returns string with formatted test result with colored status and test
527 name.
528
529 Example:
530 '[PASSED] example'
531
532 Parameters:
533 test - Test object representing current test being printed
534 printer - Printer object to output results
535
536 Return:
537 String containing formatted test result
538 """
539 if test.status == TestStatus.SUCCESS:
540 return printer.green('[PASSED] ') + test.name
541 if test.status == TestStatus.SKIPPED:
542 return printer.yellow('[SKIPPED] ') + test.name
543 if test.status == TestStatus.NO_TESTS:
544 return printer.yellow('[NO TESTS RUN] ') + test.name
545 if test.status == TestStatus.TEST_CRASHED:
546 print_log(test.log, printer)
547 return stdout.red('[CRASHED] ') + test.name
548 print_log(test.log, printer)
549 return printer.red('[FAILED] ') + test.name
550
551def print_test_result(test: Test, printer: Printer) -> None:
552 """
553 Prints result line with status of test.
554
555 Example:
556 '[PASSED] example'
557
558 Parameters:
559 test - Test object representing current test being printed
560 printer - Printer object
561 """
562 printer.print_with_timestamp(format_test_result(test, printer))
563
564def print_test_footer(test: Test, printer: Printer) -> None:
565 """
566 Prints test footer with status of test.
567
568 Example:
569 '===================== [PASSED] example ====================='
570
571 Parameters:
572 test - Test object representing current test being printed
573 printer - Printer object to output results
574 """
575 message = format_test_result(test, printer)
576 printer.print_with_timestamp(format_test_divider(message,
577 len(message) - printer.color_len()))
578
579def print_test(test: Test, failed_only: bool, printer: Printer) -> None:
580 """
581 Prints Test object to given printer. For a child test, the result line is
582 printed. For a parent test, the test header, all child test results, and
583 the test footer are all printed. If failed_only is true, only failed/crashed
584 tests will be printed.
585
586 Parameters:
587 test - Test object to print
588 failed_only - True if only failed/crashed tests should be printed.
589 printer - Printer object to output results
590 """
591 if test.name == "main":
592 printer.print_with_timestamp(DIVIDER)
593 for subtest in test.subtests:
594 print_test(subtest, failed_only, printer)
595 printer.print_with_timestamp(DIVIDER)
596 elif test.subtests != []:
597 if not failed_only or not test.ok_status():
598 print_test_header(test, printer)
599 for subtest in test.subtests:
600 print_test(subtest, failed_only, printer)
601 print_test_footer(test, printer)
602 else:
603 if not failed_only or not test.ok_status():
604 print_test_result(test, printer)
605
606def _summarize_failed_tests(test: Test) -> str:
607 """Tries to summarize all the failing subtests in `test`."""
608
609 def failed_names(test: Test, parent_name: str) -> List[str]:
610 # Note: we use 'main' internally for the top-level test.
611 if not parent_name or parent_name == 'main':
612 full_name = test.name
613 else:
614 full_name = parent_name + '.' + test.name
615
616 if not test.subtests: # this is a leaf node
617 return [full_name]
618
619 # If all the children failed, just say this subtest failed.
620 # Don't summarize it down "the top-level test failed", though.
621 failed_subtests = [sub for sub in test.subtests if not sub.ok_status()]
622 if parent_name and len(failed_subtests) == len(test.subtests):
623 return [full_name]
624
625 all_failures = [] # type: List[str]
626 for t in failed_subtests:
627 all_failures.extend(failed_names(t, full_name))
628 return all_failures
629
630 failures = failed_names(test, '')
631 # If there are too many failures, printing them out will just be noisy.
632 if len(failures) > 10: # this is an arbitrary limit
633 return ''
634
635 return 'Failures: ' + ', '.join(failures)
636
637
638def print_summary_line(test: Test, printer: Printer) -> None:
639 """
640 Prints summary line of test object. Color of line is dependent on
641 status of test. Color is green if test passes, yellow if test is
642 skipped, and red if the test fails or crashes. Summary line contains
643 counts of the statuses of the tests subtests or the test itself if it
644 has no subtests.
645
646 Example:
647 "Testing complete. Passed: 2, Failed: 0, Crashed: 0, Skipped: 0,
648 Errors: 0"
649
650 test - Test object representing current test being printed
651 printer - Printer object to output results
652 """
653 if test.status == TestStatus.SUCCESS:
654 color = stdout.green
655 elif test.status in (TestStatus.SKIPPED, TestStatus.NO_TESTS):
656 color = stdout.yellow
657 else:
658 color = stdout.red
659 printer.print_with_timestamp(color(f'Testing complete. {test.counts}'))
660
661 # Summarize failures that might have gone off-screen since we had a lot
662 # of tests (arbitrarily defined as >=100 for now).
663 if test.ok_status() or test.counts.total() < 100:
664 return
665 summarized = _summarize_failed_tests(test)
666 if not summarized:
667 return
668 printer.print_with_timestamp(color(summarized))
669
670# Other methods:
671
672def bubble_up_test_results(test: Test) -> None:
673 """
674 If the test has subtests, add the test counts of the subtests to the
675 test and check if any of the tests crashed and if so set the test
676 status to crashed. Otherwise if the test has no subtests add the
677 status of the test to the test counts.
678
679 Parameters:
680 test - Test object for current test being parsed
681 """
682 subtests = test.subtests
683 counts = test.counts
684 status = test.status
685 for t in subtests:
686 counts.add_subtest_counts(t.counts)
687 if counts.total() == 0:
688 counts.add_status(status)
689 elif test.counts.get_status() == TestStatus.TEST_CRASHED:
690 test.status = TestStatus.TEST_CRASHED
691
692 if status == TestStatus.FAILURE and test.counts.get_status() == TestStatus.SUCCESS:
693 counts.add_status(status)
694
695def parse_test(lines: LineStream, expected_num: int, log: List[str], is_subtest: bool, printer: Printer) -> Test:
696 """
697 Finds next test to parse in LineStream, creates new Test object,
698 parses any subtests of the test, populates Test object with all
699 information (status, name) about the test and the Test objects for
700 any subtests, and then returns the Test object. The method accepts
701 three formats of tests:
702
703 Accepted test formats:
704
705 - Main KTAP/TAP header
706
707 Example:
708
709 KTAP version 1
710 1..4
711 [subtests]
712
713 - Subtest header (must include either the KTAP version line or
714 "# Subtest" header line)
715
716 Example (preferred format with both KTAP version line and
717 "# Subtest" line):
718
719 KTAP version 1
720 # Subtest: name
721 1..3
722 [subtests]
723 ok 1 name
724
725 Example (only "# Subtest" line):
726
727 # Subtest: name
728 1..3
729 [subtests]
730 ok 1 name
731
732 Example (only KTAP version line, compliant with KTAP v1 spec):
733
734 KTAP version 1
735 1..3
736 [subtests]
737 ok 1 name
738
739 - Test result line
740
741 Example:
742
743 ok 1 - test
744
745 Parameters:
746 lines - LineStream of KTAP output to parse
747 expected_num - expected test number for test to be parsed
748 log - list of strings containing any preceding diagnostic lines
749 corresponding to the current test
750 is_subtest - boolean indicating whether test is a subtest
751 printer - Printer object to output results
752
753 Return:
754 Test object populated with characteristics and any subtests
755 """
756 test = Test()
757 test.log.extend(log)
758
759 # Parse any errors prior to parsing tests
760 err_log = parse_diagnostic(lines)
761 test.log.extend(err_log)
762
763 if not is_subtest:
764 # If parsing the main/top-level test, parse KTAP version line and
765 # test plan
766 test.name = "main"
767 parse_ktap_header(lines, test, printer)
768 test.log.extend(parse_diagnostic(lines))
769 parse_test_plan(lines, test)
770 parent_test = True
771 else:
772 # If not the main test, attempt to parse a test header containing
773 # the KTAP version line and/or subtest header line
774 ktap_line = parse_ktap_header(lines, test, printer)
775 subtest_line = parse_test_header(lines, test)
776 test.log.extend(parse_diagnostic(lines))
777 parse_test_plan(lines, test)
778 parent_test = (ktap_line or subtest_line)
779 if parent_test:
780 print_test_header(test, printer)
781
782 expected_count = test.expected_count
783 subtests = []
784 test_num = 1
785 while parent_test and (expected_count is None or test_num <= expected_count):
786 # Loop to parse any subtests.
787 # Break after parsing expected number of tests or
788 # if expected number of tests is unknown break when test
789 # result line with matching name to subtest header is found
790 # or no more lines in stream.
791 sub_log = parse_diagnostic(lines)
792 sub_test = Test()
793 if not lines or (peek_test_name_match(lines, test) and
794 is_subtest):
795 if expected_count and test_num <= expected_count:
796 # If parser reaches end of test before
797 # parsing expected number of subtests, print
798 # crashed subtest and record error
799 test.add_error(printer, 'missing expected subtest!')
800 sub_test.log.extend(sub_log)
801 test.counts.add_status(
802 TestStatus.TEST_CRASHED)
803 print_test_result(sub_test, printer)
804 else:
805 test.log.extend(sub_log)
806 break
807 else:
808 sub_test = parse_test(lines, test_num, sub_log, True, printer)
809 subtests.append(sub_test)
810 test_num += 1
811 test.subtests = subtests
812 if is_subtest:
813 # If not main test, look for test result line
814 test.log.extend(parse_diagnostic(lines))
815 if test.name != "" and not peek_test_name_match(lines, test):
816 test.add_error(printer, 'missing subtest result line!')
817 elif not lines:
818 print_log(test.log, printer)
819 test.status = TestStatus.NO_TESTS
820 test.add_error(printer, 'No more test results!')
821 else:
822 parse_test_result(lines, test, expected_num, printer)
823
824 # Check for there being no subtests within parent test
825 if parent_test and len(subtests) == 0:
826 # Don't override a bad status if this test had one reported.
827 # Assumption: no subtests means CRASHED is from Test.__init__()
828 if test.status in (TestStatus.TEST_CRASHED, TestStatus.SUCCESS):
829 print_log(test.log, printer)
830 test.status = TestStatus.NO_TESTS
831 test.add_error(printer, '0 tests run!')
832
833 # Add statuses to TestCounts attribute in Test object
834 bubble_up_test_results(test)
835 if parent_test and is_subtest:
836 # If test has subtests and is not the main test object, print
837 # footer.
838 print_test_footer(test, printer)
839 elif is_subtest:
840 print_test_result(test, printer)
841 return test
842
843def parse_run_tests(kernel_output: Iterable[str], printer: Printer) -> Test:
844 """
845 Using kernel output, extract KTAP lines, parse the lines for test
846 results and print condensed test results and summary line.
847
848 Parameters:
849 kernel_output - Iterable object contains lines of kernel output
850 printer - Printer object to output results
851
852 Return:
853 Test - the main test object with all subtests.
854 """
855 printer.print_with_timestamp(DIVIDER)
856 lines = extract_tap_lines(kernel_output)
857 test = Test()
858 if not lines:
859 test.name = '<missing>'
860 test.add_error(printer, 'Could not find any KTAP output. Did any KUnit tests run?')
861 test.status = TestStatus.FAILURE_TO_PARSE_TESTS
862 else:
863 test = parse_test(lines, 0, [], False, printer)
864 if test.status != TestStatus.NO_TESTS:
865 test.status = test.counts.get_status()
866 printer.print_with_timestamp(DIVIDER)
867 return test