Tests: python: use quoted annotations to support python <= 3.6
authorMichael Jeanson <mjeanson@efficios.com>
Mon, 27 Mar 2023 19:25:23 +0000 (15:25 -0400)
committerMichael Jeanson <mjeanson@efficios.com>
Tue, 28 Mar 2023 18:58:38 +0000 (14:58 -0400)
Python 3.0 introduced syntax for parameter and return type annotations,
as specified in PEP 484. Python 3.6 introduced support for variable type
annotations, as specified in PEP 526.

Upstream recommends using quoted annotations for librairies that need to
support older python versions.

See https://typing.readthedocs.io/en/latest/source/libraries.html#compatibility-with-older-python-versions

Change-Id: Ifc6017a7baa9f0589e85d32005ac9ead18c1c9fb
Signed-off-by: Michael Jeanson <mjeanson@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
tests/regression/tools/context/test_ust.py
tests/utils/lttngtest/environment.py
tests/utils/lttngtest/logger.py
tests/utils/lttngtest/lttng.py
tests/utils/lttngtest/lttngctl.py
tests/utils/lttngtest/tap_generator.py

index ab2e743e3570d94b89371514d9372ff9bf6c2b2e..b90291fa5b8130be0b8a423b3fb3bbbff03cc9c7 100755 (executable)
@@ -29,7 +29,8 @@ import lttngtest
 import bt2
 
 
-def context_trace_field_name(context_type: Type[lttngtest.ContextType]) -> str:
+def context_trace_field_name(context_type):
+    # type: (Type[lttngtest.ContextType]) -> str
     if isinstance(context_type, lttngtest.VpidContextType):
         return "vpid"
     elif isinstance(context_type, lttngtest.VuidContextType):
@@ -46,8 +47,9 @@ def context_trace_field_name(context_type: Type[lttngtest.ContextType]) -> str:
 
 
 def trace_stream_class_has_context_field_in_event_context(
-    trace_location: pathlib.Path, context_field_name: str
-) -> bool:
+    trace_location, context_field_name
+):
+    # type: (pathlib.Path, str) -> bool
     iterator = bt2.TraceCollectionMessageIterator(str(trace_location))
 
     # A bt2 message sequence is guaranteed to begin with a StreamBeginningMessage.
