tests: Add mechanism to start relayd in python testing environment
[lttng-tools.git] / tests / utils / lttngtest / environment.py
index 42b0a61b9b35930d9474fa529390e9181382ff6f..e51f5eb66da533cc36c6dde3ec2d8e235c7636fd 100644 (file)
@@ -6,7 +6,7 @@
 #
 
 from types import FrameType
-from typing import Callable, Iterator, Optional, Tuple, List
+from typing import Callable, Iterator, Optional, Tuple, List, Generator
 import sys
 import pathlib
 import signal
@@ -63,8 +63,21 @@ class _SignalWaitQueue:
     def wait_for_signal(self):
         self._queue.get(block=True)
 
-
-class WaitTraceTestApplication:
+    @contextlib.contextmanager
+    def intercept_signal(self, signal_number):
+        # type: (int) -> Generator[None, None, None]
+        original_handler = signal.getsignal(signal_number)
+        signal.signal(signal_number, self.signal)
+        try:
+            yield
+        except:
+            # Restore the original signal handler and forward the exception.
+            raise
+        finally:
+            signal.signal(signal_number, original_handler)
+
+
+class _WaitTraceTestApplication:
     """
     Create an application that waits before tracing. This allows a test to
     launch an application, get its PID, and get it to start tracing when it
@@ -77,20 +90,37 @@ class WaitTraceTestApplication:
         event_count,  # type: int
         environment,  # type: Environment
         wait_time_between_events_us=0,  # type: int
+        wait_before_exit=False,  # type: bool
+        wait_before_exit_file_path=None,  # type: Optional[pathlib.Path]
     ):
         self._environment = environment  # type: Environment
-        if event_count % 5:
-            # The test application currently produces 5 different events per iteration.
-            raise ValueError("event count must be a multiple of 5")
-        self._iteration_count = int(event_count / 5)  # type: int
+        self._iteration_count = event_count
         # File that the application will wait to see before tracing its events.
         self._app_start_tracing_file_path = pathlib.Path(
             tempfile.mktemp(
                 prefix="app_",
                 suffix="_start_tracing",
-                dir=self._compat_open_path(environment.lttng_home_location),
+                dir=self._compat_pathlike(environment.lttng_home_location),
+            )
+        )
+        # File that the application will create when all events have been emitted.
+        self._app_tracing_done_file_path = pathlib.Path(
+            tempfile.mktemp(
+                prefix="app_",
+                suffix="_done_tracing",
+                dir=self._compat_pathlike(environment.lttng_home_location),
             )
         )
+
+        if wait_before_exit and wait_before_exit_file_path is None:
+            wait_before_exit_file_path = pathlib.Path(
+                tempfile.mktemp(
+                    prefix="app_",
+                    suffix="_exit",
+                    dir=self._compat_pathlike(environment.lttng_home_location),
+                )
+            )
+
         self._has_returned = False
 
         test_app_env = os.environ.copy()
@@ -103,42 +133,50 @@ class WaitTraceTestApplication:
         app_ready_file_path = tempfile.mktemp(
             prefix="app_",
             suffix="_ready",
-            dir=self._compat_open_path(environment.lttng_home_location),
+            dir=self._compat_pathlike(environment.lttng_home_location),
         )  # type: str
 
         test_app_args = [str(binary_path)]
+        test_app_args.extend(["--iter", str(event_count)])
         test_app_args.extend(
-            shlex.split(
-                "--iter {iteration_count} --create-in-main {app_ready_file_path} --wait-before-first-event {app_start_tracing_file_path} --wait {wait_time_between_events_us}".format(
-                    iteration_count=self._iteration_count,
-                    app_ready_file_path=app_ready_file_path,
-                    app_start_tracing_file_path=self._app_start_tracing_file_path,
-                    wait_time_between_events_us=wait_time_between_events_us,
-                )
-            )
+            ["--sync-application-in-main-touch", str(app_ready_file_path)]
+        )
+        test_app_args.extend(
+            ["--sync-before-first-event", str(self._app_start_tracing_file_path)]
+        )
+        test_app_args.extend(
+            ["--sync-before-exit-touch", str(self._app_tracing_done_file_path)]
         )
+        if wait_time_between_events_us != 0:
+            test_app_args.extend(["--wait", str(wait_time_between_events_us)])
 
         self._process = subprocess.Popen(
             test_app_args,
             env=test_app_env,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
         )  # type: subprocess.Popen
 
         # Wait for the application to create the file indicating it has fully
         # initialized. Make sure the app hasn't crashed in order to not wait
         # forever.
+        self._wait_for_file_to_be_created(pathlib.Path(app_ready_file_path))
+
+    def _wait_for_file_to_be_created(self, sync_file_path):
+        # type: (pathlib.Path) -> None
         while True:
-            if os.path.exists(app_ready_file_path):
+            if os.path.exists(self._compat_pathlike(sync_file_path)):
                 break
 
             if self._process.poll() is not None:
                 # Application has unexepectedly returned.
                 raise RuntimeError(
-                    "Test application has unexepectedly returned during its initialization with return code `{return_code}`".format(
-                        return_code=self._process.returncode
+                    "Test application has unexepectedly returned while waiting for synchronization file to be created: sync_file=`{sync_file}`, return_code=`{return_code}`".format(
+                        sync_file=sync_file_path, return_code=self._process.returncode
                     )
                 )
 
-            time.sleep(0.1)
+            time.sleep(0.001)
 
     def trace(self):
         # type: () -> None
@@ -149,7 +187,11 @@ class WaitTraceTestApplication:
                     return_code=self._process.returncode
                 )
             )
-        open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
+        open(self._compat_pathlike(self._app_start_tracing_file_path), mode="x")
+
+    def wait_for_tracing_done(self):
+        # type: () -> None
+        self._wait_for_file_to_be_created(self._app_tracing_done_file_path)
 
     def wait_for_exit(self):
         # type: () -> None
@@ -167,12 +209,13 @@ class WaitTraceTestApplication:
         return self._process.pid
 
     @staticmethod
-    def _compat_open_path(path):
+    def _compat_pathlike(path):
         # type: (pathlib.Path) -> pathlib.Path | str
         """
