From ce8470c9d039c563167d4fd061803e9eacf69ec3 Mon Sep 17 00:00:00 2001 From: Michael Jeanson Date: Mon, 27 Mar 2023 15:25:23 -0400 Subject: [PATCH] Tests: python: use quoted annotations to support python <= 3.6 MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit Python 3.0 introduced syntax for parameter and return type annotations, as specified in PEP 484. Python 3.6 introduced support for variable type annotations, as specified in PEP 526. Upstream recommends using quoted annotations for librairies that need to support older python versions. See https://typing.readthedocs.io/en/latest/source/libraries.html#compatibility-with-older-python-versions Change-Id: Ifc6017a7baa9f0589e85d32005ac9ead18c1c9fb Signed-off-by: Michael Jeanson Signed-off-by: Jérémie Galarneau --- tests/regression/tools/context/test_ust.py | 21 +- tests/utils/lttngtest/environment.py | 108 ++++++----- tests/utils/lttngtest/logger.py | 11 +- tests/utils/lttngtest/lttng.py | 181 +++++++++--------- tests/utils/lttngtest/lttngctl.py | 212 ++++++++++++--------- tests/utils/lttngtest/tap_generator.py | 79 +++++--- 6 files changed, 351 insertions(+), 261 deletions(-) diff --git a/tests/regression/tools/context/test_ust.py b/tests/regression/tools/context/test_ust.py index ab2e743e3..b90291fa5 100755 --- a/tests/regression/tools/context/test_ust.py +++ b/tests/regression/tools/context/test_ust.py @@ -29,7 +29,8 @@ import lttngtest import bt2 -def context_trace_field_name(context_type: Type[lttngtest.ContextType]) -> str: +def context_trace_field_name(context_type): + # type: (Type[lttngtest.ContextType]) -> str if isinstance(context_type, lttngtest.VpidContextType): return "vpid" elif isinstance(context_type, lttngtest.VuidContextType): @@ -46,8 +47,9 @@ def context_trace_field_name(context_type: Type[lttngtest.ContextType]) -> str: def trace_stream_class_has_context_field_in_event_context( - trace_location: pathlib.Path, context_field_name: str -) -> bool: + trace_location, context_field_name +): + # type: (pathlib.Path, str) -> bool iterator = bt2.TraceCollectionMessageIterator(str(trace_location)) # A bt2 message sequence is guaranteed to begin with a StreamBeginningMessage. @@ -67,9 +69,8 @@ def trace_stream_class_has_context_field_in_event_context( return context_field_name in event_common_context_field_class -def trace_events_have_context_value( - trace_location: pathlib.Path, context_field_name: str, value: Any -) -> bool: +def trace_events_have_context_value(trace_location, context_field_name, value): + # type: (pathlib.Path, str, Any) -> bool for msg in bt2.TraceCollectionMessageIterator(str(trace_location)): if type(msg) is not bt2._EventMessageConst: continue @@ -80,12 +81,8 @@ def trace_events_have_context_value( return True -def test_static_context( - tap: lttngtest.TapGenerator, - test_env: lttngtest._Environment, - context_type: lttngtest.ContextType, - context_value_retriever: Callable[[lttngtest.WaitTraceTestApplication], Any], -) -> None: +def test_static_context(tap, test_env, context_type, context_value_retriever): + # type: (lttngtest.TapGenerator, lttngtest._Environment, lttngtest.ContextType, Callable[[lttngtest.WaitTraceTestApplication], Any]) -> None tap.diagnostic( "Test presence and expected value of context `{context_name}`".format( context_name=type(context_type).__name__ diff --git a/tests/utils/lttngtest/environment.py b/tests/utils/lttngtest/environment.py index d9777810b..e71e588df 100644 --- a/tests/utils/lttngtest/environment.py +++ b/tests/utils/lttngtest/environment.py @@ -6,7 +6,7 @@ # from types import FrameType -from typing import Callable, Optional, Tuple, List +from typing import Callable, Iterator, Optional, Tuple, List import sys import pathlib import signal @@ -23,14 +23,16 @@ import contextlib class TemporaryDirectory: - def __init__(self, prefix: str): + def __init__(self, prefix): + # type: (str) -> None self._directory_path = tempfile.mkdtemp(prefix=prefix) def __del__(self): shutil.rmtree(self._directory_path, ignore_errors=True) @property - def path(self) -> pathlib.Path: + def path(self): + # type: () -> pathlib.Path return pathlib.Path(self._directory_path) @@ -49,9 +51,13 @@ class _SignalWaitQueue: """ def __init__(self): - self._queue: queue.Queue = queue.Queue() + self._queue = queue.Queue() # type: queue.Queue - def signal(self, signal_number, frame: Optional[FrameType]): + def signal( + self, + signal_number, + frame, # type: Optional[FrameType] + ): self._queue.put_nowait(signal_number) def wait_for_signal(self): @@ -67,18 +73,18 @@ class WaitTraceTestApplication: def __init__( self, - binary_path: pathlib.Path, - event_count: int, - environment: "Environment", - wait_time_between_events_us: int = 0, + binary_path, # type: pathlib.Path + event_count, # type: int + environment, # type: Environment + wait_time_between_events_us=0, # type: int ): - self._environment: Environment = environment + self._environment = environment # type: Environment if event_count % 5: # The test application currently produces 5 different events per iteration. raise ValueError("event count must be a multiple of 5") - self._iteration_count: int = int(event_count / 5) + self._iteration_count = int(event_count / 5) # type: int # File that the application will wait to see before tracing its events. - self._app_start_tracing_file_path: pathlib.Path = pathlib.Path( + self._app_start_tracing_file_path = pathlib.Path( tempfile.mktemp( prefix="app_", suffix="_start_tracing", @@ -98,7 +104,7 @@ class WaitTraceTestApplication: prefix="app_", suffix="_ready", dir=self._compat_open_path(environment.lttng_home_location), - ) + ) # type: str test_app_args = [str(binary_path)] test_app_args.extend( @@ -112,10 +118,10 @@ class WaitTraceTestApplication: ) ) - self._process: subprocess.Popen = subprocess.Popen( + self._process = subprocess.Popen( test_app_args, env=test_app_env, - ) + ) # type: subprocess.Popen # Wait for the application to create the file indicating it has fully # initialized. Make sure the app hasn't crashed in order to not wait @@ -134,7 +140,8 @@ class WaitTraceTestApplication: time.sleep(0.1) - def trace(self) -> None: + def trace(self): + # type: () -> None if self._process.poll() is not None: # Application has unexepectedly returned. raise RuntimeError( @@ -144,7 +151,8 @@ class WaitTraceTestApplication: ) open(self._compat_open_path(self._app_start_tracing_file_path), mode="x") - def wait_for_exit(self) -> None: + def wait_for_exit(self): + # type: () -> None if self._process.wait() != 0: raise RuntimeError( "Test application has exit with return code `{return_code}`".format( @@ -154,12 +162,13 @@ class WaitTraceTestApplication: self._has_returned = True @property - def vpid(self) -> int: + def vpid(self): + # type: () -> int return self._process.pid @staticmethod def _compat_open_path(path): - # type: (pathlib.Path) + # type: (pathlib.Path) -> pathlib.Path | str """ The builtin open() in python >= 3.6 expects a path-like object while prior versions expect a string or bytes object. Return the correct type @@ -180,14 +189,18 @@ class WaitTraceTestApplication: class ProcessOutputConsumer(threading.Thread, logger._Logger): def __init__( - self, process: subprocess.Popen, name: str, log: Callable[[str], None] + self, + process, # type: subprocess.Popen + name, # type: str + log, # type: Callable[[str], None] ): threading.Thread.__init__(self) self._prefix = name logger._Logger.__init__(self, log) self._process = process - def run(self) -> None: + def run(self): + # type: () -> None while self._process.poll() is None: assert self._process.stdout line = self._process.stdout.readline().decode("utf-8").replace("\n", "") @@ -198,7 +211,9 @@ class ProcessOutputConsumer(threading.Thread, logger._Logger): # Generate a temporary environment in which to execute a test. class _Environment(logger._Logger): def __init__( - self, with_sessiond: bool, log: Optional[Callable[[str], None]] = None + self, + with_sessiond, # type: bool + log=None, # type: Optional[Callable[[str], None]] ): super().__init__(log) signal.signal(signal.SIGTERM, self._handle_termination_signal) @@ -206,26 +221,31 @@ class _Environment(logger._Logger): # Assumes the project's hierarchy to this file is: # tests/utils/python/this_file - self._project_root: pathlib.Path = pathlib.Path(__file__).absolute().parents[3] - self._lttng_home: Optional[TemporaryDirectory] = TemporaryDirectory( + self._project_root = ( + pathlib.Path(__file__).absolute().parents[3] + ) # type: pathlib.Path + self._lttng_home = TemporaryDirectory( "lttng_test_env_home" - ) + ) # type: Optional[TemporaryDirectory] - self._sessiond: Optional[subprocess.Popen[bytes]] = ( + self._sessiond = ( self._launch_lttng_sessiond() if with_sessiond else None - ) + ) # type: Optional[subprocess.Popen[bytes]] @property - def lttng_home_location(self) -> pathlib.Path: + def lttng_home_location(self): + # type: () -> pathlib.Path if self._lttng_home is None: raise RuntimeError("Attempt to access LTTng home after clean-up") return self._lttng_home.path @property - def lttng_client_path(self) -> pathlib.Path: + def lttng_client_path(self): + # type: () -> pathlib.Path return self._project_root / "src" / "bin" / "lttng" / "lttng" - def create_temporary_directory(self, prefix: Optional[str] = None) -> pathlib.Path: + def create_temporary_directory(self, prefix=None): + # type: (Optional[str]) -> pathlib.Path # Simply return a path that is contained within LTTNG_HOME; it will # be destroyed when the temporary home goes out of scope. assert self._lttng_home @@ -239,7 +259,8 @@ class _Environment(logger._Logger): # Unpack a list of environment variables from a string # such as "HELLO=is_it ME='/you/are/looking/for'" @staticmethod - def _unpack_env_vars(env_vars_string: str) -> List[Tuple[str, str]]: + def _unpack_env_vars(env_vars_string): + # type: (str) -> List[Tuple[str, str]] unpacked_vars = [] for var in shlex.split(env_vars_string): equal_position = var.find("=") @@ -258,7 +279,8 @@ class _Environment(logger._Logger): return unpacked_vars - def _launch_lttng_sessiond(self) -> Optional[subprocess.Popen]: + def _launch_lttng_sessiond(self): + # type: () -> Optional[subprocess.Popen] is_64bits_host = sys.maxsize > 2**32 sessiond_path = ( @@ -315,9 +337,9 @@ class _Environment(logger._Logger): ) if self._logging_function: - self._sessiond_output_consumer: Optional[ - ProcessOutputConsumer - ] = ProcessOutputConsumer(process, "lttng-sessiond", self._logging_function) + self._sessiond_output_consumer = ProcessOutputConsumer( + process, "lttng-sessiond", self._logging_function + ) # type: Optional[ProcessOutputConsumer] self._sessiond_output_consumer.daemon = True self._sessiond_output_consumer.start() @@ -327,9 +349,8 @@ class _Environment(logger._Logger): return process - def _handle_termination_signal( - self, signal_number: int, frame: Optional[FrameType] - ) -> None: + def _handle_termination_signal(self, signal_number, frame): + # type: (int, Optional[FrameType]) -> None self._log( "Killed by {signal_name} signal, cleaning-up".format( signal_name=signal.strsignal(signal_number) @@ -337,9 +358,8 @@ class _Environment(logger._Logger): ) self._cleanup() - def launch_wait_trace_test_application( - self, event_count: int - ) -> WaitTraceTestApplication: + def launch_wait_trace_test_application(self, event_count): + # type: (int) -> WaitTraceTestApplication """ Launch an application that will wait before tracing `event_count` events. """ @@ -355,7 +375,8 @@ class _Environment(logger._Logger): ) # Clean-up managed processes - def _cleanup(self) -> None: + def _cleanup(self): + # type: () -> None if self._sessiond and self._sessiond.poll() is None: # The session daemon is alive; kill it. self._log( @@ -380,7 +401,8 @@ class _Environment(logger._Logger): @contextlib.contextmanager -def test_environment(with_sessiond: bool, log: Optional[Callable[[str], None]] = None): +def test_environment(with_sessiond, log=None): + # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment] env = _Environment(with_sessiond, log) try: yield env diff --git a/tests/utils/lttngtest/logger.py b/tests/utils/lttngtest/logger.py index 9f90ec06a..b0363d1ab 100644 --- a/tests/utils/lttngtest/logger.py +++ b/tests/utils/lttngtest/logger.py @@ -8,13 +8,16 @@ from typing import Callable, Optional class _Logger: - def __init__(self, log: Optional[Callable[[str], None]]): - self._logging_function: Optional[Callable[[str], None]] = log + def __init__(self, log): + # type: (Optional[Callable[[str], None]]) -> None + self._logging_function = log # type: Optional[Callable[[str], None]] - def _log(self, msg: str) -> None: + def _log(self, msg): + # type: (str) -> None if self._logging_function: self._logging_function(msg) @property - def logger(self) -> Optional[Callable[[str], None]]: + def logger(self): + # type: () -> Optional[Callable[[str], None]] return self._logging_function diff --git a/tests/utils/lttngtest/lttng.py b/tests/utils/lttngtest/lttng.py index 2679fd1a6..9ccea6b63 100644 --- a/tests/utils/lttngtest/lttng.py +++ b/tests/utils/lttngtest/lttng.py @@ -19,11 +19,13 @@ Implementation of the lttngctl interface based on the `lttng` command line clien class Unsupported(lttngctl.ControlException): - def __init__(self, msg: str): + def __init__(self, msg): + # type: (str) -> None super().__init__(msg) -def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str: +def _get_domain_option_name(domain): + # type: (lttngctl.TracingDomain) -> str if domain == lttngctl.TracingDomain.User: return "userspace" elif domain == lttngctl.TracingDomain.Kernel: @@ -38,7 +40,8 @@ def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str: raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client") -def _get_context_type_name(context: lttngctl.ContextType) -> str: +def _get_context_type_name(context): + # type: (lttngctl.ContextType) -> str if isinstance(context, lttngctl.VgidContextType): return "vgid" elif isinstance(context, lttngctl.VuidContextType): @@ -60,17 +63,18 @@ def _get_context_type_name(context: lttngctl.ContextType) -> str: class _Channel(lttngctl.Channel): def __init__( self, - client: "LTTngClient", - name: str, - domain: lttngctl.TracingDomain, - session: "_Session", + client, # type: LTTngClient + name, # type: str + domain, # type: lttngctl.TracingDomain + session, # type: _Session ): - self._client: LTTngClient = client - self._name: str = name - self._domain: lttngctl.TracingDomain = domain - self._session: _Session = session + self._client = client # type: LTTngClient + self._name = name # type: str + self._domain = domain # type: lttngctl.TracingDomain + self._session = session # type: _Session - def add_context(self, context_type: lttngctl.ContextType) -> None: + def add_context(self, context_type): + # type: (lttngctl.ContextType) -> None domain_option_name = _get_domain_option_name(self.domain) context_type_name = _get_context_type_name(context_type) self._client._run_cmd( @@ -80,7 +84,8 @@ class _Channel(lttngctl.Channel): ) ) - def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None: + def add_recording_rule(self, rule): + # type: (Type[lttngctl.EventRule]) -> None client_args = ( "enable-event --session {session_name} --channel {channel_name}".format( session_name=self._session.name, channel_name=self.name @@ -136,11 +141,13 @@ class _Channel(lttngctl.Channel): self._client._run_cmd(client_args) @property - def name(self) -> str: + def name(self): + # type: () -> str return self._name @property - def domain(self) -> lttngctl.TracingDomain: + def domain(self): + # type: () -> lttngctl.TracingDomain return self._domain @@ -157,7 +164,8 @@ class _ProcessAttribute(enum.Enum): return "<%s.%s>" % (self.__class__.__name__, self.name) -def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str: +def _get_process_attribute_option_name(attribute): + # type: (_ProcessAttribute) -> str return { _ProcessAttribute.PID: "pid", _ProcessAttribute.VPID: "vpid", @@ -171,21 +179,22 @@ def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str: class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker): def __init__( self, - client: "LTTngClient", - attribute: _ProcessAttribute, - domain: lttngctl.TracingDomain, - session: "_Session", + client, # type: LTTngClient + attribute, # type: _ProcessAttribute + domain, # type: lttngctl.TracingDomain + session, # type: _Session ): - self._client: LTTngClient = client - self._tracked_attribute: _ProcessAttribute = attribute - self._domain: lttngctl.TracingDomain = domain - self._session: "_Session" = session + self._client = client # type: LTTngClient + self._tracked_attribute = attribute # type: _ProcessAttribute + self._domain = domain # type: lttngctl.TracingDomain + self._session = session # type: _Session if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID: - self._allowed_value_types: list[type] = [int, str] + self._allowed_value_types = [int, str] # type: list[type] else: - self._allowed_value_types: list[type] = [int] + self._allowed_value_types = [int] # type: list[type] - def _call_client(self, cmd_name: str, value: Union[int, str]) -> None: + def _call_client(self, cmd_name, value): + # type: (str, Union[int, str]) -> None if type(value) not in self._allowed_value_types: raise TypeError( "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format( @@ -208,31 +217,33 @@ class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker): ) ) - def track(self, value: Union[int, str]) -> None: + def track(self, value): + # type: (Union[int, str]) -> None self._call_client("track", value) - def untrack(self, value: Union[int, str]) -> None: + def untrack(self, value): + # type: (Union[int, str]) -> None self._call_client("untrack", value) class _Session(lttngctl.Session): def __init__( self, - client: "LTTngClient", - name: str, - output: Optional[Type[lttngctl.SessionOutputLocation]], + client, # type: LTTngClient + name, # type: str + output, # type: Optional[lttngctl.SessionOutputLocation] ): - self._client: LTTngClient = client - self._name: str = name - self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output + self._client = client # type: LTTngClient + self._name = name # type: str + self._output = output # type: Optional[lttngctl.SessionOutputLocation] @property - def name(self) -> str: + def name(self): + # type: () -> str return self._name - def add_channel( - self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None - ) -> lttngctl.Channel: + def add_channel(self, domain, channel_name=None): + # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel channel_name = lttngctl.Channel._generate_name() domain_option_name = _get_domain_option_name(domain) self._client._run_cmd( @@ -242,81 +253,81 @@ class _Session(lttngctl.Session): ) return _Channel(self._client, channel_name, domain, self) - def add_context(self, context_type: lttngctl.ContextType) -> None: + def add_context(self, context_type): + # type: (lttngctl.ContextType) -> None pass @property - def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]: - return self._output + def output(self): + # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]" + return self._output # type: ignore - def start(self) -> None: + def start(self): + # type: () -> None self._client._run_cmd("start {session_name}".format(session_name=self.name)) - def stop(self) -> None: + def stop(self): + # type: () -> None self._client._run_cmd("stop {session_name}".format(session_name=self.name)) - def destroy(self) -> None: + def destroy(self): + # type: () -> None self._client._run_cmd("destroy {session_name}".format(session_name=self.name)) @property - def kernel_pid_process_attribute_tracker( - self, - ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]: + def kernel_pid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def kernel_vpid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]: + def kernel_vpid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def user_vpid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]: + def user_vpid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore @property - def kernel_gid_process_attribute_tracker( - self, - ) -> Type[lttngctl.GroupIDProcessAttributeTracker]: + def kernel_gid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def kernel_vgid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]: + def kernel_vgid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def user_vgid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]: + def user_vgid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore @property - def kernel_uid_process_attribute_tracker( - self, - ) -> Type[lttngctl.UserIDProcessAttributeTracker]: + def kernel_uid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.UserIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def kernel_vuid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]: + def kernel_vuid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def user_vuid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]: + def user_vuid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore class LTTngClientError(lttngctl.ControlException): - def __init__(self, command_args: str, error_output: str): - self._command_args: str = command_args - self._output: str = error_output + def __init__( + self, + command_args, # type: str + error_output, # type: str + ): + self._command_args = command_args # type: str + self._output = error_output # type: str class LTTngClient(logger._Logger, lttngctl.Controller): @@ -326,23 +337,24 @@ class LTTngClient(logger._Logger, lttngctl.Controller): def __init__( self, - test_environment: environment._Environment, - log: Optional[Callable[[str], None]], + test_environment, # type: environment._Environment + log, # type: Optional[Callable[[str], None]] ): logger._Logger.__init__(self, log) - self._environment: environment._Environment = test_environment + self._environment = test_environment # type: environment._Environment - def _run_cmd(self, command_args: str) -> None: + def _run_cmd(self, command_args): + # type: (str) -> None """ Invoke the `lttng` client with a set of arguments. The command is executed in the context of the client's test environment. """ - args: list[str] = [str(self._environment.lttng_client_path)] + args = [str(self._environment.lttng_client_path)] # type: list[str] args.extend(shlex.split(command_args)) self._log("lttng {command_args}".format(command_args=command_args)) - client_env: dict[str, str] = os.environ.copy() + client_env = os.environ.copy() # type: dict[str, str] client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location) process = subprocess.Popen( @@ -357,11 +369,8 @@ class LTTngClient(logger._Logger, lttngctl.Controller): self._log(error_line) raise LTTngClientError(command_args, decoded_output) - def create_session( - self, - name: Optional[str] = None, - output: Optional[lttngctl.SessionOutputLocation] = None, - ) -> lttngctl.Session: + def create_session(self, name=None, output=None): + # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session name = name if name else lttngctl.Session._generate_name() if isinstance(output, lttngctl.LocalSessionOutputLocation): diff --git a/tests/utils/lttngtest/lttngctl.py b/tests/utils/lttngtest/lttngctl.py index 0301b6e09..b034308f3 100644 --- a/tests/utils/lttngtest/lttngctl.py +++ b/tests/utils/lttngtest/lttngctl.py @@ -22,7 +22,8 @@ control functionality that is used by tests. """ -def _generate_random_string(length: int) -> str: +def _generate_random_string(length): + # type: (int) -> str return "".join( random.choice(string.ascii_lowercase + string.digits) for _ in range(length) ) @@ -55,16 +56,22 @@ class VgidContextType(ContextType): class JavaApplicationContextType(ContextType): """A java application-specific context field is a piece of state which the application provides.""" - def __init__(self, retriever_name: str, field_name: str): - self._retriever_name: str = retriever_name - self._field_name: str = field_name + def __init__( + self, + retriever_name, # type: str + field_name, # type: str + ): + self._retriever_name = retriever_name # type: str + self._field_name = field_name # type: str @property - def retriever_name(self) -> str: + def retriever_name(self): + # type: () -> str return self._retriever_name @property - def field_name(self) -> str: + def field_name(self): + # type: () -> str return self._field_name @@ -93,60 +100,70 @@ class LogLevelRule: class LogLevelRuleAsSevereAs(LogLevelRule): - def __init__(self, level: int): + def __init__(self, level): + # type: (int) self._level = level @property - def level(self) -> int: + def level(self): + # type: () -> int return self._level class LogLevelRuleExactly(LogLevelRule): - def __init__(self, level: int): + def __init__(self, level): + # type: (int) self._level = level @property - def level(self) -> int: + def level(self): + # type: () -> int return self._level class TracepointEventRule(EventRule): def __init__( self, - name_pattern: Optional[str] = None, - filter_expression: Optional[str] = None, - log_level_rule: Optional[LogLevelRule] = None, - name_pattern_exclusions: Optional[List[str]] = None, + name_pattern=None, # type: Optional[str] + filter_expression=None, # type: Optional[str] + log_level_rule=None, # type: Optional[LogLevelRule] + name_pattern_exclusions=None, # type: Optional[List[str]] ): - self._name_pattern: Optional[str] = name_pattern - self._filter_expression: Optional[str] = filter_expression - self._log_level_rule: Optional[LogLevelRule] = log_level_rule - self._name_pattern_exclusions: Optional[List[str]] = name_pattern_exclusions + self._name_pattern = name_pattern # type: Optional[str] + self._filter_expression = filter_expression # type: Optional[str] + self._log_level_rule = log_level_rule # type: Optional[LogLevelRule] + self._name_pattern_exclusions = ( + name_pattern_exclusions + ) # type: Optional[List[str]] @property - def name_pattern(self) -> Optional[str]: + def name_pattern(self): + # type: () -> Optional[str] return self._name_pattern @property - def filter_expression(self) -> Optional[str]: + def filter_expression(self): + # type: () -> Optional[str] return self._filter_expression @property - def log_level_rule(self) -> Optional[LogLevelRule]: + def log_level_rule(self): + # type: () -> Optional[LogLevelRule] return self._log_level_rule @property - def name_pattern_exclusions(self) -> Optional[List[str]]: + def name_pattern_exclusions(self): + # type: () -> Optional[List[str]] return self._name_pattern_exclusions class UserTracepointEventRule(TracepointEventRule): def __init__( self, - name_pattern: Optional[str] = None, - filter_expression: Optional[str] = None, - log_level_rule: Optional[LogLevelRule] = None, - name_pattern_exclusions: Optional[List[str]] = None, + name_pattern=None, # type: Optional[str] + filter_expression=None, # type: Optional[str] + log_level_rule=None, # type: Optional[LogLevelRule] + name_pattern_exclusions=None, # type: Optional[List[str]] ): TracepointEventRule.__init__(**locals()) @@ -154,10 +171,10 @@ class UserTracepointEventRule(TracepointEventRule): class KernelTracepointEventRule(TracepointEventRule): def __init__( self, - name_pattern: Optional[str] = None, - filter_expression: Optional[str] = None, - log_level_rule: Optional[LogLevelRule] = None, - name_pattern_exclusions: Optional[List[str]] = None, + name_pattern=None, # type: Optional[str] + filter_expression=None, # type: Optional[str] + log_level_rule=None, # type: Optional[LogLevelRule] + name_pattern_exclusions=None, # type: Optional[List[str]] ): TracepointEventRule.__init__(**locals()) @@ -169,25 +186,30 @@ class Channel(abc.ABC): """ @staticmethod - def _generate_name() -> str: + def _generate_name(): + # type: () -> str return "channel_{random_id}".format(random_id=_generate_random_string(8)) @abc.abstractmethod - def add_context(self, context_type: ContextType) -> None: + def add_context(self, context_type): + # type: (ContextType) -> None pass @property @abc.abstractmethod - def domain(self) -> TracingDomain: + def domain(self): + # type: () -> TracingDomain pass @property @abc.abstractmethod - def name(self) -> str: + def name(self): + # type: () -> str pass @abc.abstractmethod - def add_recording_rule(self, rule: Type[EventRule]) -> None: + def add_recording_rule(self, rule) -> None: + # type: (Type[EventRule]) -> None pass @@ -196,11 +218,13 @@ class SessionOutputLocation(abc.ABC): class LocalSessionOutputLocation(SessionOutputLocation): - def __init__(self, trace_path: pathlib.Path): + def __init__(self, trace_path): + # type: (pathlib.Path) self._path = trace_path @property - def path(self) -> pathlib.Path: + def path(self): + # type: () -> pathlib.Path return self._path @@ -226,167 +250,180 @@ class ProcessAttributeTracker(abc.ABC): def __repr__(self): return "<%s.%s>" % (self.__class__.__name__, self.name) - def __init__(self, policy: TrackingPolicy): + def __init__(self, policy): + # type: (TrackingPolicy) self._policy = policy @property - def tracking_policy(self) -> TrackingPolicy: + def tracking_policy(self): + # type: () -> TrackingPolicy return self._policy class ProcessIDProcessAttributeTracker(ProcessAttributeTracker): @abc.abstractmethod - def track(self, pid: int) -> None: + def track(self, pid): + # type: (int) -> None pass @abc.abstractmethod - def untrack(self, pid: int) -> None: + def untrack(self, pid): + # type: (int) -> None pass class VirtualProcessIDProcessAttributeTracker(ProcessAttributeTracker): @abc.abstractmethod - def track(self, vpid: int) -> None: + def track(self, vpid): + # type: (int) -> None pass @abc.abstractmethod - def untrack(self, vpid: int) -> None: + def untrack(self, vpid): + # type: (int) -> None pass class UserIDProcessAttributeTracker(ProcessAttributeTracker): @abc.abstractmethod - def track(self, uid: Union[int, str]) -> None: + def track(self, uid): + # type: (Union[int, str]) -> None pass @abc.abstractmethod - def untrack(self, uid: Union[int, str]) -> None: + def untrack(self, uid): + # type: (Union[int, str]) -> None pass class VirtualUserIDProcessAttributeTracker(ProcessAttributeTracker): @abc.abstractmethod - def track(self, vuid: Union[int, str]) -> None: + def track(self, vuid): + # type: (Union[int, str]) -> None pass @abc.abstractmethod - def untrack(self, vuid: Union[int, str]) -> None: + def untrack(self, vuid): + # type: (Union[int, str]) -> None pass class GroupIDProcessAttributeTracker(ProcessAttributeTracker): @abc.abstractmethod - def track(self, gid: Union[int, str]) -> None: + def track(self, gid): + # type: (Union[int, str]) -> None pass @abc.abstractmethod - def untrack(self, gid: Union[int, str]) -> None: + def untrack(self, gid): + # type: (Union[int, str]) -> None pass class VirtualGroupIDProcessAttributeTracker(ProcessAttributeTracker): @abc.abstractmethod - def track(self, vgid: Union[int, str]) -> None: + def track(self, vgid): + # type: (Union[int, str]) -> None pass @abc.abstractmethod - def untrack(self, vgid: Union[int, str]) -> None: + def untrack(self, vgid): + # type: (Union[int, str]) -> None pass class Session(abc.ABC): @staticmethod - def _generate_name() -> str: + def _generate_name(): + # type: () -> str return "session_{random_id}".format(random_id=_generate_random_string(8)) @property @abc.abstractmethod - def name(self) -> str: + def name(self): + # type: () -> str pass @property @abc.abstractmethod - def output(self) -> Optional[Type[SessionOutputLocation]]: + def output(self): + # type: () -> Optional[Type[SessionOutputLocation]] pass @abc.abstractmethod - def add_channel( - self, domain: TracingDomain, channel_name: Optional[str] = None - ) -> Channel: + def add_channel(self, domain, channel_name=None): + # type: (TracingDomain, Optional[str]) -> Channel """Add a channel with default attributes to the session.""" pass @abc.abstractmethod - def start(self) -> None: + def start(self): + # type: () -> None pass @abc.abstractmethod - def stop(self) -> None: + def stop(self): + # type: () -> None pass @abc.abstractmethod - def destroy(self) -> None: + def destroy(self): + # type: () -> None pass @abc.abstractproperty - def kernel_pid_process_attribute_tracker( - self, - ) -> Type[ProcessIDProcessAttributeTracker]: + def kernel_pid_process_attribute_tracker(self): + # type: () -> Type[ProcessIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty - def kernel_vpid_process_attribute_tracker( - self, - ) -> Type[VirtualProcessIDProcessAttributeTracker]: + def kernel_vpid_process_attribute_tracker(self): + # type: () -> Type[VirtualProcessIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty def user_vpid_process_attribute_tracker( self, ) -> Type[VirtualProcessIDProcessAttributeTracker]: + # type: () -> Type[VirtualProcessIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty - def kernel_gid_process_attribute_tracker( - self, - ) -> Type[GroupIDProcessAttributeTracker]: + def kernel_gid_process_attribute_tracker(self): + # type: () -> Type[GroupIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty - def kernel_vgid_process_attribute_tracker( - self, - ) -> Type[VirtualGroupIDProcessAttributeTracker]: + def kernel_vgid_process_attribute_tracker(self): + # type: () -> Type[VirtualGroupIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty - def user_vgid_process_attribute_tracker( - self, - ) -> Type[VirtualGroupIDProcessAttributeTracker]: + def user_vgid_process_attribute_tracker(self): + # type: () -> Type[VirtualGroupIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty - def kernel_uid_process_attribute_tracker( - self, - ) -> Type[UserIDProcessAttributeTracker]: + def kernel_uid_process_attribute_tracker(self): + # type: () -> Type[UserIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty - def kernel_vuid_process_attribute_tracker( - self, - ) -> Type[VirtualUserIDProcessAttributeTracker]: + def kernel_vuid_process_attribute_tracker(self): + # type: () -> Type[VirtualUserIDProcessAttributeTracker] raise NotImplementedError @abc.abstractproperty - def user_vuid_process_attribute_tracker( - self, - ) -> Type[VirtualUserIDProcessAttributeTracker]: + def user_vuid_process_attribute_tracker(self): + # type: () -> Type[VirtualUserIDProcessAttributeTracker] raise NotImplementedError class ControlException(RuntimeError): """Base type for exceptions thrown by a controller.""" - def __init__(self, msg: str): + def __init__(self, msg): + # type: (str) super().__init__(msg) @@ -398,9 +435,8 @@ class Controller(abc.ABC): """ @abc.abstractmethod - def create_session( - self, name: Optional[str] = None, output: Optional[SessionOutputLocation] = None - ) -> Session: + def create_session(self, name=None, output=None): + # type: (Optional[str], Optional[SessionOutputLocation]) -> Session """ Create a session with an output. Don't specify an output to create a session without an output. diff --git a/tests/utils/lttngtest/tap_generator.py b/tests/utils/lttngtest/tap_generator.py index c28e87d36..39c6bda90 100644 --- a/tests/utils/lttngtest/tap_generator.py +++ b/tests/utils/lttngtest/tap_generator.py @@ -7,57 +7,69 @@ import contextlib import sys -from typing import Optional +from typing import Iterator, Optional class InvalidTestPlan(RuntimeError): - def __init__(self, msg: str): + def __init__(self, msg): + # type: (str) -> None super().__init__(msg) class BailOut(RuntimeError): - def __init__(self, msg: str): + def __init__(self, msg): + # type: (str) -> None super().__init__(msg) class TestCase: - def __init__(self, tap_generator: "TapGenerator", description: str): - self._tap_generator = tap_generator - self._result: Optional[bool] = None - self._description = description + def __init__( + self, + tap_generator, # type: "TapGenerator" + description, # type: str + ): + self._tap_generator = tap_generator # type: "TapGenerator" + self._result = None # type: Optional[bool] + self._description = description # type: str @property - def result(self) -> Optional[bool]: + def result(self): + # type: () -> Optional[bool] return self._result @property - def description(self) -> str: + def description(self): + # type: () -> str return self._description - def _set_result(self, result: bool) -> None: + def _set_result(self, result): + # type: (bool) -> None if self._result is not None: raise RuntimeError("Can't set test case result twice") self._result = result self._tap_generator.test(result, self._description) - def success(self) -> None: + def success(self): + # type: () -> None self._set_result(True) - def fail(self) -> None: + def fail(self): + # type: () -> None self._set_result(False) # Produces a test execution report in the TAP format. class TapGenerator: - def __init__(self, total_test_count: int): + def __init__(self, total_test_count): + # type: (int) -> None if total_test_count <= 0: raise ValueError("Test count must be greater than zero") - self._total_test_count: int = total_test_count - self._last_test_case_id: int = 0 - self._printed_plan: bool = False - self._has_failure: bool = False + self._total_test_count = total_test_count # type: int + self._last_test_case_id = 0 # type: int + self._printed_plan = False # type: bool + self._has_failure = False # type: bool def __del__(self): if self.remaining_test_cases > 0: @@ -68,10 +80,12 @@ class TapGenerator: ) @property - def remaining_test_cases(self) -> int: + def remaining_test_cases(self): + # type: () -> int return self._total_test_count - self._last_test_case_id - def _print(self, msg: str) -> None: + def _print(self, msg): + # type: (str) -> None if not self._printed_plan: print( "1..{total_test_count}".format(total_test_count=self._total_test_count), @@ -81,7 +95,8 @@ class TapGenerator: print(msg, flush=True) - def skip_all(self, reason) -> None: + def skip_all(self, reason): + # type: (str) -> None if self._last_test_case_id != 0: raise RuntimeError("Can't skip all tests after running test cases") @@ -90,7 +105,8 @@ class TapGenerator: self._last_test_case_id = self._total_test_count - def skip(self, reason, skip_count: int = 1) -> None: + def skip(self, reason, skip_count=1): + # type: (str, int) -> None for i in range(skip_count): self._last_test_case_id = self._last_test_case_id + 1 self._print( @@ -99,12 +115,14 @@ class TapGenerator: ) ) - def bail_out(self, reason: str) -> None: + def bail_out(self, reason): + # type: (str) -> None self._print("Bail out! {reason}".format(reason=reason)) self._last_test_case_id = self._total_test_count raise BailOut(reason) - def test(self, result: bool, description: str) -> None: + def test(self, result, description): + # type: (bool, str) -> None if self._last_test_case_id == self._total_test_count: raise InvalidTestPlan("Executing too many tests") @@ -121,20 +139,24 @@ class TapGenerator: ) ) - def ok(self, description: str) -> None: + def ok(self, description): + # type: (str) -> None self.test(True, description) - def fail(self, description: str) -> None: + def fail(self, description): + # type: (str) -> None self.test(False, description) @property - def is_successful(self) -> bool: + def is_successful(self): + # type: () -> bool return ( self._last_test_case_id == self._total_test_count and not self._has_failure ) @contextlib.contextmanager - def case(self, description: str): + def case(self, description): + # type: (str) -> Iterator[TestCase] test_case = TestCase(self, description) try: yield test_case @@ -153,5 +175,6 @@ class TapGenerator: if test_case.result is None: test_case.success() - def diagnostic(self, msg) -> None: + def diagnostic(self, msg): + # type: (str) -> None print("# {msg}".format(msg=msg), file=sys.stderr, flush=True) -- 2.34.1