@@ -67,9 +69,8 @@ def trace_stream_class_has_context_field_in_event_context(
     return context_field_name in event_common_context_field_class
 
 
-def trace_events_have_context_value(
-    trace_location: pathlib.Path, context_field_name: str, value: Any
-) -> bool:
+def trace_events_have_context_value(trace_location, context_field_name, value):
+    # type: (pathlib.Path, str, Any) -> bool
     for msg in bt2.TraceCollectionMessageIterator(str(trace_location)):
         if type(msg) is not bt2._EventMessageConst:
             continue
@@ -80,12 +81,8 @@ def trace_events_have_context_value(
     return True
 
 
-def test_static_context(
-    tap: lttngtest.TapGenerator,
-    test_env: lttngtest._Environment,
-    context_type: lttngtest.ContextType,
-    context_value_retriever: Callable[[lttngtest.WaitTraceTestApplication], Any],
-) -> None:
+def test_static_context(tap, test_env, context_type, context_value_retriever):
+    # type: (lttngtest.TapGenerator, lttngtest._Environment, lttngtest.ContextType, Callable[[lttngtest.WaitTraceTestApplication], Any]) -> None
     tap.diagnostic(
         "Test presence and expected value of context `{context_name}`".format(
             context_name=type(context_type).__name__
index d9777810bdeb90fa7c4b7563598c43289304a8f6..e71e588df80dfdfcfd14290547cf73cad77c2716 100644 (file)
@@ -6,7 +6,7 @@
 #
 
 from types import FrameType
-from typing import Callable, Optional, Tuple, List
+from typing import Callable, Iterator, Optional, Tuple, List
 import sys
 import pathlib
 import signal
@@ -23,14 +23,16 @@ import contextlib
 
 
 class TemporaryDirectory:
-    def __init__(self, prefix: str):
+    def __init__(self, prefix):
+        # type: (str) -> None
         self._directory_path = tempfile.mkdtemp(prefix=prefix)
 
     def __del__(self):
         shutil.rmtree(self._directory_path, ignore_errors=True)
 
     @property
-    def path(self) -> pathlib.Path:
+    def path(self):
+        # type: () -> pathlib.Path
         return pathlib.Path(self._directory_path)
 
 
@@ -49,9 +51,13 @@ class _SignalWaitQueue:
     """
 
     def __init__(self):
-        self._queue: queue.Queue = queue.Queue()
+        self._queue = queue.Queue()  # type: queue.Queue
 
-    def signal(self, signal_number, frame: Optional[FrameType]):
+    def signal(
+        self,
+        signal_number,
+        frame,  # type: Optional[FrameType]
+    ):
         self._queue.put_nowait(signal_number)
 
     def wait_for_signal(self):
@@ -67,18 +73,18 @@ class WaitTraceTestApplication:
 
     def __init__(
         self,
-        binary_path: pathlib.Path,
-        event_count: int,
-        environment: "Environment",
-        wait_time_between_events_us: int = 0,
+        binary_path,  # type: pathlib.Path
+        event_count,  # type: int
+        environment,  # type: Environment
+        wait_time_between_events_us=0,  # type: int
     ):
-        self._environment: Environment = environment
+        self._environment = environment  # type: Environment
         if event_count % 5:
             # The test application currently produces 5 different events per iteration.
             raise ValueError("event count must be a multiple of 5")
-        self._iteration_count: int = int(event_count / 5)
+        self._iteration_count = int(event_count / 5)  # type: int
         # File that the application will wait to see before tracing its events.
-        self._app_start_tracing_file_path: pathlib.Path = pathlib.Path(
+        self._app_start_tracing_file_path = pathlib.Path(
             tempfile.mktemp(
                 prefix="app_",
                 suffix="_start_tracing",
@@ -98,7 +104,7 @@ class WaitTraceTestApplication:
             prefix="app_",
             suffix="_ready",
             dir=self._compat_open_path(environment.lttng_home_location),
-        )
+        )  # type: str
 
         test_app_args = [str(binary_path)]
         test_app_args.extend(
@@ -112,10 +118,10 @@ class WaitTraceTestApplication:
             )
         )
 
-        self._process: subprocess.Popen = subprocess.Popen(
+        self._process = subprocess.Popen(
             test_app_args,
             env=test_app_env,
-        )
+        )  # type: subprocess.Popen
 
         # Wait for the application to create the file indicating it has fully
         # initialized. Make sure the app hasn't crashed in order to not wait
@@ -134,7 +140,8 @@ class WaitTraceTestApplication:
 
             time.sleep(0.1)
 
-    def trace(self) -> None:
+    def trace(self):
+        # type: () -> None
         if self._process.poll() is not None:
             # Application has unexepectedly returned.
             raise RuntimeError(
@@ -144,7 +151,8 @@ class WaitTraceTestApplication:
             )
         open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
 
-    def wait_for_exit(self) -> None:
+    def wait_for_exit(self):
+        # type: () -> None
         if self._process.wait() != 0:
             raise RuntimeError(
                 "Test application has exit with return code `{return_code}`".format(
@@ -154,12 +162,13 @@ class WaitTraceTestApplication:
         self._has_returned = True
 
     @property
-    def vpid(self) -> int:
+    def vpid(self):
+        # type: () -> int
         return self._process.pid
 
     @staticmethod
     def _compat_open_path(path):
-        # type: (pathlib.Path)
+        # type: (pathlib.Path) -> pathlib.Path | str
         """
         The builtin open() in python >= 3.6 expects a path-like object while
         prior versions expect a string or bytes object. Return the correct type
@@ -180,14 +189,18 @@ class WaitTraceTestApplication:
 
 class ProcessOutputConsumer(threading.Thread, logger._Logger):
     def __init__(
-        self, process: subprocess.Popen, name: str, log: Callable[[str], None]
+        self,
+        process,  # type: subprocess.Popen
+        name,  # type: str
+        log,  # type: Callable[[str], None]
     ):
         threading.Thread.__init__(self)
         self._prefix = name
         logger._Logger.__init__(self, log)
         self._process = process
 
-    def run(self) -> None:
+    def run(self):
+        # type: () -> None
         while self._process.poll() is None:
             assert self._process.stdout
             line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
@@ -198,7 +211,9 @@ class ProcessOutputConsumer(threading.Thread, logger._Logger):
 # Generate a temporary environment in which to execute a test.
 class _Environment(logger._Logger):
     def __init__(
-        self, with_sessiond: bool, log: Optional[Callable[[str], None]] = None
+        self,
+        with_sessiond,  # type: bool
+        log=None,  # type: Optional[Callable[[str], None]]
     ):
         super().__init__(log)
         signal.signal(signal.SIGTERM, self._handle_termination_signal)
@@ -206,26 +221,31 @@ class _Environment(logger._Logger):
 
         # Assumes the project's hierarchy to this file is:
         # tests/utils/python/this_file
-        self._project_root: pathlib.Path = pathlib.Path(__file__).absolute().parents[3]
-        self._lttng_home: Optional[TemporaryDirectory] = TemporaryDirectory(
+        self._project_root = (
+            pathlib.Path(__file__).absolute().parents[3]
+        )  # type: pathlib.Path
+        self._lttng_home = TemporaryDirectory(
             "lttng_test_env_home"
-        )
+        )  # type: Optional[TemporaryDirectory]
 
-        self._sessiond: Optional[subprocess.Popen[bytes]] = (
+        self._sessiond = (
             self._launch_lttng_sessiond() if with_sessiond else None
-        )
+        )  # type: Optional[subprocess.Popen[bytes]]
 
     @property
-    def lttng_home_location(self) -> pathlib.Path:
+    def lttng_home_location(self):
+        # type: () -> pathlib.Path
         if self._lttng_home is None:
             raise RuntimeError("Attempt to access LTTng home after clean-up")
         return self._lttng_home.path
 
     @property
-    def lttng_client_path(self) -> pathlib.Path:
+    def lttng_client_path(self):
+        # type: () -> pathlib.Path
         return self._project_root / "src" / "bin" / "lttng" / "lttng"
 
-    def create_temporary_directory(self, prefix: Optional[str] = None) -> pathlib.Path:
+    def create_temporary_directory(self, prefix=None):
+        # type: (Optional[str]) -> pathlib.Path
         # Simply return a path that is contained within LTTNG_HOME; it will
         # be destroyed when the temporary home goes out of scope.
         assert self._lttng_home
@@ -239,7 +259,8 @@ class _Environment(logger._Logger):
     # Unpack a list of environment variables from a string
     # such as "HELLO=is_it ME='/you/are/looking/for'"
     @staticmethod
-    def _unpack_env_vars(env_vars_string: str) -> List[Tuple[str, str]]:
+    def _unpack_env_vars(env_vars_string):
+        # type: (str) -> List[Tuple[str, str]]
         unpacked_vars = []
         for var in shlex.split(env_vars_string):
             equal_position = var.find("=")
@@ -258,7 +279,8 @@ class _Environment(logger._Logger):
 
         return unpacked_vars
 
-    def _launch_lttng_sessiond(self) -> Optional[subprocess.Popen]:
+    def _launch_lttng_sessiond(self):
+        # type: () -> Optional[subprocess.Popen]
         is_64bits_host = sys.maxsize > 2**32
 
         sessiond_path = (
@@ -315,9 +337,9 @@ class _Environment(logger._Logger):
         )
 
         if self._logging_function:
-            self._sessiond_output_consumer: Optional[
-                ProcessOutputConsumer
-            ] = ProcessOutputConsumer(process, "lttng-sessiond", self._logging_function)
+            self._sessiond_output_consumer = ProcessOutputConsumer(
+                process, "lttng-sessiond", self._logging_function
+            )  # type: Optional[ProcessOutputConsumer]
             self._sessiond_output_consumer.daemon = True
             self._sessiond_output_consumer.start()
 
@@ -327,9 +349,8 @@ class _Environment(logger._Logger):
 
         return process
 
-    def _handle_termination_signal(
-        self, signal_number: int, frame: Optional[FrameType]
-    ) -> None:
+    def _handle_termination_signal(self, signal_number, frame):
+        # type: (int, Optional[FrameType]) -> None
         self._log(
             "Killed by {signal_name} signal, cleaning-up".format(
                 signal_name=signal.strsignal(signal_number)
@@ -337,9 +358,8 @@ class _Environment(logger._Logger):
         )
         self._cleanup()
 
-    def launch_wait_trace_test_application(
-        self, event_count: int
-    ) -> WaitTraceTestApplication:
+    def launch_wait_trace_test_application(self, event_count):
+        # type: (int) -> WaitTraceTestApplication
         """
         Launch an application that will wait before tracing `event_count` events.
         """
@@ -355,7 +375,8 @@ class _Environment(logger._Logger):
         )
 
     # Clean-up managed processes
-    def _cleanup(self) -> None:
+    def _cleanup(self):
+        # type: () -> None
         if self._sessiond and self._sessiond.poll() is None:
             # The session daemon is alive; kill it.
             self._log(
@@ -380,7 +401,8 @@ class _Environment(logger._Logger):
 
 
 @contextlib.contextmanager
-def test_environment(with_sessiond: bool, log: Optional[Callable[[str], None]] = None):
+def test_environment(with_sessiond, log=None):
+    # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment]
     env = _Environment(with_sessiond, log)
     try:
         yield env
index 9f90ec06ab1a775b48a2023d22e411dad6b7eddb..b0363d1ab511027966a883b29c1313414dbcaa1b 100644 (file)
@@ -8,13 +8,16 @@ from typing import Callable, Optional
 
 
 class _Logger:
-    def __init__(self, log: Optional[Callable[[str], None]]):
-        self._logging_function: Optional[Callable[[str], None]] = log
+    def __init__(self, log):
+        # type: (Optional[Callable[[str], None]]) -> None
+        self._logging_function = log  # type: Optional[Callable[[str], None]]
 
-    def _log(self, msg: str) -> None:
+    def _log(self, msg):
+        # type: (str) -> None
         if self._logging_function:
             self._logging_function(msg)
 
     @property
-    def logger(self) -> Optional[Callable[[str], None]]:
+    def logger(self):
+        # type: () -> Optional[Callable[[str], None]]
         return self._logging_function
index 2679fd1a663b3f18a44df0c8d5d1845caf39f315..9ccea6b63f2b9325ef6ecd51362358f80a80ec60 100644 (file)
@@ -19,11 +19,13 @@ Implementation of the lttngctl interface based on the `lttng` command line clien
 
 
 class Unsupported(lttngctl.ControlException):
-    def __init__(self, msg: str):
+    def __init__(self, msg):
+        # type: (str) -> None
         super().__init__(msg)
 
 
-def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
+def _get_domain_option_name(domain):
+    # type: (lttngctl.TracingDomain) -> str
     if domain == lttngctl.TracingDomain.User:
         return "userspace"
     elif domain == lttngctl.TracingDomain.Kernel:
@@ -38,7 +40,8 @@ def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
         raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
 
 
-def _get_context_type_name(context: lttngctl.ContextType) -> str:
+def _get_context_type_name(context):
+    # type: (lttngctl.ContextType) -> str
     if isinstance(context, lttngctl.VgidContextType):
         return "vgid"
     elif isinstance(context, lttngctl.VuidContextType):
@@ -60,17 +63,18 @@ def _get_context_type_name(context: lttngctl.ContextType) -> str:
 class _Channel(lttngctl.Channel):
     def __init__(
         self,
-        client: "LTTngClient",
-        name: str,
-        domain: lttngctl.TracingDomain,
-        session: "_Session",
+        client,  # type: LTTngClient
+        name,  # type: str
+        domain,  # type: lttngctl.TracingDomain
+        session,  # type: _Session
     ):
-        self._client: LTTngClient = client
-        self._name: str = name
-        self._domain: lttngctl.TracingDomain = domain
-        self._session: _Session = session
+        self._client = client  # type: LTTngClient
+        self._name = name  # type: str
+        self._domain = domain  # type: lttngctl.TracingDomain
+        self._session = session  # type: _Session
 
-    def add_context(self, context_type: lttngctl.ContextType) -> None:
+    def add_context(self, context_type):
+        # type: (lttngctl.ContextType) -> None
         domain_option_name = _get_domain_option_name(self.domain)
         context_type_name = _get_context_type_name(context_type)
         self._client._run_cmd(
@@ -80,7 +84,8 @@ class _Channel(lttngctl.Channel):
             )
         )
 
-    def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None:
+    def add_recording_rule(self, rule):
+        # type: (Type[lttngctl.EventRule]) -> None
         client_args = (
             "enable-event --session {session_name} --channel {channel_name}".format(
                 session_name=self._session.name, channel_name=self.name
@@ -136,11 +141,13 @@ class _Channel(lttngctl.Channel):
         self._client._run_cmd(client_args)
 
     @property
-    def name(self) -> str:
+    def name(self):
+        # type: () -> str
         return self._name
 
     @property
-    def domain(self) -> lttngctl.TracingDomain:
+    def domain(self):
+        # type: () -> lttngctl.TracingDomain
         return self._domain
 
 
@@ -157,7 +164,8 @@ class _ProcessAttribute(enum.Enum):
         return "<%s.%s>" % (self.__class__.__name__, self.name)
 
 
-def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
+def _get_process_attribute_option_name(attribute):
+    # type: (_ProcessAttribute) -> str
     return {
         _ProcessAttribute.PID: "pid",
         _ProcessAttribute.VPID: "vpid",
@@ -171,21 +179,22 @@ def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
 class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
     def __init__(
         self,
-        client: "LTTngClient",
-        attribute: _ProcessAttribute,
-        domain: lttngctl.TracingDomain,
-        session: "_Session",
+        client,  # type: LTTngClient
+        attribute,  # type: _ProcessAttribute
+        domain,  # type: lttngctl.TracingDomain
+        session,  # type: _Session
     ):
-        self._client: LTTngClient = client
-        self._tracked_attribute: _ProcessAttribute = attribute
-        self._domain: lttngctl.TracingDomain = domain
-        self._session: "_Session" = session
+        self._client = client  # type: LTTngClient
+        self._tracked_attribute = attribute  # type: _ProcessAttribute
+        self._domain = domain  # type: lttngctl.TracingDomain
+        self._session = session  # type: _Session
         if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
-            self._allowed_value_types: list[type] = [int, str]
+            self._allowed_value_types = [int, str]  # type: list[type]
         else:
-            self._allowed_value_types: list[type] = [int]
+            self._allowed_value_types = [int]  # type: list[type]
 
-    def _call_client(self, cmd_name: str, value: Union[int, str]) -> None:
+    def _call_client(self, cmd_name, value):
+        # type: (str, Union[int, str]) -> None
         if type(value) not in self._allowed_value_types:
             raise TypeError(
                 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
@@ -208,31 +217,33 @@ class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
             )
         )
 
-    def track(self, value: Union[int, str]) -> None:
+    def track(self, value):
+        # type: (Union[int, str]) -> None
         self._call_client("track", value)
 
-    def untrack(self, value: Union[int, str]) -> None:
+    def untrack(self, value):
+        # type: (Union[int, str]) -> None
         self._call_client("untrack", value)
 
 
 class _Session(lttngctl.Session):
     def __init__(
         self,
-        client: "LTTngClient",
-        name: str,
-        output: Optional[Type[lttngctl.SessionOutputLocation]],
+        client,  # type: LTTngClient
+        name,  # type: str
+        output,  # type: Optional[lttngctl.SessionOutputLocation]
     ):
-        self._client: LTTngClient = client
-        self._name: str = name
-        self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output
+        self._client = client  # type: LTTngClient
+        self._name = name  # type: str
+        self._output = output  # type: Optional[lttngctl.SessionOutputLocation]
 
     @property
-    def name(self) -> str:
+    def name(self):
+        # type: () -> str
         return self._name
 
-    def add_channel(
-        self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None
-    ) -> lttngctl.Channel:
+    def add_channel(self, domain, channel_name=None):
+        # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
         channel_name = lttngctl.Channel._generate_name()
         domain_option_name = _get_domain_option_name(domain)
         self._client._run_cmd(
@@ -242,81 +253,81 @@ class _Session(lttngctl.Session):
         )
         return _Channel(self._client, channel_name, domain, self)
 
-    def add_context(self, context_type: lttngctl.ContextType) -> None:
+    def add_context(self, context_type):
+        # type: (lttngctl.ContextType) -> None
         pass
 
     @property
-    def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]:
-        return self._output
+    def output(self):
+        # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
+        return self._output  # type: ignore
 
-    def start(self) -> None:
+    def start(self):
+        # type: () -> None
         self._client._run_cmd("start {session_name}".format(session_name=self.name))
 
-    def stop(self) -> None:
+    def stop(self):
+        # type: () -> None
         self._client._run_cmd("stop {session_name}".format(session_name=self.name))
 
-    def destroy(self) -> None:
+    def destroy(self):
+        # type: () -> None
         self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
 
     @property
-    def kernel_pid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]:
+    def kernel_pid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def kernel_vpid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+    def kernel_vpid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def user_vpid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+    def user_vpid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self)  # type: ignore
 
     @property
-    def kernel_gid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.GroupIDProcessAttributeTracker]:
+    def kernel_gid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def kernel_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+    def kernel_vgid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def user_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+    def user_vgid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self)  # type: ignore
 
     @property
-    def kernel_uid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.UserIDProcessAttributeTracker]:
+    def kernel_uid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def kernel_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+    def kernel_vuid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def user_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+    def user_vuid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self)  # type: ignore
 
 
 class LTTngClientError(lttngctl.ControlException):
