Tests: python: use quoted annotations to support python <= 3.6
[lttng-tools.git] / tests / utils / lttngtest / lttng.py
CommitLineData
ef945e4d
JG
1#!/usr/bin/env python3
2#
3# Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4#
5# SPDX-License-Identifier: GPL-2.0-only
6
7from concurrent.futures import process
8from . import lttngctl, logger, environment
9import pathlib
10import os
11from typing import Callable, Optional, Type, Union
12import shlex
13import subprocess
14import enum
15
16"""
17Implementation of the lttngctl interface based on the `lttng` command line client.
18"""
19
20
21class Unsupported(lttngctl.ControlException):
ce8470c9
MJ
22 def __init__(self, msg):
23 # type: (str) -> None
ef945e4d
JG
24 super().__init__(msg)
25
26
ce8470c9
MJ
27def _get_domain_option_name(domain):
28 # type: (lttngctl.TracingDomain) -> str
ef945e4d
JG
29 if domain == lttngctl.TracingDomain.User:
30 return "userspace"
31 elif domain == lttngctl.TracingDomain.Kernel:
32 return "kernel"
33 elif domain == lttngctl.TracingDomain.Log4j:
34 return "log4j"
35 elif domain == lttngctl.TracingDomain.JUL:
36 return "jul"
37 elif domain == lttngctl.TracingDomain.Python:
38 return "python"
39 else:
40 raise Unsupported("Domain `{domain_name}` is not supported by the LTTng client")
41
42
ce8470c9
MJ
43def _get_context_type_name(context):
44 # type: (lttngctl.ContextType) -> str
ef945e4d
JG
45 if isinstance(context, lttngctl.VgidContextType):
46 return "vgid"
47 elif isinstance(context, lttngctl.VuidContextType):
48 return "vuid"
49 elif isinstance(context, lttngctl.VpidContextType):
50 return "vpid"
51 elif isinstance(context, lttngctl.JavaApplicationContextType):
52 return "$app.{retriever}:{field}".format(
53 retriever=context.retriever_name, field=context.field_name
54 )
55 else:
56 raise Unsupported(
57 "Context `{context_name}` is not supported by the LTTng client".format(
58 type(context).__name__
59 )
60 )
61
62
63class _Channel(lttngctl.Channel):
64 def __init__(
65 self,
ce8470c9
MJ
66 client, # type: LTTngClient
67 name, # type: str
68 domain, # type: lttngctl.TracingDomain
69 session, # type: _Session
ef945e4d 70 ):
ce8470c9
MJ
71 self._client = client # type: LTTngClient
72 self._name = name # type: str
73 self._domain = domain # type: lttngctl.TracingDomain
74 self._session = session # type: _Session
ef945e4d 75
ce8470c9
MJ
76 def add_context(self, context_type):
77 # type: (lttngctl.ContextType) -> None
ef945e4d
JG
78 domain_option_name = _get_domain_option_name(self.domain)
79 context_type_name = _get_context_type_name(context_type)
80 self._client._run_cmd(
81 "add-context --{domain_option_name} --type {context_type_name}".format(
82 domain_option_name=domain_option_name,
83 context_type_name=context_type_name,
84 )
85 )
86
ce8470c9
MJ
87 def add_recording_rule(self, rule):
88 # type: (Type[lttngctl.EventRule]) -> None
ef945e4d
JG
89 client_args = (
90 "enable-event --session {session_name} --channel {channel_name}".format(
91 session_name=self._session.name, channel_name=self.name
92 )
93 )
94 if isinstance(rule, lttngctl.TracepointEventRule):
95 domain_option_name = (
96 "userspace"
97 if isinstance(rule, lttngctl.UserTracepointEventRule)
98 else "kernel"
99 )
100 client_args = client_args + " --{domain_option_name}".format(
101 domain_option_name=domain_option_name
102 )
103
104 if rule.name_pattern:
105 client_args = client_args + " " + rule.name_pattern
106 else:
107 client_args = client_args + " --all"
108
109 if rule.filter_expression:
110 client_args = client_args + " " + rule.filter_expression
111
112 if rule.log_level_rule:
113 if isinstance(rule.log_level_rule, lttngctl.LogLevelRuleAsSevereAs):
114 client_args = client_args + " --loglevel {log_level}".format(
115 log_level=rule.log_level_rule.level
116 )
117 elif isinstance(rule.log_level_rule, lttngctl.LogLevelRuleExactly):
118 client_args = client_args + " --loglevel-only {log_level}".format(
119 log_level=rule.log_level_rule.level
120 )
121 else:
122 raise Unsupported(
123 "Unsupported log level rule type `{log_level_rule_type}`".format(
124 log_level_rule_type=type(rule.log_level_rule).__name__
125 )
126 )
127
128 if rule.name_pattern_exclusions:
129 client_args = client_args + " --exclude "
130 for idx, pattern in enumerate(rule.name_pattern_exclusions):
131 if idx != 0:
132 client_args = client_args + ","
133 client_args = client_args + pattern
134 else:
135 raise Unsupported(
136 "event rule type `{event_rule_type}` is unsupported by LTTng client".format(
137 event_rule_type=type(rule).__name__
138 )
139 )
140
141 self._client._run_cmd(client_args)
142
143 @property
ce8470c9
MJ
144 def name(self):
145 # type: () -> str
ef945e4d
JG
146 return self._name
147
148 @property
ce8470c9
MJ
149 def domain(self):
150 # type: () -> lttngctl.TracingDomain
ef945e4d
JG
151 return self._domain
152
153
544d8425 154@enum.unique
ef945e4d 155class _ProcessAttribute(enum.Enum):
544d8425
MJ
156 PID = "Process ID"
157 VPID = "Virtual Process ID"
158 UID = "User ID"
159 VUID = "Virtual User ID"
160 GID = "Group ID"
161 VGID = "Virtual Group ID"
162
163 def __repr__(self):
164 return "<%s.%s>" % (self.__class__.__name__, self.name)
ef945e4d
JG
165
166
ce8470c9
MJ
167def _get_process_attribute_option_name(attribute):
168 # type: (_ProcessAttribute) -> str
ef945e4d
JG
169 return {
170 _ProcessAttribute.PID: "pid",
171 _ProcessAttribute.VPID: "vpid",
172 _ProcessAttribute.UID: "uid",
173 _ProcessAttribute.VUID: "vuid",
174 _ProcessAttribute.GID: "gid",
175 _ProcessAttribute.VGID: "vgid",
176 }[attribute]
177
178
179class _ProcessAttributeTracker(lttngctl.ProcessAttributeTracker):
180 def __init__(
181 self,
ce8470c9
MJ
182 client, # type: LTTngClient
183 attribute, # type: _ProcessAttribute
184 domain, # type: lttngctl.TracingDomain
185 session, # type: _Session
ef945e4d 186 ):
ce8470c9
MJ
187 self._client = client # type: LTTngClient
188 self._tracked_attribute = attribute # type: _ProcessAttribute
189 self._domain = domain # type: lttngctl.TracingDomain
190 self._session = session # type: _Session
ef945e4d 191 if attribute == _ProcessAttribute.PID or attribute == _ProcessAttribute.VPID:
ce8470c9 192 self._allowed_value_types = [int, str] # type: list[type]
ef945e4d 193 else:
ce8470c9 194 self._allowed_value_types = [int] # type: list[type]
ef945e4d 195
ce8470c9
MJ
196 def _call_client(self, cmd_name, value):
197 # type: (str, Union[int, str]) -> None
ef945e4d
JG
198 if type(value) not in self._allowed_value_types:
199 raise TypeError(
200 "Value of type `{value_type}` is not allowed for process attribute {attribute_name}".format(
201 value_type=type(value).__name__,
202 attribute_name=self._tracked_attribute.name,
203 )
204 )
205
206 process_attribute_option_name = _get_process_attribute_option_name(
207 self._tracked_attribute
208 )
209 domain_name = _get_domain_option_name(self._domain)
210 self._client._run_cmd(
211 "{cmd_name} --session {session_name} --{domain_name} --{tracked_attribute_name} {value}".format(
212 cmd_name=cmd_name,
213 session_name=self._session.name,
214 domain_name=domain_name,
215 tracked_attribute_name=process_attribute_option_name,
216 value=value,
217 )
218 )
219
ce8470c9
MJ
220 def track(self, value):
221 # type: (Union[int, str]) -> None
ef945e4d
JG
222 self._call_client("track", value)
223
ce8470c9
MJ
224 def untrack(self, value):
225 # type: (Union[int, str]) -> None
ef945e4d
JG
226 self._call_client("untrack", value)
227
228
229class _Session(lttngctl.Session):
230 def __init__(
231 self,
ce8470c9
MJ
232 client, # type: LTTngClient
233 name, # type: str
234 output, # type: Optional[lttngctl.SessionOutputLocation]
ef945e4d 235 ):
ce8470c9
MJ
236 self._client = client # type: LTTngClient
237 self._name = name # type: str
238 self._output = output # type: Optional[lttngctl.SessionOutputLocation]
ef945e4d
JG
239
240 @property
ce8470c9
MJ
241 def name(self):
242 # type: () -> str
ef945e4d
JG
243 return self._name
244
ce8470c9
MJ
245 def add_channel(self, domain, channel_name=None):
246 # type: (lttngctl.TracingDomain, Optional[str]) -> lttngctl.Channel
ef945e4d
JG
247 channel_name = lttngctl.Channel._generate_name()
248 domain_option_name = _get_domain_option_name(domain)
249 self._client._run_cmd(
250 "enable-channel --{domain_name} {channel_name}".format(
251 domain_name=domain_option_name, channel_name=channel_name
252 )
253 )
254 return _Channel(self._client, channel_name, domain, self)
255
ce8470c9
MJ
256 def add_context(self, context_type):
257 # type: (lttngctl.ContextType) -> None
ef945e4d
JG
258 pass
259
260 @property
ce8470c9
MJ
261 def output(self):
262 # type: () -> "Optional[Type[lttngctl.SessionOutputLocation]]"
263 return self._output # type: ignore
ef945e4d 264
ce8470c9
MJ
265 def start(self):
266 # type: () -> None
ef945e4d
JG
267 self._client._run_cmd("start {session_name}".format(session_name=self.name))
268
ce8470c9
MJ
269 def stop(self):
270 # type: () -> None
ef945e4d
JG
271 self._client._run_cmd("stop {session_name}".format(session_name=self.name))
272
ce8470c9
MJ
273 def destroy(self):
274 # type: () -> None
ef945e4d
JG
275 self._client._run_cmd("destroy {session_name}".format(session_name=self.name))
276
277 @property
ce8470c9
MJ
278 def kernel_pid_process_attribute_tracker(self):
279 # type: () -> Type[lttngctl.ProcessIDProcessAttributeTracker]
ef945e4d
JG
280 return _ProcessAttributeTracker(self._client, _ProcessAttribute.PID, lttngctl.TracingDomain.Kernel, self) # type: ignore
281
282 @property
ce8470c9
MJ
283 def kernel_vpid_process_attribute_tracker(self):
284 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
ef945e4d
JG
285 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.Kernel, self) # type: ignore
286
287 @property
ce8470c9
MJ
288 def user_vpid_process_attribute_tracker(self):
289 # type: () -> Type[lttngctl.VirtualProcessIDProcessAttributeTracker]
ef945e4d
JG
290 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VPID, lttngctl.TracingDomain.User, self) # type: ignore
291
292 @property
ce8470c9
MJ
293 def kernel_gid_process_attribute_tracker(self):
294 # type: () -> Type[lttngctl.GroupIDProcessAttributeTracker]
ef945e4d
JG
295 return _ProcessAttributeTracker(self._client, _ProcessAttribute.GID, lttngctl.TracingDomain.Kernel, self) # type: ignore
296
297 @property
ce8470c9
MJ
298 def kernel_vgid_process_attribute_tracker(self):
299 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
ef945e4d
JG
300 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.Kernel, self) # type: ignore
301
302 @property
ce8470c9
MJ
303 def user_vgid_process_attribute_tracker(self):
304 # type: () -> Type[lttngctl.VirtualGroupIDProcessAttributeTracker]
ef945e4d
JG
305 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VGID, lttngctl.TracingDomain.User, self) # type: ignore
306
307 @property
ce8470c9
MJ
308 def kernel_uid_process_attribute_tracker(self):
309 # type: () -> Type[lttngctl.UserIDProcessAttributeTracker]
ef945e4d
JG
310 return _ProcessAttributeTracker(self._client, _ProcessAttribute.UID, lttngctl.TracingDomain.Kernel, self) # type: ignore
311
312 @property
ce8470c9
MJ
313 def kernel_vuid_process_attribute_tracker(self):
314 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
ef945e4d
JG
315 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.Kernel, self) # type: ignore
316
317 @property
ce8470c9
MJ
318 def user_vuid_process_attribute_tracker(self):
319 # type: () -> Type[lttngctl.VirtualUserIDProcessAttributeTracker]
ef945e4d
JG
320 return _ProcessAttributeTracker(self._client, _ProcessAttribute.VUID, lttngctl.TracingDomain.User, self) # type: ignore
321
322
323class LTTngClientError(lttngctl.ControlException):
ce8470c9
MJ
324 def __init__(
325 self,
326 command_args, # type: str
327 error_output, # type: str
328 ):
329 self._command_args = command_args # type: str
330 self._output = error_output # type: str
ef945e4d
JG
331
332
333class LTTngClient(logger._Logger, lttngctl.Controller):
334 """
335 Implementation of a LTTngCtl Controller that uses the `lttng` client as a back-end.
336 """
337
338 def __init__(
339 self,
ce8470c9
MJ
340 test_environment, # type: environment._Environment
341 log, # type: Optional[Callable[[str], None]]
ef945e4d
JG
342 ):
343 logger._Logger.__init__(self, log)
ce8470c9 344 self._environment = test_environment # type: environment._Environment
ef945e4d 345
ce8470c9
MJ
346 def _run_cmd(self, command_args):
347 # type: (str) -> None
ef945e4d
JG
348 """
349 Invoke the `lttng` client with a set of arguments. The command is
350 executed in the context of the client's test environment.
351 """
ce8470c9 352 args = [str(self._environment.lttng_client_path)] # type: list[str]
ef945e4d
JG
353 args.extend(shlex.split(command_args))
354
355 self._log("lttng {command_args}".format(command_args=command_args))
356
ce8470c9 357 client_env = os.environ.copy() # type: dict[str, str]
ef945e4d
JG
358 client_env["LTTNG_HOME"] = str(self._environment.lttng_home_location)
359
360 process = subprocess.Popen(
361 args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, env=client_env
362 )
363
364 out = process.communicate()[0]
365
366 if process.returncode != 0:
367 decoded_output = out.decode("utf-8")
368 for error_line in decoded_output.splitlines():
369 self._log(error_line)
370 raise LTTngClientError(command_args, decoded_output)
371
ce8470c9
MJ
372 def create_session(self, name=None, output=None):
373 # type: (Optional[str], Optional[lttngctl.SessionOutputLocation]) -> lttngctl.Session
ef945e4d
JG
374 name = name if name else lttngctl.Session._generate_name()
375
376 if isinstance(output, lttngctl.LocalSessionOutputLocation):
377 output_option = "--output {output_path}".format(output_path=output.path)
378 elif output is None:
379 output_option = "--no-output"
380 else:
381 raise TypeError("LTTngClient only supports local or no output")
382
383 self._run_cmd(
384 "create {session_name} {output_option}".format(
385 session_name=name, output_option=output_option
386 )
387 )
388 return _Session(self, name, output)
This page took 0.040275 seconds and 4 git commands to generate.