Tests: python: enum.auto() introduced in python 3.6
[lttng-tools.git] / tests / utils / lttngtest / lttng.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 #
5 # SPDX-License-Identifier: GPL-2.0-only
6
7 from concurrent.futures import process
8 from . import lttngctl, logger, environment
9 import pathlib
10 import os
11 from typing import Callable, Optional, Type, Union
12 import shlex
13 import subprocess
14 import enum
15
16 """
17 Implementation of the lttngctl interface based on the `lttng` command line client.
18 """
19
20
21 class Unsupported(lttngctl.ControlException):
22 def __init__(self, msg: str):
23 super().__init__(msg)
24
25
26 def _get_domain_option_name(domain: lttngctl.TracingDomain) -> str:
27 if domain == lttngctl.TracingDomain.User:
28 return "userspace"
29 elif domain == lttngctl.TracingDomain.Kernel:
30 return "kernel"
31 elif domain == lttngctl.TracingDomain.Log4j:
32 return "log4j"
33 elif domain == lttngctl.TracingDomain.JUL:
34 return "jul"
35 elif domain == lttngctl.TracingDomain.Python:
36 return "python"
37 else:
38 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
39
40
41 def _get_context_type_name(context: lttngctl.ContextType) -> str:
42 if isinstance(context, lttngctl.VgidContextType):
43 return "vgid"
44 elif isinstance(context, lttngctl.VuidContextType):
45 return "vuid"
46 elif isinstance(context, lttngctl.VpidContextType):
47 return "vpid"
48 elif isinstance(context, lttngctl.JavaApplicationContextType):
49 return "$app.{retriever}:{field}".format(
50 retriever=context.retriever_name, field=context.field_name
51 )
52 else:
53 raise Unsupported(
54 "Context `{context_name}` is not supported by the LTTng client".format(
55 type(context).__name__
56 )
57 )
58
59
60 class _Channel(lttngctl.Channel):
61 def __init__(
62 self,
63 client: "LTTngClient",
64 name: str,
65 domain: lttngctl.TracingDomain,
66 session: "_Session",
67 ):
68 self._client: LTTngClient = client
69 self._name: str = name
70 self._domain: lttngctl.TracingDomain = domain
71 self._session: _Session = session
72
73 def add_context(self, context_type: lttngctl.ContextType) -> None:
74 domain_option_name = _get_domain_option_name(self.domain)
75 context_type_name = _get_context_type_name(context_type)
76 self._client._run_cmd(
77 "add-context --{domain_option_name} --type {context_type_name}".format(
78 domain_option_name=domain_option_name,
79 context_type_name=context_type_name,
80 )
81 )
82
83 def add_recording_rule(self, rule: Type[lttngctl.EventRule]) -> None:
84 client_args = (
85 "enable-event --session {session_name} --channel {channel_name}".format(
86 session_name=self._session.name, channel_name=self.name
87 )
88 )
89 if isinstance(rule, lttngctl.TracepointEventRule):
90 domain_option_name = (
91 "userspace"
92 if isinstance(rule, lttngctl.UserTracepointEventRule)
93 else "kernel"
94 )
95 client_args = client_args + " --{domain_option_name}".format(
96 domain_option_name=domain_option_name
97 )
98
99 if rule.name_pattern:
100 client_args = client_args + " " + rule.name_pattern
101 else:
102 client_args = client_args + " --all"
103
104 if rule.filter_expression:
105 client_args = client_args + " " + rule.filter_expression
106
107 if rule.log_level_rule:
108 if isinstance(rule.log_level_rule, lttngctl.LogLevelRuleAsSevereAs):
109 client_args = client_args + " --loglevel {log_level}".format(
110 log_level=rule.log_level_rule.level
111 )
112 elif isinstance(rule.log_level_rule, lttngctl.LogLevelRuleExactly):
113 client_args = client_args + " --loglevel-only {log_level}".format(
114 log_level=rule.log_level_rule.level
115 )
116 else:
117 raise Unsupported(
118 "Unsupported log level rule type `{log_level_rule_type}`".format(
119 log_level_rule_type=type(rule.log_level_rule).__name__
120 )
121 )
122
123 if rule.name_pattern_exclusions:
124 client_args = client_args + " --exclude "
125 for idx, pattern in enumerate(rule.name_pattern_exclusions):
126 if idx != 0:
127 client_args = client_args + ","
128 client_args = client_args + pattern
129 else:
130 raise Unsupported(
131 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
132 event_rule_type=type(rule).__name__
133 )
134 )
135
136 self._client._run_cmd(client_args)
137
138 @property
139 def name(self) -> str:
140 return self._name
141
142 @property
143 def domain(self) -> lttngctl.TracingDomain:
144 return self._domain
145
146
147 @enum.unique
148 class _ProcessAttribute(enum.Enum):
149 PID = "Process ID"
150 VPID = "Virtual Process ID"
151 UID = "User ID"
152 VUID = "Virtual User ID"
153 GID = "Group ID"
154 VGID = "Virtual Group ID"
155
156 def __repr__(self):
157 return "<%s.%s>" % (self.__class__.__name__, self.name)
158
159
160 def _get_process_attribute_option_name(attribute: _ProcessAttribute) -> str:
161 return {
162 _ProcessAttribute.PID: "pid",
163 _ProcessAttribute.VPID: "vpid",
164 _ProcessAttribute.UID: "uid",
165 _ProcessAttribute.VUID: "vuid",
166 _ProcessAttribute.GID: "gid",
167 _ProcessAttribute.VGID: "vgid",
168 }[attribute]
169
170
171 class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
172 def __init__(
173 self,
174 client: "LTTngClient",
175 attribute: _ProcessAttribute,
176 domain: lttngctl.TracingDomain,
177 session: "_Session",
178 ):
179 self._client: LTTngClient = client
180 self._tracked_attribute: _ProcessAttribute = attribute
181 self._domain: lttngctl.TracingDomain = domain
182 self._session: "_Session" = session
183 if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
184 self._allowed_value_types: list[type] = [int, str]
185 else:
186 self._allowed_value_types: list[type] = [int]
187
188 def _call_client(self, cmd_name: str, value: Union[int, str]) -> None:
189 if type(value) not in self._allowed_value_types:
190 raise TypeError(
191 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
192 value_type=type(value).__name__,
193 attribute_name=self._tracked_attribute.name,
194 )
195 )
196
197 process_attribute_option_name = _get_process_attribute_option_name(
198 self._tracked_attribute
199 )
200 domain_name = _get_domain_option_name(self._domain)
201 self._client._run_cmd(
202 "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
203 cmd_name=cmd_name,
204 session_name=self._session.name,
205 domain_name=domain_name,
206 tracked_attribute_name=process_attribute_option_name,
207 value=value,
208 )
209 )
210
211 def track(self, value: Union[int, str]) -> None:
212 self._call_client("track", value)
213
214 def untrack(self, value: Union[int, str]) -> None:
215 self._call_client("untrack", value)
216
217
218 class _Session(lttngctl.Session):
219 def __init__(
220 self,
221 client: "LTTngClient",
222 name: str,
223 output: Optional[Type[lttngctl.SessionOutputLocation]],
224 ):
225 self._client: LTTngClient = client
226 self._name: str = name
227 self._output: Optional[Type[lttngctl.SessionOutputLocation]] = output
228
229 @property
230 def name(self) -> str:
231 return self._name
232
233 def add_channel(
234 self, domain: lttngctl.TracingDomain, channel_name: Optional[str] = None
235 ) -> lttngctl.Channel:
236 channel_name = lttngctl.Channel._generate_name()
237 domain_option_name = _get_domain_option_name(domain)
238 self._client._run_cmd(
239 "enable-channel --{domain_name} {channel_name}".format(
240 domain_name=domain_option_name, channel_name=channel_name
241 )
242 )
243 return _Channel(self._client, channel_name, domain, self)
244
245 def add_context(self, context_type: lttngctl.ContextType) -> None:
246 pass
247
248 @property
249 def output(self) -> Optional[Type[lttngctl.SessionOutputLocation]]:
250 return self._output
251
252 def start(self) -> None:
253 self._client._run_cmd("start {session_name}".format(session_name=self.name))
254
255 def stop(self) -> None:
256 self._client._run_cmd("stop {session_name}".format(session_name=self.name))
257
258 def destroy(self) -> None:
259 self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
260
261 @property
262 def kernel_pid_process_attribute_tracker(
263 self,
264 ) -> Type[lttngctl.ProcessIDProcessAttributeTracker]:
265 return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
266
267 @property
268 def kernel_vpid_process_attribute_tracker(
269 self,
270 ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
271 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
272
273 @property
274 def user_vpid_process_attribute_tracker(
275 self,
276 ) -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]:
277 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
278
279 @property
280 def kernel_gid_process_attribute_tracker(
281 self,
282 ) -> Type[lttngctl.GroupIDProcessAttributeTracker]:
283 return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
284
285 @property
286 def kernel_vgid_process_attribute_tracker(
287 self,
288 ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
289 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
290
291 @property
292 def user_vgid_process_attribute_tracker(
293 self,
294 ) -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]:
295 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
296
297 @property
298 def kernel_uid_process_attribute_tracker(
299 self,
300 ) -> Type[lttngctl.UserIDProcessAttributeTracker]:
301 return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
302
303 @property
304 def kernel_vuid_process_attribute_tracker(
305 self,
306 ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
307 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
308
309 @property
310 def user_vuid_process_attribute_tracker(
311 self,
312 ) -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]:
313 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
314
315
316 class LTTngClientError(lttngctl.ControlException):
317 def __init__(self, command_args: str, error_output: str):
318 self._command_args: str = command_args
319 self._output: str = error_output
320
321
322 class LTTngClient(logger._Logger, lttngctl.Controller):
323 """
324 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
325 """
326
327 def __init__(
328 self,
329 test_environment: environment._Environment,
330 log: Optional[Callable[[str], None]],
331 ):
332 logger._Logger.__init__(self, log)
333 self._environment: environment._Environment = test_environment
334
335 def _run_cmd(self, command_args: str) -> None:
336 """
337 Invoke the `lttng` client with a set of arguments. The command is
338 executed in the context of the client's test environment.
339 """
340 args: list[str] = [str(self._environment.lttng_client_path)]
341 args.extend(shlex.split(command_args))
342
343 self._log("lttng {command_args}".format(command_args=command_args))
344
345 client_env: dict[str, str] = os.environ.copy()
346 client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
347
348 process = subprocess.Popen(
349 args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=client_env
350 )
351
352 out = process.communicate()[0]
353
354 if process.returncode != 0:
355 decoded_output = out.decode("utf-8")
356 for error_line in decoded_output.splitlines():
357 self._log(error_line)
358 raise LTTngClientError(command_args, decoded_output)
359
360 def create_session(
361 self,
362 name: Optional[str] = None,
363 output: Optional[lttngctl.SessionOutputLocation] = None,
364 ) -> lttngctl.Session:
365 name = name if name else lttngctl.Session._generate_name()
366
367 if isinstance(output, lttngctl.LocalSessionOutputLocation):
368 output_option = "--output {output_path}".format(output_path=output.path)
369 elif output is None:
370 output_option = "--no-output"
371 else:
372 raise TypeError("LTTngClient only supports local or no output")
373
374 self._run_cmd(
375 "create {session_name} {output_option}".format(
376 session_name=name, output_option=output_option
377 )
378 )
379 return _Session(self, name, output)
This page took 0.03932 seconds and 4 git commands to generate.