-    def __init__(self, command_args: str, error_output: str):
-        self._command_args: str = command_args
-        self._output: str = error_output
+    def __init__(
+        self,
+        command_args,  # type: str
+        error_output,  # type: str
+    ):
+        self._command_args = command_args  # type: str
+        self._output = error_output  # type: str
 
 
 class LTTngClient(logger._Logger, lttngctl.Controller):
@@ -326,23 +337,24 @@ class LTTngClient(logger._Logger, lttngctl.Controller):
 
     def __init__(
         self,
-        test_environment: environment._Environment,
-        log: Optional[Callable[[str], None]],
+        test_environment,  # type: environment._Environment
+        log,  # type: Optional[Callable[[str], None]]
     ):
         logger._Logger.__init__(self, log)
-        self._environment: environment._Environment = test_environment
+        self._environment = test_environment  # type: environment._Environment
 
-    def _run_cmd(self, command_args: str) -> None:
+    def _run_cmd(self, command_args):
+        # type: (str) -> None
         """
         Invoke the `lttng` client with a set of arguments. The command is
         executed in the context of the client's test environment.
         """
-        args: list[str] = [str(self._environment.lttng_client_path)]
+        args = [str(self._environment.lttng_client_path)]  # type: list[str]
         args.extend(shlex.split(command_args))
 
         self._log("lttng {command_args}".format(command_args=command_args))
 
