tests: Add mechanism to start relayd in python testing environment
[lttng-tools.git] / tests / utils / lttngtest / environment.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 #
5 # SPDX-License-Identifier: GPL-2.0-only
6 #
7
8 from types import FrameType
9 from typing import Callable, Iterator, Optional, Tuple, List, Generator
10 import sys
11 import pathlib
12 import signal
13 import subprocess
14 import shlex
15 import shutil
16 import os
17 import queue
18 import tempfile
19 from . import logger
20 import time
21 import threading
22 import contextlib
23
24
25 class TemporaryDirectory:
26 def __init__(self, prefix):
27 # type: (str) -> None
28 self._directory_path = tempfile.mkdtemp(prefix=prefix)
29
30 def __del__(self):
31 shutil.rmtree(self._directory_path, ignore_errors=True)
32
33 @property
34 def path(self):
35 # type: () -> pathlib.Path
36 return pathlib.Path(self._directory_path)
37
38
39 class _SignalWaitQueue:
40 """
41 Utility class useful to wait for a signal before proceeding.
42
43 Simply register the `signal` method as the handler for the signal you are
44 interested in and call `wait_for_signal` to wait for its reception.
45
46 Registering a signal:
47 signal.signal(signal.SIGWHATEVER, queue.signal)
48
49 Waiting for the signal:
50 queue.wait_for_signal()
51 """
52
53 def __init__(self):
54 self._queue = queue.Queue() # type: queue.Queue
55
56 def signal(
57 self,
58 signal_number,
59 frame, # type: Optional[FrameType]
60 ):
61 self._queue.put_nowait(signal_number)
62
63 def wait_for_signal(self):
64 self._queue.get(block=True)
65
66 @contextlib.contextmanager
67 def intercept_signal(self, signal_number):
68 # type: (int) -> Generator[None, None, None]
69 original_handler = signal.getsignal(signal_number)
70 signal.signal(signal_number, self.signal)
71 try:
72 yield
73 except:
74 # Restore the original signal handler and forward the exception.
75 raise
76 finally:
77 signal.signal(signal_number, original_handler)
78
79
80 class _WaitTraceTestApplication:
81 """
82 Create an application that waits before tracing. This allows a test to
83 launch an application, get its PID, and get it to start tracing when it
84 has completed its setup.
85 """
86
87 def __init__(
88 self,
89 binary_path, # type: pathlib.Path
90 event_count, # type: int
91 environment, # type: Environment
92 wait_time_between_events_us=0, # type: int
93 wait_before_exit=False, # type: bool
94 wait_before_exit_file_path=None, # type: Optional[pathlib.Path]
95 ):
96 self._environment = environment # type: Environment
97 self._iteration_count = event_count
98 # File that the application will wait to see before tracing its events.
99 self._app_start_tracing_file_path = pathlib.Path(
100 tempfile.mktemp(
101 prefix="app_",
102 suffix="_start_tracing",
103 dir=self._compat_pathlike(environment.lttng_home_location),
104 )
105 )
106 # File that the application will create when all events have been emitted.
107 self._app_tracing_done_file_path = pathlib.Path(
108 tempfile.mktemp(
109 prefix="app_",
110 suffix="_done_tracing",
111 dir=self._compat_pathlike(environment.lttng_home_location),
112 )
113 )
114
115 if wait_before_exit and wait_before_exit_file_path is None:
116 wait_before_exit_file_path = pathlib.Path(
117 tempfile.mktemp(
118 prefix="app_",
119 suffix="_exit",
120 dir=self._compat_pathlike(environment.lttng_home_location),
121 )
122 )
123
124 self._has_returned = False
125
126 test_app_env = os.environ.copy()
127 test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
128 # Make sure the app is blocked until it is properly registered to
129 # the session daemon.
130 test_app_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
131
132 # File that the application will create to indicate it has completed its initialization.
133 app_ready_file_path = tempfile.mktemp(
134 prefix="app_",
135 suffix="_ready",
136 dir=self._compat_pathlike(environment.lttng_home_location),
137 ) # type: str
138
139 test_app_args = [str(binary_path)]
140 test_app_args.extend(["--iter", str(event_count)])
141 test_app_args.extend(
142 ["--sync-application-in-main-touch", str(app_ready_file_path)]
143 )
144 test_app_args.extend(
145 ["--sync-before-first-event", str(self._app_start_tracing_file_path)]
146 )
147 test_app_args.extend(
148 ["--sync-before-exit-touch", str(self._app_tracing_done_file_path)]
149 )
150 if wait_time_between_events_us != 0:
151 test_app_args.extend(["--wait", str(wait_time_between_events_us)])
152
153 self._process = subprocess.Popen(
154 test_app_args,
155 env=test_app_env,
156 stdout=subprocess.PIPE,
157 stderr=subprocess.STDOUT,
158 ) # type: subprocess.Popen
159
160 # Wait for the application to create the file indicating it has fully
161 # initialized. Make sure the app hasn't crashed in order to not wait
162 # forever.
163 self._wait_for_file_to_be_created(pathlib.Path(app_ready_file_path))
164
165 def _wait_for_file_to_be_created(self, sync_file_path):
166 # type: (pathlib.Path) -> None
167 while True:
168 if os.path.exists(self._compat_pathlike(sync_file_path)):
169 break
170
171 if self._process.poll() is not None:
172 # Application has unexepectedly returned.
173 raise RuntimeError(
174 "Test application has unexepectedly returned while waiting for synchronization file to be created: sync_file=`{sync_file}`, return_code=`{return_code}`".format(
175 sync_file=sync_file_path, return_code=self._process.returncode
176 )
177 )
178
179 time.sleep(0.001)
180
181 def trace(self):
182 # type: () -> None
183 if self._process.poll() is not None:
184 # Application has unexepectedly returned.
185 raise RuntimeError(
186 "Test application has unexepectedly before tracing with return code `{return_code}`".format(
187 return_code=self._process.returncode
188 )
189 )
190 open(self._compat_pathlike(self._app_start_tracing_file_path), mode="x")
191
192 def wait_for_tracing_done(self):
193 # type: () -> None
194 self._wait_for_file_to_be_created(self._app_tracing_done_file_path)
195
196 def wait_for_exit(self):
197 # type: () -> None
198 if self._process.wait() != 0:
199 raise RuntimeError(
200 "Test application has exit with return code `{return_code}`".format(
201 return_code=self._process.returncode
202 )
203 )
204 self._has_returned = True
205
206 @property
207 def vpid(self):
208 # type: () -> int
209 return self._process.pid
210
211 @staticmethod
212 def _compat_pathlike(path):
213 # type: (pathlib.Path) -> pathlib.Path | str
214 """
215 The builtin open() and many methods of the 'os' library in Python >= 3.6
216 expect a path-like object while prior versions expect a string or
217 bytes object. Return the correct type based on the presence of the
218 "__fspath__" attribute specified in PEP-519.
219 """
220 if hasattr(path, "__fspath__"):
221 return path
222 else:
223 return str(path)
224
225 def __del__(self):
226 if not self._has_returned:
227 # This is potentially racy if the pid has been recycled. However,
228 # we can't use pidfd_open since it is only available in python >= 3.9.
229 self._process.kill()
230 self._process.wait()
231
232
233 class WaitTraceTestApplicationGroup:
234 def __init__(
235 self,
236 environment, # type: Environment
237 application_count, # type: int
238 event_count, # type: int
239 wait_time_between_events_us=0, # type: int
240 wait_before_exit=False, # type: bool
241 ):
242 self._wait_before_exit_file_path = (
243 pathlib.Path(
244 tempfile.mktemp(
245 prefix="app_group_",
246 suffix="_exit",
247 dir=_WaitTraceTestApplication._compat_pathlike(
248 environment.lttng_home_location
249 ),
250 )
251 )
252 if wait_before_exit
253 else None
254 )
255
256 self._apps = []
257 self._consumers = []
258 for i in range(application_count):
259 new_app = environment.launch_wait_trace_test_application(
260 event_count,
261 wait_time_between_events_us,
262 wait_before_exit,
263 self._wait_before_exit_file_path,
264 )
265
266 # Attach an output consumer to log the application's error output (if any).
267 if environment._logging_function:
268 app_output_consumer = ProcessOutputConsumer(
269 new_app._process,
270 "app-{}".format(str(new_app.vpid)),
271 environment._logging_function,
272 ) # type: Optional[ProcessOutputConsumer]
273 app_output_consumer.daemon = True
274 app_output_consumer.start()
275 self._consumers.append(app_output_consumer)
276
277 self._apps.append(new_app)
278
279 def trace(self):
280 # type: () -> None
281 for app in self._apps:
282 app.trace()
283
284 def exit(
285 self, wait_for_apps=False # type: bool
286 ):
287 if self._wait_before_exit_file_path is None:
288 raise RuntimeError(
289 "Can't call exit on an application group created with `wait_before_exit=False`"
290 )
291
292 # Wait for apps to have produced all of their events so that we can
293 # cause the death of all apps to happen within a short time span.
294 for app in self._apps:
295 app.wait_for_tracing_done()
296
297 open(
298 _WaitTraceTestApplication._compat_pathlike(
299 self._wait_before_exit_file_path
300 ),
301 mode="x",
302 )
303 # Performed in two passes to allow tests to stress the unregistration of many applications.
304 # Waiting for each app to exit turn-by-turn would defeat the purpose here.
305 if wait_for_apps:
306 for app in self._apps:
307 app.wait_for_exit()
308
309
310 class _TraceTestApplication:
311 """
312 Create an application that emits events as soon as it is launched. In most
313 scenarios, it is preferable to use a WaitTraceTestApplication.
314 """
315
316 def __init__(self, binary_path, environment):
317 # type: (pathlib.Path, Environment)
318 self._environment = environment # type: Environment
319 self._has_returned = False
320
321 test_app_env = os.environ.copy()
322 test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
323 # Make sure the app is blocked until it is properly registered to
324 # the session daemon.
325 test_app_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
326
327 test_app_args = [str(binary_path)]
328
329 self._process = subprocess.Popen(
330 test_app_args, env=test_app_env
331 ) # type: subprocess.Popen
332
333 def wait_for_exit(self):
334 # type: () -> None
335 if self._process.wait() != 0:
336 raise RuntimeError(
337 "Test application has exit with return code `{return_code}`".format(
338 return_code=self._process.returncode
339 )
340 )
341 self._has_returned = True
342
343 def __del__(self):
344 if not self._has_returned:
345 # This is potentially racy if the pid has been recycled. However,
346 # we can't use pidfd_open since it is only available in python >= 3.9.
347 self._process.kill()
348 self._process.wait()
349
350
351 class ProcessOutputConsumer(threading.Thread, logger._Logger):
352 def __init__(
353 self,
354 process, # type: subprocess.Popen
355 name, # type: str
356 log, # type: Callable[[str], None]
357 ):
358 threading.Thread.__init__(self)
359 self._prefix = name
360 logger._Logger.__init__(self, log)
361 self._process = process
362
363 def run(self):
364 # type: () -> None
365 while self._process.poll() is None:
366 assert self._process.stdout
367 line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
368 if len(line) != 0:
369 self._log("{prefix}: {line}".format(prefix=self._prefix, line=line))
370
371
372 # Generate a temporary environment in which to execute a test.
373 class _Environment(logger._Logger):
374 def __init__(
375 self,
376 with_sessiond, # type: bool
377 log=None, # type: Optional[Callable[[str], None]]
378 with_relayd=False, # type: bool
379 ):
380 super().__init__(log)
381 signal.signal(signal.SIGTERM, self._handle_termination_signal)
382 signal.signal(signal.SIGINT, self._handle_termination_signal)
383
384 # Assumes the project's hierarchy to this file is:
385 # tests/utils/python/this_file
386 self._project_root = (
387 pathlib.Path(__file__).absolute().parents[3]
388 ) # type: pathlib.Path
389 self._lttng_home = TemporaryDirectory(
390 "lttng_test_env_home"
391 ) # type: Optional[TemporaryDirectory]
392
393 self._relayd = (
394 self._launch_lttng_relayd() if with_relayd else None
395 ) # type: Optional[subprocess.Popen[bytes]]
396 self._relayd_output_consumer = None
397
398 self._sessiond = (
399 self._launch_lttng_sessiond() if with_sessiond else None
400 ) # type: Optional[subprocess.Popen[bytes]]
401
402 @property
403 def lttng_home_location(self):
404 # type: () -> pathlib.Path
405 if self._lttng_home is None:
406 raise RuntimeError("Attempt to access LTTng home after clean-up")
407 return self._lttng_home.path
408
409 @property
410 def lttng_client_path(self):
411 # type: () -> pathlib.Path
412 return self._project_root / "src" / "bin" / "lttng" / "lttng"
413
414 @property
415 def lttng_relayd_control_port(self):
416 # type: () -> int
417 return 5400
418
419 @property
420 def lttng_relayd_data_port(self):
421 # type: () -> int
422 return 5401
423
424 @property
425 def lttng_relayd_live_port(self):
426 # type: () -> int
427 return 5402
428
429 def create_temporary_directory(self, prefix=None):
430 # type: (Optional[str]) -> pathlib.Path
431 # Simply return a path that is contained within LTTNG_HOME; it will
432 # be destroyed when the temporary home goes out of scope.
433 assert self._lttng_home
434 return pathlib.Path(
435 tempfile.mkdtemp(
436 prefix="tmp" if prefix is None else prefix,
437 dir=str(self._lttng_home.path),
438 )
439 )
440
441 # Unpack a list of environment variables from a string
442 # such as "HELLO=is_it ME='/you/are/looking/for'"
443 @staticmethod
444 def _unpack_env_vars(env_vars_string):
445 # type: (str) -> List[Tuple[str, str]]
446 unpacked_vars = []
447 for var in shlex.split(env_vars_string):
448 equal_position = var.find("=")
449 # Must have an equal sign and not end with an equal sign
450 if equal_position == -1 or equal_position == len(var) - 1:
451 raise ValueError(
452 "Invalid sessiond environment variable: `{}`".format(var)
453 )
454
455 var_name = var[0:equal_position]
456 var_value = var[equal_position + 1 :]
457 # Unquote any paths
458 var_value = var_value.replace("'", "")
459 var_value = var_value.replace('"', "")
460 unpacked_vars.append((var_name, var_value))
461
462 return unpacked_vars
463
464 def _launch_lttng_relayd(self):
465 # type: () -> Optional[subprocess.Popen]
466 relayd_path = (
467 self._project_root / "src" / "bin" / "lttng-relayd" / "lttng-relayd"
468 )
469 if os.environ.get("LTTNG_TEST_NO_RELAYD", "0") == "1":
470 # Run without a relay daemon; the user may be running one
471 # under gdb, for example.
472 return None
473
474 relayd_env_vars = os.environ.get("LTTNG_RELAYD_ENV_VARS")
475 relayd_env = os.environ.copy()
476 if relayd_env_vars:
477 self._log("Additional lttng-relayd environment variables:")
478 for name, value in self._unpack_env_vars(relayd_env_vars):
479 self._log("{}={}".format(name, value))
480 relayd_env[name] = value
481
482 assert self._lttng_home is not None
483 relayd_env["LTTNG_HOME"] = str(self._lttng_home.path)
484 self._log(
485 "Launching relayd with LTTNG_HOME='${}'".format(str(self._lttng_home.path))
486 )
487 process = subprocess.Popen(
488 [
489 str(relayd_path),
490 "-C",
491 "tcp://0.0.0.0:{}".format(self.lttng_relayd_control_port),
492 "-D",
493 "tcp://0.0.0.0:{}".format(self.lttng_relayd_data_port),
494 "-L",
495 "tcp://localhost:{}".format(self.lttng_relayd_live_port),
496 ],
497 stdout=subprocess.PIPE,
498 stderr=subprocess.STDOUT,
499 env=relayd_env,
500 )
501
502 if self._logging_function:
503 self._relayd_output_consumer = ProcessOutputConsumer(
504 process, "lttng-relayd", self._logging_function
505 )
506 self._relayd_output_consumer.daemon = True
507 self._relayd_output_consumer.start()
508
509 return process
510
511 def _launch_lttng_sessiond(self):
512 # type: () -> Optional[subprocess.Popen]
513 is_64bits_host = sys.maxsize > 2**32
514
515 sessiond_path = (
516 self._project_root / "src" / "bin" / "lttng-sessiond" / "lttng-sessiond"
517 )
518 consumerd_path_option_name = "--consumerd{bitness}-path".format(
519 bitness="64" if is_64bits_host else "32"
520 )
521 consumerd_path = (
522 self._project_root / "src" / "bin" / "lttng-consumerd" / "lttng-consumerd"
523 )
524
525 no_sessiond_var = os.environ.get("TEST_NO_SESSIOND")
526 if no_sessiond_var and no_sessiond_var == "1":
527 # Run test without a session daemon; the user probably
528 # intends to run one under gdb for example.
529 return None
530
531 # Setup the session daemon's environment
532 sessiond_env_vars = os.environ.get("LTTNG_SESSIOND_ENV_VARS")
533 sessiond_env = os.environ.copy()
534 if sessiond_env_vars:
535 self._log("Additional lttng-sessiond environment variables:")
536 additional_vars = self._unpack_env_vars(sessiond_env_vars)
537 for var_name, var_value in additional_vars:
538 self._log(" {name}={value}".format(name=var_name, value=var_value))
539 sessiond_env[var_name] = var_value
540
541 sessiond_env["LTTNG_SESSION_CONFIG_XSD_PATH"] = str(
542 self._project_root / "src" / "common"
543 )
544
545 assert self._lttng_home is not None
546 sessiond_env["LTTNG_HOME"] = str(self._lttng_home.path)
547
548 wait_queue = _SignalWaitQueue()
549 with wait_queue.intercept_signal(signal.SIGUSR1):
550 self._log(
551 "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
552 home_dir=str(self._lttng_home.path)
553 )
554 )
555 process = subprocess.Popen(
556 [
557 str(sessiond_path),
558 consumerd_path_option_name,
559 str(consumerd_path),
560 "--sig-parent",
561 ],
562 stdout=subprocess.PIPE,
563 stderr=subprocess.STDOUT,
564 env=sessiond_env,
565 )
566
567 if self._logging_function:
568 self._sessiond_output_consumer = ProcessOutputConsumer(
569 process, "lttng-sessiond", self._logging_function
570 ) # type: Optional[ProcessOutputConsumer]
571 self._sessiond_output_consumer.daemon = True
572 self._sessiond_output_consumer.start()
573
574 # Wait for SIGUSR1, indicating the sessiond is ready to proceed
575 wait_queue.wait_for_signal()
576
577 return process
578
579 def _handle_termination_signal(self, signal_number, frame):
580 # type: (int, Optional[FrameType]) -> None
581 self._log(
582 "Killed by {signal_name} signal, cleaning-up".format(
583 signal_name=signal.strsignal(signal_number)
584 )
585 )
586 self._cleanup()
587
588 def launch_wait_trace_test_application(
589 self,
590 event_count, # type: int
591 wait_time_between_events_us=0,
592 wait_before_exit=False,
593 wait_before_exit_file_path=None,
594 ):
595 # type: (int, int, bool, Optional[pathlib.Path]) -> _WaitTraceTestApplication
596 """
597 Launch an application that will wait before tracing `event_count` events.
598 """
599 return _WaitTraceTestApplication(
600 self._project_root
601 / "tests"
602 / "utils"
603 / "testapp"
604 / "gen-ust-events"
605 / "gen-ust-events",
606 event_count,
607 self,
608 wait_time_between_events_us,
609 wait_before_exit,
610 wait_before_exit_file_path,
611 )
612
613 def launch_trace_test_constructor_application(self):
614 # type () -> TraceTestApplication
615 """
616 Launch an application that will trace from within constructors.
617 """
618 return _TraceTestApplication(
619 self._project_root
620 / "tests"
621 / "utils"
622 / "testapp"
623 / "gen-ust-events-constructor"
624 / "gen-ust-events-constructor",
625 self,
626 )
627
628 # Clean-up managed processes
629 def _cleanup(self):
630 # type: () -> None
631 if self._sessiond and self._sessiond.poll() is None:
632 # The session daemon is alive; kill it.
633 self._log(
634 "Killing session daemon (pid = {sessiond_pid})".format(
635 sessiond_pid=self._sessiond.pid
636 )
637 )
638
639 self._sessiond.terminate()
640 self._sessiond.wait()
641 if self._sessiond_output_consumer:
642 self._sessiond_output_consumer.join()
643 self._sessiond_output_consumer = None
644
645 self._log("Session daemon killed")
646 self._sessiond = None
647
648 if self._relayd and self._relayd.poll() is None:
649 self._relayd.terminate()
650 self._relayd.wait()
651 if self._relayd_output_consumer:
652 self._relayd_output_consumer.join()
653 self._relayd_output_consumer = None
654 self._log("Relayd killed")
655 self._relayd = None
656
657 self._lttng_home = None
658
659 def __del__(self):
660 self._cleanup()
661
662
663 @contextlib.contextmanager
664 def test_environment(with_sessiond, log=None, with_relayd=False):
665 # type: (bool, Optional[Callable[[str], None]], bool) -> Iterator[_Environment]
666 env = _Environment(with_sessiond, log, with_relayd)
667 try:
668 yield env
669 finally:
670 env._cleanup()
This page took 0.060584 seconds and 4 git commands to generate.