-        The builtin open() in python >= 3.6 expects a path-like object while
-        prior versions expect a string or bytes object. Return the correct type
-        based on the presence of the "__fspath__" attribute specified in PEP-519.
+        The builtin open() and many methods of the 'os' library in Python >= 3.6
+        expect a path-like object while prior versions expect a string or
+        bytes object. Return the correct type based on the presence of the
+        "__fspath__" attribute specified in PEP-519.
         """
         if hasattr(path, "__fspath__"):
             return path
@@ -187,7 +230,84 @@ class WaitTraceTestApplication:
             self._process.wait()
 
 
-class TraceTestApplication:
+class WaitTraceTestApplicationGroup:
+    def __init__(
+        self,
+        environment,  # type: Environment
+        application_count,  # type: int
+        event_count,  # type: int
+        wait_time_between_events_us=0,  # type: int
+        wait_before_exit=False,  # type: bool
+    ):
+        self._wait_before_exit_file_path = (
+            pathlib.Path(
+                tempfile.mktemp(
+                    prefix="app_group_",
+                    suffix="_exit",
+                    dir=_WaitTraceTestApplication._compat_pathlike(
+                        environment.lttng_home_location
+                    ),
+                )
+            )
+            if wait_before_exit
+            else None
+        )
+
+        self._apps = []
+        self._consumers = []
+        for i in range(application_count):
+            new_app = environment.launch_wait_trace_test_application(
+                event_count,
+                wait_time_between_events_us,
+                wait_before_exit,
+                self._wait_before_exit_file_path,
+            )
+
+            # Attach an output consumer to log the application's error output (if any).
+            if environment._logging_function:
+                app_output_consumer = ProcessOutputConsumer(
+                    new_app._process,
+                    "app-{}".format(str(new_app.vpid)),
+                    environment._logging_function,
+                )  # type: Optional[ProcessOutputConsumer]
+                app_output_consumer.daemon = True
+                app_output_consumer.start()
+                self._consumers.append(app_output_consumer)
+
+            self._apps.append(new_app)
+
+    def trace(self):
+        # type: () -> None
+        for app in self._apps:
+            app.trace()
+
+    def exit(
+        self, wait_for_apps=False  # type: bool
+    ):
+        if self._wait_before_exit_file_path is None:
+            raise RuntimeError(
+                "Can't call exit on an application group created with `wait_before_exit=False`"
+            )
+
+        # Wait for apps to have produced all of their events so that we can
+        # cause the death of all apps to happen within a short time span.
+        for app in self._apps:
+            app.wait_for_tracing_done()
+
+        open(
+            _WaitTraceTestApplication._compat_pathlike(
+                self._wait_before_exit_file_path
+            ),
+            mode="x",
+        )
+        # Performed in two passes to allow tests to stress the unregistration of many applications.
+        # Waiting for each app to exit turn-by-turn would defeat the purpose here.
+        if wait_for_apps:
+            for app in self._apps:
+                app.wait_for_exit()
+
+
+class _TraceTestApplication:
     """
     Create an application that emits events as soon as it is launched. In most
     scenarios, it is preferable to use a WaitTraceTestApplication.