-        client_env: dict[str, str] = os.environ.copy()
+        client_env = os.environ.copy()  # type: dict[str, str]
         client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
 
         process = subprocess.Popen(
@@ -357,11 +369,8 @@ class LTTngClient(logger._Logger, lttngctl.Controller):
                 self._log(error_line)
             raise LTTngClientError(command_args, decoded_output)
 
-    def create_session(
-        self,
-        name: Optional[str] = None,
-        output: Optional[lttngctl.SessionOutputLocation] = None,
-    ) -> lttngctl.Session:
+    def create_session(self, name=None, output=None):
+        # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
         name = name if name else lttngctl.Session._generate_name()
 
         if isinstance(output, lttngctl.LocalSessionOutputLocation):
index 0301b6e093afd1748e28f49d9547a7dd6116a7fb..b034308f3685b3de30b135c8168addd414594aa7 100644 (file)
@@ -22,7 +22,8 @@ control functionality that is used by tests.
 """
 
 
-def _generate_random_string(length: int) -> str:
+def _generate_random_string(length):
+    # type: (int) -> str
     return "".join(
         random.choice(string.ascii_lowercase + string.digits) for _ in range(length)
     )
@@ -55,16 +56,22 @@ class VgidContextType(ContextType):
 class JavaApplicationContextType(ContextType):
     """A java application-specific context field is a piece of state which the application provides."""
 
-    def __init__(self, retriever_name: str, field_name: str):
-        self._retriever_name: str = retriever_name
-        self._field_name: str = field_name
+    def __init__(
+        self,
+        retriever_name,  # type: str
+        field_name,  # type: str
+    ):
+        self._retriever_name = retriever_name  # type: str
+        self._field_name = field_name  # type: str
 
     @property
-    def retriever_name(self) -> str:
+    def retriever_name(self):
+        # type: () -> str
         return self._retriever_name
 
     @property
-    def field_name(self) -> str:
+    def field_name(self):
+        # type: () -> str
         return self._field_name
 
 
@@ -93,60 +100,70 @@ class LogLevelRule:
 
 
 class LogLevelRuleAsSevereAs(LogLevelRule):
-    def __init__(self, level: int):
+    def __init__(self, level):
+        # type: (int)
         self._level = level
 
     @property
-    def level(self) -> int:
+    def level(self):
+        # type: () -> int
         return self._level
 
 
 class LogLevelRuleExactly(LogLevelRule):
-    def __init__(self, level: int):
+    def __init__(self, level):
+        # type: (int)
         self._level = level
 
     @property
-    def level(self) -> int:
+    def level(self):
+        # type: () -> int
         return self._level
 
 
 class TracepointEventRule(EventRule):
     def __init__(
         self,
-        name_pattern: Optional[str] = None,
-        filter_expression: Optional[str] = None,
-        log_level_rule: Optional[LogLevelRule] = None,
-        name_pattern_exclusions: Optional[List[str]] = None,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
+        log_level_rule=None,  # type: Optional[LogLevelRule]
+        name_pattern_exclusions=None,  # type: Optional[List[str]]
     ):
-        self._name_pattern: Optional[str] = name_pattern
-        self._filter_expression: Optional[str] = filter_expression
-        self._log_level_rule: Optional[LogLevelRule] = log_level_rule
-        self._name_pattern_exclusions: Optional[List[str]] = name_pattern_exclusions
+        self._name_pattern = name_pattern  # type: Optional[str]
+        self._filter_expression = filter_expression  # type: Optional[str]
+        self._log_level_rule = log_level_rule  # type: Optional[LogLevelRule]
+        self._name_pattern_exclusions = (
+            name_pattern_exclusions
+        )  # type: Optional[List[str]]
 
     @property
-    def name_pattern(self) -> Optional[str]:
+    def name_pattern(self):
+        # type: () -> Optional[str]
         return self._name_pattern
 
     @property
-    def filter_expression(self) -> Optional[str]:
+    def filter_expression(self):
+        # type: () -> Optional[str]
         return self._filter_expression
 
     @property
-    def log_level_rule(self) -> Optional[LogLevelRule]:
+    def log_level_rule(self):
+        # type: () -> Optional[LogLevelRule]
         return self._log_level_rule
 
     @property
-    def name_pattern_exclusions(self) -> Optional[List[str]]:
+    def name_pattern_exclusions(self):
+        # type: () -> Optional[List[str]]
         return self._name_pattern_exclusions
 
 
 class UserTracepointEventRule(TracepointEventRule):
     def __init__(
         self,
-        name_pattern: Optional[str] = None,
-        filter_expression: Optional[str] = None,
-        log_level_rule: Optional[LogLevelRule] = None,
-        name_pattern_exclusions: Optional[List[str]] = None,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
+        log_level_rule=None,  # type: Optional[LogLevelRule]
+        name_pattern_exclusions=None,  # type: Optional[List[str]]
     ):
         TracepointEventRule.__init__(**locals())
 
@@ -154,10 +171,10 @@ class UserTracepointEventRule(TracepointEventRule):
 class KernelTracepointEventRule(TracepointEventRule):
     def __init__(
         self,
-        name_pattern: Optional[str] = None,
-        filter_expression: Optional[str] = None,
-        log_level_rule: Optional[LogLevelRule] = None,
-        name_pattern_exclusions: Optional[List[str]] = None,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
+        log_level_rule=None,  # type: Optional[LogLevelRule]
+        name_pattern_exclusions=None,  # type: Optional[List[str]]
     ):
         TracepointEventRule.__init__(**locals())
 
@@ -169,25 +186,30 @@ class Channel(abc.ABC):
     """
 
     @staticmethod
-    def _generate_name() -> str:
+    def _generate_name():
+        # type: () -> str
         return "channel_{random_id}".format(random_id=_generate_random_string(8))
 
     @abc.abstractmethod
-    def add_context(self, context_type: ContextType) -> None:
+    def add_context(self, context_type):
+        # type: (ContextType) -> None
         pass
 
     @property
     @abc.abstractmethod
-    def domain(self) -> TracingDomain:
+    def domain(self):
+        # type: () -> TracingDomain
         pass
 
     @property
     @abc.abstractmethod
-    def name(self) -> str:
+    def name(self):
+        # type: () -> str
         pass
 
     @abc.abstractmethod
-    def add_recording_rule(self, rule: Type[EventRule]) -> None:
+    def add_recording_rule(self, rule) -> None:
+        # type: (Type[EventRule]) -> None
         pass
 
 
@@ -196,11 +218,13 @@ class SessionOutputLocation(abc.ABC):
 
 
 class LocalSessionOutputLocation(SessionOutputLocation):
-    def __init__(self, trace_path: pathlib.Path):
+    def __init__(self, trace_path):
+        # type: (pathlib.Path)
         self._path = trace_path
 
     @property
-    def path(self) -> pathlib.Path:
+    def path(self):
+        # type: () -> pathlib.Path
         return self._path
 
 
@@ -226,167 +250,180 @@ class ProcessAttributeTracker(abc.ABC):
         def __repr__(self):
             return "<%s.%s>" % (self.__class__.__name__, self.name)
 
-    def __init__(self, policy: TrackingPolicy):
+    def __init__(self, policy):
+        # type: (TrackingPolicy)
         self._policy = policy
 
     @property
-    def tracking_policy(self) -> TrackingPolicy:
+    def tracking_policy(self):
+        # type: () -> TrackingPolicy
         return self._policy
 
 
 class ProcessIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, pid: int) -> None:
