#
from types import FrameType
-from typing import Callable, Iterator, Optional, Tuple, List
+from typing import Callable, Iterator, Optional, Tuple, List, Generator
import sys
import pathlib
import signal
def wait_for_signal(self):
self._queue.get(block=True)
+ @contextlib.contextmanager
+ def intercept_signal(self, signal_number):
+ # type: (int) -> Generator[None, None, None]
+ original_handler = signal.getsignal(signal_number)
+ signal.signal(signal_number, self.signal)
+ try:
+ yield
+ except:
+ # Restore the original signal handler and forward the exception.
+ raise
+ finally:
+ signal.signal(signal_number, original_handler)
+
class WaitTraceTestApplication:
"""
wait_time_between_events_us=0, # type: int
):
self._environment = environment # type: Environment
- if event_count % 5:
- # The test application currently produces 5 different events per iteration.
- raise ValueError("event count must be a multiple of 5")
- self._iteration_count = int(event_count / 5) # type: int
+ self._iteration_count = event_count
# File that the application will wait to see before tracing its events.
self._app_start_tracing_file_path = pathlib.Path(
tempfile.mktemp(
test_app_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
# File that the application will create to indicate it has completed its initialization.
- app_ready_file_path: str = tempfile.mktemp(
+ app_ready_file_path = tempfile.mktemp(
prefix="app_",
suffix="_ready",
dir=self._compat_open_path(environment.lttng_home_location),
test_app_args = [str(binary_path)]
test_app_args.extend(
shlex.split(
- "--iter {iteration_count} --create-in-main {app_ready_file_path} --wait-before-first-event {app_start_tracing_file_path} --wait {wait_time_between_events_us}".format(
+ "--iter {iteration_count} --sync-application-in-main-touch {app_ready_file_path} --sync-before-first-event {app_start_tracing_file_path} --wait {wait_time_between_events_us}".format(
iteration_count=self._iteration_count,
app_ready_file_path=app_ready_file_path,
app_start_tracing_file_path=self._app_start_tracing_file_path,
self._process.wait()
+class TraceTestApplication:
+ """
+ Create an application that emits events as soon as it is launched. In most
+ scenarios, it is preferable to use a WaitTraceTestApplication.
+ """
+
+ def __init__(self, binary_path, environment):
+ # type: (pathlib.Path, Environment)
+ self._environment = environment # type: Environment
+ self._has_returned = False
+
+ test_app_env = os.environ.copy()
+ test_app_env["LTTNG_HOME"] = str(environment.lttng_home_location)
+ # Make sure the app is blocked until it is properly registered to
+ # the session daemon.
+ test_app_env["LTTNG_UST_REGISTER_TIMEOUT"] = "-1"
+
+ test_app_args = [str(binary_path)]
+
+ self._process = subprocess.Popen(
+ test_app_args, env=test_app_env
+ ) # type: subprocess.Popen
+
+ def wait_for_exit(self):
+ # type: () -> None
+ if self._process.wait() != 0:
+ raise RuntimeError(
+ "Test application has exit with return code `{return_code}`".format(
+ return_code=self._process.returncode
+ )
+ )
+ self._has_returned = True
+
+ def __del__(self):
+ if not self._has_returned:
+ # This is potentially racy if the pid has been recycled. However,
+ # we can't use pidfd_open since it is only available in python >= 3.9.
+ self._process.kill()
+ self._process.wait()
+
+
class ProcessOutputConsumer(threading.Thread, logger._Logger):
def __init__(
self,
sessiond_env["LTTNG_HOME"] = str(self._lttng_home.path)
wait_queue = _SignalWaitQueue()
- signal.signal(signal.SIGUSR1, wait_queue.signal)
-
- self._log(
- "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
- home_dir=str(self._lttng_home.path)
+ with wait_queue.intercept_signal(signal.SIGUSR1):
+ self._log(
+ "Launching session daemon with LTTNG_HOME=`{home_dir}`".format(
+ home_dir=str(self._lttng_home.path)
+ )
+ )
+ process = subprocess.Popen(
+ [
+ str(sessiond_path),
+ consumerd_path_option_name,
+ str(consumerd_path),
+ "--sig-parent",
+ ],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.STDOUT,
+ env=sessiond_env,
)
- )
- process = subprocess.Popen(
- [
- str(sessiond_path),
- consumerd_path_option_name,
- str(consumerd_path),
- "--sig-parent",
- ],
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT,
- env=sessiond_env,
- )
- if self._logging_function:
- self._sessiond_output_consumer = ProcessOutputConsumer(
- process, "lttng-sessiond", self._logging_function
- ) # type: Optional[ProcessOutputConsumer]
- self._sessiond_output_consumer.daemon = True
- self._sessiond_output_consumer.start()
+ if self._logging_function:
+ self._sessiond_output_consumer = ProcessOutputConsumer(
+ process, "lttng-sessiond", self._logging_function
+ ) # type: Optional[ProcessOutputConsumer]
+ self._sessiond_output_consumer.daemon = True
+ self._sessiond_output_consumer.start()
- # Wait for SIGUSR1, indicating the sessiond is ready to proceed
- wait_queue.wait_for_signal()
- signal.signal(signal.SIGUSR1, wait_queue.signal)
+ # Wait for SIGUSR1, indicating the sessiond is ready to proceed
+ wait_queue.wait_for_signal()
return process
/ "tests"
/ "utils"
/ "testapp"
- / "gen-ust-nevents"
- / "gen-ust-nevents",
+ / "gen-ust-events"
+ / "gen-ust-events",
event_count,
self,
)
+ def launch_trace_test_constructor_application(self):
+ # type () -> TraceTestApplication
+ """
+ Launch an application that will trace from within constructors.
+ """
+ return TraceTestApplication(
+ self._project_root
+ / "tests"
+ / "utils"
+ / "testapp"
+ / "gen-ust-events-constructor"
+ / "gen-ust-events-constructor",
+ self,
+ )
+
# Clean-up managed processes
def _cleanup(self):
# type: () -> None