GNU Linux-libre 6.8.9-gnu
[releases.git] / tools / testing / kunit / kunit_kernel.py
1 # SPDX-License-Identifier: GPL-2.0
2 #
3 # Runs UML kernel, collects output, and handles errors.
4 #
5 # Copyright (C) 2019, Google LLC.
6 # Author: Felix Guo <felixguoxiuping@gmail.com>
7 # Author: Brendan Higgins <brendanhiggins@google.com>
8
9 import importlib.abc
10 import importlib.util
11 import logging
12 import subprocess
13 import os
14 import shlex
15 import shutil
16 import signal
17 import threading
18 from typing import Iterator, List, Optional, Tuple
19 from types import FrameType
20
21 import kunit_config
22 import qemu_config
23
24 KCONFIG_PATH = '.config'
25 KUNITCONFIG_PATH = '.kunitconfig'
26 OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
27 DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
28 ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
29 UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
30 OUTFILE_PATH = 'test.log'
31 ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
32 QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
33
34 class ConfigError(Exception):
35         """Represents an error trying to configure the Linux kernel."""
36
37
38 class BuildError(Exception):
39         """Represents an error trying to build the Linux kernel."""
40
41
42 class LinuxSourceTreeOperations:
43         """An abstraction over command line operations performed on a source tree."""
44
45         def __init__(self, linux_arch: str, cross_compile: Optional[str]):
46                 self._linux_arch = linux_arch
47                 self._cross_compile = cross_compile
48
49         def make_mrproper(self) -> None:
50                 try:
51                         subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
52                 except OSError as e:
53                         raise ConfigError('Could not call make command: ' + str(e))
54                 except subprocess.CalledProcessError as e:
55                         raise ConfigError(e.output.decode())
56
57         def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
58                 return base_kunitconfig
59
60         def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
61                 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
62                 if self._cross_compile:
63                         command += ['CROSS_COMPILE=' + self._cross_compile]
64                 if make_options:
65                         command.extend(make_options)
66                 print('Populating config with:\n$', ' '.join(command))
67                 try:
68                         subprocess.check_output(command, stderr=subprocess.STDOUT)
69                 except OSError as e:
70                         raise ConfigError('Could not call make command: ' + str(e))
71                 except subprocess.CalledProcessError as e:
72                         raise ConfigError(e.output.decode())
73
74         def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
75                 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
76                 if make_options:
77                         command.extend(make_options)
78                 if self._cross_compile:
79                         command += ['CROSS_COMPILE=' + self._cross_compile]
80                 print('Building with:\n$', ' '.join(command))
81                 try:
82                         proc = subprocess.Popen(command,
83                                                 stderr=subprocess.PIPE,
84                                                 stdout=subprocess.DEVNULL)
85                 except OSError as e:
86                         raise BuildError('Could not call execute make: ' + str(e))
87                 except subprocess.CalledProcessError as e:
88                         raise BuildError(e.output)
89                 _, stderr = proc.communicate()
90                 if proc.returncode != 0:
91                         raise BuildError(stderr.decode())
92                 if stderr:  # likely only due to build warnings
93                         print(stderr.decode())
94
95         def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
96                 raise RuntimeError('not implemented!')
97
98
99 class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
100
101         def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
102                 super().__init__(linux_arch=qemu_arch_params.linux_arch,
103                                  cross_compile=cross_compile)
104                 self._kconfig = qemu_arch_params.kconfig
105                 self._qemu_arch = qemu_arch_params.qemu_arch
106                 self._kernel_path = qemu_arch_params.kernel_path
107                 self._kernel_command_line = qemu_arch_params.kernel_command_line + ' kunit_shutdown=reboot'
108                 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
109                 self._serial = qemu_arch_params.serial
110
111         def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
112                 kconfig = kunit_config.parse_from_string(self._kconfig)
113                 kconfig.merge_in_entries(base_kunitconfig)
114                 return kconfig
115
116         def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
117                 kernel_path = os.path.join(build_dir, self._kernel_path)
118                 qemu_command = ['qemu-system-' + self._qemu_arch,
119                                 '-nodefaults',
120                                 '-m', '1024',
121                                 '-kernel', kernel_path,
122                                 '-append', ' '.join(params + [self._kernel_command_line]),
123                                 '-no-reboot',
124                                 '-nographic',
125                                 '-serial', self._serial] + self._extra_qemu_params
126                 # Note: shlex.join() does what we want, but requires python 3.8+.
127                 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
128                 return subprocess.Popen(qemu_command,
129                                         stdin=subprocess.PIPE,
130                                         stdout=subprocess.PIPE,
131                                         stderr=subprocess.STDOUT,
132                                         text=True, errors='backslashreplace')
133
134 class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
135         """An abstraction over command line operations performed on a source tree."""
136
137         def __init__(self, cross_compile: Optional[str]=None):
138                 super().__init__(linux_arch='um', cross_compile=cross_compile)
139
140         def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
141                 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
142                 kconfig.merge_in_entries(base_kunitconfig)
143                 return kconfig
144
145         def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
146                 """Runs the Linux UML binary. Must be named 'linux'."""
147                 linux_bin = os.path.join(build_dir, 'linux')
148                 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
149                 return subprocess.Popen([linux_bin] + params,
150                                            stdin=subprocess.PIPE,
151                                            stdout=subprocess.PIPE,
152                                            stderr=subprocess.STDOUT,
153                                            text=True, errors='backslashreplace')
154
155 def get_kconfig_path(build_dir: str) -> str:
156         return os.path.join(build_dir, KCONFIG_PATH)
157
158 def get_kunitconfig_path(build_dir: str) -> str:
159         return os.path.join(build_dir, KUNITCONFIG_PATH)
160
161 def get_old_kunitconfig_path(build_dir: str) -> str:
162         return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
163
164 def get_parsed_kunitconfig(build_dir: str,
165                            kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
166         if not kunitconfig_paths:
167                 path = get_kunitconfig_path(build_dir)
168                 if not os.path.exists(path):
169                         shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
170                 return kunit_config.parse_file(path)
171
172         merged = kunit_config.Kconfig()
173
174         for path in kunitconfig_paths:
175                 if os.path.isdir(path):
176                         path = os.path.join(path, KUNITCONFIG_PATH)
177                 if not os.path.exists(path):
178                         raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
179
180                 partial = kunit_config.parse_file(path)
181                 diff = merged.conflicting_options(partial)
182                 if diff:
183                         diff_str = '\n\n'.join(f'{a}\n  vs from {path}\n{b}' for a, b in diff)
184                         raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
185                 merged.merge_in_entries(partial)
186         return merged
187
188 def get_outfile_path(build_dir: str) -> str:
189         return os.path.join(build_dir, OUTFILE_PATH)
190
191 def _default_qemu_config_path(arch: str) -> str:
192         config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
193         if os.path.isfile(config_path):
194                 return config_path
195
196         options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
197         raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
198
199 def _get_qemu_ops(config_path: str,
200                   extra_qemu_args: Optional[List[str]],
201                   cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
202         # The module name/path has very little to do with where the actual file
203         # exists (I learned this through experimentation and could not find it
204         # anywhere in the Python documentation).
205         #
206         # Bascially, we completely ignore the actual file location of the config
207         # we are loading and just tell Python that the module lives in the
208         # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
209         # exists as a file.
210         module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
211         spec = importlib.util.spec_from_file_location(module_path, config_path)
212         assert spec is not None
213         config = importlib.util.module_from_spec(spec)
214         # See https://github.com/python/typeshed/pull/2626 for context.
215         assert isinstance(spec.loader, importlib.abc.Loader)
216         spec.loader.exec_module(config)
217
218         if not hasattr(config, 'QEMU_ARCH'):
219                 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
220         params: qemu_config.QemuArchParams = config.QEMU_ARCH
221         if extra_qemu_args:
222                 params.extra_qemu_params.extend(extra_qemu_args)
223         return params.linux_arch, LinuxSourceTreeOperationsQemu(
224                         params, cross_compile=cross_compile)
225
226 class LinuxSourceTree:
227         """Represents a Linux kernel source tree with KUnit tests."""
228
229         def __init__(
230               self,
231               build_dir: str,
232               kunitconfig_paths: Optional[List[str]]=None,
233               kconfig_add: Optional[List[str]]=None,
234               arch: Optional[str]=None,
235               cross_compile: Optional[str]=None,
236               qemu_config_path: Optional[str]=None,
237               extra_qemu_args: Optional[List[str]]=None) -> None:
238                 signal.signal(signal.SIGINT, self.signal_handler)
239                 if qemu_config_path:
240                         self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
241                 else:
242                         self._arch = 'um' if arch is None else arch
243                         if self._arch == 'um':
244                                 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
245                         else:
246                                 qemu_config_path = _default_qemu_config_path(self._arch)
247                                 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
248
249                 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
250                 if kconfig_add:
251                         kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
252                         self._kconfig.merge_in_entries(kconfig)
253
254         def arch(self) -> str:
255                 return self._arch
256
257         def clean(self) -> bool:
258                 try:
259                         self._ops.make_mrproper()
260                 except ConfigError as e:
261                         logging.error(e)
262                         return False
263                 return True
264
265         def validate_config(self, build_dir: str) -> bool:
266                 kconfig_path = get_kconfig_path(build_dir)
267                 validated_kconfig = kunit_config.parse_file(kconfig_path)
268                 if self._kconfig.is_subset_of(validated_kconfig):
269                         return True
270                 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
271                 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
272                           'This is probably due to unsatisfied dependencies.\n' \
273                           'Missing: ' + ', '.join(str(e) for e in missing)
274                 if self._arch == 'um':
275                         message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
276                                    'on a different architecture with something like "--arch=x86_64".'
277                 logging.error(message)
278                 return False
279
280         def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
281                 kconfig_path = get_kconfig_path(build_dir)
282                 if build_dir and not os.path.exists(build_dir):
283                         os.mkdir(build_dir)
284                 try:
285                         self._kconfig = self._ops.make_arch_config(self._kconfig)
286                         self._kconfig.write_to_file(kconfig_path)
287                         self._ops.make_olddefconfig(build_dir, make_options)
288                 except ConfigError as e:
289                         logging.error(e)
290                         return False
291                 if not self.validate_config(build_dir):
292                         return False
293
294                 old_path = get_old_kunitconfig_path(build_dir)
295                 if os.path.exists(old_path):
296                         os.remove(old_path)  # write_to_file appends to the file
297                 self._kconfig.write_to_file(old_path)
298                 return True
299
300         def _kunitconfig_changed(self, build_dir: str) -> bool:
301                 old_path = get_old_kunitconfig_path(build_dir)
302                 if not os.path.exists(old_path):
303                         return True
304
305                 old_kconfig = kunit_config.parse_file(old_path)
306                 return old_kconfig != self._kconfig
307
308         def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
309                 """Creates a new .config if it is not a subset of the .kunitconfig."""
310                 kconfig_path = get_kconfig_path(build_dir)
311                 if not os.path.exists(kconfig_path):
312                         print('Generating .config ...')
313                         return self.build_config(build_dir, make_options)
314
315                 existing_kconfig = kunit_config.parse_file(kconfig_path)
316                 self._kconfig = self._ops.make_arch_config(self._kconfig)
317
318                 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
319                         return True
320                 print('Regenerating .config ...')
321                 os.remove(kconfig_path)
322                 return self.build_config(build_dir, make_options)
323
324         def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
325                 try:
326                         self._ops.make_olddefconfig(build_dir, make_options)
327                         self._ops.make(jobs, build_dir, make_options)
328                 except (ConfigError, BuildError) as e:
329                         logging.error(e)
330                         return False
331                 return self.validate_config(build_dir)
332
333         def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
334                 if not args:
335                         args = []
336                 if filter_glob:
337                         args.append('kunit.filter_glob=' + filter_glob)
338                 if filter:
339                         args.append('kunit.filter="' + filter + '"')
340                 if filter_action:
341                         args.append('kunit.filter_action=' + filter_action)
342                 args.append('kunit.enable=1')
343
344                 process = self._ops.start(args, build_dir)
345                 assert process.stdout is not None  # tell mypy it's set
346
347                 # Enforce the timeout in a background thread.
348                 def _wait_proc() -> None:
349                         try:
350                                 process.wait(timeout=timeout)
351                         except Exception as e:
352                                 print(e)
353                                 process.terminate()
354                                 process.wait()
355                 waiter = threading.Thread(target=_wait_proc)
356                 waiter.start()
357
358                 output = open(get_outfile_path(build_dir), 'w')
359                 try:
360                         # Tee the output to the file and to our caller in real time.
361                         for line in process.stdout:
362                                 output.write(line)
363                                 yield line
364                 # This runs even if our caller doesn't consume every line.
365                 finally:
366                         # Flush any leftover output to the file
367                         output.write(process.stdout.read())
368                         output.close()
369                         process.stdout.close()
370
371                         waiter.join()
372                         subprocess.call(['stty', 'sane'])
373
374         def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
375                 logging.error('Build interruption occurred. Cleaning console.')
376                 subprocess.call(['stty', 'sane'])