+    def track(self, pid):
+        # type: (int) -> None
         pass
 
     @abc.abstractmethod
-    def untrack(self, pid: int) -> None:
+    def untrack(self, pid):
+        # type: (int) -> None
         pass
 
 
 class VirtualProcessIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, vpid: int) -> None:
+    def track(self, vpid):
+        # type: (int) -> None
         pass
 
     @abc.abstractmethod
-    def untrack(self, vpid: int) -> None:
+    def untrack(self, vpid):
+        # type: (int) -> None
         pass
 
 
 class UserIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, uid: Union[int, str]) -> None:
+    def track(self, uid):
+        # type: (Union[int, str]) -> None
         pass
 
     @abc.abstractmethod
-    def untrack(self, uid: Union[int, str]) -> None:
+    def untrack(self, uid):
+        # type: (Union[int, str]) -> None
         pass
 
 
 class VirtualUserIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, vuid: Union[int, str]) -> None:
+    def track(self, vuid):
+        # type: (Union[int, str]) -> None
         pass
 
     @abc.abstractmethod
-    def untrack(self, vuid: Union[int, str]) -> None:
+    def untrack(self, vuid):
+        # type: (Union[int, str]) -> None
         pass
 
 
 class GroupIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, gid: Union[int, str]) -> None:
