from . import lttngctl, logger, environment
-import pathlib
import os
-from typing import Callable, Optional, Type, Union
+from typing import Callable, Optional, Type, Union, Iterator
import shlex
import subprocess
import enum
super().__init__(msg)
+class ChannelNotFound(lttngctl.ControlException):
+ def __init__(self, msg):
+ # type: (str) -> None
+ super().__init__(msg)
+
+
def _get_domain_option_name(domain):
# type: (lttngctl.TracingDomain) -> str
- if domain == lttngctl.TracingDomain.User:
- return "userspace"
- elif domain == lttngctl.TracingDomain.Kernel:
- return "kernel"
- elif domain == lttngctl.TracingDomain.Log4j:
- return "log4j"
- elif domain == lttngctl.TracingDomain.JUL:
- return "jul"
- elif domain == lttngctl.TracingDomain.Python:
- return "python"
- else:
- raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
+ return {
+ lttngctl.TracingDomain.User: "userspace",
+ lttngctl.TracingDomain.Kernel: "kernel",
+ lttngctl.TracingDomain.Log4j: "log4j",
+ lttngctl.TracingDomain.Python: "python",
+ lttngctl.TracingDomain.JUL: "jul",
+ }[domain]
+
+
+def _get_domain_xml_mi_name(domain):
+ # type: (lttngctl.TracingDomain) -> str
+ return {
+ lttngctl.TracingDomain.User: "UST",
+ lttngctl.TracingDomain.Kernel: "KERNEL",
+ lttngctl.TracingDomain.Log4j: "LOG4J",
+ lttngctl.TracingDomain.Python: "PYTHON",
+ lttngctl.TracingDomain.JUL: "JUL",
+ }[domain]
def _get_context_type_name(context):
)
+def _get_log_level_argument_name(log_level):
+ # type: (lttngctl.LogLevel) -> str
+ if isinstance(log_level, lttngctl.UserLogLevel):
+ return {
+ lttngctl.UserLogLevel.EMERGENCY: "EMER",
+ lttngctl.UserLogLevel.ALERT: "ALERT",
+ lttngctl.UserLogLevel.CRITICAL: "CRIT",
+ lttngctl.UserLogLevel.ERROR: "ERR",
+ lttngctl.UserLogLevel.WARNING: "WARNING",
+ lttngctl.UserLogLevel.NOTICE: "NOTICE",
+ lttngctl.UserLogLevel.INFO: "INFO",
+ lttngctl.UserLogLevel.DEBUG_SYSTEM: "DEBUG_SYSTEM",
+ lttngctl.UserLogLevel.DEBUG_PROGRAM: "DEBUG_PROGRAM",
+ lttngctl.UserLogLevel.DEBUG_PROCESS: "DEBUG_PROCESS",
+ lttngctl.UserLogLevel.DEBUG_MODULE: "DEBUG_MODULE",
+ lttngctl.UserLogLevel.DEBUG_UNIT: "DEBUG_UNIT",
+ lttngctl.UserLogLevel.DEBUG_FUNCTION: "DEBUG_FUNCTION",
+ lttngctl.UserLogLevel.DEBUG_LINE: "DEBUG_LINE",
+ lttngctl.UserLogLevel.DEBUG: "DEBUG",
+ }[log_level]
+ elif isinstance(log_level, lttngctl.JULLogLevel):
+ return {
+ lttngctl.JULLogLevel.OFF: "OFF",
+ lttngctl.JULLogLevel.SEVERE: "SEVERE",
+ lttngctl.JULLogLevel.WARNING: "WARNING",
+ lttngctl.JULLogLevel.INFO: "INFO",
+ lttngctl.JULLogLevel.CONFIG: "CONFIG",
+ lttngctl.JULLogLevel.FINE: "FINE",
+ lttngctl.JULLogLevel.FINER: "FINER",
+ lttngctl.JULLogLevel.FINEST: "FINEST",
+ lttngctl.JULLogLevel.ALL: "ALL",
+ }[log_level]
+ elif isinstance(log_level, lttngctl.Log4jLogLevel):
+ return {
+ lttngctl.Log4jLogLevel.OFF: "OFF",
+ lttngctl.Log4jLogLevel.FATAL: "FATAL",
+ lttngctl.Log4jLogLevel.ERROR: "ERROR",
+ lttngctl.Log4jLogLevel.WARN: "WARN",
+ lttngctl.Log4jLogLevel.INFO: "INFO",
+ lttngctl.Log4jLogLevel.DEBUG: "DEBUG",
+ lttngctl.Log4jLogLevel.TRACE: "TRACE",
+ lttngctl.Log4jLogLevel.ALL: "ALL",
+ }[log_level]
+ elif isinstance(log_level, lttngctl.PythonLogLevel):
+ return {
+ lttngctl.PythonLogLevel.CRITICAL: "CRITICAL",
+ lttngctl.PythonLogLevel.ERROR: "ERROR",
+ lttngctl.PythonLogLevel.WARNING: "WARNING",
+ lttngctl.PythonLogLevel.INFO: "INFO",
+ lttngctl.PythonLogLevel.DEBUG: "DEBUG",
+ lttngctl.PythonLogLevel.NOTSET: "NOTSET",
+ }[log_level]
+
+ raise TypeError("Unknown log level type")
+
+
+def _get_log_level_from_mi_log_level_name(mi_log_level_name):
+ # type: (str) -> lttngctl.LogLevel
+ return {
+ "TRACE_EMERG": lttngctl.UserLogLevel.EMERGENCY,
+ "TRACE_ALERT": lttngctl.UserLogLevel.ALERT,
+ "TRACE_CRIT": lttngctl.UserLogLevel.CRITICAL,
+ "TRACE_ERR": lttngctl.UserLogLevel.ERROR,
+ "TRACE_WARNING": lttngctl.UserLogLevel.WARNING,
+ "TRACE_NOTICE": lttngctl.UserLogLevel.NOTICE,
+ "TRACE_INFO": lttngctl.UserLogLevel.INFO,
+ "TRACE_DEBUG_SYSTEM": lttngctl.UserLogLevel.DEBUG_SYSTEM,
+ "TRACE_DEBUG_PROGRAM": lttngctl.UserLogLevel.DEBUG_PROGRAM,
+ "TRACE_DEBUG_PROCESS": lttngctl.UserLogLevel.DEBUG_PROCESS,
+ "TRACE_DEBUG_MODULE": lttngctl.UserLogLevel.DEBUG_MODULE,
+ "TRACE_DEBUG_UNIT": lttngctl.UserLogLevel.DEBUG_UNIT,
+ "TRACE_DEBUG_FUNCTION": lttngctl.UserLogLevel.DEBUG_FUNCTION,
+ "TRACE_DEBUG_LINE": lttngctl.UserLogLevel.DEBUG_LINE,
+ "TRACE_DEBUG": lttngctl.UserLogLevel.DEBUG,
+ "JUL_OFF": lttngctl.JULLogLevel.OFF,
+ "JUL_SEVERE": lttngctl.JULLogLevel.SEVERE,
+ "JUL_WARNING": lttngctl.JULLogLevel.WARNING,
+ "JUL_INFO": lttngctl.JULLogLevel.INFO,
+ "JUL_CONFIG": lttngctl.JULLogLevel.CONFIG,
+ "JUL_FINE": lttngctl.JULLogLevel.FINE,
+ "JUL_FINER": lttngctl.JULLogLevel.FINER,
+ "JUL_FINEST": lttngctl.JULLogLevel.FINEST,
+ "JUL_ALL": lttngctl.JULLogLevel.ALL,
+ "LOG4J_OFF": lttngctl.Log4jLogLevel.OFF,
+ "LOG4J_FATAL": lttngctl.Log4jLogLevel.FATAL,
+ "LOG4J_ERROR": lttngctl.Log4jLogLevel.ERROR,
+ "LOG4J_WARN": lttngctl.Log4jLogLevel.WARN,
+ "LOG4J_INFO": lttngctl.Log4jLogLevel.INFO,
+ "LOG4J_DEBUG": lttngctl.Log4jLogLevel.DEBUG,
+ "LOG4J_TRACE": lttngctl.Log4jLogLevel.TRACE,
+ "LOG4J_ALL": lttngctl.Log4jLogLevel.ALL,
+ "PYTHON_CRITICAL": lttngctl.PythonLogLevel.CRITICAL,
+ "PYTHON_ERROR": lttngctl.PythonLogLevel.ERROR,
+ "PYTHON_WARNING": lttngctl.PythonLogLevel.WARNING,
+ "PYTHON_INFO": lttngctl.PythonLogLevel.INFO,
+ "PYTHON_DEBUG": lttngctl.PythonLogLevel.DEBUG,
+ "PYTHON_NOTSET": lttngctl.PythonLogLevel.NOTSET,
+ }[mi_log_level_name]
+
+
+def _get_tracepoint_event_rule_class_from_domain_type(domain_type):
+ # type: (lttngctl.TracingDomain) -> Type[lttngctl.UserTracepointEventRule] | Type[lttngctl.Log4jTracepointEventRule] | Type[lttngctl.JULTracepointEventRule] | Type[lttngctl.PythonTracepointEventRule] | Type[lttngctl.KernelTracepointEventRule]
+ return {
+ lttngctl.TracingDomain.User: lttngctl.UserTracepointEventRule,
+ lttngctl.TracingDomain.JUL: lttngctl.JULTracepointEventRule,
+ lttngctl.TracingDomain.Log4j: lttngctl.Log4jTracepointEventRule,
+ lttngctl.TracingDomain.Python: lttngctl.PythonTracepointEventRule,
+ lttngctl.TracingDomain.Kernel: lttngctl.KernelTracepointEventRule,
+ }[domain_type]
+
+
class _Channel(lttngctl.Channel):
def __init__(
self,
if rule.log_level_rule:
if isinstance(rule.log_level_rule, lttngctl.LogLevelRuleAsSevereAs):
client_args = client_args + " --loglevel {log_level}".format(
- log_level=rule.log_level_rule.level
+ log_level=_get_log_level_argument_name(
+ rule.log_level_rule.level
+ )
)
elif isinstance(rule.log_level_rule, lttngctl.LogLevelRuleExactly):
client_args = client_args + " --loglevel-only {log_level}".format(
- log_level=rule.log_level_rule.level
+ log_level=_get_log_level_argument_name(
+ rule.log_level_rule.level
+ )
)
else:
raise Unsupported(
# type: () -> lttngctl.TracingDomain
return self._domain
+ @property
+ def recording_rules(self):
+ # type: () -> Iterator[lttngctl.EventRule]
+ list_session_xml = self._client._run_cmd(
+ "list '{session_name}'".format(session_name=self._session.name),
+ LTTngClient.CommandOutputFormat.MI_XML,
+ )
+
+ root = xml.etree.ElementTree.fromstring(list_session_xml)
+ command_output = LTTngClient._mi_get_in_element(root, "output")
+ sessions = LTTngClient._mi_get_in_element(command_output, "sessions")
+
+ # The channel's session is supposed to be the only session returned by the command
+ if len(sessions) != 1:
+ raise InvalidMI(
+ "Only one session expected when listing with an explicit session name"
+ )
+ session = sessions[0]
+
+ # Look for the channel's domain
+ target_domain = None
+ target_domain_mi_name = _get_domain_xml_mi_name(self.domain)
+ for domain in LTTngClient._mi_get_in_element(session, "domains"):
+ if (
+ LTTngClient._mi_get_in_element(domain, "type").text
+ == target_domain_mi_name
+ ):
+ target_domain = domain
+
+ if target_domain is None:
+ raise ChannelNotFound(
+ "Failed to find channel `{channel_name}`: no channel in target domain".format(
+ channel_name=self.name
+ )
+ )
+
+ target_channel = None
+ for channel in LTTngClient._mi_get_in_element(target_domain, "channels"):
+ if LTTngClient._mi_get_in_element(channel, "name").text == self.name:
+ target_channel = channel
+ break
+
+ if target_channel is None:
+ raise ChannelNotFound(
+ "Failed to find channel `{channel_name}`: no such channel in target domain".format(
+ channel_name=self.name
+ )
+ )
+
+ tracepoint_event_rule_class = None
+
+ for event in LTTngClient._mi_get_in_element(target_channel, "events"):
+ # Note that the "enabled" property is ignored as it is not exposed by
+ # the EventRule interface.
+ pattern = LTTngClient._mi_get_in_element(event, "name").text
+ type = LTTngClient._mi_get_in_element(event, "type").text
+
+ filter_expression = None
+ filter_expression_element = LTTngClient._mi_find_in_element(
+ event, "filter_expression"
+ )
+ if filter_expression_element:
+ filter_expression = filter_expression_element.text
+
+ exclusions = []
+ for exclusion in LTTngClient._mi_get_in_element(event, "exclusions"):
+ exclusions.append(exclusion.text)
+
+ exclusions = exclusions if len(exclusions) > 0 else None
+
+ if type != "TRACEPOINT":
+ raise Unsupported(
+ "Non-tracepoint event rules are not supported by this Controller implementation"
+ )
+
+ tracepoint_event_rule_class = (
+ _get_tracepoint_event_rule_class_from_domain_type(self.domain)
+ )
+ event_rule = None
+ if self.domain != lttngctl.TracingDomain.Kernel:
+ log_level_element = LTTngClient._mi_find_in_element(event, "loglevel")
+ log_level_type_element = LTTngClient._mi_find_in_element(
+ event, "loglevel_type"
+ )
+
+ log_level_rule = None
+ if log_level_element is not None and log_level_type_element is not None:
+ if log_level_element.text is None:
+ raise InvalidMI("`loglevel` element of event rule has no text")
+
+ if log_level_type_element.text == "RANGE":
+ log_level_rule = lttngctl.LogLevelRuleAsSevereAs(
+ _get_log_level_from_mi_log_level_name(
+ log_level_element.text
+ )
+ )
+ elif log_level_type_element.text == "SINGLE":
+ log_level_rule = lttngctl.LogLevelRuleExactly(
+ _get_log_level_from_mi_log_level_name(
+ log_level_element.text
+ )
+ )
+
+ yield tracepoint_event_rule_class(
+ pattern, filter_expression, log_level_rule, exclusions
+ )
+ else:
+ yield tracepoint_event_rule_class(pattern, filter_expression)
+
@enum.unique
class _ProcessAttribute(enum.Enum):
# type: () -> str
return self._name
- def add_channel(self, domain, channel_name=None):
- # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
+ def add_channel(
+ self,
+ domain,
+ channel_name=None,
+ buffer_sharing_policy=lttngctl.BufferSharingPolicy.PerUID,
+ ):
+ # type: (lttngctl.TracingDomain, Optional[str], lttngctl.BufferSharingPolicy) -> lttngctl.Channel
channel_name = lttngctl.Channel._generate_name()
domain_option_name = _get_domain_option_name(domain)
self._client._run_cmd(
- "enable-channel --session '{session_name}' --{domain_name} '{channel_name}'".format(
+ "enable-channel --session '{session_name}' --{domain_name} '{channel_name}' {buffer_sharing_policy}".format(
session_name=self.name,
domain_name=domain_option_name,
channel_name=channel_name,
+ buffer_sharing_policy=(
+ "--buffers-uid"
+ if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
+ else "--buffers-pid"
+ ),
)
)
return _Channel(self._client, channel_name, domain, self)
# type: () -> None
self._client._run_cmd("stop '{session_name}'".format(session_name=self.name))
+ def clear(self):
+ # type: () -> None
+ self._client._run_cmd("clear '{session_name}'".format(session_name=self.name))
+
def destroy(self):
# type: () -> None
self._client._run_cmd("destroy '{session_name}'".format(session_name=self.name))
+ def rotate(self, wait=True):
+ # type: (bool) -> None
+ self._client.rotate_session_by_name(self.name, wait)
+
@property
def is_active(self):
# type: () -> bool
)
root = xml.etree.ElementTree.fromstring(list_session_xml)
- command_output = LTTngClient._mi_find_in_element(root, "output")
- sessions = LTTngClient._mi_find_in_element(command_output, "sessions")
- session_mi = LTTngClient._mi_find_in_element(sessions, "session")
+ command_output = LTTngClient._mi_get_in_element(root, "output")
+ sessions = LTTngClient._mi_get_in_element(command_output, "sessions")
+ session_mi = LTTngClient._mi_get_in_element(sessions, "session")
- enabled_text = LTTngClient._mi_find_in_element(session_mi, "enabled").text
+ enabled_text = LTTngClient._mi_get_in_element(session_mi, "enabled").text
if enabled_text not in ["true", "false"]:
raise InvalidMI(
"Expected boolean value in element '{}': value='{}'".format(
decoded_output = out.decode("utf-8")
for error_line in decoded_output.splitlines():
self._log(error_line)
+
raise LTTngClientError(command_args, decoded_output)
else:
return out.decode("utf-8")
name = name if name else lttngctl.Session._generate_name()
if isinstance(output, lttngctl.LocalSessionOutputLocation):
- output_option = "--output {output_path}".format(output_path=output.path)
+ output_option = "--output '{output_path}'".format(output_path=output.path)
elif output is None:
output_option = "--no-output"
else:
# type: () -> None
self._run_cmd("destroy --all")
+ def rotate_session_by_name(self, name, wait=True):
+ self._run_cmd(
+ "rotate '{session_name}' {wait_option}".format(
+ session_name=name, wait_option="-n" if wait is False else ""
+ )
+ )
+
+ def schedule_size_based_rotation(self, name, size_bytes):
+ # type (str, int) -> None
+ self._run_cmd(
+ "enable-rotation --session '{session_name}' --size {size}".format(
+ session_name=name, size=size_bytes
+ )
+ )
+
+ def schedule_time_based_rotation(self, name, period_seconds):
+ # type (str, int) -> None
+ self._run_cmd(
+ "enable-rotation --session '{session_name}' --timer {period_seconds}s".format(
+ session_name=name, period_seconds=period_seconds
+ )
+ )
+
@staticmethod
def _mi_find_in_element(element, sub_element_name):
+ # type: (xml.etree.ElementTree.Element, str) -> Optional[xml.etree.ElementTree.Element]
+ return element.find(LTTngClient._namespaced_mi_element(sub_element_name))
+
+ @staticmethod
+ def _mi_get_in_element(element, sub_element_name):
# type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element
- result = element.find(LTTngClient._namespaced_mi_element(sub_element_name))
+ result = LTTngClient._mi_find_in_element(element, sub_element_name)
if result is None:
raise InvalidMI(
"Failed to find element '{}' within command MI element '{}'".format(
)
root = xml.etree.ElementTree.fromstring(list_sessions_xml)
- command_output = self._mi_find_in_element(root, "output")
- sessions = self._mi_find_in_element(command_output, "sessions")
+ command_output = self._mi_get_in_element(root, "output")
+ sessions = self._mi_get_in_element(command_output, "sessions")
ctl_sessions = [] # type: list[lttngctl.Session]
for session_mi in sessions:
- name = self._mi_find_in_element(session_mi, "name").text
- path = self._mi_find_in_element(session_mi, "path").text
+ name = self._mi_get_in_element(session_mi, "name").text
+ path = self._mi_get_in_element(session_mi, "path").text
if name is None:
raise InvalidMI(