Tests: lttngtest: add methods to control session rotations
[lttng-tools.git] / tests / utils / lttngtest / lttng.py
index 6829fa7076cad455569fa07b8b4eb3c2a002cf8d..6b86bbac56aec3cdf36f537cec81ecd1531eba0a 100644 (file)
@@ -4,7 +4,7 @@
 #
 # SPDX-License-Identifier: GPL-2.0-only
 
-from concurrent.futures import process
+
 from . import lttngctl, logger, environment
 import pathlib
 import os
@@ -12,6 +12,7 @@ from typing import Callable, Optional, Type, Union
 import shlex
 import subprocess
 import enum
+import xml.etree.ElementTree
 
 """
 Implementation of the lttngctl interface based on the `lttng` command line client.
@@ -19,11 +20,19 @@ Implementation of the lttngctl interface based on the `lttng` command line clien
 
 
 class Unsupported(lttngctl.ControlException):
-    def __init__(self, msg: str):
+    def __init__(self, msg):
+        # type: (str) -> None
         super().__init__(msg)
 
 
-def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
+class InvalidMI(lttngctl.ControlException):
+    def __init__(self, msg):
+        # type: (str) -> None
+        super().__init__(msg)
+
+
+def _get_domain_option_name(domain):
+    # type: (lttngctl.TracingDomain) -> str
     if domain == lttngctl.TracingDomain.User:
         return "userspace"
     elif domain == lttngctl.TracingDomain.Kernel:
@@ -38,7 +47,8 @@ def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
         raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
 
 
-def _get_context_type_name(context: lttngctl.ContextType) -> str:
+def _get_context_type_name(context):
+    # type: (lttngctl.ContextType) -> str
     if isinstance(context, lttngctl.VgidContextType):
         return "vgid"
     elif isinstance(context, lttngctl.VuidContextType):
@@ -60,27 +70,30 @@ def _get_context_type_name(context: lttngctl.ContextType) -> str:
 class _Channel(lttngctl.Channel):
     def __init__(
         self,
-        client: "LTTngClient",
-        name: str,
-        domain: lttngctl.TracingDomain,
-        session: "_Session",
+        client,  # type: LTTngClient
+        name,  # type: str
+        domain,  # type: lttngctl.TracingDomain
+        session,  # type: _Session
     ):
-        self._client: LTTngClient = client
-        self._name: str = name
-        self._domain: lttngctl.TracingDomain = domain
-        self._session: _Session = session
+        self._client = client  # type: LTTngClient
+        self._name = name  # type: str
+        self._domain = domain  # type: lttngctl.TracingDomain
+        self._session = session  # type: _Session
 
-    def add_context(self, context_type: lttngctl.ContextType) -> None:
+    def add_context(self, context_type):
+        # type: (lttngctl.ContextType) -> None
         domain_option_name = _get_domain_option_name(self.domain)
         context_type_name = _get_context_type_name(context_type)
         self._client._run_cmd(
-            "add-context --{domain_option_name} --type {context_type_name}".format(
+            "add-context --{domain_option_name} --channel '{channel_name}' --type {context_type_name}".format(
                 domain_option_name=domain_option_name,
+                channel_name=self.name,
                 context_type_name=context_type_name,
             )
         )
 
-    def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None:
+    def add_recording_rule(self, rule):
+        # type: (Type[lttngctl.EventRule]) -> None
         client_args = (
             "enable-event --session {session_name} --channel {channel_name}".format(
                 session_name=self._session.name, channel_name=self.name
@@ -136,24 +149,31 @@ class _Channel(lttngctl.Channel):
         self._client._run_cmd(client_args)
 
     @property
-    def name(self) -> str:
+    def name(self):
+        # type: () -> str
         return self._name
 
     @property
-    def domain(self) -> lttngctl.TracingDomain:
+    def domain(self):
+        # type: () -> lttngctl.TracingDomain
         return self._domain
 
 
+@enum.unique
 class _ProcessAttribute(enum.Enum):
-    PID = (enum.auto(),)
-    VPID = (enum.auto(),)
-    UID = (enum.auto(),)
-    VUID = (enum.auto(),)
-    GID = (enum.auto(),)
-    VGID = (enum.auto(),)
+    PID = "Process ID"
+    VPID = "Virtual Process ID"
+    UID = "User ID"
+    VUID = "Virtual User ID"
+    GID = "Group ID"
+    VGID = "Virtual Group ID"
+
+    def __repr__(self):
+        return "<%s.%s>" % (self.__class__.__name__, self.name)
 
 
-def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
+def _get_process_attribute_option_name(attribute):
+    # type: (_ProcessAttribute) -> str
     return {
         _ProcessAttribute.PID: "pid",
         _ProcessAttribute.VPID: "vpid",
@@ -167,21 +187,22 @@ def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
 class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
     def __init__(
         self,
-        client: "LTTngClient",
-        attribute: _ProcessAttribute,
-        domain: lttngctl.TracingDomain,
-        session: "_Session",
+        client,  # type: LTTngClient
+        attribute,  # type: _ProcessAttribute
+        domain,  # type: lttngctl.TracingDomain
+        session,  # type: _Session
     ):
-        self._client: LTTngClient = client
-        self._tracked_attribute: _ProcessAttribute = attribute
-        self._domain: lttngctl.TracingDomain = domain
-        self._session: "_Session" = session
+        self._client = client  # type: LTTngClient
+        self._tracked_attribute = attribute  # type: _ProcessAttribute
+        self._domain = domain  # type: lttngctl.TracingDomain
+        self._session = session  # type: _Session
         if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
-            self._allowed_value_types: list[type] = [int, str]
+            self._allowed_value_types = [int, str]  # type: list[type]
         else:
-            self._allowed_value_types: list[type] = [int]
+            self._allowed_value_types = [int]  # type: list[type]
 
-    def _call_client(self, cmd_name: str, value: Union[int, str]) -> None:
+    def _call_client(self, cmd_name, value):
+        # type: (str, Union[int, str]) -> None
         if type(value) not in self._allowed_value_types:
             raise TypeError(
                 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
@@ -195,7 +216,7 @@ class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
         )
         domain_name = _get_domain_option_name(self._domain)
         self._client._run_cmd(
-            "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
+            "{cmd_name} --session '{session_name}' --{domain_name} --{tracked_attribute_name} {value}".format(
                 cmd_name=cmd_name,
                 session_name=self._session.name,
                 domain_name=domain_name,
@@ -204,115 +225,154 @@ class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
             )
         )
 
-    def track(self, value: Union[int, str]) -> None:
+    def track(self, value):
+        # type: (Union[int, str]) -> None
         self._call_client("track", value)
 
-    def untrack(self, value: Union[int, str]) -> None:
+    def untrack(self, value):
+        # type: (Union[int, str]) -> None
         self._call_client("untrack", value)
 
 
 class _Session(lttngctl.Session):
     def __init__(
         self,
-        client: "LTTngClient",
-        name: str,
-        output: Optional[Type[lttngctl.SessionOutputLocation]],
+        client,  # type: LTTngClient
+        name,  # type: str
+        output,  # type: Optional[lttngctl.SessionOutputLocation]
     ):
-        self._client: LTTngClient = client
-        self._name: str = name
-        self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output
+        self._client = client  # type: LTTngClient
+        self._name = name  # type: str
+        self._output = output  # type: Optional[lttngctl.SessionOutputLocation]
 
     @property
-    def name(self) -> str:
+    def name(self):
+        # type: () -> str
         return self._name
 
     def add_channel(
-        self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None
-    ) -> lttngctl.Channel:
+        self,
+        domain,
+        channel_name=None,
+        buffer_sharing_policy=lttngctl.BufferSharingPolicy.PerUID,
+    ):
+        # type: (lttngctl.TracingDomain, Optional[str], lttngctl.BufferSharingPolicy) -> lttngctl.Channel
         channel_name = lttngctl.Channel._generate_name()
         domain_option_name = _get_domain_option_name(domain)
         self._client._run_cmd(
-            "enable-channel --{domain_name} {channel_name}".format(
-                domain_name=domain_option_name, channel_name=channel_name
+            "enable-channel --session '{session_name}' --{domain_name} '{channel_name}' {buffer_sharing_policy}".format(
+                session_name=self.name,
+                domain_name=domain_option_name,
+                channel_name=channel_name,
+                buffer_sharing_policy="--buffers-uid"
+                if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
+                else "--buffers-pid",
             )
         )
         return _Channel(self._client, channel_name, domain, self)
 
-    def add_context(self, context_type: lttngctl.ContextType) -> None:
+    def add_context(self, context_type):
+        # type: (lttngctl.ContextType) -> None
         pass
 
     @property
-    def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]:
-        return self._output
+    def output(self):
+        # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
+        return self._output  # type: ignore
 
-    def start(self) -> None:
-        self._client._run_cmd("start {session_name}".format(session_name=self.name))
+    def start(self):
+        # type: () -> None
+        self._client._run_cmd("start '{session_name}'".format(session_name=self.name))
 
-    def stop(self) -> None:
-        self._client._run_cmd("stop {session_name}".format(session_name=self.name))
+    def stop(self):
+        # type: () -> None
+        self._client._run_cmd("stop '{session_name}'".format(session_name=self.name))
 
-    def destroy(self) -> None:
-        self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
+    def destroy(self):
+        # type: () -> None
+        self._client._run_cmd("destroy '{session_name}'".format(session_name=self.name))
+
+    def rotate(self, wait=True):
+        # type: (bool) -> None
+        self._client.rotate_session_by_name(self.name, wait)
 
     @property
-    def kernel_pid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]:
+    def is_active(self):
+        # type: () -> bool
+        list_session_xml = self._client._run_cmd(
+            "list '{session_name}'".format(session_name=self.name),
+            LTTngClient.CommandOutputFormat.MI_XML,
+        )
+
+        root = xml.etree.ElementTree.fromstring(list_session_xml)
+        command_output = LTTngClient._mi_find_in_element(root, "output")
+        sessions = LTTngClient._mi_find_in_element(command_output, "sessions")
+        session_mi = LTTngClient._mi_find_in_element(sessions, "session")
+
+        enabled_text = LTTngClient._mi_find_in_element(session_mi, "enabled").text
+        if enabled_text not in ["true", "false"]:
+            raise InvalidMI(
+                "Expected boolean value in element '{}': value='{}'".format(
+                    session_mi.tag, enabled_text
+                )
+            )
+
+        return enabled_text == "true"
+
+    @property
+    def kernel_pid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def kernel_vpid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+    def kernel_vpid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def user_vpid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
+    def user_vpid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self)  # type: ignore
 
     @property
-    def kernel_gid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.GroupIDProcessAttributeTracker]:
+    def kernel_gid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def kernel_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+    def kernel_vgid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def user_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
+    def user_vgid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self)  # type: ignore
 
     @property
-    def kernel_uid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.UserIDProcessAttributeTracker]:
+    def kernel_uid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def kernel_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+    def kernel_vuid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self)  # type: ignore
 
     @property
-    def user_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
+    def user_vuid_process_attribute_tracker(self):
+        # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
         return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self)  # type: ignore
 
 
 class LTTngClientError(lttngctl.ControlException):
-    def __init__(self, command_args: str, error_output: str):
-        self._command_args: str = command_args
-        self._output: str = error_output
+    def __init__(
+        self,
+        command_args,  # type: str
+        error_output,  # type: str
+    ):
+        self._command_args = command_args  # type: str
+        self._output = error_output  # type: str
 
 
 class LTTngClient(logger._Logger, lttngctl.Controller):
@@ -320,25 +380,40 @@ class LTTngClient(logger._Logger, lttngctl.Controller):
     Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
     """
 