+    def track(self, gid):
+        # type: (Union[int, str]) -> None
         pass
 
     @abc.abstractmethod
-    def untrack(self, gid: Union[int, str]) -> None:
+    def untrack(self, gid):
+        # type: (Union[int, str]) -> None
         pass
 
 
 class VirtualGroupIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, vgid: Union[int, str]) -> None:
+    def track(self, vgid):
+        # type: (Union[int, str]) -> None
         pass
 
     @abc.abstractmethod
-    def untrack(self, vgid: Union[int, str]) -> None:
+    def untrack(self, vgid):
+        # type: (Union[int, str]) -> None
         pass
 
 
 class Session(abc.ABC):
     @staticmethod
-    def _generate_name() -> str:
+    def _generate_name():
+        # type: () -> str
         return "session_{random_id}".format(random_id=_generate_random_string(8))
 
     @property
     @abc.abstractmethod
-    def name(self) -> str:
+    def name(self):
+        # type: () -> str
         pass
 
     @property
     @abc.abstractmethod
-    def output(self) -> Optional[Type[SessionOutputLocation]]:
+    def output(self):
+        # type: () -> Optional[Type[SessionOutputLocation]]
         pass
 
     @abc.abstractmethod
-    def add_channel(
-        self, domain: TracingDomain, channel_name: Optional[str] = None
-    ) -> Channel:
+    def add_channel(self, domain, channel_name=None):
+        # type: (TracingDomain, Optional[str]) -> Channel
         """Add a channel with default attributes to the session."""
         pass
 
     @abc.abstractmethod