@@ -255,6 +375,7 @@ class _Environment(logger._Logger):
         self,
         with_sessiond,  # type: bool
         log=None,  # type: Optional[Callable[[str], None]]
+        with_relayd=False,  # type: bool
     ):
         super().__init__(log)
         signal.signal(signal.SIGTERM, self._handle_termination_signal)
@@ -269,6 +390,11 @@ class _Environment(logger._Logger):
             "lttng_test_env_home"
         )  # type: Optional[TemporaryDirectory]
 
+        self._relayd = (
+            self._launch_lttng_relayd() if with_relayd else None
+        )  # type: Optional[subprocess.Popen[bytes]]
+        self._relayd_output_consumer = None
+
         self._sessiond = (
             self._launch_lttng_sessiond() if with_sessiond else None
         )  # type: Optional[subprocess.Popen[bytes]]
@@ -285,6 +411,21 @@ class _Environment(logger._Logger):
         # type: () -> pathlib.Path
         return self._project_root / "src" / "bin" / "lttng" / "lttng"
 
+    @property
+    def lttng_relayd_control_port(self):
+        # type: () -> int
+        return 5400
+
+    @property
+    def lttng_relayd_data_port(self):
+        # type: () -> int
+        return 5401
+
+    @property
+    def lttng_relayd_live_port(self):
+        # type: () -> int
+        return 5402
+
     def create_temporary_directory(self, prefix=None):
         # type: (Optional[str]) -> pathlib.Path
         # Simply return a path that is contained within LTTNG_HOME; it will
@@ -320,6 +461,53 @@ class _Environment(logger._Logger):
 
         return unpacked_vars
 
+    def _launch_lttng_relayd(self):
+        # type: () -> Optional[subprocess.Popen]
+        relayd_path = (
+            self._project_root / "src" / "bin" / "lttng-relayd" / "lttng-relayd"
+        )
+        if os.environ.get("LTTNG_TEST_NO_RELAYD", "0") == "1":
+            # Run without a relay daemon; the user may be running one
+            # under gdb, for example.
+            return None
+
+        relayd_env_vars = os.environ.get("LTTNG_RELAYD_ENV_VARS")
+        relayd_env = os.environ.copy()
+        if relayd_env_vars:
+            self._log("Additional lttng-relayd environment variables:")
+            for name, value in self._unpack_env_vars(relayd_env_vars):
+                self._log("{}={}".format(name, value))
+                relayd_env[name] = value
+
+        assert self._lttng_home is not None
+        relayd_env["LTTNG_HOME"] = str(self._lttng_home.path)
+        self._log(
+            "Launching relayd with LTTNG_HOME='${}'".format(str(self._lttng_home.path))
+        )
+        process = subprocess.Popen(
+            [
+                str(relayd_path),
+                "-C",
+                "tcp://0.0.0.0:{}".format(self.lttng_relayd_control_port),
+                "-D",
+                "tcp://0.0.0.0:{}".format(self.lttng_relayd_data_port),
+                "-L",
+                "tcp://localhost:{}".format(self.lttng_relayd_live_port),
+            ],
+            stdout=subprocess.PIPE,
+            stderr=subprocess.STDOUT,
+            env=relayd_env,
+        )
+
+        if self._logging_function:
+            self._relayd_output_consumer = ProcessOutputConsumer(
+                process, "lttng-relayd", self._logging_function
+            )
+            self._relayd_output_consumer.daemon = True
+            self._relayd_output_consumer.start()
+
+        return process
+
     def _launch_lttng_sessiond(self):
         # type: () -> Optional[subprocess.Popen]
         is_64bits_host = sys.maxsize > 2**32
@@ -358,35 +546,33 @@ class _Environment(logger._Logger):
         sessiond_env["LTTNG_HOME"] = str(self._lttng_home.path)
 
         wait_queue = _SignalWaitQueue()