+    class CommandOutputFormat(enum.Enum):
+        MI_XML = 0
+        HUMAN = 1
+
+    _MI_NS = "{https://lttng.org/xml/ns/lttng-mi}"
+
     def __init__(
         self,
-        test_environment: environment._Environment,
-        log: Optional[Callable[[str], None]],
+        test_environment,  # type: environment._Environment
+        log,  # type: Optional[Callable[[str], None]]
     ):
         logger._Logger.__init__(self, log)
-        self._environment: environment._Environment = test_environment
+        self._environment = test_environment  # type: environment._Environment
 
-    def _run_cmd(self, command_args: str) -> None:
+    @staticmethod
+    def _namespaced_mi_element(property):
+        # type: (str) -> str
+        return LTTngClient._MI_NS + property
+
+    def _run_cmd(self, command_args, output_format=CommandOutputFormat.MI_XML):
+        # type: (str, CommandOutputFormat) -> str
         """
         Invoke the `lttng` client with a set of arguments. The command is
         executed in the context of the client's test environment.
         """
-        args: list[str] = [str(self._environment.lttng_client_path)]
+        args = [str(self._environment.lttng_client_path)]  # type: list[str]
+        if output_format == LTTngClient.CommandOutputFormat.MI_XML:
+            args.extend(["--mi", "xml"])
+
         args.extend(shlex.split(command_args))
 
         self._log("lttng {command_args}".format(command_args=command_args))
 