-    def start(self) -> None:
+    def start(self):
+        # type: () -> None
         pass
 
     @abc.abstractmethod
-    def stop(self) -> None:
+    def stop(self):
+        # type: () -> None
         pass
 
     @abc.abstractmethod
-    def destroy(self) -> None:
+    def destroy(self):
+        # type: () -> None
         pass
 
     @abc.abstractproperty
-    def kernel_pid_process_attribute_tracker(
-        self,
-    ) -> Type[ProcessIDProcessAttributeTracker]:
+    def kernel_pid_process_attribute_tracker(self):
+        # type: () -> Type[ProcessIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_vpid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualProcessIDProcessAttributeTracker]:
+    def kernel_vpid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
     def user_vpid_process_attribute_tracker(
         self,
     ) -> Type[VirtualProcessIDProcessAttributeTracker]:
+        # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_gid_process_attribute_tracker(
-        self,
-    ) -> Type[GroupIDProcessAttributeTracker]:
+    def kernel_gid_process_attribute_tracker(self):
+        # type: () -> Type[GroupIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+    def kernel_vgid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def user_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+    def user_vgid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_uid_process_attribute_tracker(
-        self,
-    ) -> Type[UserIDProcessAttributeTracker]:
+    def kernel_uid_process_attribute_tracker(self):
+        # type: () -> Type[UserIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualUserIDProcessAttributeTracker]:
+    def kernel_vuid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualUserIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def user_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualUserIDProcessAttributeTracker]:
+    def user_vuid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualUserIDProcessAttributeTracker]
         raise NotImplementedError
 
 
 class ControlException(RuntimeError):
     """Base type for exceptions thrown by a controller."""
 
-    def __init__(self, msg: str):
+    def __init__(self, msg):
+        # type: (str)
         super().__init__(msg)
 
 
@@ -398,9 +435,8 @@ class Controller(abc.ABC):
     """
 
     @abc.abstractmethod
-    def create_session(
-        self, name: Optional[str] = None, output: Optional[SessionOutputLocation] = None
-    ) -> Session:
+    def create_session(self, name=None, output=None):
+        # type: (Optional[str], Optional[SessionOutputLocation]) -> Session
         """
         Create a session with an output. Don't specify an output
         to create a session without an output.
index c28e87d365a64609f2e9b8874fd769cad3000c30..39c6bda903093c60222bdb8d3a7b3f067d3d7b43 100644 (file)
@@ -7,57 +7,69 @@
 
 import contextlib
 import sys
