b99293424aac705728c13c47716b5b9f516951ab
[lttng-ust.git] / src / python-lttngust / lttngust / agent.py
1 # -*- coding: utf-8 -*-
2 #
3 # SPDX-License-Identifier: LGPL-2.1-only
4 #
5 # Copyright (C) 2015 Philippe Proulx <pproulx@efficios.com>
6 # Copyright (C) 2014 David Goulet <dgoulet@efficios.com>
7
8 from __future__ import unicode_literals
9 from __future__ import print_function
10 from __future__ import division
11 import lttngust.debug as dbg
12 import lttngust.loghandler
13 import lttngust.compat
14 import lttngust.cmd
15 from io import open
16 import threading
17 import logging
18 import socket
19 import time
20 import sys
21 import os
22
23
24 try:
25 # Python 2
26 import Queue as queue
27 except ImportError:
28 # Python 3
29 import queue
30
31
32 _PROTO_DOMAIN = 5
33 _PROTO_MAJOR = 2
34 _PROTO_MINOR = 0
35
36
37 def _get_env_value_ms(key, default_s):
38 try:
39 val = int(os.getenv(key, default_s * 1000)) / 1000
40 except:
41 val = -1
42
43 if val < 0:
44 fmt = 'invalid ${} value; {} seconds will be used'
45 dbg._pwarning(fmt.format(key, default_s))
46 val = default_s
47
48 return val
49
50
51 _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
52 _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
53
54
55 class _TcpClient(object):
56 def __init__(self, name, host, port, reg_queue):
57 super(self.__class__, self).__init__()
58 self._name = name
59 self._host = host
60 self._port = port
61
62 try:
63 self._log_handler = lttngust.loghandler._Handler()
64 except (OSError) as e:
65 dbg._pwarning('cannot load library: {}'.format(e))
66 raise e
67
68 self._root_logger = logging.getLogger()
69 self._root_logger.setLevel(logging.NOTSET)
70 self._ref_count = 0
71 self._sessiond_sock = None
72 self._reg_queue = reg_queue
73 self._server_cmd_handlers = {
74 lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
75 lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
76 lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
77 lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
78 }
79
80 def _debug(self, msg):
81 return 'client "{}": {}'.format(self._name, msg)
82
83 def run(self):
84 while True:
85 try:
86 # connect to the session daemon
87 dbg._pdebug(self._debug('connecting to session daemon'))
88 self._connect_to_sessiond()
89
90 # register to the session daemon after a successful connection
91 dbg._pdebug(self._debug('registering to session daemon'))
92 self._register()
93
94 # wait for commands from the session daemon
95 self._wait_server_cmd()
96 except (Exception) as e:
97 # Whatever happens here, we have to close the socket and
98 # retry to connect to the session daemon since either
99 # the socket was closed, a network timeout occurred, or
100 # invalid data was received.
101 dbg._pdebug(self._debug('got exception: {}'.format(e)))
102 self._cleanup_socket()
103 dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
104 time.sleep(_RETRY_REG_DELAY)
105
106 def _recv_server_cmd_header(self):
107 data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
108
109 if not data:
110 dbg._pdebug(self._debug('received empty server command header'))
111 return None
112
113 assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
114 dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
115
116 return lttngust.cmd._server_cmd_header_from_data(data)
117
118 def _recv_server_cmd(self):
119 server_cmd_header = self._recv_server_cmd_header()
120
121 if server_cmd_header is None:
122 return None
123
124 dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
125 dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
126 dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
127 data = bytes()
128
129 if server_cmd_header.data_size > 0:
130 data = self._sessiond_sock.recv(server_cmd_header.data_size)
131 assert(len(data) == server_cmd_header.data_size)
132
133 return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
134
135 def _send_cmd_reply(self, cmd_reply):
136 data = cmd_reply.get_data()
137 dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
138 self._sessiond_sock.sendall(data)
139
140 def _handle_server_cmd_reg_done(self, server_cmd):
141 dbg._pdebug(self._debug('got "registration done" server command'))
142
143 if self._reg_queue is not None:
144 dbg._pdebug(self._debug('notifying _init_threads()'))
145
146 try:
147 self._reg_queue.put(True)
148 except (Exception) as e:
149 # read side could be closed by now; ignore it
150 pass
151
152 self._reg_queue = None
153
154 def _handle_server_cmd_enable(self, server_cmd):
155 dbg._pdebug(self._debug('got "enable" server command'))
156 self._ref_count += 1
157
158 if self._ref_count == 1:
159 dbg._pdebug(self._debug('adding our handler to the root logger'))
160 self._root_logger.addHandler(self._log_handler)
161
162 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
163
164 return lttngust.cmd._ClientCmdReplyEnable()
165
166 def _handle_server_cmd_disable(self, server_cmd):
167 dbg._pdebug(self._debug('got "disable" server command'))
168 self._ref_count -= 1
169
170 if self._ref_count < 0:
171 # disable command could be sent again when a session is destroyed
172 self._ref_count = 0
173
174 if self._ref_count == 0:
175 dbg._pdebug(self._debug('removing our handler from the root logger'))
176 self._root_logger.removeHandler(self._log_handler)
177
178 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
179
180 return lttngust.cmd._ClientCmdReplyDisable()
181
182 def _handle_server_cmd_list(self, server_cmd):
183 dbg._pdebug(self._debug('got "list" server command'))
184 names = logging.Logger.manager.loggerDict.keys()
185 dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
186 cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
187
188 return cmd_reply
189
190 def _handle_server_cmd(self, server_cmd):
191 cmd_reply = None
192
193 if server_cmd is None:
194 dbg._pdebug(self._debug('bad server command'))
195 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
196 cmd_reply = lttngust.cmd._ClientCmdReply(status)
197 elif type(server_cmd) in self._server_cmd_handlers:
198 cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
199 else:
200 dbg._pdebug(self._debug('unknown server command'))
201 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
202 cmd_reply = lttngust.cmd._ClientCmdReply(status)
203
204 if cmd_reply is not None:
205 self._send_cmd_reply(cmd_reply)
206
207 def _wait_server_cmd(self):
208 while True:
209 try:
210 server_cmd = self._recv_server_cmd()
211 except socket.timeout:
212 # simply retry here; the protocol has no KA and we could
213 # wait for hours
214 continue
215
216 self._handle_server_cmd(server_cmd)
217
218 def _cleanup_socket(self):
219 try:
220 self._sessiond_sock.shutdown(socket.SHUT_RDWR)
221 self._sessiond_sock.close()
222 except:
223 pass
224
225 self._sessiond_sock = None
226
227 def _connect_to_sessiond(self):
228 # create session daemon TCP socket
229 if self._sessiond_sock is None:
230 self._sessiond_sock = socket.socket(socket.AF_INET,
231 socket.SOCK_STREAM)
232
233 # Use str(self._host) here. Since this host could be a string
234 # literal, and since we're importing __future__.unicode_literals,
235 # we want to make sure the host is a native string in Python 2.
236 # This avoids an indirect module import (unicode module to
237 # decode the unicode string, eventually imported by the
238 # socket module if needed), which is not allowed in a thread
239 # directly created by a module in Python 2 (our case).
240 #
241 # tl;dr: Do NOT remove str() here, or this call in Python 2
242 # _will_ block on an interpreter's mutex until the waiting
243 # register queue timeouts.
244 self._sessiond_sock.connect((str(self._host), self._port))
245
246 def _register(self):
247 cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
248 _PROTO_MAJOR, _PROTO_MINOR)
249 data = cmd.get_data()
250 self._sessiond_sock.sendall(data)
251
252
253 def _get_port_from_file(path):
254 port = None
255 dbg._pdebug('reading port from file "{}"'.format(path))
256
257 try:
258 f = open(path)
259 r_port = int(f.readline())
260 f.close()
261
262 if r_port > 0 or r_port <= 65535:
263 port = r_port
264 except:
265 pass
266
267 return port
268
269
270 def _get_user_home_path():
271 # $LTTNG_UST_HOME overrides $LTTNG_HOME if it exist.
272 # In turn, $LTTNG_HOME overrides $HOME if it exists
273 return os.getenv('LTTNG_UST_HOME', os.getenv('LTTNG_HOME',
274 os.path.expanduser('~')))
275
276
277 _initialized = False
278 _SESSIOND_HOST = '127.0.0.1'
279
280
281 def _client_thread_target(name, port, reg_queue):
282 dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
283 client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
284 dbg._pdebug('starting client "{}"'.format(name))
285 client.run()
286
287
288 def _init_threads():
289 global _initialized
290
291 dbg._pdebug('entering')
292
293 if _initialized:
294 dbg._pdebug('agent is already initialized')
295 return
296
297 # This makes sure that the appropriate modules for encoding and
298 # decoding strings/bytes are imported now, since no import should
299 # happen within a thread at import time (our case).
300 'lttng'.encode().decode()
301
302 _initialized = True
303 sys_port = _get_port_from_file('/var/run/lttng/agent.port')
304 user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
305 user_port = _get_port_from_file(user_port_file)
306 reg_queue = queue.Queue()
307 reg_expecting = 0
308
309 dbg._pdebug('system session daemon port: {}'.format(sys_port))
310 dbg._pdebug('user session daemon port: {}'.format(user_port))
311
312 if sys_port == user_port and sys_port is not None:
313 # The two session daemon ports are the same. This is not normal.
314 # Connect to only one.
315 dbg._pdebug('both user and system session daemon have the same port')
316 sys_port = None
317
318 try:
319 if sys_port is not None:
320 dbg._pdebug('creating system client thread')
321 t = threading.Thread(target=_client_thread_target,
322 args=('system', sys_port, reg_queue))
323 t.name = 'system'
324 t.daemon = True
325 t.start()
326 dbg._pdebug('created and started system client thread')
327 reg_expecting += 1
328
329 if user_port is not None:
330 dbg._pdebug('creating user client thread')
331 t = threading.Thread(target=_client_thread_target,
332 args=('user', user_port, reg_queue))
333 t.name = 'user'
334 t.daemon = True
335 t.start()
336 dbg._pdebug('created and started user client thread')
337 reg_expecting += 1
338 except:
339 # cannot create threads for some reason; stop this initialization
340 dbg._pwarning('cannot create client threads')
341 return
342
343 if reg_expecting == 0:
344 # early exit: looks like there's not even one valid port
345 dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
346 return
347
348 cur_timeout = _REG_TIMEOUT
349
350 # We block here to make sure the agent is properly registered to
351 # the session daemon. If we timeout, the client threads will still
352 # continue to try to connect and register to the session daemon,
353 # but there is no guarantee that all following logging statements
354 # will make it to LTTng-UST.
355 #
356 # When a client thread receives a "registration done" confirmation
357 # from the session daemon it's connected to, it puts True in
358 # reg_queue.
359 while True:
360 try:
361 dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
362 cur_timeout))
363 t1 = lttngust.compat._clock()
364 reg_queue.get(timeout=cur_timeout)
365 t2 = lttngust.compat._clock()
366 reg_expecting -= 1
367 dbg._pdebug('unblocked')
368
369 if reg_expecting == 0:
370 # done!
371 dbg._pdebug('successfully registered to session daemon(s)')
372 break
373
374 cur_timeout -= (t2 - t1)
375
376 if cur_timeout <= 0:
377 # timeout
378 dbg._pdebug('ran out of time')
379 break
380 except queue.Empty:
381 dbg._pdebug('ran out of time')
382 break
383
384 dbg._pdebug('leaving')
385
386
387 _init_threads()
This page took 0.045826 seconds and 3 git commands to generate.