GNU Linux-libre 6.8.9-gnu
[releases.git] / tools / testing / kunit / kunit.py
1 #!/usr/bin/env python3
2 # SPDX-License-Identifier: GPL-2.0
3 #
4 # A thin wrapper on top of the KUnit Kernel
5 #
6 # Copyright (C) 2019, Google LLC.
7 # Author: Felix Guo <felixguoxiuping@gmail.com>
8 # Author: Brendan Higgins <brendanhiggins@google.com>
9
10 import argparse
11 import os
12 import re
13 import shlex
14 import sys
15 import time
16
17 assert sys.version_info >= (3, 7), "Python version is too old"
18
19 from dataclasses import dataclass
20 from enum import Enum, auto
21 from typing import Iterable, List, Optional, Sequence, Tuple
22
23 import kunit_json
24 import kunit_kernel
25 import kunit_parser
26 from kunit_printer import stdout
27
28 class KunitStatus(Enum):
29         SUCCESS = auto()
30         CONFIG_FAILURE = auto()
31         BUILD_FAILURE = auto()
32         TEST_FAILURE = auto()
33
34 @dataclass
35 class KunitResult:
36         status: KunitStatus
37         elapsed_time: float
38
39 @dataclass
40 class KunitConfigRequest:
41         build_dir: str
42         make_options: Optional[List[str]]
43
44 @dataclass
45 class KunitBuildRequest(KunitConfigRequest):
46         jobs: int
47
48 @dataclass
49 class KunitParseRequest:
50         raw_output: Optional[str]
51         json: Optional[str]
52
53 @dataclass
54 class KunitExecRequest(KunitParseRequest):
55         build_dir: str
56         timeout: int
57         filter_glob: str
58         filter: str
59         filter_action: Optional[str]
60         kernel_args: Optional[List[str]]
61         run_isolated: Optional[str]
62         list_tests: bool
63         list_tests_attr: bool
64
65 @dataclass
66 class KunitRequest(KunitExecRequest, KunitBuildRequest):
67         pass
68
69
70 def get_kernel_root_path() -> str:
71         path = sys.argv[0] if not __file__ else __file__
72         parts = os.path.realpath(path).split('tools/testing/kunit')
73         if len(parts) != 2:
74                 sys.exit(1)
75         return parts[0]
76
77 def config_tests(linux: kunit_kernel.LinuxSourceTree,
78                  request: KunitConfigRequest) -> KunitResult:
79         stdout.print_with_timestamp('Configuring KUnit Kernel ...')
80
81         config_start = time.time()
82         success = linux.build_reconfig(request.build_dir, request.make_options)
83         config_end = time.time()
84         status = KunitStatus.SUCCESS if success else KunitStatus.CONFIG_FAILURE
85         return KunitResult(status, config_end - config_start)
86
87 def build_tests(linux: kunit_kernel.LinuxSourceTree,
88                 request: KunitBuildRequest) -> KunitResult:
89         stdout.print_with_timestamp('Building KUnit Kernel ...')
90
91         build_start = time.time()
92         success = linux.build_kernel(request.jobs,
93                                      request.build_dir,
94                                      request.make_options)
95         build_end = time.time()
96         status = KunitStatus.SUCCESS if success else KunitStatus.BUILD_FAILURE
97         return KunitResult(status, build_end - build_start)
98
99 def config_and_build_tests(linux: kunit_kernel.LinuxSourceTree,
100                            request: KunitBuildRequest) -> KunitResult:
101         config_result = config_tests(linux, request)
102         if config_result.status != KunitStatus.SUCCESS:
103                 return config_result
104
105         return build_tests(linux, request)
106
107 def _list_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> List[str]:
108         args = ['kunit.action=list']
109
110         if request.kernel_args:
111                 args.extend(request.kernel_args)
112
113         output = linux.run_kernel(args=args,
114                            timeout=request.timeout,
115                            filter_glob=request.filter_glob,
116                            filter=request.filter,
117                            filter_action=request.filter_action,
118                            build_dir=request.build_dir)
119         lines = kunit_parser.extract_tap_lines(output)
120         # Hack! Drop the dummy TAP version header that the executor prints out.
121         lines.pop()
122
123         # Filter out any extraneous non-test output that might have gotten mixed in.
124         return [l for l in output if re.match(r'^[^\s.]+\.[^\s.]+$', l)]
125
126 def _list_tests_attr(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> Iterable[str]:
127         args = ['kunit.action=list_attr']
128
129         if request.kernel_args:
130                 args.extend(request.kernel_args)
131
132         output = linux.run_kernel(args=args,
133                            timeout=request.timeout,
134                            filter_glob=request.filter_glob,
135                            filter=request.filter,
136                            filter_action=request.filter_action,
137                            build_dir=request.build_dir)
138         lines = kunit_parser.extract_tap_lines(output)
139         # Hack! Drop the dummy TAP version header that the executor prints out.
140         lines.pop()
141
142         # Filter out any extraneous non-test output that might have gotten mixed in.
143         return lines
144
145 def _suites_from_test_list(tests: List[str]) -> List[str]:
146         """Extracts all the suites from an ordered list of tests."""
147         suites = []  # type: List[str]
148         for t in tests:
149                 parts = t.split('.', maxsplit=2)
150                 if len(parts) != 2:
151                         raise ValueError(f'internal KUnit error, test name should be of the form "<suite>.<test>", got "{t}"')
152                 suite, _ = parts
153                 if not suites or suites[-1] != suite:
154                         suites.append(suite)
155         return suites
156
157 def exec_tests(linux: kunit_kernel.LinuxSourceTree, request: KunitExecRequest) -> KunitResult:
158         filter_globs = [request.filter_glob]
159         if request.list_tests:
160                 output = _list_tests(linux, request)
161                 for line in output:
162                         print(line.rstrip())
163                 return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0)
164         if request.list_tests_attr:
165                 attr_output = _list_tests_attr(linux, request)
166                 for line in attr_output:
167                         print(line.rstrip())
168                 return KunitResult(status=KunitStatus.SUCCESS, elapsed_time=0.0)
169         if request.run_isolated:
170                 tests = _list_tests(linux, request)
171                 if request.run_isolated == 'test':
172                         filter_globs = tests
173                 elif request.run_isolated == 'suite':
174                         filter_globs = _suites_from_test_list(tests)
175                         # Apply the test-part of the user's glob, if present.
176                         if '.' in request.filter_glob:
177                                 test_glob = request.filter_glob.split('.', maxsplit=2)[1]
178                                 filter_globs = [g + '.'+ test_glob for g in filter_globs]
179
180         metadata = kunit_json.Metadata(arch=linux.arch(), build_dir=request.build_dir, def_config='kunit_defconfig')
181
182         test_counts = kunit_parser.TestCounts()
183         exec_time = 0.0
184         for i, filter_glob in enumerate(filter_globs):
185                 stdout.print_with_timestamp('Starting KUnit Kernel ({}/{})...'.format(i+1, len(filter_globs)))
186
187                 test_start = time.time()
188                 run_result = linux.run_kernel(
189                         args=request.kernel_args,
190                         timeout=request.timeout,
191                         filter_glob=filter_glob,
192                         filter=request.filter,
193                         filter_action=request.filter_action,
194                         build_dir=request.build_dir)
195
196                 _, test_result = parse_tests(request, metadata, run_result)
197                 # run_kernel() doesn't block on the kernel exiting.
198                 # That only happens after we get the last line of output from `run_result`.
199                 # So exec_time here actually contains parsing + execution time, which is fine.
200                 test_end = time.time()
201                 exec_time += test_end - test_start
202
203                 test_counts.add_subtest_counts(test_result.counts)
204
205         if len(filter_globs) == 1 and test_counts.crashed > 0:
206                 bd = request.build_dir
207                 print('The kernel seems to have crashed; you can decode the stack traces with:')
208                 print('$ scripts/decode_stacktrace.sh {}/vmlinux {} < {} | tee {}/decoded.log | {} parse'.format(
209                                 bd, bd, kunit_kernel.get_outfile_path(bd), bd, sys.argv[0]))
210
211         kunit_status = _map_to_overall_status(test_counts.get_status())
212         return KunitResult(status=kunit_status, elapsed_time=exec_time)
213
214 def _map_to_overall_status(test_status: kunit_parser.TestStatus) -> KunitStatus:
215         if test_status in (kunit_parser.TestStatus.SUCCESS, kunit_parser.TestStatus.SKIPPED):
216                 return KunitStatus.SUCCESS
217         return KunitStatus.TEST_FAILURE
218
219 def parse_tests(request: KunitParseRequest, metadata: kunit_json.Metadata, input_data: Iterable[str]) -> Tuple[KunitResult, kunit_parser.Test]:
220         parse_start = time.time()
221
222         if request.raw_output:
223                 # Treat unparsed results as one passing test.
224                 fake_test = kunit_parser.Test()
225                 fake_test.status = kunit_parser.TestStatus.SUCCESS
226                 fake_test.counts.passed = 1
227
228                 output: Iterable[str] = input_data
229                 if request.raw_output == 'all':
230                         pass
231                 elif request.raw_output == 'kunit':
232                         output = kunit_parser.extract_tap_lines(output)
233                 for line in output:
234                         print(line.rstrip())
235                 parse_time = time.time() - parse_start
236                 return KunitResult(KunitStatus.SUCCESS, parse_time), fake_test
237
238
239         # Actually parse the test results.
240         test = kunit_parser.parse_run_tests(input_data)
241         parse_time = time.time() - parse_start
242
243         if request.json:
244                 json_str = kunit_json.get_json_result(
245                                         test=test,
246                                         metadata=metadata)
247                 if request.json == 'stdout':
248                         print(json_str)
249                 else:
250                         with open(request.json, 'w') as f:
251                                 f.write(json_str)
252                         stdout.print_with_timestamp("Test results stored in %s" %
253                                 os.path.abspath(request.json))
254
255         if test.status != kunit_parser.TestStatus.SUCCESS:
256                 return KunitResult(KunitStatus.TEST_FAILURE, parse_time), test
257
258         return KunitResult(KunitStatus.SUCCESS, parse_time), test
259
260 def run_tests(linux: kunit_kernel.LinuxSourceTree,
261               request: KunitRequest) -> KunitResult:
262         run_start = time.time()
263
264         config_result = config_tests(linux, request)
265         if config_result.status != KunitStatus.SUCCESS:
266                 return config_result
267
268         build_result = build_tests(linux, request)
269         if build_result.status != KunitStatus.SUCCESS:
270                 return build_result
271
272         exec_result = exec_tests(linux, request)
273
274         run_end = time.time()
275
276         stdout.print_with_timestamp((
277                 'Elapsed time: %.3fs total, %.3fs configuring, %.3fs ' +
278                 'building, %.3fs running\n') % (
279                                 run_end - run_start,
280                                 config_result.elapsed_time,
281                                 build_result.elapsed_time,
282                                 exec_result.elapsed_time))
283         return exec_result
284
285 # Problem:
286 # $ kunit.py run --json
287 # works as one would expect and prints the parsed test results as JSON.
288 # $ kunit.py run --json suite_name
289 # would *not* pass suite_name as the filter_glob and print as json.
290 # argparse will consider it to be another way of writing
291 # $ kunit.py run --json=suite_name
292 # i.e. it would run all tests, and dump the json to a `suite_name` file.
293 # So we hackily automatically rewrite --json => --json=stdout
294 pseudo_bool_flag_defaults = {
295                 '--json': 'stdout',
296                 '--raw_output': 'kunit',
297 }
298 def massage_argv(argv: Sequence[str]) -> Sequence[str]:
299         def massage_arg(arg: str) -> str:
300                 if arg not in pseudo_bool_flag_defaults:
301                         return arg
302                 return  f'{arg}={pseudo_bool_flag_defaults[arg]}'
303         return list(map(massage_arg, argv))
304
305 def get_default_jobs() -> int:
306         return len(os.sched_getaffinity(0))
307
308 def add_common_opts(parser: argparse.ArgumentParser) -> None:
309         parser.add_argument('--build_dir',
310                             help='As in the make command, it specifies the build '
311                             'directory.',
312                             type=str, default='.kunit', metavar='DIR')
313         parser.add_argument('--make_options',
314                             help='X=Y make option, can be repeated.',
315                             action='append', metavar='X=Y')
316         parser.add_argument('--alltests',
317                             help='Run all KUnit tests via tools/testing/kunit/configs/all_tests.config',
318                             action='store_true')
319         parser.add_argument('--kunitconfig',
320                              help='Path to Kconfig fragment that enables KUnit tests.'
321                              ' If given a directory, (e.g. lib/kunit), "/.kunitconfig" '
322                              'will get  automatically appended. If repeated, the files '
323                              'blindly concatenated, which might not work in all cases.',
324                              action='append', metavar='PATHS')
325         parser.add_argument('--kconfig_add',
326                              help='Additional Kconfig options to append to the '
327                              '.kunitconfig, e.g. CONFIG_KASAN=y. Can be repeated.',
328                             action='append', metavar='CONFIG_X=Y')
329
330         parser.add_argument('--arch',
331                             help=('Specifies the architecture to run tests under. '
332                                   'The architecture specified here must match the '
333                                   'string passed to the ARCH make param, '
334                                   'e.g. i386, x86_64, arm, um, etc. Non-UML '
335                                   'architectures run on QEMU.'),
336                             type=str, default='um', metavar='ARCH')
337
338         parser.add_argument('--cross_compile',
339                             help=('Sets make\'s CROSS_COMPILE variable; it should '
340                                   'be set to a toolchain path prefix (the prefix '
341                                   'of gcc and other tools in your toolchain, for '
342                                   'example `sparc64-linux-gnu-` if you have the '
343                                   'sparc toolchain installed on your system, or '
344                                   '`$HOME/toolchains/microblaze/gcc-9.2.0-nolibc/microblaze-linux/bin/microblaze-linux-` '
345                                   'if you have downloaded the microblaze toolchain '
346                                   'from the 0-day website to a directory in your '
347                                   'home directory called `toolchains`).'),
348                             metavar='PREFIX')
349
350         parser.add_argument('--qemu_config',
351                             help=('Takes a path to a path to a file containing '
352                                   'a QemuArchParams object.'),
353                             type=str, metavar='FILE')
354
355         parser.add_argument('--qemu_args',
356                             help='Additional QEMU arguments, e.g. "-smp 8"',
357                             action='append', metavar='')
358
359 def add_build_opts(parser: argparse.ArgumentParser) -> None:
360         parser.add_argument('--jobs',
361                             help='As in the make command, "Specifies  the number of '
362                             'jobs (commands) to run simultaneously."',
363                             type=int, default=get_default_jobs(), metavar='N')
364
365 def add_exec_opts(parser: argparse.ArgumentParser) -> None:
366         parser.add_argument('--timeout',
367                             help='maximum number of seconds to allow for all tests '
368                             'to run. This does not include time taken to build the '
369                             'tests.',
370                             type=int,
371                             default=300,
372                             metavar='SECONDS')
373         parser.add_argument('filter_glob',
374                             help='Filter which KUnit test suites/tests run at '
375                             'boot-time, e.g. list* or list*.*del_test',
376                             type=str,
377                             nargs='?',
378                             default='',
379                             metavar='filter_glob')
380         parser.add_argument('--filter',
381                             help='Filter KUnit tests with attributes, '
382                             'e.g. module=example or speed>slow',
383                             type=str,
384                                 default='')
385         parser.add_argument('--filter_action',
386                             help='If set to skip, filtered tests will be skipped, '
387                                 'e.g. --filter_action=skip. Otherwise they will not run.',
388                             type=str,
389                                 choices=['skip'])
390         parser.add_argument('--kernel_args',
391                             help='Kernel command-line parameters. Maybe be repeated',
392                              action='append', metavar='')
393         parser.add_argument('--run_isolated', help='If set, boot the kernel for each '
394                             'individual suite/test. This is can be useful for debugging '
395                             'a non-hermetic test, one that might pass/fail based on '
396                             'what ran before it.',
397                             type=str,
398                             choices=['suite', 'test'])
399         parser.add_argument('--list_tests', help='If set, list all tests that will be '
400                             'run.',
401                             action='store_true')
402         parser.add_argument('--list_tests_attr', help='If set, list all tests and test '
403                             'attributes.',
404                             action='store_true')
405
406 def add_parse_opts(parser: argparse.ArgumentParser) -> None:
407         parser.add_argument('--raw_output', help='If set don\'t parse output from kernel. '
408                             'By default, filters to just KUnit output. Use '
409                             '--raw_output=all to show everything',
410                              type=str, nargs='?', const='all', default=None, choices=['all', 'kunit'])
411         parser.add_argument('--json',
412                             nargs='?',
413                             help='Prints parsed test results as JSON to stdout or a file if '
414                             'a filename is specified. Does nothing if --raw_output is set.',
415                             type=str, const='stdout', default=None, metavar='FILE')
416
417
418 def tree_from_args(cli_args: argparse.Namespace) -> kunit_kernel.LinuxSourceTree:
419         """Returns a LinuxSourceTree based on the user's arguments."""
420         # Allow users to specify multiple arguments in one string, e.g. '-smp 8'
421         qemu_args: List[str] = []
422         if cli_args.qemu_args:
423                 for arg in cli_args.qemu_args:
424                         qemu_args.extend(shlex.split(arg))
425
426         kunitconfigs = cli_args.kunitconfig if cli_args.kunitconfig else []
427         if cli_args.alltests:
428                 # Prepend so user-specified options take prio if we ever allow
429                 # --kunitconfig options to have differing options.
430                 kunitconfigs = [kunit_kernel.ALL_TESTS_CONFIG_PATH] + kunitconfigs
431
432         return kunit_kernel.LinuxSourceTree(cli_args.build_dir,
433                         kunitconfig_paths=kunitconfigs,
434                         kconfig_add=cli_args.kconfig_add,
435                         arch=cli_args.arch,
436                         cross_compile=cli_args.cross_compile,
437                         qemu_config_path=cli_args.qemu_config,
438                         extra_qemu_args=qemu_args)
439
440
441 def run_handler(cli_args: argparse.Namespace) -> None:
442         if not os.path.exists(cli_args.build_dir):
443                 os.mkdir(cli_args.build_dir)
444
445         linux = tree_from_args(cli_args)
446         request = KunitRequest(build_dir=cli_args.build_dir,
447                                         make_options=cli_args.make_options,
448                                         jobs=cli_args.jobs,
449                                         raw_output=cli_args.raw_output,
450                                         json=cli_args.json,
451                                         timeout=cli_args.timeout,
452                                         filter_glob=cli_args.filter_glob,
453                                         filter=cli_args.filter,
454                                         filter_action=cli_args.filter_action,
455                                         kernel_args=cli_args.kernel_args,
456                                         run_isolated=cli_args.run_isolated,
457                                         list_tests=cli_args.list_tests,
458                                         list_tests_attr=cli_args.list_tests_attr)
459         result = run_tests(linux, request)
460         if result.status != KunitStatus.SUCCESS:
461                 sys.exit(1)
462
463
464 def config_handler(cli_args: argparse.Namespace) -> None:
465         if cli_args.build_dir and (
466                         not os.path.exists(cli_args.build_dir)):
467                 os.mkdir(cli_args.build_dir)
468
469         linux = tree_from_args(cli_args)
470         request = KunitConfigRequest(build_dir=cli_args.build_dir,
471                                                 make_options=cli_args.make_options)
472         result = config_tests(linux, request)
473         stdout.print_with_timestamp((
474                 'Elapsed time: %.3fs\n') % (
475                         result.elapsed_time))
476         if result.status != KunitStatus.SUCCESS:
477                 sys.exit(1)
478
479
480 def build_handler(cli_args: argparse.Namespace) -> None:
481         linux = tree_from_args(cli_args)
482         request = KunitBuildRequest(build_dir=cli_args.build_dir,
483                                         make_options=cli_args.make_options,
484                                         jobs=cli_args.jobs)
485         result = config_and_build_tests(linux, request)
486         stdout.print_with_timestamp((
487                 'Elapsed time: %.3fs\n') % (
488                         result.elapsed_time))
489         if result.status != KunitStatus.SUCCESS:
490                 sys.exit(1)
491
492
493 def exec_handler(cli_args: argparse.Namespace) -> None:
494         linux = tree_from_args(cli_args)
495         exec_request = KunitExecRequest(raw_output=cli_args.raw_output,
496                                         build_dir=cli_args.build_dir,
497                                         json=cli_args.json,
498                                         timeout=cli_args.timeout,
499                                         filter_glob=cli_args.filter_glob,
500                                         filter=cli_args.filter,
501                                         filter_action=cli_args.filter_action,
502                                         kernel_args=cli_args.kernel_args,
503                                         run_isolated=cli_args.run_isolated,
504                                         list_tests=cli_args.list_tests,
505                                         list_tests_attr=cli_args.list_tests_attr)
506         result = exec_tests(linux, exec_request)
507         stdout.print_with_timestamp((
508                 'Elapsed time: %.3fs\n') % (result.elapsed_time))
509         if result.status != KunitStatus.SUCCESS:
510                 sys.exit(1)
511
512
513 def parse_handler(cli_args: argparse.Namespace) -> None:
514         if cli_args.file is None:
515                 sys.stdin.reconfigure(errors='backslashreplace')  # type: ignore
516                 kunit_output = sys.stdin  # type: Iterable[str]
517         else:
518                 with open(cli_args.file, 'r', errors='backslashreplace') as f:
519                         kunit_output = f.read().splitlines()
520         # We know nothing about how the result was created!
521         metadata = kunit_json.Metadata()
522         request = KunitParseRequest(raw_output=cli_args.raw_output,
523                                         json=cli_args.json)
524         result, _ = parse_tests(request, metadata, kunit_output)
525         if result.status != KunitStatus.SUCCESS:
526                 sys.exit(1)
527
528
529 subcommand_handlers_map = {
530         'run': run_handler,
531         'config': config_handler,
532         'build': build_handler,
533         'exec': exec_handler,
534         'parse': parse_handler
535 }
536
537
538 def main(argv: Sequence[str]) -> None:
539         parser = argparse.ArgumentParser(
540                         description='Helps writing and running KUnit tests.')
541         subparser = parser.add_subparsers(dest='subcommand')
542
543         # The 'run' command will config, build, exec, and parse in one go.
544         run_parser = subparser.add_parser('run', help='Runs KUnit tests.')
545         add_common_opts(run_parser)
546         add_build_opts(run_parser)
547         add_exec_opts(run_parser)
548         add_parse_opts(run_parser)
549
550         config_parser = subparser.add_parser('config',
551                                                 help='Ensures that .config contains all of '
552                                                 'the options in .kunitconfig')
553         add_common_opts(config_parser)
554
555         build_parser = subparser.add_parser('build', help='Builds a kernel with KUnit tests')
556         add_common_opts(build_parser)
557         add_build_opts(build_parser)
558
559         exec_parser = subparser.add_parser('exec', help='Run a kernel with KUnit tests')
560         add_common_opts(exec_parser)
561         add_exec_opts(exec_parser)
562         add_parse_opts(exec_parser)
563
564         # The 'parse' option is special, as it doesn't need the kernel source
565         # (therefore there is no need for a build_dir, hence no add_common_opts)
566         # and the '--file' argument is not relevant to 'run', so isn't in
567         # add_parse_opts()
568         parse_parser = subparser.add_parser('parse',
569                                             help='Parses KUnit results from a file, '
570                                             'and parses formatted results.')
571         add_parse_opts(parse_parser)
572         parse_parser.add_argument('file',
573                                   help='Specifies the file to read results from.',
574                                   type=str, nargs='?', metavar='input_file')
575
576         cli_args = parser.parse_args(massage_argv(argv))
577
578         if get_kernel_root_path():
579                 os.chdir(get_kernel_root_path())
580
581         subcomand_handler = subcommand_handlers_map.get(cli_args.subcommand, None)
582
583         if subcomand_handler is None:
584                 parser.print_help()
585                 return
586
587         subcomand_handler(cli_args)
588
589
590 if __name__ == '__main__':
591         main(sys.argv[1:])