9ccea6b63f2b9325ef6ecd51362358f80a80ec60
[lttng-tools.git] / tests / utils / lttngtest / lttng.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 #
5 # SPDX-License-Identifier: GPL-2.0-only
6
7 from concurrent.futures import process
8 from . import lttngctl, logger, environment
9 import pathlib
10 import os
11 from typing import Callable, Optional, Type, Union
12 import shlex
13 import subprocess
14 import enum
15
16 """
17 Implementation of the lttngctl interface based on the `lttng` command line client.
18 """
19
20
21 class Unsupported(lttngctl.ControlException):
22 def __init__(self, msg):
23 # type: (str) -> None
24 super().__init__(msg)
25
26
27 def _get_domain_option_name(domain):
28 # type: (lttngctl.TracingDomain) -> str
29 if domain == lttngctl.TracingDomain.User:
30 return "userspace"
31 elif domain == lttngctl.TracingDomain.Kernel:
32 return "kernel"
33 elif domain == lttngctl.TracingDomain.Log4j:
34 return "log4j"
35 elif domain == lttngctl.TracingDomain.JUL:
36 return "jul"
37 elif domain == lttngctl.TracingDomain.Python:
38 return "python"
39 else:
40 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
41
42
43 def _get_context_type_name(context):
44 # type: (lttngctl.ContextType) -> str
45 if isinstance(context, lttngctl.VgidContextType):
46 return "vgid"
47 elif isinstance(context, lttngctl.VuidContextType):
48 return "vuid"
49 elif isinstance(context, lttngctl.VpidContextType):
50 return "vpid"
51 elif isinstance(context, lttngctl.JavaApplicationContextType):
52 return "$app.{retriever}:{field}".format(
53 retriever=context.retriever_name, field=context.field_name
54 )
55 else:
56 raise Unsupported(
57 "Context `{context_name}` is not supported by the LTTng client".format(
58 type(context).__name__
59 )
60 )
61
62
63 class _Channel(lttngctl.Channel):
64 def __init__(
65 self,
66 client, # type: LTTngClient
67 name, # type: str
68 domain, # type: lttngctl.TracingDomain
69 session, # type: _Session
70 ):
71 self._client = client # type: LTTngClient
72 self._name = name # type: str
73 self._domain = domain # type: lttngctl.TracingDomain
74 self._session = session # type: _Session
75
76 def add_context(self, context_type):
77 # type: (lttngctl.ContextType) -> None
78 domain_option_name = _get_domain_option_name(self.domain)
79 context_type_name = _get_context_type_name(context_type)
80 self._client._run_cmd(
81 "add-context --{domain_option_name} --type {context_type_name}".format(
82 domain_option_name=domain_option_name,
83 context_type_name=context_type_name,
84 )
85 )
86
87 def add_recording_rule(self, rule):
88 # type: (Type[lttngctl.EventRule]) -> None
89 client_args = (
90 "enable-event --session {session_name} --channel {channel_name}".format(
91 session_name=self._session.name, channel_name=self.name
92 )
93 )
94 if isinstance(rule, lttngctl.TracepointEventRule):
95 domain_option_name = (
96 "userspace"
97 if isinstance(rule, lttngctl.UserTracepointEventRule)
98 else "kernel"
99 )
100 client_args = client_args + " --{domain_option_name}".format(
101 domain_option_name=domain_option_name
102 )
103
104 if rule.name_pattern:
105 client_args = client_args + " " + rule.name_pattern
106 else:
107 client_args = client_args + " --all"
108
109 if rule.filter_expression:
110 client_args = client_args + " " + rule.filter_expression
111
112 if rule.log_level_rule:
113 if isinstance(rule.log_level_rule, lttngctl.LogLevelRuleAsSevereAs):
114 client_args = client_args + " --loglevel {log_level}".format(
115 log_level=rule.log_level_rule.level
116 )
117 elif isinstance(rule.log_level_rule, lttngctl.LogLevelRuleExactly):
118 client_args = client_args + " --loglevel-only {log_level}".format(
119 log_level=rule.log_level_rule.level
120 )
121 else:
122 raise Unsupported(
123 "Unsupported log level rule type `{log_level_rule_type}`".format(
124 log_level_rule_type=type(rule.log_level_rule).__name__
125 )
126 )
127
128 if rule.name_pattern_exclusions:
129 client_args = client_args + " --exclude "
130 for idx, pattern in enumerate(rule.name_pattern_exclusions):
131 if idx != 0:
132 client_args = client_args + ","
133 client_args = client_args + pattern
134 else:
135 raise Unsupported(
136 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
137 event_rule_type=type(rule).__name__
138 )
139 )
140
141 self._client._run_cmd(client_args)
142
143 @property
144 def name(self):
145 # type: () -> str
146 return self._name
147
148 @property
149 def domain(self):
150 # type: () -> lttngctl.TracingDomain
151 return self._domain
152
153
154 @enum.unique
155 class _ProcessAttribute(enum.Enum):
156 PID = "Process ID"
157 VPID = "Virtual Process ID"
158 UID = "User ID"
159 VUID = "Virtual User ID"
160 GID = "Group ID"
161 VGID = "Virtual Group ID"
162
163 def __repr__(self):
164 return "<%s.%s>" % (self.__class__.__name__, self.name)
165
166
167 def _get_process_attribute_option_name(attribute):
168 # type: (_ProcessAttribute) -> str
169 return {
170 _ProcessAttribute.PID: "pid",
171 _ProcessAttribute.VPID: "vpid",
172 _ProcessAttribute.UID: "uid",
173 _ProcessAttribute.VUID: "vuid",
174 _ProcessAttribute.GID: "gid",
175 _ProcessAttribute.VGID: "vgid",
176 }[attribute]
177
178
179 class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
180 def __init__(
181 self,
182 client, # type: LTTngClient
183 attribute, # type: _ProcessAttribute
184 domain, # type: lttngctl.TracingDomain
185 session, # type: _Session
186 ):
187 self._client = client # type: LTTngClient
188 self._tracked_attribute = attribute # type: _ProcessAttribute
189 self._domain = domain # type: lttngctl.TracingDomain
190 self._session = session # type: _Session
191 if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
192 self._allowed_value_types = [int, str] # type: list[type]
193 else:
194 self._allowed_value_types = [int] # type: list[type]
195
196 def _call_client(self, cmd_name, value):
197 # type: (str, Union[int, str]) -> None
198 if type(value) not in self._allowed_value_types:
199 raise TypeError(
200 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
201 value_type=type(value).__name__,
202 attribute_name=self._tracked_attribute.name,
203 )
204 )
205
206 process_attribute_option_name = _get_process_attribute_option_name(
207 self._tracked_attribute
208 )
209 domain_name = _get_domain_option_name(self._domain)
210 self._client._run_cmd(
211 "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
212 cmd_name=cmd_name,
213 session_name=self._session.name,
214 domain_name=domain_name,
215 tracked_attribute_name=process_attribute_option_name,
216 value=value,
217 )
218 )
219
220 def track(self, value):
221 # type: (Union[int, str]) -> None
222 self._call_client("track", value)
223
224 def untrack(self, value):
225 # type: (Union[int, str]) -> None
226 self._call_client("untrack", value)
227
228
229 class _Session(lttngctl.Session):
230 def __init__(
231 self,
232 client, # type: LTTngClient
233 name, # type: str
234 output, # type: Optional[lttngctl.SessionOutputLocation]
235 ):
236 self._client = client # type: LTTngClient
237 self._name = name # type: str
238 self._output = output # type: Optional[lttngctl.SessionOutputLocation]
239
240 @property
241 def name(self):
242 # type: () -> str
243 return self._name
244
245 def add_channel(self, domain, channel_name=None):
246 # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
247 channel_name = lttngctl.Channel._generate_name()
248 domain_option_name = _get_domain_option_name(domain)
249 self._client._run_cmd(
250 "enable-channel --{domain_name} {channel_name}".format(
251 domain_name=domain_option_name, channel_name=channel_name
252 )
253 )
254 return _Channel(self._client, channel_name, domain, self)
255
256 def add_context(self, context_type):
257 # type: (lttngctl.ContextType) -> None
258 pass
259
260 @property
261 def output(self):
262 # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
263 return self._output # type: ignore
264
265 def start(self):
266 # type: () -> None
267 self._client._run_cmd("start {session_name}".format(session_name=self.name))
268
269 def stop(self):
270 # type: () -> None
271 self._client._run_cmd("stop {session_name}".format(session_name=self.name))
272
273 def destroy(self):
274 # type: () -> None
275 self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
276
277 @property
278 def kernel_pid_process_attribute_tracker(self):
279 # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
280 return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
281
282 @property
283 def kernel_vpid_process_attribute_tracker(self):
284 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
285 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
286
287 @property
288 def user_vpid_process_attribute_tracker(self):
289 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
290 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
291
292 @property
293 def kernel_gid_process_attribute_tracker(self):
294 # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
295 return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
296
297 @property
298 def kernel_vgid_process_attribute_tracker(self):
299 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
300 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
301
302 @property
303 def user_vgid_process_attribute_tracker(self):
304 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
305 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
306
307 @property
308 def kernel_uid_process_attribute_tracker(self):
309 # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
310 return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
311
312 @property
313 def kernel_vuid_process_attribute_tracker(self):
314 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
315 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
316
317 @property
318 def user_vuid_process_attribute_tracker(self):
319 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
320 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
321
322
323 class LTTngClientError(lttngctl.ControlException):
324 def __init__(
325 self,
326 command_args, # type: str
327 error_output, # type: str
328 ):
329 self._command_args = command_args # type: str
330 self._output = error_output # type: str
331
332
333 class LTTngClient(logger._Logger, lttngctl.Controller):
334 """
335 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
336 """
337
338 def __init__(
339 self,
340 test_environment, # type: environment._Environment
341 log, # type: Optional[Callable[[str], None]]
342 ):
343 logger._Logger.__init__(self, log)
344 self._environment = test_environment # type: environment._Environment
345
346 def _run_cmd(self, command_args):
347 # type: (str) -> None
348 """
349 Invoke the `lttng` client with a set of arguments. The command is
350 executed in the context of the client's test environment.
351 """
352 args = [str(self._environment.lttng_client_path)] # type: list[str]
353 args.extend(shlex.split(command_args))
354
355 self._log("lttng {command_args}".format(command_args=command_args))
356
357 client_env = os.environ.copy() # type: dict[str, str]
358 client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
359
360 process = subprocess.Popen(
361 args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=client_env
362 )
363
364 out = process.communicate()[0]
365
366 if process.returncode != 0:
367 decoded_output = out.decode("utf-8")
368 for error_line in decoded_output.splitlines():
369 self._log(error_line)
370 raise LTTngClientError(command_args, decoded_output)
371
372 def create_session(self, name=None, output=None):
373 # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
374 name = name if name else lttngctl.Session._generate_name()
375
376 if isinstance(output, lttngctl.LocalSessionOutputLocation):
377 output_option = "--output {output_path}".format(output_path=output.path)
378 elif output is None:
379 output_option = "--no-output"
380 else:
381 raise TypeError("LTTngClient only supports local or no output")
382
383 self._run_cmd(
384 "create {session_name} {output_option}".format(
385 session_name=name, output_option=output_option
386 )
387 )
388 return _Session(self, name, output)
This page took 0.03602 seconds and 3 git commands to generate.