3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 # SPDX-License-Identifier: GPL-2.0-only
7 from concurrent
.futures
import process
8 from . import lttngctl
, logger
, environment
11 from typing
import Callable
, Optional
, Type
, Union
17 Implementation of the lttngctl interface based on the `lttng` command line client.
21 class Unsupported(lttngctl
.ControlException
):
22 def __init__(self
, msg
: str):
26 def _get_domain_option_name(domain
: lttngctl
.TracingDomain
) -> str:
27 if domain
== lttngctl
.TracingDomain
.User
:
29 elif domain
== lttngctl
.TracingDomain
.Kernel
:
31 elif domain
== lttngctl
.TracingDomain
.Log4j
:
33 elif domain
== lttngctl
.TracingDomain
.JUL
:
35 elif domain
== lttngctl
.TracingDomain
.Python
:
38 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
41 def _get_context_type_name(context
: lttngctl
.ContextType
) -> str:
42 if isinstance(context
, lttngctl
.VgidContextType
):
44 elif isinstance(context
, lttngctl
.VuidContextType
):
46 elif isinstance(context
, lttngctl
.VpidContextType
):
48 elif isinstance(context
, lttngctl
.JavaApplicationContextType
):
49 return "$app.{retriever}:{field}".format(
50 retriever
=context
.retriever_name
, field
=context
.field_name
54 "Context `{context_name}` is not supported by the LTTng client".format(
55 type(context
).__name
__
60 class _Channel(lttngctl
.Channel
):
63 client
: "LTTngClient",
65 domain
: lttngctl
.TracingDomain
,
68 self
._client
: LTTngClient
= client
69 self
._name
: str = name
70 self
._domain
: lttngctl
.TracingDomain
= domain
71 self
._session
: _Session
= session
73 def add_context(self
, context_type
: lttngctl
.ContextType
) -> None:
74 domain_option_name
= _get_domain_option_name(self
.domain
)
75 context_type_name
= _get_context_type_name(context_type
)
76 self
._client
._run
_cmd
(
77 "add-context --{domain_option_name} --type {context_type_name}".format(
78 domain_option_name
=domain_option_name
,
79 context_type_name
=context_type_name
,
83 def add_recording_rule(self
, rule
: Type
[lttngctl
.EventRule
]) -> None:
85 "enable-event --session {session_name} --channel {channel_name}".format(
86 session_name
=self
._session
.name
, channel_name
=self
.name
89 if isinstance(rule
, lttngctl
.TracepointEventRule
):
90 domain_option_name
= (
92 if isinstance(rule
, lttngctl
.UserTracepointEventRule
)
95 client_args
= client_args
+ " --{domain_option_name}".format(
96 domain_option_name
=domain_option_name
100 client_args
= client_args
+ " " + rule
.name_pattern
102 client_args
= client_args
+ " --all"
104 if rule
.filter_expression
:
105 client_args
= client_args
+ " " + rule
.filter_expression
107 if rule
.log_level_rule
:
108 if isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleAsSevereAs
):
109 client_args
= client_args
+ " --loglevel {log_level}".format(
110 log_level
=rule
.log_level_rule
.level
112 elif isinstance(rule
.log_level_rule
, lttngctl
.LogLevelRuleExactly
):
113 client_args
= client_args
+ " --loglevel-only {log_level}".format(
114 log_level
=rule
.log_level_rule
.level
118 "Unsupported log level rule type `{log_level_rule_type}`".format(
119 log_level_rule_type
=type(rule
.log_level_rule
).__name
__
123 if rule
.name_pattern_exclusions
:
124 client_args
= client_args
+ " --exclude "
125 for idx
, pattern
in enumerate(rule
.name_pattern_exclusions
):
127 client_args
= client_args
+ ","
128 client_args
= client_args
+ pattern
131 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
132 event_rule_type
=type(rule
).__name
__
136 self
._client
._run
_cmd
(client_args
)
139 def name(self
) -> str:
143 def domain(self
) -> lttngctl
.TracingDomain
:
148 class _ProcessAttribute(enum
.Enum
):
150 VPID
= "Virtual Process ID"
152 VUID
= "Virtual User ID"
154 VGID
= "Virtual Group ID"
157 return "<%s.%s>" % (self
.__class
__.__name
__, self
.name
)
160 def _get_process_attribute_option_name(attribute
: _ProcessAttribute
) -> str:
162 _ProcessAttribute
.PID
: "pid",
163 _ProcessAttribute
.VPID
: "vpid",
164 _ProcessAttribute
.UID
: "uid",
165 _ProcessAttribute
.VUID
: "vuid",
166 _ProcessAttribute
.GID
: "gid",
167 _ProcessAttribute
.VGID
: "vgid",
171 class _ProcessAttributeTracker(lttngctl
.ProcessAttributeTracker
):
174 client
: "LTTngClient",
175 attribute
: _ProcessAttribute
,
176 domain
: lttngctl
.TracingDomain
,
179 self
._client
: LTTngClient
= client
180 self
._tracked
_attribute
: _ProcessAttribute
= attribute
181 self
._domain
: lttngctl
.TracingDomain
= domain
182 self
._session
: "_Session" = session
183 if attribute
== _ProcessAttribute
.PID
or attribute
== _ProcessAttribute
.VPID
:
184 self
._allowed
_value
_types
: list[type] = [int, str]
186 self
._allowed
_value
_types
: list[type] = [int]
188 def _call_client(self
, cmd_name
: str, value
: Union
[int, str]) -> None:
189 if type(value
) not in self
._allowed
_value
_types
:
191 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
192 value_type
=type(value
).__name
__,
193 attribute_name
=self
._tracked
_attribute
.name
,
197 process_attribute_option_name
= _get_process_attribute_option_name(
198 self
._tracked
_attribute
200 domain_name
= _get_domain_option_name(self
._domain
)
201 self
._client
._run
_cmd
(
202 "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
204 session_name
=self
._session
.name
,
205 domain_name
=domain_name
,
206 tracked_attribute_name
=process_attribute_option_name
,
211 def track(self
, value
: Union
[int, str]) -> None:
212 self
._call
_client
("track", value
)
214 def untrack(self
, value
: Union
[int, str]) -> None:
215 self
._call
_client
("untrack", value
)
218 class _Session(lttngctl
.Session
):
221 client
: "LTTngClient",
223 output
: Optional
[Type
[lttngctl
.SessionOutputLocation
]],
225 self
._client
: LTTngClient
= client
226 self
._name
: str = name
227 self
._output
: Optional
[Type
[lttngctl
.SessionOutputLocation
]] = output
230 def name(self
) -> str:
234 self
, domain
: lttngctl
.TracingDomain
, channel_name
: Optional
[str] = None
235 ) -> lttngctl
.Channel
:
236 channel_name
= lttngctl
.Channel
._generate
_name
()
237 domain_option_name
= _get_domain_option_name(domain
)
238 self
._client
._run
_cmd
(
239 "enable-channel --{domain_name} {channel_name}".format(
240 domain_name
=domain_option_name
, channel_name
=channel_name
243 return _Channel(self
._client
, channel_name
, domain
, self
)
245 def add_context(self
, context_type
: lttngctl
.ContextType
) -> None:
249 def output(self
) -> Optional
[Type
[lttngctl
.SessionOutputLocation
]]:
252 def start(self
) -> None:
253 self
._client
._run
_cmd
("start {session_name}".format(session_name
=self
.name
))
255 def stop(self
) -> None:
256 self
._client
._run
_cmd
("stop {session_name}".format(session_name
=self
.name
))
258 def destroy(self
) -> None:
259 self
._client
._run
_cmd
("destroy {session_name}".format(session_name
=self
.name
))
262 def kernel_pid_process_attribute_tracker(
264 ) -> Type
[lttngctl
.ProcessIDProcessAttributeTracker
]:
265 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.PID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
268 def kernel_vpid_process_attribute_tracker(
270 ) -> Type
[lttngctl
.VirtualProcessIDProcessAttributeTracker
]:
271 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
274 def user_vpid_process_attribute_tracker(
276 ) -> Type
[lttngctl
.VirtualProcessIDProcessAttributeTracker
]:
277 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VPID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
280 def kernel_gid_process_attribute_tracker(
282 ) -> Type
[lttngctl
.GroupIDProcessAttributeTracker
]:
283 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.GID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
286 def kernel_vgid_process_attribute_tracker(
288 ) -> Type
[lttngctl
.VirtualGroupIDProcessAttributeTracker
]:
289 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
292 def user_vgid_process_attribute_tracker(
294 ) -> Type
[lttngctl
.VirtualGroupIDProcessAttributeTracker
]:
295 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VGID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
298 def kernel_uid_process_attribute_tracker(
300 ) -> Type
[lttngctl
.UserIDProcessAttributeTracker
]:
301 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.UID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
304 def kernel_vuid_process_attribute_tracker(
306 ) -> Type
[lttngctl
.VirtualUserIDProcessAttributeTracker
]:
307 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.Kernel
, self
) # type: ignore
310 def user_vuid_process_attribute_tracker(
312 ) -> Type
[lttngctl
.VirtualUserIDProcessAttributeTracker
]:
313 return _ProcessAttributeTracker(self
._client
, _ProcessAttribute
.VUID
, lttngctl
.TracingDomain
.User
, self
) # type: ignore
316 class LTTngClientError(lttngctl
.ControlException
):
317 def __init__(self
, command_args
: str, error_output
: str):
318 self
._command
_args
: str = command_args
319 self
._output
: str = error_output
322 class LTTngClient(logger
._Logger
, lttngctl
.Controller
):
324 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
329 test_environment
: environment
._Environment
,
330 log
: Optional
[Callable
[[str], None]],
332 logger
._Logger
.__init
__(self
, log
)
333 self
._environment
: environment
._Environment
= test_environment
335 def _run_cmd(self
, command_args
: str) -> None:
337 Invoke the `lttng` client with a set of arguments. The command is
338 executed in the context of the client's test environment.
340 args
: list[str] = [str(self
._environment
.lttng_client_path
)]
341 args
.extend(shlex
.split(command_args
))
343 self
._log
("lttng {command_args}".format(command_args
=command_args
))
345 client_env
: dict[str, str] = os
.environ
.copy()
346 client_env
["LTTNG_HOME"] = str(self
._environment
.lttng_home_location
)
348 process
= subprocess
.Popen(
349 args
, stdout
=subprocess
.PIPE
, stderr
=subprocess
.STDOUT
, env
=client_env
352 out
= process
.communicate()[0]
354 if process
.returncode
!= 0:
355 decoded_output
= out
.decode("utf-8")
356 for error_line
in decoded_output
.splitlines():
357 self
._log
(error_line
)
358 raise LTTngClientError(command_args
, decoded_output
)
362 name
: Optional
[str] = None,
363 output
: Optional
[lttngctl
.SessionOutputLocation
] = None,
364 ) -> lttngctl
.Session
:
365 name
= name
if name
else lttngctl
.Session
._generate
_name
()
367 if isinstance(output
, lttngctl
.LocalSessionOutputLocation
):
368 output_option
= "--output {output_path}".format(output_path
=output
.path
)
370 output_option
= "--no-output"
372 raise TypeError("LTTngClient only supports local or no output")
375 "create {session_name} {output_option}".format(
376 session_name
=name
, output_option
=output_option
379 return _Session(self
, name
, output
)