X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=tests%2Futils%2Flttngtest%2Flttng.py;h=6b86bbac56aec3cdf36f537cec81ecd1531eba0a;hb=a8cac44b677e440a552921794b51699edf4cf846;hp=6829fa7076cad455569fa07b8b4eb3c2a002cf8d;hpb=ef945e4deb2e24b7d7ce05c3289a29a22c6b427d;p=lttng-tools.git diff --git a/tests/utils/lttngtest/lttng.py b/tests/utils/lttngtest/lttng.py index 6829fa707..6b86bbac5 100644 --- a/tests/utils/lttngtest/lttng.py +++ b/tests/utils/lttngtest/lttng.py @@ -4,7 +4,7 @@ # # SPDX-License-Identifier: GPL-2.0-only -from concurrent.futures import process + from . import lttngctl, logger, environment import pathlib import os @@ -12,6 +12,7 @@ from typing import Callable, Optional, Type, Union import shlex import subprocess import enum +import xml.etree.ElementTree """ Implementation of the lttngctl interface based on the `lttng` command line client. @@ -19,11 +20,19 @@ Implementation of the lttngctl interface based on the `lttng` command line clien class Unsupported(lttngctl.ControlException): - def __init__(self, msg: str): + def __init__(self, msg): + # type: (str) -> None super().__init__(msg) -def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str: +class InvalidMI(lttngctl.ControlException): + def __init__(self, msg): + # type: (str) -> None + super().__init__(msg) + + +def _get_domain_option_name(domain): + # type: (lttngctl.TracingDomain) -> str if domain == lttngctl.TracingDomain.User: return "userspace" elif domain == lttngctl.TracingDomain.Kernel: @@ -38,7 +47,8 @@ def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str: raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client") -def _get_context_type_name(context: lttngctl.ContextType) -> str: +def _get_context_type_name(context): + # type: (lttngctl.ContextType) -> str if isinstance(context, lttngctl.VgidContextType): return "vgid" elif isinstance(context, lttngctl.VuidContextType): @@ -60,27 +70,30 @@ def _get_context_type_name(context: lttngctl.ContextType) -> str: class _Channel(lttngctl.Channel): def __init__( self, - client: "LTTngClient", - name: str, - domain: lttngctl.TracingDomain, - session: "_Session", + client, # type: LTTngClient + name, # type: str + domain, # type: lttngctl.TracingDomain + session, # type: _Session ): - self._client: LTTngClient = client - self._name: str = name - self._domain: lttngctl.TracingDomain = domain - self._session: _Session = session + self._client = client # type: LTTngClient + self._name = name # type: str + self._domain = domain # type: lttngctl.TracingDomain + self._session = session # type: _Session - def add_context(self, context_type: lttngctl.ContextType) -> None: + def add_context(self, context_type): + # type: (lttngctl.ContextType) -> None domain_option_name = _get_domain_option_name(self.domain) context_type_name = _get_context_type_name(context_type) self._client._run_cmd( - "add-context --{domain_option_name} --type {context_type_name}".format( + "add-context --{domain_option_name} --channel '{channel_name}' --type {context_type_name}".format( domain_option_name=domain_option_name, + channel_name=self.name, context_type_name=context_type_name, ) ) - def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None: + def add_recording_rule(self, rule): + # type: (Type[lttngctl.EventRule]) -> None client_args = ( "enable-event --session {session_name} --channel {channel_name}".format( session_name=self._session.name, channel_name=self.name @@ -136,24 +149,31 @@ class _Channel(lttngctl.Channel): self._client._run_cmd(client_args) @property - def name(self) -> str: + def name(self): + # type: () -> str return self._name @property - def domain(self) -> lttngctl.TracingDomain: + def domain(self): + # type: () -> lttngctl.TracingDomain return self._domain +@enum.unique class _ProcessAttribute(enum.Enum): - PID = (enum.auto(),) - VPID = (enum.auto(),) - UID = (enum.auto(),) - VUID = (enum.auto(),) - GID = (enum.auto(),) - VGID = (enum.auto(),) + PID = "Process ID" + VPID = "Virtual Process ID" + UID = "User ID" + VUID = "Virtual User ID" + GID = "Group ID" + VGID = "Virtual Group ID" + + def __repr__(self): + return "<%s.%s>" % (self.__class__.__name__, self.name) -def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str: +def _get_process_attribute_option_name(attribute): + # type: (_ProcessAttribute) -> str return { _ProcessAttribute.PID: "pid", _ProcessAttribute.VPID: "vpid", @@ -167,21 +187,22 @@ def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str: class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker): def __init__( self, - client: "LTTngClient", - attribute: _ProcessAttribute, - domain: lttngctl.TracingDomain, - session: "_Session", + client, # type: LTTngClient + attribute, # type: _ProcessAttribute + domain, # type: lttngctl.TracingDomain + session, # type: _Session ): - self._client: LTTngClient = client - self._tracked_attribute: _ProcessAttribute = attribute - self._domain: lttngctl.TracingDomain = domain - self._session: "_Session" = session + self._client = client # type: LTTngClient + self._tracked_attribute = attribute # type: _ProcessAttribute + self._domain = domain # type: lttngctl.TracingDomain + self._session = session # type: _Session if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID: - self._allowed_value_types: list[type] = [int, str] + self._allowed_value_types = [int, str] # type: list[type] else: - self._allowed_value_types: list[type] = [int] + self._allowed_value_types = [int] # type: list[type] - def _call_client(self, cmd_name: str, value: Union[int, str]) -> None: + def _call_client(self, cmd_name, value): + # type: (str, Union[int, str]) -> None if type(value) not in self._allowed_value_types: raise TypeError( "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format( @@ -195,7 +216,7 @@ class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker): ) domain_name = _get_domain_option_name(self._domain) self._client._run_cmd( - "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format( + "{cmd_name} --session '{session_name}' --{domain_name} --{tracked_attribute_name} {value}".format( cmd_name=cmd_name, session_name=self._session.name, domain_name=domain_name, @@ -204,115 +225,154 @@ class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker): ) ) - def track(self, value: Union[int, str]) -> None: + def track(self, value): + # type: (Union[int, str]) -> None self._call_client("track", value) - def untrack(self, value: Union[int, str]) -> None: + def untrack(self, value): + # type: (Union[int, str]) -> None self._call_client("untrack", value) class _Session(lttngctl.Session): def __init__( self, - client: "LTTngClient", - name: str, - output: Optional[Type[lttngctl.SessionOutputLocation]], + client, # type: LTTngClient + name, # type: str + output, # type: Optional[lttngctl.SessionOutputLocation] ): - self._client: LTTngClient = client - self._name: str = name - self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output + self._client = client # type: LTTngClient + self._name = name # type: str + self._output = output # type: Optional[lttngctl.SessionOutputLocation] @property - def name(self) -> str: + def name(self): + # type: () -> str return self._name def add_channel( - self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None - ) -> lttngctl.Channel: + self, + domain, + channel_name=None, + buffer_sharing_policy=lttngctl.BufferSharingPolicy.PerUID, + ): + # type: (lttngctl.TracingDomain, Optional[str], lttngctl.BufferSharingPolicy) -> lttngctl.Channel channel_name = lttngctl.Channel._generate_name() domain_option_name = _get_domain_option_name(domain) self._client._run_cmd( - "enable-channel --{domain_name} {channel_name}".format( - domain_name=domain_option_name, channel_name=channel_name + "enable-channel --session '{session_name}' --{domain_name} '{channel_name}' {buffer_sharing_policy}".format( + session_name=self.name, + domain_name=domain_option_name, + channel_name=channel_name, + buffer_sharing_policy="--buffers-uid" + if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID + else "--buffers-pid", ) ) return _Channel(self._client, channel_name, domain, self) - def add_context(self, context_type: lttngctl.ContextType) -> None: + def add_context(self, context_type): + # type: (lttngctl.ContextType) -> None pass @property - def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]: - return self._output + def output(self): + # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]" + return self._output # type: ignore - def start(self) -> None: - self._client._run_cmd("start {session_name}".format(session_name=self.name)) + def start(self): + # type: () -> None + self._client._run_cmd("start '{session_name}'".format(session_name=self.name)) - def stop(self) -> None: - self._client._run_cmd("stop {session_name}".format(session_name=self.name)) + def stop(self): + # type: () -> None + self._client._run_cmd("stop '{session_name}'".format(session_name=self.name)) - def destroy(self) -> None: - self._client._run_cmd("destroy {session_name}".format(session_name=self.name)) + def destroy(self): + # type: () -> None + self._client._run_cmd("destroy '{session_name}'".format(session_name=self.name)) + + def rotate(self, wait=True): + # type: (bool) -> None + self._client.rotate_session_by_name(self.name, wait) @property - def kernel_pid_process_attribute_tracker( - self, - ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]: + def is_active(self): + # type: () -> bool + list_session_xml = self._client._run_cmd( + "list '{session_name}'".format(session_name=self.name), + LTTngClient.CommandOutputFormat.MI_XML, + ) + + root = xml.etree.ElementTree.fromstring(list_session_xml) + command_output = LTTngClient._mi_find_in_element(root, "output") + sessions = LTTngClient._mi_find_in_element(command_output, "sessions") + session_mi = LTTngClient._mi_find_in_element(sessions, "session") + + enabled_text = LTTngClient._mi_find_in_element(session_mi, "enabled").text + if enabled_text not in ["true", "false"]: + raise InvalidMI( + "Expected boolean value in element '{}': value='{}'".format( + session_mi.tag, enabled_text + ) + ) + + return enabled_text == "true" + + @property + def kernel_pid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def kernel_vpid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]: + def kernel_vpid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def user_vpid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]: + def user_vpid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore @property - def kernel_gid_process_attribute_tracker( - self, - ) -> Type[lttngctl.GroupIDProcessAttributeTracker]: + def kernel_gid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def kernel_vgid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]: + def kernel_vgid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def user_vgid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]: + def user_vgid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore @property - def kernel_uid_process_attribute_tracker( - self, - ) -> Type[lttngctl.UserIDProcessAttributeTracker]: + def kernel_uid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.UserIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def kernel_vuid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]: + def kernel_vuid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore @property - def user_vuid_process_attribute_tracker( - self, - ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]: + def user_vuid_process_attribute_tracker(self): + # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker] return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore class LTTngClientError(lttngctl.ControlException): - def __init__(self, command_args: str, error_output: str): - self._command_args: str = command_args - self._output: str = error_output + def __init__( + self, + command_args, # type: str + error_output, # type: str + ): + self._command_args = command_args # type: str + self._output = error_output # type: str class LTTngClient(logger._Logger, lttngctl.Controller): @@ -320,25 +380,40 @@ class LTTngClient(logger._Logger, lttngctl.Controller): Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end. """ + class CommandOutputFormat(enum.Enum): + MI_XML = 0 + HUMAN = 1 + + _MI_NS = "{https://lttng.org/xml/ns/lttng-mi}" + def __init__( self, - test_environment: environment._Environment, - log: Optional[Callable[[str], None]], + test_environment, # type: environment._Environment + log, # type: Optional[Callable[[str], None]] ): logger._Logger.__init__(self, log) - self._environment: environment._Environment = test_environment + self._environment = test_environment # type: environment._Environment - def _run_cmd(self, command_args: str) -> None: + @staticmethod + def _namespaced_mi_element(property): + # type: (str) -> str + return LTTngClient._MI_NS + property + + def _run_cmd(self, command_args, output_format=CommandOutputFormat.MI_XML): + # type: (str, CommandOutputFormat) -> str """ Invoke the `lttng` client with a set of arguments. The command is executed in the context of the client's test environment. """ - args: list[str] = [str(self._environment.lttng_client_path)] + args = [str(self._environment.lttng_client_path)] # type: list[str] + if output_format == LTTngClient.CommandOutputFormat.MI_XML: + args.extend(["--mi", "xml"]) + args.extend(shlex.split(command_args)) self._log("lttng {command_args}".format(command_args=command_args)) - client_env: dict[str, str] = os.environ.copy() + client_env = os.environ.copy() # type: dict[str, str] client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location) process = subprocess.Popen( @@ -352,12 +427,11 @@ class LTTngClient(logger._Logger, lttngctl.Controller): for error_line in decoded_output.splitlines(): self._log(error_line) raise LTTngClientError(command_args, decoded_output) + else: + return out.decode("utf-8") - def create_session( - self, - name: Optional[str] = None, - output: Optional[lttngctl.SessionOutputLocation] = None, - ) -> lttngctl.Session: + def create_session(self, name=None, output=None): + # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session name = name if name else lttngctl.Session._generate_name() if isinstance(output, lttngctl.LocalSessionOutputLocation): @@ -368,8 +442,115 @@ class LTTngClient(logger._Logger, lttngctl.Controller): raise TypeError("LTTngClient only supports local or no output") self._run_cmd( - "create {session_name} {output_option}".format( + "create '{session_name}' {output_option}".format( session_name=name, output_option=output_option ) ) return _Session(self, name, output) + + def start_session_by_name(self, name): + # type: (str) -> None + self._run_cmd("start '{session_name}'".format(session_name=name)) + + def start_session_by_glob_pattern(self, pattern): + # type: (str) -> None + self._run_cmd("start --glob '{pattern}'".format(pattern=pattern)) + + def start_sessions_all(self): + # type: () -> None + self._run_cmd("start --all") + + def stop_session_by_name(self, name): + # type: (str) -> None + self._run_cmd("stop '{session_name}'".format(session_name=name)) + + def stop_session_by_glob_pattern(self, pattern): + # type: (str) -> None + self._run_cmd("stop --glob '{pattern}'".format(pattern=pattern)) + + def stop_sessions_all(self): + # type: () -> None + self._run_cmd("stop --all") + + def destroy_session_by_name(self, name): + # type: (str) -> None + self._run_cmd("destroy '{session_name}'".format(session_name=name)) + + def destroy_session_by_glob_pattern(self, pattern): + # type: (str) -> None + self._run_cmd("destroy --glob '{pattern}'".format(pattern=pattern)) + + def destroy_sessions_all(self): + # type: () -> None + self._run_cmd("destroy --all") + + def rotate_session_by_name(self, name, wait=True): + self._run_cmd( + "rotate '{session_name}' {wait_option}".format( + session_name=name, wait_option="-n" if wait is False else "" + ) + ) + + def schedule_size_based_rotation(self, name, size_bytes): + # type (str, int) -> None + self._run_cmd( + "enable-rotation --session '{session_name}' --size {size}".format( + session_name=name, size=size_bytes + ) + ) + + def schedule_time_based_rotation(self, name, period_seconds): + # type (str, int) -> None + self._run_cmd( + "enable-rotation --session '{session_name}' --timer {period_seconds}s".format( + session_name=name, period_seconds=period_seconds + ) + ) + + @staticmethod + def _mi_find_in_element(element, sub_element_name): + # type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element + result = element.find(LTTngClient._namespaced_mi_element(sub_element_name)) + if result is None: + raise InvalidMI( + "Failed to find element '{}' within command MI element '{}'".format( + element.tag, sub_element_name + ) + ) + + return result + + def list_sessions(self): + # type () -> List[Session] + list_sessions_xml = self._run_cmd( + "list", LTTngClient.CommandOutputFormat.MI_XML + ) + + root = xml.etree.ElementTree.fromstring(list_sessions_xml) + command_output = self._mi_find_in_element(root, "output") + sessions = self._mi_find_in_element(command_output, "sessions") + + ctl_sessions = [] # type: list[lttngctl.Session] + + for session_mi in sessions: + name = self._mi_find_in_element(session_mi, "name").text + path = self._mi_find_in_element(session_mi, "path").text + + if name is None: + raise InvalidMI( + "Invalid empty 'name' element in '{}'".format(session_mi.tag) + ) + if path is None: + raise InvalidMI( + "Invalid empty 'path' element in '{}'".format(session_mi.tag) + ) + if not path.startswith("/"): + raise Unsupported( + "{} does not support non-local session outputs".format(type(self)) + ) + + ctl_sessions.append( + _Session(self, name, lttngctl.LocalSessionOutputLocation(path)) + ) + + return ctl_sessions