Python 3.0 introduced syntax for parameter and return type annotations,
as specified in PEP 484. Python 3.6 introduced support for variable type
annotations, as specified in PEP 526.
Upstream recommends using quoted annotations for librairies that need to
support older python versions.
See https://typing.readthedocs.io/en/latest/source/libraries.html#compatibility-with-older-python-versions
Change-Id: Ifc6017a7baa9f0589e85d32005ac9ead18c1c9fb
Signed-off-by: Michael Jeanson <mjeanson@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
-def context_trace_field_name(context_type: Type[lttngtest.ContextType]) -> str:
+def context_trace_field_name(context_type):
+ # type: (Type[lttngtest.ContextType]) -> str
if isinstance(context_type, lttngtest.VpidContextType):
return "vpid"
elif isinstance(context_type, lttngtest.VuidContextType):
if isinstance(context_type, lttngtest.VpidContextType):
return "vpid"
elif isinstance(context_type, lttngtest.VuidContextType):
def trace_stream_class_has_context_field_in_event_context(
def trace_stream_class_has_context_field_in_event_context(
- trace_location: pathlib.Path, context_field_name: str
-) -> bool:
+ trace_location, context_field_name
+):
+ # type: (pathlib.Path, str) -> bool
iterator = bt2.TraceCollectionMessageIterator(str(trace_location))
# A bt2 message sequence is guaranteed to begin with a StreamBeginningMessage.
iterator = bt2.TraceCollectionMessageIterator(str(trace_location))
# A bt2 message sequence is guaranteed to begin with a StreamBeginningMessage.
return context_field_name in event_common_context_field_class
return context_field_name in event_common_context_field_class
-def trace_events_have_context_value(
- trace_location: pathlib.Path, context_field_name: str, value: Any
-) -> bool:
+def trace_events_have_context_value(trace_location, context_field_name, value):
+ # type: (pathlib.Path, str, Any) -> bool
for msg in bt2.TraceCollectionMessageIterator(str(trace_location)):
if type(msg) is not bt2._EventMessageConst:
continue
for msg in bt2.TraceCollectionMessageIterator(str(trace_location)):
if type(msg) is not bt2._EventMessageConst:
continue
-def test_static_context(
- tap: lttngtest.TapGenerator,
- test_env: lttngtest._Environment,
- context_type: lttngtest.ContextType,
- context_value_retriever: Callable[[lttngtest.WaitTraceTestApplication], Any],
-) -> None:
+def test_static_context(tap, test_env, context_type, context_value_retriever):
+ # type: (lttngtest.TapGenerator, lttngtest._Environment, lttngtest.ContextType, Callable[[lttngtest.WaitTraceTestApplication], Any]) -> None
tap.diagnostic(
"Test presence and expected value of context `{context_name}`".format(
context_name=type(context_type).__name__
tap.diagnostic(
"Test presence and expected value of context `{context_name}`".format(
context_name=type(context_type).__name__
#
from types import FrameType
#
from types import FrameType
-from typing import Callable, Optional, Tuple, List
+from typing import Callable, Iterator, Optional, Tuple, List
import sys
import pathlib
import signal
import sys
import pathlib
import signal
class TemporaryDirectory:
class TemporaryDirectory:
- def __init__(self, prefix: str):
+ def __init__(self, prefix):
+ # type: (str) -> None
self._directory_path = tempfile.mkdtemp(prefix=prefix)
def __del__(self):
shutil.rmtree(self._directory_path, ignore_errors=True)
@property
self._directory_path = tempfile.mkdtemp(prefix=prefix)
def __del__(self):
shutil.rmtree(self._directory_path, ignore_errors=True)
@property
- def path(self) -> pathlib.Path:
+ def path(self):
+ # type: () -> pathlib.Path
return pathlib.Path(self._directory_path)
return pathlib.Path(self._directory_path)
- self._queue: queue.Queue = queue.Queue()
+ self._queue = queue.Queue() # type: queue.Queue
- def signal(self, signal_number, frame: Optional[FrameType]):
+ def signal(
+ self,
+ signal_number,
+ frame, # type: Optional[FrameType]
+ ):
self._queue.put_nowait(signal_number)
def wait_for_signal(self):
self._queue.put_nowait(signal_number)
def wait_for_signal(self):
- binary_path: pathlib.Path,
- event_count: int,
- environment: "Environment",
- wait_time_between_events_us: int = 0,
+ binary_path, # type: pathlib.Path
+ event_count, # type: int
+ environment, # type: Environment
+ wait_time_between_events_us=0, # type: int
- self._environment: Environment = environment
+ self._environment = environment # type: Environment
if event_count % 5:
# The test application currently produces 5 different events per iteration.
raise ValueError("event count must be a multiple of 5")
if event_count % 5:
# The test application currently produces 5 different events per iteration.
raise ValueError("event count must be a multiple of 5")
- self._iteration_count: int = int(event_count / 5)
+ self._iteration_count = int(event_count / 5) # type: int
# File that the application will wait to see before tracing its events.
# File that the application will wait to see before tracing its events.
- self._app_start_tracing_file_path: pathlib.Path = pathlib.Path(
+ self._app_start_tracing_file_path = pathlib.Path(
tempfile.mktemp(
prefix="app_",
suffix="_start_tracing",
tempfile.mktemp(
prefix="app_",
suffix="_start_tracing",
prefix="app_",
suffix="_ready",
dir=self._compat_open_path(environment.lttng_home_location),
prefix="app_",
suffix="_ready",
dir=self._compat_open_path(environment.lttng_home_location),
test_app_args = [str(binary_path)]
test_app_args.extend(
test_app_args = [str(binary_path)]
test_app_args.extend(
- self._process: subprocess.Popen = subprocess.Popen(
+ self._process = subprocess.Popen(
test_app_args,
env=test_app_env,
test_app_args,
env=test_app_env,
+ ) # type: subprocess.Popen
# Wait for the application to create the file indicating it has fully
# initialized. Make sure the app hasn't crashed in order to not wait
# Wait for the application to create the file indicating it has fully
# initialized. Make sure the app hasn't crashed in order to not wait
- def trace(self) -> None:
+ def trace(self):
+ # type: () -> None
if self._process.poll() is not None:
# Application has unexepectedly returned.
raise RuntimeError(
if self._process.poll() is not None:
# Application has unexepectedly returned.
raise RuntimeError(
)
open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
)
open(self._compat_open_path(self._app_start_tracing_file_path), mode="x")
- def wait_for_exit(self) -> None:
+ def wait_for_exit(self):
+ # type: () -> None
if self._process.wait() != 0:
raise RuntimeError(
"Test application has exit with return code `{return_code}`".format(
if self._process.wait() != 0:
raise RuntimeError(
"Test application has exit with return code `{return_code}`".format(
self._has_returned = True
@property
self._has_returned = True
@property
+ def vpid(self):
+ # type: () -> int
return self._process.pid
@staticmethod
def _compat_open_path(path):
return self._process.pid
@staticmethod
def _compat_open_path(path):
+ # type: (pathlib.Path) -> pathlib.Path | str
"""
The builtin open() in python >= 3.6 expects a path-like object while
prior versions expect a string or bytes object. Return the correct type
"""
The builtin open() in python >= 3.6 expects a path-like object while
prior versions expect a string or bytes object. Return the correct type
class ProcessOutputConsumer(threading.Thread, logger._Logger):
def __init__(
class ProcessOutputConsumer(threading.Thread, logger._Logger):
def __init__(
- self, process: subprocess.Popen, name: str, log: Callable[[str], None]
+ self,
+ process, # type: subprocess.Popen
+ name, # type: str
+ log, # type: Callable[[str], None]
):
threading.Thread.__init__(self)
self._prefix = name
logger._Logger.__init__(self, log)
self._process = process
):
threading.Thread.__init__(self)
self._prefix = name
logger._Logger.__init__(self, log)
self._process = process
+ def run(self):
+ # type: () -> None
while self._process.poll() is None:
assert self._process.stdout
line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
while self._process.poll() is None:
assert self._process.stdout
line = self._process.stdout.readline().decode("utf-8").replace("\n", "")
# Generate a temporary environment in which to execute a test.
class _Environment(logger._Logger):
def __init__(
# Generate a temporary environment in which to execute a test.
class _Environment(logger._Logger):
def __init__(
- self, with_sessiond: bool, log: Optional[Callable[[str], None]] = None
+ self,
+ with_sessiond, # type: bool
+ log=None, # type: Optional[Callable[[str], None]]
):
super().__init__(log)
signal.signal(signal.SIGTERM, self._handle_termination_signal)
):
super().__init__(log)
signal.signal(signal.SIGTERM, self._handle_termination_signal)
# Assumes the project's hierarchy to this file is:
# tests/utils/python/this_file
# Assumes the project's hierarchy to this file is:
# tests/utils/python/this_file
- self._project_root: pathlib.Path = pathlib.Path(__file__).absolute().parents[3]
- self._lttng_home: Optional[TemporaryDirectory] = TemporaryDirectory(
+ self._project_root = (
+ pathlib.Path(__file__).absolute().parents[3]
+ ) # type: pathlib.Path
+ self._lttng_home = TemporaryDirectory(
+ ) # type: Optional[TemporaryDirectory]
- self._sessiond: Optional[subprocess.Popen[bytes]] = (
self._launch_lttng_sessiond() if with_sessiond else None
self._launch_lttng_sessiond() if with_sessiond else None
+ ) # type: Optional[subprocess.Popen[bytes]]
- def lttng_home_location(self) -> pathlib.Path:
+ def lttng_home_location(self):
+ # type: () -> pathlib.Path
if self._lttng_home is None:
raise RuntimeError("Attempt to access LTTng home after clean-up")
return self._lttng_home.path
@property
if self._lttng_home is None:
raise RuntimeError("Attempt to access LTTng home after clean-up")
return self._lttng_home.path
@property
- def lttng_client_path(self) -> pathlib.Path:
+ def lttng_client_path(self):
+ # type: () -> pathlib.Path
return self._project_root / "src" / "bin" / "lttng" / "lttng"
return self._project_root / "src" / "bin" / "lttng" / "lttng"
- def create_temporary_directory(self, prefix: Optional[str] = None) -> pathlib.Path:
+ def create_temporary_directory(self, prefix=None):
+ # type: (Optional[str]) -> pathlib.Path
# Simply return a path that is contained within LTTNG_HOME; it will
# be destroyed when the temporary home goes out of scope.
assert self._lttng_home
# Simply return a path that is contained within LTTNG_HOME; it will
# be destroyed when the temporary home goes out of scope.
assert self._lttng_home
# Unpack a list of environment variables from a string
# such as "HELLO=is_it ME='/you/are/looking/for'"
@staticmethod
# Unpack a list of environment variables from a string
# such as "HELLO=is_it ME='/you/are/looking/for'"
@staticmethod
- def _unpack_env_vars(env_vars_string: str) -> List[Tuple[str, str]]:
+ def _unpack_env_vars(env_vars_string):
+ # type: (str) -> List[Tuple[str, str]]
unpacked_vars = []
for var in shlex.split(env_vars_string):
equal_position = var.find("=")
unpacked_vars = []
for var in shlex.split(env_vars_string):
equal_position = var.find("=")
- def _launch_lttng_sessiond(self) -> Optional[subprocess.Popen]:
+ def _launch_lttng_sessiond(self):
+ # type: () -> Optional[subprocess.Popen]
is_64bits_host = sys.maxsize > 2**32
sessiond_path = (
is_64bits_host = sys.maxsize > 2**32
sessiond_path = (
)
if self._logging_function:
)
if self._logging_function:
- self._sessiond_output_consumer: Optional[
- ProcessOutputConsumer
- ] = ProcessOutputConsumer(process, "lttng-sessiond", self._logging_function)
+ self._sessiond_output_consumer = ProcessOutputConsumer(
+ process, "lttng-sessiond", self._logging_function
+ ) # type: Optional[ProcessOutputConsumer]
self._sessiond_output_consumer.daemon = True
self._sessiond_output_consumer.start()
self._sessiond_output_consumer.daemon = True
self._sessiond_output_consumer.start()
- def _handle_termination_signal(
- self, signal_number: int, frame: Optional[FrameType]
- ) -> None:
+ def _handle_termination_signal(self, signal_number, frame):
+ # type: (int, Optional[FrameType]) -> None
self._log(
"Killed by {signal_name} signal, cleaning-up".format(
signal_name=signal.strsignal(signal_number)
self._log(
"Killed by {signal_name} signal, cleaning-up".format(
signal_name=signal.strsignal(signal_number)
- def launch_wait_trace_test_application(
- self, event_count: int
- ) -> WaitTraceTestApplication:
+ def launch_wait_trace_test_application(self, event_count):
+ # type: (int) -> WaitTraceTestApplication
"""
Launch an application that will wait before tracing `event_count` events.
"""
"""
Launch an application that will wait before tracing `event_count` events.
"""
)
# Clean-up managed processes
)
# Clean-up managed processes
- def _cleanup(self) -> None:
+ def _cleanup(self):
+ # type: () -> None
if self._sessiond and self._sessiond.poll() is None:
# The session daemon is alive; kill it.
self._log(
if self._sessiond and self._sessiond.poll() is None:
# The session daemon is alive; kill it.
self._log(
@contextlib.contextmanager
@contextlib.contextmanager
-def test_environment(with_sessiond: bool, log: Optional[Callable[[str], None]] = None):
+def test_environment(with_sessiond, log=None):
+ # type: (bool, Optional[Callable[[str], None]]) -> Iterator[_Environment]
env = _Environment(with_sessiond, log)
try:
yield env
env = _Environment(with_sessiond, log)
try:
yield env
- def __init__(self, log: Optional[Callable[[str], None]]):
- self._logging_function: Optional[Callable[[str], None]] = log
+ def __init__(self, log):
+ # type: (Optional[Callable[[str], None]]) -> None
+ self._logging_function = log # type: Optional[Callable[[str], None]]
- def _log(self, msg: str) -> None:
+ def _log(self, msg):
+ # type: (str) -> None
if self._logging_function:
self._logging_function(msg)
@property
if self._logging_function:
self._logging_function(msg)
@property
- def logger(self) -> Optional[Callable[[str], None]]:
+ def logger(self):
+ # type: () -> Optional[Callable[[str], None]]
return self._logging_function
return self._logging_function
class Unsupported(lttngctl.ControlException):
class Unsupported(lttngctl.ControlException):
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str) -> None
-def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
+def _get_domain_option_name(domain):
+ # type: (lttngctl.TracingDomain) -> str
if domain == lttngctl.TracingDomain.User:
return "userspace"
elif domain == lttngctl.TracingDomain.Kernel:
if domain == lttngctl.TracingDomain.User:
return "userspace"
elif domain == lttngctl.TracingDomain.Kernel:
raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
-def _get_context_type_name(context: lttngctl.ContextType) -> str:
+def _get_context_type_name(context):
+ # type: (lttngctl.ContextType) -> str
if isinstance(context, lttngctl.VgidContextType):
return "vgid"
elif isinstance(context, lttngctl.VuidContextType):
if isinstance(context, lttngctl.VgidContextType):
return "vgid"
elif isinstance(context, lttngctl.VuidContextType):
class _Channel(lttngctl.Channel):
def __init__(
self,
class _Channel(lttngctl.Channel):
def __init__(
self,
- client: "LTTngClient",
- name: str,
- domain: lttngctl.TracingDomain,
- session: "_Session",
+ client, # type: LTTngClient
+ name, # type: str
+ domain, # type: lttngctl.TracingDomain
+ session, # type: _Session
- self._client: LTTngClient = client
- self._name: str = name
- self._domain: lttngctl.TracingDomain = domain
- self._session: _Session = session
+ self._client = client # type: LTTngClient
+ self._name = name # type: str
+ self._domain = domain # type: lttngctl.TracingDomain
+ self._session = session # type: _Session
- def add_context(self, context_type: lttngctl.ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (lttngctl.ContextType) -> None
domain_option_name = _get_domain_option_name(self.domain)
context_type_name = _get_context_type_name(context_type)
self._client._run_cmd(
domain_option_name = _get_domain_option_name(self.domain)
context_type_name = _get_context_type_name(context_type)
self._client._run_cmd(
- def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None:
+ def add_recording_rule(self, rule):
+ # type: (Type[lttngctl.EventRule]) -> None
client_args = (
"enable-event --session {session_name} --channel {channel_name}".format(
session_name=self._session.name, channel_name=self.name
client_args = (
"enable-event --session {session_name} --channel {channel_name}".format(
session_name=self._session.name, channel_name=self.name
self._client._run_cmd(client_args)
@property
self._client._run_cmd(client_args)
@property
+ def name(self):
+ # type: () -> str
return self._name
@property
return self._name
@property
- def domain(self) -> lttngctl.TracingDomain:
+ def domain(self):
+ # type: () -> lttngctl.TracingDomain
return "<%s.%s>" % (self.__class__.__name__, self.name)
return "<%s.%s>" % (self.__class__.__name__, self.name)
-def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
+def _get_process_attribute_option_name(attribute):
+ # type: (_ProcessAttribute) -> str
return {
_ProcessAttribute.PID: "pid",
_ProcessAttribute.VPID: "vpid",
return {
_ProcessAttribute.PID: "pid",
_ProcessAttribute.VPID: "vpid",
class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
def __init__(
self,
class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
def __init__(
self,
- client: "LTTngClient",
- attribute: _ProcessAttribute,
- domain: lttngctl.TracingDomain,
- session: "_Session",
+ client, # type: LTTngClient
+ attribute, # type: _ProcessAttribute
+ domain, # type: lttngctl.TracingDomain
+ session, # type: _Session
- self._client: LTTngClient = client
- self._tracked_attribute: _ProcessAttribute = attribute
- self._domain: lttngctl.TracingDomain = domain
- self._session: "_Session" = session
+ self._client = client # type: LTTngClient
+ self._tracked_attribute = attribute # type: _ProcessAttribute
+ self._domain = domain # type: lttngctl.TracingDomain
+ self._session = session # type: _Session
if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
- self._allowed_value_types: list[type] = [int, str]
+ self._allowed_value_types = [int, str] # type: list[type]
- self._allowed_value_types: list[type] = [int]
+ self._allowed_value_types = [int] # type: list[type]
- def _call_client(self, cmd_name: str, value: Union[int, str]) -> None:
+ def _call_client(self, cmd_name, value):
+ # type: (str, Union[int, str]) -> None
if type(value) not in self._allowed_value_types:
raise TypeError(
"Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
if type(value) not in self._allowed_value_types:
raise TypeError(
"Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
- def track(self, value: Union[int, str]) -> None:
+ def track(self, value):
+ # type: (Union[int, str]) -> None
self._call_client("track", value)
self._call_client("track", value)
- def untrack(self, value: Union[int, str]) -> None:
+ def untrack(self, value):
+ # type: (Union[int, str]) -> None
self._call_client("untrack", value)
class _Session(lttngctl.Session):
def __init__(
self,
self._call_client("untrack", value)
class _Session(lttngctl.Session):
def __init__(
self,
- client: "LTTngClient",
- name: str,
- output: Optional[Type[lttngctl.SessionOutputLocation]],
+ client, # type: LTTngClient
+ name, # type: str
+ output, # type: Optional[lttngctl.SessionOutputLocation]
- self._client: LTTngClient = client
- self._name: str = name
- self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output
+ self._client = client # type: LTTngClient
+ self._name = name # type: str
+ self._output = output # type: Optional[lttngctl.SessionOutputLocation]
+ def name(self):
+ # type: () -> str
- def add_channel(
- self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None
- ) -> lttngctl.Channel:
+ def add_channel(self, domain, channel_name=None):
+ # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
channel_name = lttngctl.Channel._generate_name()
domain_option_name = _get_domain_option_name(domain)
self._client._run_cmd(
channel_name = lttngctl.Channel._generate_name()
domain_option_name = _get_domain_option_name(domain)
self._client._run_cmd(
)
return _Channel(self._client, channel_name, domain, self)
)
return _Channel(self._client, channel_name, domain, self)
- def add_context(self, context_type: lttngctl.ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (lttngctl.ContextType) -> None
- def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]:
- return self._output
+ def output(self):
+ # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
+ return self._output # type: ignore
- def start(self) -> None:
+ def start(self):
+ # type: () -> None
self._client._run_cmd("start {session_name}".format(session_name=self.name))
self._client._run_cmd("start {session_name}".format(session_name=self.name))
- def stop(self) -> None:
+ def stop(self):
+ # type: () -> None
self._client._run_cmd("stop {session_name}".format(session_name=self.name))
self._client._run_cmd("stop {session_name}".format(session_name=self.name))
- def destroy(self) -> None:
+ def destroy(self):
+ # type: () -> None
self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
@property
self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
@property
- def kernel_pid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]:
+ def kernel_pid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vpid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+ def kernel_vpid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vpid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+ def user_vpid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
@property
- def kernel_gid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.GroupIDProcessAttributeTracker]:
+ def kernel_gid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vgid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+ def kernel_vgid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vgid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+ def user_vgid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
@property
- def kernel_uid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.UserIDProcessAttributeTracker]:
+ def kernel_uid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def kernel_vuid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+ def kernel_vuid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
@property
- def user_vuid_process_attribute_tracker(
- self,
- ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+ def user_vuid_process_attribute_tracker(self):
+ # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
class LTTngClientError(lttngctl.ControlException):
return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
class LTTngClientError(lttngctl.ControlException):
- def __init__(self, command_args: str, error_output: str):
- self._command_args: str = command_args
- self._output: str = error_output
+ def __init__(
+ self,
+ command_args, # type: str
+ error_output, # type: str
+ ):
+ self._command_args = command_args # type: str
+ self._output = error_output # type: str
class LTTngClient(logger._Logger, lttngctl.Controller):
class LTTngClient(logger._Logger, lttngctl.Controller):
- test_environment: environment._Environment,
- log: Optional[Callable[[str], None]],
+ test_environment, # type: environment._Environment
+ log, # type: Optional[Callable[[str], None]]
):
logger._Logger.__init__(self, log)
):
logger._Logger.__init__(self, log)
- self._environment: environment._Environment = test_environment
+ self._environment = test_environment # type: environment._Environment
- def _run_cmd(self, command_args: str) -> None:
+ def _run_cmd(self, command_args):
+ # type: (str) -> None
"""
Invoke the `lttng` client with a set of arguments. The command is
executed in the context of the client's test environment.
"""
"""
Invoke the `lttng` client with a set of arguments. The command is
executed in the context of the client's test environment.
"""
- args: list[str] = [str(self._environment.lttng_client_path)]
+ args = [str(self._environment.lttng_client_path)] # type: list[str]
args.extend(shlex.split(command_args))
self._log("lttng {command_args}".format(command_args=command_args))
args.extend(shlex.split(command_args))
self._log("lttng {command_args}".format(command_args=command_args))
- client_env: dict[str, str] = os.environ.copy()
+ client_env = os.environ.copy() # type: dict[str, str]
client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
process = subprocess.Popen(
client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
process = subprocess.Popen(
self._log(error_line)
raise LTTngClientError(command_args, decoded_output)
self._log(error_line)
raise LTTngClientError(command_args, decoded_output)
- def create_session(
- self,
- name: Optional[str] = None,
- output: Optional[lttngctl.SessionOutputLocation] = None,
- ) -> lttngctl.Session:
+ def create_session(self, name=None, output=None):
+ # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
name = name if name else lttngctl.Session._generate_name()
if isinstance(output, lttngctl.LocalSessionOutputLocation):
name = name if name else lttngctl.Session._generate_name()
if isinstance(output, lttngctl.LocalSessionOutputLocation):
-def _generate_random_string(length: int) -> str:
+def _generate_random_string(length):
+ # type: (int) -> str
return "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(length)
)
return "".join(
random.choice(string.ascii_lowercase + string.digits) for _ in range(length)
)
class JavaApplicationContextType(ContextType):
"""A java application-specific context field is a piece of state which the application provides."""
class JavaApplicationContextType(ContextType):
"""A java application-specific context field is a piece of state which the application provides."""
- def __init__(self, retriever_name: str, field_name: str):
- self._retriever_name: str = retriever_name
- self._field_name: str = field_name
+ def __init__(
+ self,
+ retriever_name, # type: str
+ field_name, # type: str
+ ):
+ self._retriever_name = retriever_name # type: str
+ self._field_name = field_name # type: str
- def retriever_name(self) -> str:
+ def retriever_name(self):
+ # type: () -> str
return self._retriever_name
@property
return self._retriever_name
@property
- def field_name(self) -> str:
+ def field_name(self):
+ # type: () -> str
class LogLevelRuleAsSevereAs(LogLevelRule):
class LogLevelRuleAsSevereAs(LogLevelRule):
- def __init__(self, level: int):
+ def __init__(self, level):
+ # type: (int)
self._level = level
@property
self._level = level
@property
- def level(self) -> int:
+ def level(self):
+ # type: () -> int
return self._level
class LogLevelRuleExactly(LogLevelRule):
return self._level
class LogLevelRuleExactly(LogLevelRule):
- def __init__(self, level: int):
+ def __init__(self, level):
+ # type: (int)
self._level = level
@property
self._level = level
@property
- def level(self) -> int:
+ def level(self):
+ # type: () -> int
return self._level
class TracepointEventRule(EventRule):
def __init__(
self,
return self._level
class TracepointEventRule(EventRule):
def __init__(
self,
- name_pattern: Optional[str] = None,
- filter_expression: Optional[str] = None,
- log_level_rule: Optional[LogLevelRule] = None,
- name_pattern_exclusions: Optional[List[str]] = None,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
- self._name_pattern: Optional[str] = name_pattern
- self._filter_expression: Optional[str] = filter_expression
- self._log_level_rule: Optional[LogLevelRule] = log_level_rule
- self._name_pattern_exclusions: Optional[List[str]] = name_pattern_exclusions
+ self._name_pattern = name_pattern # type: Optional[str]
+ self._filter_expression = filter_expression # type: Optional[str]
+ self._log_level_rule = log_level_rule # type: Optional[LogLevelRule]
+ self._name_pattern_exclusions = (
+ name_pattern_exclusions
+ ) # type: Optional[List[str]]
- def name_pattern(self) -> Optional[str]:
+ def name_pattern(self):
+ # type: () -> Optional[str]
return self._name_pattern
@property
return self._name_pattern
@property
- def filter_expression(self) -> Optional[str]:
+ def filter_expression(self):
+ # type: () -> Optional[str]
return self._filter_expression
@property
return self._filter_expression
@property
- def log_level_rule(self) -> Optional[LogLevelRule]:
+ def log_level_rule(self):
+ # type: () -> Optional[LogLevelRule]
return self._log_level_rule
@property
return self._log_level_rule
@property
- def name_pattern_exclusions(self) -> Optional[List[str]]:
+ def name_pattern_exclusions(self):
+ # type: () -> Optional[List[str]]
return self._name_pattern_exclusions
class UserTracepointEventRule(TracepointEventRule):
def __init__(
self,
return self._name_pattern_exclusions
class UserTracepointEventRule(TracepointEventRule):
def __init__(
self,
- name_pattern: Optional[str] = None,
- filter_expression: Optional[str] = None,
- log_level_rule: Optional[LogLevelRule] = None,
- name_pattern_exclusions: Optional[List[str]] = None,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
):
TracepointEventRule.__init__(**locals())
):
TracepointEventRule.__init__(**locals())
class KernelTracepointEventRule(TracepointEventRule):
def __init__(
self,
class KernelTracepointEventRule(TracepointEventRule):
def __init__(
self,
- name_pattern: Optional[str] = None,
- filter_expression: Optional[str] = None,
- log_level_rule: Optional[LogLevelRule] = None,
- name_pattern_exclusions: Optional[List[str]] = None,
+ name_pattern=None, # type: Optional[str]
+ filter_expression=None, # type: Optional[str]
+ log_level_rule=None, # type: Optional[LogLevelRule]
+ name_pattern_exclusions=None, # type: Optional[List[str]]
):
TracepointEventRule.__init__(**locals())
):
TracepointEventRule.__init__(**locals())
- def _generate_name() -> str:
+ def _generate_name():
+ # type: () -> str
return "channel_{random_id}".format(random_id=_generate_random_string(8))
@abc.abstractmethod
return "channel_{random_id}".format(random_id=_generate_random_string(8))
@abc.abstractmethod
- def add_context(self, context_type: ContextType) -> None:
+ def add_context(self, context_type):
+ # type: (ContextType) -> None
pass
@property
@abc.abstractmethod
pass
@property
@abc.abstractmethod
- def domain(self) -> TracingDomain:
+ def domain(self):
+ # type: () -> TracingDomain
pass
@property
@abc.abstractmethod
pass
@property
@abc.abstractmethod
+ def name(self):
+ # type: () -> str
- def add_recording_rule(self, rule: Type[EventRule]) -> None:
+ def add_recording_rule(self, rule) -> None:
+ # type: (Type[EventRule]) -> None
class LocalSessionOutputLocation(SessionOutputLocation):
class LocalSessionOutputLocation(SessionOutputLocation):
- def __init__(self, trace_path: pathlib.Path):
+ def __init__(self, trace_path):
+ # type: (pathlib.Path)
self._path = trace_path
@property
self._path = trace_path
@property
- def path(self) -> pathlib.Path:
+ def path(self):
+ # type: () -> pathlib.Path
def __repr__(self):
return "<%s.%s>" % (self.__class__.__name__, self.name)
def __repr__(self):
return "<%s.%s>" % (self.__class__.__name__, self.name)
- def __init__(self, policy: TrackingPolicy):
+ def __init__(self, policy):
+ # type: (TrackingPolicy)
self._policy = policy
@property
self._policy = policy
@property
- def tracking_policy(self) -> TrackingPolicy:
+ def tracking_policy(self):
+ # type: () -> TrackingPolicy
return self._policy
class ProcessIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
return self._policy
class ProcessIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, pid: int) -> None:
+ def track(self, pid):
+ # type: (int) -> None
- def untrack(self, pid: int) -> None:
+ def untrack(self, pid):
+ # type: (int) -> None
pass
class VirtualProcessIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
pass
class VirtualProcessIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, vpid: int) -> None:
+ def track(self, vpid):
+ # type: (int) -> None
- def untrack(self, vpid: int) -> None:
+ def untrack(self, vpid):
+ # type: (int) -> None
pass
class UserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
pass
class UserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, uid: Union[int, str]) -> None:
+ def track(self, uid):
+ # type: (Union[int, str]) -> None
- def untrack(self, uid: Union[int, str]) -> None:
+ def untrack(self, uid):
+ # type: (Union[int, str]) -> None
pass
class VirtualUserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
pass
class VirtualUserIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, vuid: Union[int, str]) -> None:
+ def track(self, vuid):
+ # type: (Union[int, str]) -> None
- def untrack(self, vuid: Union[int, str]) -> None:
+ def untrack(self, vuid):
+ # type: (Union[int, str]) -> None
pass
class GroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
pass
class GroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, gid: Union[int, str]) -> None:
+ def track(self, gid):
+ # type: (Union[int, str]) -> None
- def untrack(self, gid: Union[int, str]) -> None:
+ def untrack(self, gid):
+ # type: (Union[int, str]) -> None
pass
class VirtualGroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
pass
class VirtualGroupIDProcessAttributeTracker(ProcessAttributeTracker):
@abc.abstractmethod
- def track(self, vgid: Union[int, str]) -> None:
+ def track(self, vgid):
+ # type: (Union[int, str]) -> None
- def untrack(self, vgid: Union[int, str]) -> None:
+ def untrack(self, vgid):
+ # type: (Union[int, str]) -> None
pass
class Session(abc.ABC):
@staticmethod
pass
class Session(abc.ABC):
@staticmethod
- def _generate_name() -> str:
+ def _generate_name():
+ # type: () -> str
return "session_{random_id}".format(random_id=_generate_random_string(8))
@property
@abc.abstractmethod
return "session_{random_id}".format(random_id=_generate_random_string(8))
@property
@abc.abstractmethod
+ def name(self):
+ # type: () -> str
pass
@property
@abc.abstractmethod
pass
@property
@abc.abstractmethod
- def output(self) -> Optional[Type[SessionOutputLocation]]:
+ def output(self):
+ # type: () -> Optional[Type[SessionOutputLocation]]
- def add_channel(
- self, domain: TracingDomain, channel_name: Optional[str] = None
- ) -> Channel:
+ def add_channel(self, domain, channel_name=None):
+ # type: (TracingDomain, Optional[str]) -> Channel
"""Add a channel with default attributes to the session."""
pass
@abc.abstractmethod
"""Add a channel with default attributes to the session."""
pass
@abc.abstractmethod
- def start(self) -> None:
+ def start(self):
+ # type: () -> None
- def stop(self) -> None:
+ def stop(self):
+ # type: () -> None
- def destroy(self) -> None:
+ def destroy(self):
+ # type: () -> None
pass
@abc.abstractproperty
pass
@abc.abstractproperty
- def kernel_pid_process_attribute_tracker(
- self,
- ) -> Type[ProcessIDProcessAttributeTracker]:
+ def kernel_pid_process_attribute_tracker(self):
+ # type: () -> Type[ProcessIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
raise NotImplementedError
@abc.abstractproperty
- def kernel_vpid_process_attribute_tracker(
- self,
- ) -> Type[VirtualProcessIDProcessAttributeTracker]:
+ def kernel_vpid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
def user_vpid_process_attribute_tracker(
self,
) -> Type[VirtualProcessIDProcessAttributeTracker]:
raise NotImplementedError
@abc.abstractproperty
def user_vpid_process_attribute_tracker(
self,
) -> Type[VirtualProcessIDProcessAttributeTracker]:
+ # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
raise NotImplementedError
@abc.abstractproperty
- def kernel_gid_process_attribute_tracker(
- self,
- ) -> Type[GroupIDProcessAttributeTracker]:
+ def kernel_gid_process_attribute_tracker(self):
+ # type: () -> Type[GroupIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
raise NotImplementedError
@abc.abstractproperty
- def kernel_vgid_process_attribute_tracker(
- self,
- ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+ def kernel_vgid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
raise NotImplementedError
@abc.abstractproperty
- def user_vgid_process_attribute_tracker(
- self,
- ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+ def user_vgid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
raise NotImplementedError
@abc.abstractproperty
- def kernel_uid_process_attribute_tracker(
- self,
- ) -> Type[UserIDProcessAttributeTracker]:
+ def kernel_uid_process_attribute_tracker(self):
+ # type: () -> Type[UserIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
raise NotImplementedError
@abc.abstractproperty
- def kernel_vuid_process_attribute_tracker(
- self,
- ) -> Type[VirtualUserIDProcessAttributeTracker]:
+ def kernel_vuid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualUserIDProcessAttributeTracker]
raise NotImplementedError
@abc.abstractproperty
raise NotImplementedError
@abc.abstractproperty
- def user_vuid_process_attribute_tracker(
- self,
- ) -> Type[VirtualUserIDProcessAttributeTracker]:
+ def user_vuid_process_attribute_tracker(self):
+ # type: () -> Type[VirtualUserIDProcessAttributeTracker]
raise NotImplementedError
class ControlException(RuntimeError):
"""Base type for exceptions thrown by a controller."""
raise NotImplementedError
class ControlException(RuntimeError):
"""Base type for exceptions thrown by a controller."""
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str)
- def create_session(
- self, name: Optional[str] = None, output: Optional[SessionOutputLocation] = None
- ) -> Session:
+ def create_session(self, name=None, output=None):
+ # type: (Optional[str], Optional[SessionOutputLocation]) -> Session
"""
Create a session with an output. Don't specify an output
to create a session without an output.
"""
Create a session with an output. Don't specify an output
to create a session without an output.
import contextlib
import sys
import contextlib
import sys
-from typing import Optional
+from typing import Iterator, Optional
class InvalidTestPlan(RuntimeError):
class InvalidTestPlan(RuntimeError):
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str) -> None
super().__init__(msg)
class BailOut(RuntimeError):
super().__init__(msg)
class BailOut(RuntimeError):
- def __init__(self, msg: str):
+ def __init__(self, msg):
+ # type: (str) -> None
super().__init__(msg)
class TestCase:
super().__init__(msg)
class TestCase:
- def __init__(self, tap_generator: "TapGenerator", description: str):
- self._tap_generator = tap_generator
- self._result: Optional[bool] = None
- self._description = description
+ def __init__(
+ self,
+ tap_generator, # type: "TapGenerator"
+ description, # type: str
+ ):
+ self._tap_generator = tap_generator # type: "TapGenerator"
+ self._result = None # type: Optional[bool]
+ self._description = description # type: str
- def result(self) -> Optional[bool]:
+ def result(self):
+ # type: () -> Optional[bool]
return self._result
@property
return self._result
@property
- def description(self) -> str:
+ def description(self):
+ # type: () -> str
- def _set_result(self, result: bool) -> None:
+ def _set_result(self, result):
+ # type: (bool) -> None
if self._result is not None:
raise RuntimeError("Can't set test case result twice")
self._result = result
self._tap_generator.test(result, self._description)
if self._result is not None:
raise RuntimeError("Can't set test case result twice")
self._result = result
self._tap_generator.test(result, self._description)
- def success(self) -> None:
+ def success(self):
+ # type: () -> None
- def fail(self) -> None:
+ def fail(self):
+ # type: () -> None
self._set_result(False)
# Produces a test execution report in the TAP format.
class TapGenerator:
self._set_result(False)
# Produces a test execution report in the TAP format.
class TapGenerator:
- def __init__(self, total_test_count: int):
+ def __init__(self, total_test_count):
+ # type: (int) -> None
if total_test_count <= 0:
raise ValueError("Test count must be greater than zero")
if total_test_count <= 0:
raise ValueError("Test count must be greater than zero")
- self._total_test_count: int = total_test_count
- self._last_test_case_id: int = 0
- self._printed_plan: bool = False
- self._has_failure: bool = False
+ self._total_test_count = total_test_count # type: int
+ self._last_test_case_id = 0 # type: int
+ self._printed_plan = False # type: bool
+ self._has_failure = False # type: bool
def __del__(self):
if self.remaining_test_cases > 0:
def __del__(self):
if self.remaining_test_cases > 0:
- def remaining_test_cases(self) -> int:
+ def remaining_test_cases(self):
+ # type: () -> int
return self._total_test_count - self._last_test_case_id
return self._total_test_count - self._last_test_case_id
- def _print(self, msg: str) -> None:
+ def _print(self, msg):
+ # type: (str) -> None
if not self._printed_plan:
print(
"1..{total_test_count}".format(total_test_count=self._total_test_count),
if not self._printed_plan:
print(
"1..{total_test_count}".format(total_test_count=self._total_test_count),
- def skip_all(self, reason) -> None:
+ def skip_all(self, reason):
+ # type: (str) -> None
if self._last_test_case_id != 0:
raise RuntimeError("Can't skip all tests after running test cases")
if self._last_test_case_id != 0:
raise RuntimeError("Can't skip all tests after running test cases")
self._last_test_case_id = self._total_test_count
self._last_test_case_id = self._total_test_count
- def skip(self, reason, skip_count: int = 1) -> None:
+ def skip(self, reason, skip_count=1):
+ # type: (str, int) -> None
for i in range(skip_count):
self._last_test_case_id = self._last_test_case_id + 1
self._print(
for i in range(skip_count):
self._last_test_case_id = self._last_test_case_id + 1
self._print(
- def bail_out(self, reason: str) -> None:
+ def bail_out(self, reason):
+ # type: (str) -> None
self._print("Bail out! {reason}".format(reason=reason))
self._last_test_case_id = self._total_test_count
raise BailOut(reason)
self._print("Bail out! {reason}".format(reason=reason))
self._last_test_case_id = self._total_test_count
raise BailOut(reason)
- def test(self, result: bool, description: str) -> None:
+ def test(self, result, description):
+ # type: (bool, str) -> None
if self._last_test_case_id == self._total_test_count:
raise InvalidTestPlan("Executing too many tests")
if self._last_test_case_id == self._total_test_count:
raise InvalidTestPlan("Executing too many tests")
- def ok(self, description: str) -> None:
+ def ok(self, description):
+ # type: (str) -> None
self.test(True, description)
self.test(True, description)
- def fail(self, description: str) -> None:
+ def fail(self, description):
+ # type: (str) -> None
self.test(False, description)
@property
self.test(False, description)
@property
- def is_successful(self) -> bool:
+ def is_successful(self):
+ # type: () -> bool
return (
self._last_test_case_id == self._total_test_count and not self._has_failure
)
@contextlib.contextmanager
return (
self._last_test_case_id == self._total_test_count and not self._has_failure
)
@contextlib.contextmanager
- def case(self, description: str):
+ def case(self, description):
+ # type: (str) -> Iterator[TestCase]
test_case = TestCase(self, description)
try:
yield test_case
test_case = TestCase(self, description)
try:
yield test_case
if test_case.result is None:
test_case.success()
if test_case.result is None:
test_case.success()
- def diagnostic(self, msg) -> None:
+ def diagnostic(self, msg):
+ # type: (str) -> None
print("# {msg}".format(msg=msg), file=sys.stderr, flush=True)
print("# {msg}".format(msg=msg), file=sys.stderr, flush=True)