2b9dec5adafaba0772fd371740500db06a3d18ab
[lttng-tools.git] / tests / utils / lttngtest / lttng.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 #
5 # SPDX-License-Identifier: GPL-2.0-only
6
7
8 from . import lttngctl, logger, environment
9 import pathlib
10 import os
11 from typing import Callable, Optional, Type, Union
12 import shlex
13 import subprocess
14 import enum
15 import xml.etree.ElementTree
16
17 """
18 Implementation of the lttngctl interface based on the `lttng` command line client.
19 """
20
21
22 class Unsupported(lttngctl.ControlException):
23 def __init__(self, msg):
24 # type: (str) -> None
25 super().__init__(msg)
26
27
28 class InvalidMI(lttngctl.ControlException):
29 def __init__(self, msg):
30 # type: (str) -> None
31 super().__init__(msg)
32
33
34 def _get_domain_option_name(domain):
35 # type: (lttngctl.TracingDomain) -> str
36 if domain == lttngctl.TracingDomain.User:
37 return "userspace"
38 elif domain == lttngctl.TracingDomain.Kernel:
39 return "kernel"
40 elif domain == lttngctl.TracingDomain.Log4j:
41 return "log4j"
42 elif domain == lttngctl.TracingDomain.JUL:
43 return "jul"
44 elif domain == lttngctl.TracingDomain.Python:
45 return "python"
46 else:
47 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
48
49
50 def _get_context_type_name(context):
51 # type: (lttngctl.ContextType) -> str
52 if isinstance(context, lttngctl.VgidContextType):
53 return "vgid"
54 elif isinstance(context, lttngctl.VuidContextType):
55 return "vuid"
56 elif isinstance(context, lttngctl.VpidContextType):
57 return "vpid"
58 elif isinstance(context, lttngctl.JavaApplicationContextType):
59 return "$app.{retriever}:{field}".format(
60 retriever=context.retriever_name, field=context.field_name
61 )
62 else:
63 raise Unsupported(
64 "Context `{context_name}` is not supported by the LTTng client".format(
65 type(context).__name__
66 )
67 )
68
69
70 class _Channel(lttngctl.Channel):
71 def __init__(
72 self,
73 client, # type: LTTngClient
74 name, # type: str
75 domain, # type: lttngctl.TracingDomain
76 session, # type: _Session
77 ):
78 self._client = client # type: LTTngClient
79 self._name = name # type: str
80 self._domain = domain # type: lttngctl.TracingDomain
81 self._session = session # type: _Session
82
83 def add_context(self, context_type):
84 # type: (lttngctl.ContextType) -> None
85 domain_option_name = _get_domain_option_name(self.domain)
86 context_type_name = _get_context_type_name(context_type)
87 self._client._run_cmd(
88 "add-context --{domain_option_name} --channel '{channel_name}' --type {context_type_name}".format(
89 domain_option_name=domain_option_name,
90 channel_name=self.name,
91 context_type_name=context_type_name,
92 )
93 )
94
95 def add_recording_rule(self, rule):
96 # type: (Type[lttngctl.EventRule]) -> None
97 client_args = (
98 "enable-event --session {session_name} --channel {channel_name}".format(
99 session_name=self._session.name, channel_name=self.name
100 )
101 )
102 if isinstance(rule, lttngctl.TracepointEventRule):
103 domain_option_name = (
104 "userspace"
105 if isinstance(rule, lttngctl.UserTracepointEventRule)
106 else "kernel"
107 )
108 client_args = client_args + " --{domain_option_name}".format(
109 domain_option_name=domain_option_name
110 )
111
112 if rule.name_pattern:
113 client_args = client_args + " " + rule.name_pattern
114 else:
115 client_args = client_args + " --all"
116
117 if rule.filter_expression:
118 client_args = client_args + " " + rule.filter_expression
119
120 if rule.log_level_rule:
121 if isinstance(rule.log_level_rule, lttngctl.LogLevelRuleAsSevereAs):
122 client_args = client_args + " --loglevel {log_level}".format(
123 log_level=rule.log_level_rule.level
124 )
125 elif isinstance(rule.log_level_rule, lttngctl.LogLevelRuleExactly):
126 client_args = client_args + " --loglevel-only {log_level}".format(
127 log_level=rule.log_level_rule.level
128 )
129 else:
130 raise Unsupported(
131 "Unsupported log level rule type `{log_level_rule_type}`".format(
132 log_level_rule_type=type(rule.log_level_rule).__name__
133 )
134 )
135
136 if rule.name_pattern_exclusions:
137 client_args = client_args + " --exclude "
138 for idx, pattern in enumerate(rule.name_pattern_exclusions):
139 if idx != 0:
140 client_args = client_args + ","
141 client_args = client_args + pattern
142 else:
143 raise Unsupported(
144 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
145 event_rule_type=type(rule).__name__
146 )
147 )
148
149 self._client._run_cmd(client_args)
150
151 @property
152 def name(self):
153 # type: () -> str
154 return self._name
155
156 @property
157 def domain(self):
158 # type: () -> lttngctl.TracingDomain
159 return self._domain
160
161
162 @enum.unique
163 class _ProcessAttribute(enum.Enum):
164 PID = "Process ID"
165 VPID = "Virtual Process ID"
166 UID = "User ID"
167 VUID = "Virtual User ID"
168 GID = "Group ID"
169 VGID = "Virtual Group ID"
170
171 def __repr__(self):
172 return "<%s.%s>" % (self.__class__.__name__, self.name)
173
174
175 def _get_process_attribute_option_name(attribute):
176 # type: (_ProcessAttribute) -> str
177 return {
178 _ProcessAttribute.PID: "pid",
179 _ProcessAttribute.VPID: "vpid",
180 _ProcessAttribute.UID: "uid",
181 _ProcessAttribute.VUID: "vuid",
182 _ProcessAttribute.GID: "gid",
183 _ProcessAttribute.VGID: "vgid",
184 }[attribute]
185
186
187 class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
188 def __init__(
189 self,
190 client, # type: LTTngClient
191 attribute, # type: _ProcessAttribute
192 domain, # type: lttngctl.TracingDomain
193 session, # type: _Session
194 ):
195 self._client = client # type: LTTngClient
196 self._tracked_attribute = attribute # type: _ProcessAttribute
197 self._domain = domain # type: lttngctl.TracingDomain
198 self._session = session # type: _Session
199 if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
200 self._allowed_value_types = [int, str] # type: list[type]
201 else:
202 self._allowed_value_types = [int] # type: list[type]
203
204 def _call_client(self, cmd_name, value):
205 # type: (str, Union[int, str]) -> None
206 if type(value) not in self._allowed_value_types:
207 raise TypeError(
208 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
209 value_type=type(value).__name__,
210 attribute_name=self._tracked_attribute.name,
211 )
212 )
213
214 process_attribute_option_name = _get_process_attribute_option_name(
215 self._tracked_attribute
216 )
217 domain_name = _get_domain_option_name(self._domain)
218 self._client._run_cmd(
219 "{cmd_name} --session '{session_name}' --{domain_name} --{tracked_attribute_name} {value}".format(
220 cmd_name=cmd_name,
221 session_name=self._session.name,
222 domain_name=domain_name,
223 tracked_attribute_name=process_attribute_option_name,
224 value=value,
225 )
226 )
227
228 def track(self, value):
229 # type: (Union[int, str]) -> None
230 self._call_client("track", value)
231
232 def untrack(self, value):
233 # type: (Union[int, str]) -> None
234 self._call_client("untrack", value)
235
236
237 class _Session(lttngctl.Session):
238 def __init__(
239 self,
240 client, # type: LTTngClient
241 name, # type: str
242 output, # type: Optional[lttngctl.SessionOutputLocation]
243 ):
244 self._client = client # type: LTTngClient
245 self._name = name # type: str
246 self._output = output # type: Optional[lttngctl.SessionOutputLocation]
247
248 @property
249 def name(self):
250 # type: () -> str
251 return self._name
252
253 def add_channel(
254 self,
255 domain,
256 channel_name=None,
257 buffer_sharing_policy=lttngctl.BufferSharingPolicy.PerUID,
258 ):
259 # type: (lttngctl.TracingDomain, Optional[str], lttngctl.BufferSharingPolicy) -> lttngctl.Channel
260 channel_name = lttngctl.Channel._generate_name()
261 domain_option_name = _get_domain_option_name(domain)
262 self._client._run_cmd(
263 "enable-channel --session '{session_name}' --{domain_name} '{channel_name}' {buffer_sharing_policy}".format(
264 session_name=self.name,
265 domain_name=domain_option_name,
266 channel_name=channel_name,
267 buffer_sharing_policy="--buffers-uid"
268 if buffer_sharing_policy == lttngctl.BufferSharingPolicy.PerUID
269 else "--buffers-pid",
270 )
271 )
272 return _Channel(self._client, channel_name, domain, self)
273
274 def add_context(self, context_type):
275 # type: (lttngctl.ContextType) -> None
276 pass
277
278 @property
279 def output(self):
280 # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
281 return self._output # type: ignore
282
283 def start(self):
284 # type: () -> None
285 self._client._run_cmd("start '{session_name}'".format(session_name=self.name))
286
287 def stop(self):
288 # type: () -> None
289 self._client._run_cmd("stop '{session_name}'".format(session_name=self.name))
290
291 def destroy(self):
292 # type: () -> None
293 self._client._run_cmd("destroy '{session_name}'".format(session_name=self.name))
294
295 @property
296 def is_active(self):
297 # type: () -> bool
298 list_session_xml = self._client._run_cmd(
299 "list '{session_name}'".format(session_name=self.name),
300 LTTngClient.CommandOutputFormat.MI_XML,
301 )
302
303 root = xml.etree.ElementTree.fromstring(list_session_xml)
304 command_output = LTTngClient._mi_find_in_element(root, "output")
305 sessions = LTTngClient._mi_find_in_element(command_output, "sessions")
306 session_mi = LTTngClient._mi_find_in_element(sessions, "session")
307
308 enabled_text = LTTngClient._mi_find_in_element(session_mi, "enabled").text
309 if enabled_text not in ["true", "false"]:
310 raise InvalidMI(
311 "Expected boolean value in element '{}': value='{}'".format(
312 session_mi.tag, enabled_text
313 )
314 )
315
316 return enabled_text == "true"
317
318 @property
319 def kernel_pid_process_attribute_tracker(self):
320 # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
321 return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
322
323 @property
324 def kernel_vpid_process_attribute_tracker(self):
325 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
326 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
327
328 @property
329 def user_vpid_process_attribute_tracker(self):
330 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
331 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
332
333 @property
334 def kernel_gid_process_attribute_tracker(self):
335 # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
336 return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
337
338 @property
339 def kernel_vgid_process_attribute_tracker(self):
340 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
341 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
342
343 @property
344 def user_vgid_process_attribute_tracker(self):
345 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
346 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
347
348 @property
349 def kernel_uid_process_attribute_tracker(self):
350 # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
351 return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
352
353 @property
354 def kernel_vuid_process_attribute_tracker(self):
355 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
356 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
357
358 @property
359 def user_vuid_process_attribute_tracker(self):
360 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
361 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
362
363
364 class LTTngClientError(lttngctl.ControlException):
365 def __init__(
366 self,
367 command_args, # type: str
368 error_output, # type: str
369 ):
370 self._command_args = command_args # type: str
371 self._output = error_output # type: str
372
373
374 class LTTngClient(logger._Logger, lttngctl.Controller):
375 """
376 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
377 """
378
379 class CommandOutputFormat(enum.Enum):
380 MI_XML = 0
381 HUMAN = 1
382
383 _MI_NS = "{https://lttng.org/xml/ns/lttng-mi}"
384
385 def __init__(
386 self,
387 test_environment, # type: environment._Environment
388 log, # type: Optional[Callable[[str], None]]
389 ):
390 logger._Logger.__init__(self, log)
391 self._environment = test_environment # type: environment._Environment
392
393 @staticmethod
394 def _namespaced_mi_element(property):
395 # type: (str) -> str
396 return LTTngClient._MI_NS + property
397
398 def _run_cmd(self, command_args, output_format=CommandOutputFormat.MI_XML):
399 # type: (str, CommandOutputFormat) -> str
400 """
401 Invoke the `lttng` client with a set of arguments. The command is
402 executed in the context of the client's test environment.
403 """
404 args = [str(self._environment.lttng_client_path)] # type: list[str]
405 if output_format == LTTngClient.CommandOutputFormat.MI_XML:
406 args.extend(["--mi", "xml"])
407
408 args.extend(shlex.split(command_args))
409
410 self._log("lttng {command_args}".format(command_args=command_args))
411
412 client_env = os.environ.copy() # type: dict[str, str]
413 client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
414
415 process = subprocess.Popen(
416 args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=client_env
417 )
418
419 out = process.communicate()[0]
420
421 if process.returncode != 0:
422 decoded_output = out.decode("utf-8")
423 for error_line in decoded_output.splitlines():
424 self._log(error_line)
425 raise LTTngClientError(command_args, decoded_output)
426 else:
427 return out.decode("utf-8")
428
429 def create_session(self, name=None, output=None):
430 # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
431 name = name if name else lttngctl.Session._generate_name()
432
433 if isinstance(output, lttngctl.LocalSessionOutputLocation):
434 output_option = "--output {output_path}".format(output_path=output.path)
435 elif output is None:
436 output_option = "--no-output"
437 else:
438 raise TypeError("LTTngClient only supports local or no output")
439
440 self._run_cmd(
441 "create '{session_name}' {output_option}".format(
442 session_name=name, output_option=output_option
443 )
444 )
445 return _Session(self, name, output)
446
447 def start_session_by_name(self, name):
448 # type: (str) -> None
449 self._run_cmd("start '{session_name}'".format(session_name=name))
450
451 def start_session_by_glob_pattern(self, pattern):
452 # type: (str) -> None
453 self._run_cmd("start --glob '{pattern}'".format(pattern=pattern))
454
455 def start_sessions_all(self):
456 # type: () -> None
457 self._run_cmd("start --all")
458
459 def stop_session_by_name(self, name):
460 # type: (str) -> None
461 self._run_cmd("stop '{session_name}'".format(session_name=name))
462
463 def stop_session_by_glob_pattern(self, pattern):
464 # type: (str) -> None
465 self._run_cmd("stop --glob '{pattern}'".format(pattern=pattern))
466
467 def stop_sessions_all(self):
468 # type: () -> None
469 self._run_cmd("stop --all")
470
471 def destroy_session_by_name(self, name):
472 # type: (str) -> None
473 self._run_cmd("destroy '{session_name}'".format(session_name=name))
474
475 def destroy_session_by_glob_pattern(self, pattern):
476 # type: (str) -> None
477 self._run_cmd("destroy --glob '{pattern}'".format(pattern=pattern))
478
479 def destroy_sessions_all(self):
480 # type: () -> None
481 self._run_cmd("destroy --all")
482
483 @staticmethod
484 def _mi_find_in_element(element, sub_element_name):
485 # type: (xml.etree.ElementTree.Element, str) -> xml.etree.ElementTree.Element
486 result = element.find(LTTngClient._namespaced_mi_element(sub_element_name))
487 if result is None:
488 raise InvalidMI(
489 "Failed to find element '{}' within command MI element '{}'".format(
490 element.tag, sub_element_name
491 )
492 )
493
494 return result
495
496 def list_sessions(self):
497 # type () -> List[Session]
498 list_sessions_xml = self._run_cmd(
499 "list", LTTngClient.CommandOutputFormat.MI_XML
500 )
501
502 root = xml.etree.ElementTree.fromstring(list_sessions_xml)
503 command_output = self._mi_find_in_element(root, "output")
504 sessions = self._mi_find_in_element(command_output, "sessions")
505
506 ctl_sessions = [] # type: list[lttngctl.Session]
507
508 for session_mi in sessions:
509 name = self._mi_find_in_element(session_mi, "name").text
510 path = self._mi_find_in_element(session_mi, "path").text
511
512 if name is None:
513 raise InvalidMI(
514 "Invalid empty 'name' element in '{}'".format(session_mi.tag)
515 )
516 if path is None:
517 raise InvalidMI(
518 "Invalid empty 'path' element in '{}'".format(session_mi.tag)
519 )
520 if not path.startswith("/"):
521 raise Unsupported(
522 "{} does not support non-local session outputs".format(type(self))
523 )
524
525 ctl_sessions.append(
526 _Session(self, name, lttngctl.LocalSessionOutputLocation(path))
527 )
528
529 return ctl_sessions
This page took 0.040741 seconds and 3 git commands to generate.