#
from types import FrameType
-from typing import Callable, Iterator, Optional, Tuple, List
+from typing import Callable, Iterator, Optional, Tuple, List, Generator
import sys
import pathlib
import signal
def wait_for_signal(self):
self._queue.get(block=True)
-
-class WaitTraceTestApplication:
+ @contextlib.contextmanager
+ def intercept_signal(self, signal_number):
+ # type: (int) -> Generator[None, None, None]
+ original_handler = signal.getsignal(signal_number)
+ signal.signal(signal_number, self.signal)
+ try:
+ yield
+ except:
+ # Restore the original signal handler and forward the exception.
+ raise
+ finally:
+ signal.signal(signal_number, original_handler)
+
+
+class _WaitTraceTestApplication:
"""
Create an application that waits before tracing. This allows a test to
launch an application, get its PID, and get it to start tracing when it
event_count, # type: int
environment, # type: Environment
wait_time_between_events_us=0, # type: int
+ wait_before_exit=False, # type: bool
+ wait_before_exit_file_path=None, # type: Optional[pathlib.Path]
):
self._environment = environment # type: Environment
- if event_count % 5:
- # The test application currently produces 5 different events per iteration.
- raise ValueError("event count must be a multiple of 5")
- self._iteration_count = int(event_count / 5) # type: int
+ self._iteration_count = event_count
# File that the application will wait to see before tracing its events.
self._app_start_tracing_file_path = pathlib.Path(
tempfile.mktemp(
prefix="app_",
suffix="_start_tracing",
- dir=self._compat_open_path(environment.lttng_home_location),
+ dir=self._compat_pathlike(environment.lttng_home_location),
)
)
+ # File that the application will create when all events have been emitted.
+ self._app_tracing_done_file_path = pathlib.Path(
+ tempfile.mktemp(
+ prefix="app_",
+ suffix="_done_tracing",
+ dir=self._compat_pathlike(environment.lttng_home_location),
+ )
+ )
+
+ if wait_before_exit and wait_before_exit_file_path is None:
+ wait_before_exit_file_path = pathlib.Path(
+ tempfile.mktemp(
+ prefix="app_",
+ suffix="_exit",
+ dir=self._compat_pathlike(environment.lttng_home_location),
+ )
+ )
+
self._has_returned = False
test_app_env = os.environ.copy()
app_ready_file_path = tempfile.mktemp(
prefix="app_",
suffix="_ready",
- dir=self._compat_open_path(environment.lttng_home_location),
+ dir=self._compat_pathlike(environment.lttng_home_location),
) # type: str
test_app_args = [str(binary_path)]
+ test_app_args.extend(["--iter", str(event_count)])
test_app_args.extend(
- shlex.split(
- "--iter {iteration_count} --create-in-main {app_ready_file_path} --wait-before-first-event {app_start_tracing_file_path} --wait {wait_time_between_events_us}".format(
- iteration_count=self._iteration_count,
- app_ready_file_path=app_ready_file_path,
- app_start_tracing_file_path=self._app_start_tracing_file_path,
- wait_time_between_events_us=wait_time_between_events_us,
- )
- )
+ ["--sync-application-in-main-touch", str(app_ready_file_path)]
+ )
+ test_app_args.extend(
+ ["--sync-before-first-event", str(self._app_start_tracing_file_path)]
)
+ test_app_args.extend(
+ ["--sync-before-exit-touch", str(self._app_tracing_done_file_path)]
+ )
+ if wait_time_between_events_us != 0:
+ test_app_args.extend(["--wait", str(wait_time_between_events_us)])
self._process = subprocess.Popen(
test_app_args,
env=test_app_env,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
) # type: subprocess.Popen
# Wait for the application to create the file indicating it has fully
# initialized. Make sure the app hasn't crashed in order to not wait
# forever.
+ self._wait_for_file_to_be_created(pathlib.Path(app_ready_file_path))
+
+ def _wait_for_file_to_be_created(self, sync_file_path):
+ # type: (pathlib.Path) -> None
while True:
- if os.path.exists(app_ready_file_path):
+ if os.path.exists(self._compat_pathlike(sync_file_path)):
break
if self._process.poll() is not None:
# Application has unexepectedly returned.
raise RuntimeError(
- "Test application has unexepectedly returned during its initialization with return code `{return_code}`".format(
- return_code=self._process.returncode
+ "Test application has unexepectedly returned while waiting for synchronization file to be created: sync_file=`{sync_file}`, return_code=`{return_code}`".format(
+ sync_file=sync_file_path, return_code=self._process.returncode
)
)
- time.sleep(0.1)
+ time.sleep(0.001)
def trace(self):
# type: () -> None
return_code=self._process.returncode
)
)
- open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
+ open(self._compat_pathlike(self._app_start_tracing_file_path), mode="x")
+
+ def wait_for_tracing_done(self):
+ # type: () -> None
+ self._wait_for_file_to_be_created(self._app_tracing_done_file_path)
def wait_for_exit(self):
# type: () -> None
return self._process.pid
@staticmethod
- def _compat_open_path(path):
+ def _compat_pathlike(path):
# type: (pathlib.Path) -> pathlib.Path | str
"""
- The builtin open() in python >= 3.6 expects a path-like object while
- prior versions expect a string or bytes object. Return the correct type
- based on the presence of the "__fspath__" attribute specified in PEP-519.
+ The builtin open() and many methods of the 'os' library in Python >= 3.6
+ expect a path-like object while prior versions expect a string or
+ bytes object. Return the correct type based on the presence of the
+ "__fspath__" attribute specified in PEP-519.
"""
if hasattr(path, "__fspath__"):
return path
self._process.wait()
-class TraceTestApplication:
+class WaitTraceTestApplicationGroup:
+ def __init__(
+ self,
+ environment, # type: Environment
+ application_count, # type: int
+ event_count, # type: int
+ wait_time_between_events_us=0, # type: int
+ wait_before_exit=False, # type: bool
+ ):
+ self._wait_before_exit_file_path = (
+ pathlib.Path(
+ tempfile.mktemp(
+ prefix="app_group_",
+ suffix="_exit",
+ dir=_WaitTraceTestApplication._compat_pathlike(
+ environment.lttng_home_location
+ ),
+ )
+ )
+ if wait_before_exit
+ else None
+ )
+
+ self._apps = []
+ self._consumers = []
+ for i in range(application_count):
+ new_app = environment.launch_wait_trace_test_application(
+ event_count,
+ wait_time_between_events_us,
+ wait_before_exit,
+ self._wait_before_exit_file_path,
+ )
+
+ # Attach an output consumer to log the application's error output (if any).
+ if environment._logging_function:
+ app_output_consumer = ProcessOutputConsumer(
+ new_app._process,
+ "app-{}".format(str(new_app.vpid)),
+ environment._logging_function,
+ ) # type: Optional[ProcessOutputConsumer]
+ app_output_consumer.daemon = True
+ app_output_consumer.start()
+ self._consumers.append(app_output_consumer)
+
+ self._apps.append(new_app)
+
+ def trace(self):
+ # type: () -> None
+ for app in self._apps:
+ app.trace()
+
+ def exit(
+ self, wait_for_apps=False # type: bool
+ ):
+ if self._wait_before_exit_file_path is None:
+ raise RuntimeError(
+ "Can't call exit on an application group created with `wait_before_exit=False`"
+ )
+
+ # Wait for apps to have produced all of their events so that we can
+ # cause the death of all apps to happen within a short time span.
+ for app in self._apps:
+ app.wait_for_tracing_done()
+
+ open(
+ _WaitTraceTestApplication._compat_pathlike(
+ self._wait_before_exit_file_path
+ ),
+ mode="x",
+ )
+ # Performed in two passes to allow tests to stress the unregistration of many applications.
+ # Waiting for each app to exit turn-by-turn would defeat the purpose here.
+ if wait_for_apps:
+ for app in self._apps:
+ app.wait_for_exit()
+
+
+class _TraceTestApplication:
"""
- Create an application to trace.
+ Create an application that emits events as soon as it is launched. In most
+ scenarios, it is preferable to use a WaitTraceTestApplication.
"""
- def __init__(self, binary_path: pathlib.Path, environment: "Environment"):
- self._environment: Environment = environment
+ def __init__(self, binary_path, environment):
+ # type: (pathlib.Path, Environment)
+ self._environment = environment # type: Environment
self._has_returned = False
test_app_env = os.environ.copy()
test_app_args = [str(binary_path)]
- self._process: subprocess.Popen = subprocess.Popen(
+ self._process = subprocess.Popen(
test_app_args, env=test_app_env
- )
+ ) # type: subprocess.Popen
- def wait_for_exit(self) -> None:
+ def wait_for_exit(self):
+ # type: () -> None
if self._process.wait() != 0:
raise RuntimeError(
"Test application has exit with return code `{return_code}`".format(
self,
with_sessiond, # type: bool
log=None, # type: Optional[Callable[[str], None]]
+ with_relayd=False, # type: bool
):
super().__init__(log)
signal.signal(signal.SIGTERM, self._handle_termination_signal)
"lttng_test_env_home"
) # type: Optional[TemporaryDirectory]
+ self._relayd = (
+ self._launch_lttng_relayd() if with_relayd else None
+ ) # type: Optional[subprocess.Popen[bytes]]
+ self._relayd_output_consumer = None
+
self._sessiond = (
self._launch_lttng_sessiond() if with_sessiond else None
) # type: Optional[subprocess.Popen[bytes]]
# type: () -> pathlib.Path
return self._project_root / "src" / "bin" / "lttng" / "lttng"
+ @property
+ def lttng_relayd_control_port(self):
+ # type: () -> int
+ return 5400
+
+ @property
+ def lttng_relayd_data_port(self):
+ # type: () -> int
+ return 5401
+
+ @property
+ def lttng_relayd_live_port(self):
+ # type: () -> int
+ return 5402
+
def create_temporary_directory(self, prefix=None):
# type: (Optional[str]) -> pathlib.Path
# Simply return a path that is contained within LTTNG_HOME; it will
return unpacked_vars
+ def _launch_lttng_relayd(self):
+ # type: () -> Optional[subprocess.Popen]
+ relayd_path = (
+ self._project_root / "src" / "bin" / "lttng-relayd" / "lttng-relayd"
+ )
+ if os.environ.get("LTTNG_TEST_NO_RELAYD", "0") == "1":
+ # Run without a relay daemon; the user may be running one
+ # under gdb, for example.
+ return None
+
+ relayd_env_vars = os.environ.get("LTTNG_RELAYD_ENV_VARS")
+ relayd_env = os.environ.copy()
+ if relayd_env_vars:
+ self._log("Additional lttng-relayd environment variables:")
+ for name, value in self._unpack_env_vars(relayd_env_vars):
+ self._log("{}={}".format(name, value))
+ relayd_env[name] = value
+
+ assert self._lttng_home is not None
+ relayd_env["LTTNG_HOME"] = str(self._lttng_home.path)
+ self._log(
+ "Launching relayd with LTTNG_HOME='${}'".format(str(self._lttng_home.path))
+ )
+ process = subprocess.Popen(
+ [
+ str(relayd_path),
+ "-C",
+ "tcp://0.0.0.0:{}".format(self.lttng_relayd_control_port),
+ "-D",
+ "tcp://0.0.0.0:{}".format(self.lttng_relayd_data_port),
+ "-L",
+ "tcp://localhost:{}".format(self.lttng_relayd_live_port),
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=relayd_env,
+ )
+
+ if self._logging_function:
+ self._relayd_output_consumer = ProcessOutputConsumer(
+ process, "lttng-relayd", self._logging_function
+ )
+ self._relayd_output_consumer.daemon = True
+ self._relayd_output_consumer.start()
+
+ return process
+
def _launch_lttng_sessiond(self):
# type: () -> Optional[subprocess.Popen]
is_64bits_host = sys.maxsize > 2**32
sessiond_env["LTTNG_HOME"] = str(self._lttng_home.path)
wait_queue = _SignalWaitQueue()
- signal.signal(signal.SIGUSR1, wait_queue.signal)
-
- self._log(
- "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
- home_dir=str(self._lttng_home.path)
+ with wait_queue.intercept_signal(signal.SIGUSR1):
+ self._log(
+ "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
+ home_dir=str(self._lttng_home.path)
+ )
+ )
+ process = subprocess.Popen(
+ [
+ str(sessiond_path),
+ consumerd_path_option_name,
+ str(consumerd_path),
+ "--sig-parent",
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=sessiond_env,
)
- )
- process = subprocess.Popen(
- [
- str(sessiond_path),
- consumerd_path_option_name,
- str(consumerd_path),
- "--sig-parent",
- ],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- env=sessiond_env,
- )
- if self._logging_function:
- self._sessiond_output_consumer = ProcessOutputConsumer(
- process, "lttng-sessiond", self._logging_function
- ) # type: Optional[ProcessOutputConsumer]
- self._sessiond_output_consumer.daemon = True
- self._sessiond_output_consumer.start()
+ if self._logging_function:
+ self._sessiond_output_consumer = ProcessOutputConsumer(
+ process, "lttng-sessiond", self._logging_function
+ ) # type: Optional[ProcessOutputConsumer]
+ self._sessiond_output_consumer.daemon = True
+ self._sessiond_output_consumer.start()
- # Wait for SIGUSR1, indicating the sessiond is ready to proceed
- wait_queue.wait_for_signal()
- signal.signal(signal.SIGUSR1, wait_queue.signal)
+ # Wait for SIGUSR1, indicating the sessiond is ready to proceed
+ wait_queue.wait_for_signal()
return process
)
self._cleanup()
- def launch_wait_trace_test_application(self, event_count):
- # type: (int) -> WaitTraceTestApplication
+ def launch_wait_trace_test_application(
+ self,
+ event_count, # type: int
+ wait_time_between_events_us=0,
+ wait_before_exit=False,
+ wait_before_exit_file_path=None,
+ ):
+ # type: (int, int, bool, Optional[pathlib.Path]) -> _WaitTraceTestApplication
"""
Launch an application that will wait before tracing `event_count` events.
"""
- return WaitTraceTestApplication(
+ return _WaitTraceTestApplication(
self._project_root
/ "tests"
/ "utils"
/ "testapp"
- / "gen-ust-nevents"
- / "gen-ust-nevents",
+ / "gen-ust-events"
+ / "gen-ust-events",
event_count,
self,
+ wait_time_between_events_us,
+ wait_before_exit,
+ wait_before_exit_file_path,
)
- def launch_trace_test_constructor_application(
- self
- ) -> TraceTestApplication:
+ def launch_trace_test_constructor_application(self):
+ # type () -> TraceTestApplication
"""
Launch an application that will trace from within constructors.
"""
- return TraceTestApplication(
+ return _TraceTestApplication(
self._project_root
/ "tests"
/ "utils"
self._log("Session daemon killed")
self._sessiond = None
+ if self._relayd and self._relayd.poll() is None:
+ self._relayd.terminate()
+ self._relayd.wait()
+ if self._relayd_output_consumer:
+ self._relayd_output_consumer.join()
+ self._relayd_output_consumer = None
+ self._log("Relayd killed")
+ self._relayd = None
+
self._lttng_home = None
def __del__(self):
@contextlib.contextmanager
-def test_environment(with_sessiond, log=None):
- # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment]
- env = _Environment(with_sessiond, log)
+def test_environment(with_sessiond, log=None, with_relayd=False):
+ # type: (bool, Optional[Callable[[str], None]], bool) -> Iterator[_Environment]
+ env = _Environment(with_sessiond, log, with_relayd)
try:
yield env
finally: