Linux kernel mirror (for testing)
git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git
kernel
os
linux
1# SPDX-License-Identifier: GPL-2.0
2#
3# Runs UML kernel, collects output, and handles errors.
4#
5# Copyright (C) 2019, Google LLC.
6# Author: Felix Guo <felixguoxiuping@gmail.com>
7# Author: Brendan Higgins <brendanhiggins@google.com>
8
9import importlib.abc
10import importlib.util
11import logging
12import subprocess
13import os
14import shlex
15import shutil
16import signal
17import sys
18import threading
19from typing import Iterator, List, Optional, Tuple, Any
20from types import FrameType
21
22import kunit_config
23import qemu_config
24
25KCONFIG_PATH = '.config'
26KUNITCONFIG_PATH = '.kunitconfig'
27OLD_KUNITCONFIG_PATH = 'last_used_kunitconfig'
28DEFAULT_KUNITCONFIG_PATH = 'tools/testing/kunit/configs/default.config'
29ALL_TESTS_CONFIG_PATH = 'tools/testing/kunit/configs/all_tests.config'
30UML_KCONFIG_PATH = 'tools/testing/kunit/configs/arch_uml.config'
31OUTFILE_PATH = 'test.log'
32ABS_TOOL_PATH = os.path.abspath(os.path.dirname(__file__))
33QEMU_CONFIGS_DIR = os.path.join(ABS_TOOL_PATH, 'qemu_configs')
34
35class ConfigError(Exception):
36 """Represents an error trying to configure the Linux kernel."""
37
38
39class BuildError(Exception):
40 """Represents an error trying to build the Linux kernel."""
41
42
43class LinuxSourceTreeOperations:
44 """An abstraction over command line operations performed on a source tree."""
45
46 def __init__(self, linux_arch: str, cross_compile: Optional[str]):
47 self._linux_arch = linux_arch
48 self._cross_compile = cross_compile
49
50 def make_mrproper(self) -> None:
51 try:
52 subprocess.check_output(['make', 'mrproper'], stderr=subprocess.STDOUT)
53 except OSError as e:
54 raise ConfigError('Could not call make command: ' + str(e))
55 except subprocess.CalledProcessError as e:
56 raise ConfigError(e.output.decode())
57
58 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
59 return base_kunitconfig
60
61 def make_olddefconfig(self, build_dir: str, make_options: Optional[List[str]]) -> None:
62 command = ['make', 'ARCH=' + self._linux_arch, 'O=' + build_dir, 'olddefconfig']
63 if self._cross_compile:
64 command += ['CROSS_COMPILE=' + self._cross_compile]
65 if make_options:
66 command.extend(make_options)
67 print('Populating config with:\n$', ' '.join(command))
68 try:
69 subprocess.check_output(command, stderr=subprocess.STDOUT)
70 except OSError as e:
71 raise ConfigError('Could not call make command: ' + str(e))
72 except subprocess.CalledProcessError as e:
73 raise ConfigError(e.output.decode())
74
75 def make(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> None:
76 command = ['make', 'all', 'compile_commands.json', 'scripts_gdb',
77 'ARCH=' + self._linux_arch, 'O=' + build_dir, '--jobs=' + str(jobs)]
78 if make_options:
79 command.extend(make_options)
80 if self._cross_compile:
81 command += ['CROSS_COMPILE=' + self._cross_compile]
82 print('Building with:\n$', ' '.join(command))
83 try:
84 proc = subprocess.Popen(command,
85 stderr=subprocess.PIPE,
86 stdout=subprocess.DEVNULL)
87 except OSError as e:
88 raise BuildError('Could not call execute make: ' + str(e))
89 except subprocess.CalledProcessError as e:
90 raise BuildError(e.output)
91 _, stderr = proc.communicate()
92 if proc.returncode != 0:
93 raise BuildError(stderr.decode())
94 if stderr: # likely only due to build warnings
95 print(stderr.decode())
96
97 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
98 raise RuntimeError('not implemented!')
99
100
101class LinuxSourceTreeOperationsQemu(LinuxSourceTreeOperations):
102
103 def __init__(self, qemu_arch_params: qemu_config.QemuArchParams, cross_compile: Optional[str]):
104 super().__init__(linux_arch=qemu_arch_params.linux_arch,
105 cross_compile=cross_compile)
106 self._kconfig = qemu_arch_params.kconfig
107 self._qemu_arch = qemu_arch_params.qemu_arch
108 self._kernel_path = qemu_arch_params.kernel_path
109 self._kernel_command_line = qemu_arch_params.kernel_command_line
110 if 'kunit_shutdown=' not in self._kernel_command_line:
111 self._kernel_command_line += ' kunit_shutdown=reboot'
112 self._extra_qemu_params = qemu_arch_params.extra_qemu_params
113 self._serial = qemu_arch_params.serial
114
115 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
116 kconfig = kunit_config.parse_from_string(self._kconfig)
117 kconfig.merge_in_entries(base_kunitconfig)
118 return kconfig
119
120 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
121 kernel_path = os.path.join(build_dir, self._kernel_path)
122 qemu_command = ['qemu-system-' + self._qemu_arch,
123 '-nodefaults',
124 '-m', '1024',
125 '-kernel', kernel_path,
126 '-append', ' '.join(params + [self._kernel_command_line]),
127 '-no-reboot',
128 '-nographic',
129 '-accel', 'kvm',
130 '-accel', 'hvf',
131 '-accel', 'tcg',
132 '-serial', self._serial] + self._extra_qemu_params
133 # Note: shlex.join() does what we want, but requires python 3.8+.
134 print('Running tests with:\n$', ' '.join(shlex.quote(arg) for arg in qemu_command))
135 return subprocess.Popen(qemu_command,
136 stdin=subprocess.PIPE,
137 stdout=subprocess.PIPE,
138 stderr=subprocess.STDOUT,
139 text=True, errors='backslashreplace')
140
141class LinuxSourceTreeOperationsUml(LinuxSourceTreeOperations):
142 """An abstraction over command line operations performed on a source tree."""
143
144 def __init__(self, cross_compile: Optional[str]=None):
145 super().__init__(linux_arch='um', cross_compile=cross_compile)
146
147 def make_arch_config(self, base_kunitconfig: kunit_config.Kconfig) -> kunit_config.Kconfig:
148 kconfig = kunit_config.parse_file(UML_KCONFIG_PATH)
149 kconfig.merge_in_entries(base_kunitconfig)
150 return kconfig
151
152 def start(self, params: List[str], build_dir: str) -> subprocess.Popen:
153 """Runs the Linux UML binary. Must be named 'linux'."""
154 linux_bin = os.path.join(build_dir, 'linux')
155 params.extend(['mem=1G', 'console=tty', 'kunit_shutdown=halt'])
156 print('Running tests with:\n$', linux_bin, ' '.join(shlex.quote(arg) for arg in params))
157 return subprocess.Popen([linux_bin] + params,
158 stdin=subprocess.PIPE,
159 stdout=subprocess.PIPE,
160 stderr=subprocess.STDOUT,
161 text=True, errors='backslashreplace')
162
163def get_kconfig_path(build_dir: str) -> str:
164 return os.path.join(build_dir, KCONFIG_PATH)
165
166def get_kunitconfig_path(build_dir: str) -> str:
167 return os.path.join(build_dir, KUNITCONFIG_PATH)
168
169def get_old_kunitconfig_path(build_dir: str) -> str:
170 return os.path.join(build_dir, OLD_KUNITCONFIG_PATH)
171
172def get_parsed_kunitconfig(build_dir: str,
173 kunitconfig_paths: Optional[List[str]]=None) -> kunit_config.Kconfig:
174 if not kunitconfig_paths:
175 path = get_kunitconfig_path(build_dir)
176 if not os.path.exists(path):
177 shutil.copyfile(DEFAULT_KUNITCONFIG_PATH, path)
178 return kunit_config.parse_file(path)
179
180 merged = kunit_config.Kconfig()
181
182 for path in kunitconfig_paths:
183 if os.path.isdir(path):
184 path = os.path.join(path, KUNITCONFIG_PATH)
185 if not os.path.exists(path):
186 raise ConfigError(f'Specified kunitconfig ({path}) does not exist')
187
188 partial = kunit_config.parse_file(path)
189 diff = merged.conflicting_options(partial)
190 if diff:
191 diff_str = '\n\n'.join(f'{a}\n vs from {path}\n{b}' for a, b in diff)
192 raise ConfigError(f'Multiple values specified for {len(diff)} options in kunitconfig:\n{diff_str}')
193 merged.merge_in_entries(partial)
194 return merged
195
196def get_outfile_path(build_dir: str) -> str:
197 return os.path.join(build_dir, OUTFILE_PATH)
198
199def _default_qemu_config_path(arch: str) -> str:
200 config_path = os.path.join(QEMU_CONFIGS_DIR, arch + '.py')
201 if os.path.isfile(config_path):
202 return config_path
203
204 options = [f[:-3] for f in os.listdir(QEMU_CONFIGS_DIR) if f.endswith('.py')]
205
206 if arch == 'help':
207 print('um')
208 for option in options:
209 print(option)
210 sys.exit()
211
212 raise ConfigError(arch + ' is not a valid arch, options are ' + str(sorted(options)))
213
214def _get_qemu_ops(config_path: str,
215 extra_qemu_args: Optional[List[str]],
216 cross_compile: Optional[str]) -> Tuple[str, LinuxSourceTreeOperations]:
217 # The module name/path has very little to do with where the actual file
218 # exists (I learned this through experimentation and could not find it
219 # anywhere in the Python documentation).
220 #
221 # Bascially, we completely ignore the actual file location of the config
222 # we are loading and just tell Python that the module lives in the
223 # QEMU_CONFIGS_DIR for import purposes regardless of where it actually
224 # exists as a file.
225 module_path = '.' + os.path.join(os.path.basename(QEMU_CONFIGS_DIR), os.path.basename(config_path))
226 spec = importlib.util.spec_from_file_location(module_path, config_path)
227 assert spec is not None
228 config = importlib.util.module_from_spec(spec)
229 # See https://github.com/python/typeshed/pull/2626 for context.
230 assert isinstance(spec.loader, importlib.abc.Loader)
231 spec.loader.exec_module(config)
232
233 if not hasattr(config, 'QEMU_ARCH'):
234 raise ValueError('qemu_config module missing "QEMU_ARCH": ' + config_path)
235 params: qemu_config.QemuArchParams = config.QEMU_ARCH
236 if extra_qemu_args:
237 params.extra_qemu_params.extend(extra_qemu_args)
238 return params.linux_arch, LinuxSourceTreeOperationsQemu(
239 params, cross_compile=cross_compile)
240
241class LinuxSourceTree:
242 """Represents a Linux kernel source tree with KUnit tests."""
243
244 def __init__(
245 self,
246 build_dir: str,
247 kunitconfig_paths: Optional[List[str]]=None,
248 kconfig_add: Optional[List[str]]=None,
249 arch: Optional[str]=None,
250 cross_compile: Optional[str]=None,
251 qemu_config_path: Optional[str]=None,
252 extra_qemu_args: Optional[List[str]]=None) -> None:
253 signal.signal(signal.SIGINT, self.signal_handler)
254 if qemu_config_path:
255 self._arch, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
256 else:
257 self._arch = 'um' if arch is None else arch
258 if self._arch == 'um':
259 self._ops = LinuxSourceTreeOperationsUml(cross_compile=cross_compile)
260 else:
261 qemu_config_path = _default_qemu_config_path(self._arch)
262 _, self._ops = _get_qemu_ops(qemu_config_path, extra_qemu_args, cross_compile)
263
264 self._kconfig = get_parsed_kunitconfig(build_dir, kunitconfig_paths)
265 if kconfig_add:
266 kconfig = kunit_config.parse_from_string('\n'.join(kconfig_add))
267 self._kconfig.merge_in_entries(kconfig)
268 self._process : Optional[subprocess.Popen[Any]] = None
269
270 def arch(self) -> str:
271 return self._arch
272
273 def clean(self) -> bool:
274 try:
275 self._ops.make_mrproper()
276 except ConfigError as e:
277 logging.error(e)
278 return False
279 return True
280
281 def validate_config(self, build_dir: str) -> bool:
282 kconfig_path = get_kconfig_path(build_dir)
283 validated_kconfig = kunit_config.parse_file(kconfig_path)
284 if self._kconfig.is_subset_of(validated_kconfig):
285 return True
286 missing = set(self._kconfig.as_entries()) - set(validated_kconfig.as_entries())
287 message = 'Not all Kconfig options selected in kunitconfig were in the generated .config.\n' \
288 'This is probably due to unsatisfied dependencies.\n' \
289 'Missing: ' + ', '.join(str(e) for e in missing)
290 if self._arch == 'um':
291 message += '\nNote: many Kconfig options aren\'t available on UML. You can try running ' \
292 'on a different architecture with something like "--arch=x86_64".'
293 logging.error(message)
294 return False
295
296 def build_config(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
297 kconfig_path = get_kconfig_path(build_dir)
298 if build_dir and not os.path.exists(build_dir):
299 os.mkdir(build_dir)
300 try:
301 self._kconfig = self._ops.make_arch_config(self._kconfig)
302 self._kconfig.write_to_file(kconfig_path)
303 self._ops.make_olddefconfig(build_dir, make_options)
304 except ConfigError as e:
305 logging.error(e)
306 return False
307 if not self.validate_config(build_dir):
308 return False
309
310 old_path = get_old_kunitconfig_path(build_dir)
311 if os.path.exists(old_path):
312 os.remove(old_path) # write_to_file appends to the file
313 self._kconfig.write_to_file(old_path)
314 return True
315
316 def _kunitconfig_changed(self, build_dir: str) -> bool:
317 old_path = get_old_kunitconfig_path(build_dir)
318 if not os.path.exists(old_path):
319 return True
320
321 old_kconfig = kunit_config.parse_file(old_path)
322 return old_kconfig != self._kconfig
323
324 def build_reconfig(self, build_dir: str, make_options: Optional[List[str]]) -> bool:
325 """Creates a new .config if it is not a subset of the .kunitconfig."""
326 kconfig_path = get_kconfig_path(build_dir)
327 if not os.path.exists(kconfig_path):
328 print('Generating .config ...')
329 return self.build_config(build_dir, make_options)
330
331 existing_kconfig = kunit_config.parse_file(kconfig_path)
332 self._kconfig = self._ops.make_arch_config(self._kconfig)
333
334 if self._kconfig.is_subset_of(existing_kconfig) and not self._kunitconfig_changed(build_dir):
335 return True
336 print('Regenerating .config ...')
337 os.remove(kconfig_path)
338 return self.build_config(build_dir, make_options)
339
340 def build_kernel(self, jobs: int, build_dir: str, make_options: Optional[List[str]]) -> bool:
341 try:
342 self._ops.make_olddefconfig(build_dir, make_options)
343 self._ops.make(jobs, build_dir, make_options)
344 except (ConfigError, BuildError) as e:
345 logging.error(e)
346 return False
347 return self.validate_config(build_dir)
348
349 def _restore_terminal_if_tty(self) -> None:
350 # stty requires a controlling terminal; skip headless runs.
351 if sys.stdin is None or not sys.stdin.isatty():
352 return
353 subprocess.call(['stty', 'sane'])
354
355 def run_kernel(self, args: Optional[List[str]]=None, build_dir: str='', filter_glob: str='', filter: str='', filter_action: Optional[str]=None, timeout: Optional[int]=None) -> Iterator[str]:
356 # Copy to avoid mutating the caller-supplied list. exec_tests() reuses
357 # the same args across repeated run_kernel() calls (e.g. --run_isolated),
358 # so appending to the original would accumulate stale flags on each call.
359 args = list(args) if args else []
360 if filter_glob:
361 args.append('kunit.filter_glob=' + filter_glob)
362 if filter:
363 args.append('kunit.filter="' + filter + '"')
364 if filter_action:
365 args.append('kunit.filter_action=' + filter_action)
366 args.append('kunit.enable=1')
367
368 self._process = self._ops.start(args, build_dir)
369 assert self._process is not None # tell mypy it's set
370 assert self._process.stdout is not None # tell mypy it's set
371
372 # Enforce the timeout in a background thread.
373 def _wait_proc() -> None:
374 try:
375 if self._process:
376 self._process.wait(timeout=timeout)
377 except Exception as e:
378 print(e)
379 if self._process:
380 self._process.terminate()
381 self._process.wait()
382 waiter = threading.Thread(target=_wait_proc)
383 waiter.start()
384
385 output = open(get_outfile_path(build_dir), 'w')
386 try:
387 # Tee the output to the file and to our caller in real time.
388 for line in self._process.stdout:
389 output.write(line)
390 yield line
391 # This runs even if our caller doesn't consume every line.
392 finally:
393 # Flush any leftover output to the file
394 if self._process:
395 if self._process.stdout:
396 output.write(self._process.stdout.read())
397 self._process.stdout.close()
398 self._process = None
399 output.close()
400
401 waiter.join()
402 self._restore_terminal_if_tty()
403
404 def signal_handler(self, unused_sig: int, unused_frame: Optional[FrameType]) -> None:
405 logging.error('Build interruption occurred. Cleaning console.')
406 if self._process:
407 self._process.terminate()
408 self._process.wait()
409 self._restore_terminal_if_tty()