-from typing import Optional
+from typing import Iterator, Optional
 
 
 class InvalidTestPlan(RuntimeError):
-    def __init__(self, msg: str):
+    def __init__(self, msg):
+        # type: (str) -> None
         super().__init__(msg)
 
 
 class BailOut(RuntimeError):
-    def __init__(self, msg: str):
+    def __init__(self, msg):
+        # type: (str) -> None
         super().__init__(msg)
 
 
 class TestCase:
-    def __init__(self, tap_generator: "TapGenerator", description: str):
-        self._tap_generator = tap_generator
-        self._result: Optional[bool] = None
-        self._description = description
+    def __init__(
+        self,
+        tap_generator,  # type: "TapGenerator"
+        description,  # type: str
+    ):
+        self._tap_generator = tap_generator  # type: "TapGenerator"
+        self._result = None  # type: Optional[bool]
+        self._description = description  # type: str
 
     @property
-    def result(self) -> Optional[bool]:
+    def result(self):
+        # type: () -> Optional[bool]
         return self._result
 
     @property
-    def description(self) -> str:
+    def description(self):
+        # type: () -> str
         return self._description
 
-    def _set_result(self, result: bool) -> None:
+    def _set_result(self, result):
+        # type: (bool) -> None
         if self._result is not None:
             raise RuntimeError("Can't set test case result twice")
 
         self._result = result
         self._tap_generator.test(result, self._description)
 
-    def success(self) -> None:
+    def success(self):
+        # type: () -> None
         self._set_result(True)
 
-    def fail(self) -> None:
+    def fail(self):
+        # type: () -> None
         self._set_result(False)
 
 
 # Produces a test execution report in the TAP format.
 class TapGenerator:
-    def __init__(self, total_test_count: int):
+    def __init__(self, total_test_count):
+        # type: (int) -> None
         if total_test_count <= 0:
             raise ValueError("Test count must be greater than zero")
 
-        self._total_test_count: int = total_test_count
-        self._last_test_case_id: int = 0
-        self._printed_plan: bool = False
-        self._has_failure: bool = False
+        self._total_test_count = total_test_count  # type: int
+        self._last_test_case_id = 0  # type: int
+        self._printed_plan = False  # type: bool
+        self._has_failure = False  # type: bool
 
     def __del__(self):
         if self.remaining_test_cases > 0:
@@ -68,10 +80,12 @@ class TapGenerator:
             )
 
     @property
-    def remaining_test_cases(self) -> int:
+    def remaining_test_cases(self):
+        # type: () -> int
         return self._total_test_count - self._last_test_case_id
 
-    def _print(self, msg: str) -> None:
+    def _print(self, msg):
+        # type: (str) -> None
         if not self._printed_plan:
             print(
                 "1..{total_test_count}".format(total_test_count=self._total_test_count),
@@ -81,7 +95,8 @@ class TapGenerator:
 
         print(msg, flush=True)
 
-    def skip_all(self, reason) -> None:
+    def skip_all(self, reason):
+        # type: (str) -> None
         if self._last_test_case_id != 0:
             raise RuntimeError("Can't skip all tests after running test cases")
 
@@ -90,7 +105,8 @@ class TapGenerator:
 
         self._last_test_case_id = self._total_test_count
 
-    def skip(self, reason, skip_count: int = 1) -> None:
+    def skip(self, reason, skip_count=1):
+        # type: (str, int) -> None
         for i in range(skip_count):
             self._last_test_case_id = self._last_test_case_id + 1
             self._print(
@@ -99,12 +115,14 @@ class TapGenerator:
                 )
             )
 
-    def bail_out(self, reason: str) -> None:
+    def bail_out(self, reason):
+        # type: (str) -> None
         self._print("Bail out! {reason}".format(reason=reason))
         self._last_test_case_id = self._total_test_count
         raise BailOut(reason)
 
-    def test(self, result: bool, description: str) -> None:
+    def test(self, result, description):
+        # type: (bool, str) -> None
         if self._last_test_case_id == self._total_test_count:
             raise InvalidTestPlan("Executing too many tests")
 
@@ -121,20 +139,24 @@ class TapGenerator:
             )
         )
 
-    def ok(self, description: str) -> None:
+    def ok(self, description):
+        # type: (str) -> None
         self.test(True, description)
 
-    def fail(self, description: str) -> None:
+    def fail(self, description):
+        # type: (str) -> None
         self.test(False, description)
 
     @property
-    def is_successful(self) -> bool:
+    def is_successful(self):
+        # type: () -> bool
         return (
             self._last_test_case_id == self._total_test_count and not self._has_failure
         )
 
     @contextlib.contextmanager
-    def case(self, description: str):
+    def case(self, description):
+        # type: (str) -> Iterator[TestCase]
         test_case = TestCase(self, description)
         try:
             yield test_case
@@ -153,5 +175,6 @@ class TapGenerator:
             if test_case.result is None:
                 test_case.success()
 
-    def diagnostic(self, msg) -> None:
+    def diagnostic(self, msg):
+        # type: (str) -> None
         print("# {msg}".format(msg=msg), file=sys.stderr, flush=True)
This page took 0.041636 seconds and 4 git commands to generate.