-        client_env: dict[str, str] = os.environ.copy()
+        client_env = os.environ.copy()  # type: dict[str, str]
         client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
 
         process = subprocess.Popen(
@@ -352,12 +427,11 @@ class LTTngClient(logger._Logger, lttngctl.Controller):
             for error_line in decoded_output.splitlines():
                 self._log(error_line)
             raise LTTngClientError(command_args, decoded_output)
+        else:
+            return out.decode("utf-8")
 
-    def create_session(
-        self,
-        name: Optional[str] = None,
-        output: Optional[lttngctl.SessionOutputLocation] = None,
-    ) -> lttngctl.Session:
+    def create_session(self, name=None, output=None):
+        # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
         name = name if name else lttngctl.Session._generate_name()
 
         if isinstance(output, lttngctl.LocalSessionOutputLocation):
@@ -368,8 +442,115 @@ class LTTngClient(logger._Logger, lttngctl.Controller):
             raise TypeError("LTTngClient only supports local or no output")
 
         self._run_cmd(
-            "create {session_name} {output_option}".format(
+            "create '{session_name}' {output_option}".format(
                 session_name=name, output_option=output_option
             )
         )
         return _Session(self, name, output)
+
+    def start_session_by_name(self, name):
+        # type: (str) -> None
+        self._run_cmd("start '{session_name}'".format(session_name=name))
+
+    def start_session_by_glob_pattern(self, pattern):
+        # type: (str) -> None
+        self._run_cmd("start --glob '{pattern}'".format(pattern=pattern))
+
+    def start_sessions_all(self):
+        # type: () -> None
+        self._run_cmd("start --all")
+
+    def stop_session_by_name(self, name):
+        # type: (str) -> None
+        self._run_cmd("stop '{session_name}'".format(session_name=name))
+
+    def stop_session_by_glob_pattern(self, pattern):
+        # type: (str) -> None
+        self._run_cmd("stop --glob '{pattern}'".format(pattern=pattern))
+
+    def stop_sessions_all(self):
+        # type: () -> None
+        self._run_cmd("stop --all")
+
+    def destroy_session_by_name(self, name):
+        # type: (str) -> None
+        self._run_cmd("destroy '{session_name}'".format(session_name=name))
+
+    def destroy_session_by_glob_pattern(self, pattern):
+        # type: (str) -> None
+        self._run_cmd("destroy --glob '{pattern}'".format(pattern=pattern))
+
+    def destroy_sessions_all(self):
+        # type: () -> None
+        self._run_cmd("destroy --all")
+
+    def rotate_session_by_name(self, name, wait=True):
+        self._run_cmd(
+            "rotate '{session_name}' {wait_option}".format(
+                session_name=name, wait_option="-n" if wait is False else ""
+            )
+        )
+
+    def schedule_size_based_rotation(self, name, size_bytes):
+        # type (str, int) -> None
+        self._run_cmd(
+            "enable-rotation --session '{session_name}' --size {size}".format(
+                session_name=name, size=size_bytes
+            )
+        )
+
+    def schedule_time_based_rotation(self, name, period_seconds):
+        # type (str, int) -> None
+        self._run_cmd(
+            "enable-rotation --session '{session_name}' --timer {period_seconds}s".format(
+                session_name=name, period_seconds=period_seconds
+            )
+        )
+
+    @staticmethod
+    def _mi_find_in_element(element, sub_element_name):
+        # type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element
+        result = element.find(LTTngClient._namespaced_mi_element(sub_element_name))
+        if result is None:
+            raise InvalidMI(
+                "Failed to find element '{}' within command MI element '{}'".format(
+                    element.tag, sub_element_name
+                )
+            )
+
+        return result
+
+    def list_sessions(self):
+        # type () -> List[Session]
+        list_sessions_xml = self._run_cmd(
+            "list", LTTngClient.CommandOutputFormat.MI_XML
+        )
+
+        root = xml.etree.ElementTree.fromstring(list_sessions_xml)
+        command_output = self._mi_find_in_element(root, "output")
+        sessions = self._mi_find_in_element(command_output, "sessions")
+
+        ctl_sessions = []  # type: list[lttngctl.Session]
+
+        for session_mi in sessions:
+            name = self._mi_find_in_element(session_mi, "name").text
+            path = self._mi_find_in_element(session_mi, "path").text
+
+            if name is None:
+                raise InvalidMI(
+                    "Invalid empty 'name' element in '{}'".format(session_mi.tag)
+                )
+            if path is None:
+                raise InvalidMI(
+                    "Invalid empty 'path' element in '{}'".format(session_mi.tag)
+                )
+            if not path.startswith("/"):
+                raise Unsupported(
+                    "{} does not support non-local session outputs".format(type(self))
+                )
+
+            ctl_sessions.append(
+                _Session(self, name, lttngctl.LocalSessionOutputLocation(path))
+            )
+
+        return ctl_sessions
This page took 0.028884 seconds and 4 git commands to generate.