-        signal.signal(signal.SIGUSR1, wait_queue.signal)
-
-        self._log(
-            "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
-                home_dir=str(self._lttng_home.path)
+        with wait_queue.intercept_signal(signal.SIGUSR1):
+            self._log(
+                "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
+                    home_dir=str(self._lttng_home.path)
+                )
+            )
+            process = subprocess.Popen(
+                [
+                    str(sessiond_path),
+                    consumerd_path_option_name,
+                    str(consumerd_path),
+                    "--sig-parent",
+                ],
+                stdout=subprocess.PIPE,
+                stderr=subprocess.STDOUT,
+                env=sessiond_env,
             )
-        )
-        process = subprocess.Popen(
-            [
-                str(sessiond_path),
-                consumerd_path_option_name,
-                str(consumerd_path),
-                "--sig-parent",
-            ],
-            stdout=subprocess.PIPE,
-            stderr=subprocess.STDOUT,
-            env=sessiond_env,
-        )
 
-        if self._logging_function:
-            self._sessiond_output_consumer = ProcessOutputConsumer(
-                process, "lttng-sessiond", self._logging_function
-            )  # type: Optional[ProcessOutputConsumer]
-            self._sessiond_output_consumer.daemon = True
-            self._sessiond_output_consumer.start()
+            if self._logging_function:
+                self._sessiond_output_consumer = ProcessOutputConsumer(
+                    process, "lttng-sessiond", self._logging_function
+                )  # type: Optional[ProcessOutputConsumer]
+                self._sessiond_output_consumer.daemon = True
+                self._sessiond_output_consumer.start()
 
-        # Wait for SIGUSR1, indicating the sessiond is ready to proceed
-        wait_queue.wait_for_signal()
-        signal.signal(signal.SIGUSR1, wait_queue.signal)
+            # Wait for SIGUSR1, indicating the sessiond is ready to proceed
+            wait_queue.wait_for_signal()
 
         return process
 
@@ -399,20 +585,29 @@ class _Environment(logger._Logger):
         )
         self._cleanup()
 
-    def launch_wait_trace_test_application(self, event_count):
-        # type: (int) -> WaitTraceTestApplication
+    def launch_wait_trace_test_application(
+        self,
+        event_count,  # type: int
+        wait_time_between_events_us=0,
+        wait_before_exit=False,
+        wait_before_exit_file_path=None,
+    ):
+        # type: (int, int, bool, Optional[pathlib.Path]) -> _WaitTraceTestApplication
         """
         Launch an application that will wait before tracing `event_count` events.
         """
-        return WaitTraceTestApplication(
+        return _WaitTraceTestApplication(
             self._project_root
             / "tests"
             / "utils"
             / "testapp"
-            / "gen-ust-nevents"
-            / "gen-ust-nevents",
+            / "gen-ust-events"
+            / "gen-ust-events",
             event_count,
             self,
+            wait_time_between_events_us,
+            wait_before_exit,
+            wait_before_exit_file_path,
         )
 
     def launch_trace_test_constructor_application(self):
@@ -420,7 +615,7 @@ class _Environment(logger._Logger):
         """
         Launch an application that will trace from within constructors.
         """
-        return TraceTestApplication(
+        return _TraceTestApplication(
             self._project_root
             / "tests"
             / "utils"
@@ -450,6 +645,15 @@ class _Environment(logger._Logger):
             self._log("Session daemon killed")
             self._sessiond = None
 
+        if self._relayd and self._relayd.poll() is None:
+            self._relayd.terminate()
+            self._relayd.wait()
+            if self._relayd_output_consumer:
+                self._relayd_output_consumer.join()
+                self._relayd_output_consumer = None
+            self._log("Relayd killed")
+            self._relayd = None
+
         self._lttng_home = None
 
     def __del__(self):
@@ -457,9 +661,9 @@ class _Environment(logger._Logger):
 
 
 @contextlib.contextmanager
-def test_environment(with_sessiond, log=None):
-    # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment]
-    env = _Environment(with_sessiond, log)
+def test_environment(with_sessiond, log=None, with_relayd=False):
+    # type: (bool, Optional[Callable[[str], None]], bool) -> Iterator[_Environment]
+    env = _Environment(with_sessiond, log, with_relayd)
     try:
         yield env
     finally:
This page took 0.028256 seconds and 4 git commands to generate.