Move all sources to 'src/'
[lttng-ust.git] / src / python-lttngust / lttngust / agent.py
1 # -*- coding: utf-8 -*-
2 #
3 # SPDX-License-Identifier: LGPL-2.1-only
4 #
5 # Copyright (C) 2015 Philippe Proulx <pproulx@efficios.com>
6 # Copyright (C) 2014 David Goulet <dgoulet@efficios.com>
7
8 from __future__ import unicode_literals
9 from __future__ import print_function
10 from __future__ import division
11 import lttngust.debug as dbg
12 import lttngust.loghandler
13 import lttngust.compat
14 import lttngust.cmd
15 from io import open
16 import threading
17 import logging
18 import socket
19 import time
20 import sys
21 import os
22
23
24 try:
25 # Python 2
26 import Queue as queue
27 except ImportError:
28 # Python 3
29 import queue
30
31
32 _PROTO_DOMAIN = 5
33 _PROTO_MAJOR = 2
34 _PROTO_MINOR = 0
35
36
37 def _get_env_value_ms(key, default_s):
38 try:
39 val = int(os.getenv(key, default_s * 1000)) / 1000
40 except:
41 val = -1
42
43 if val < 0:
44 fmt = 'invalid ${} value; {} seconds will be used'
45 dbg._pwarning(fmt.format(key, default_s))
46 val = default_s
47
48 return val
49
50
51 _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
52 _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
53
54
55 class _TcpClient(object):
56 def __init__(self, name, host, port, reg_queue):
57 super(self.__class__, self).__init__()
58 self._name = name
59 self._host = host
60 self._port = port
61
62 try:
63 self._log_handler = lttngust.loghandler._Handler()
64 except (OSError) as e:
65 dbg._pwarning('cannot load library: {}'.format(e))
66 raise e
67
68 self._root_logger = logging.getLogger()
69 self._root_logger.setLevel(logging.NOTSET)
70 self._ref_count = 0
71 self._sessiond_sock = None
72 self._reg_queue = reg_queue
73 self._server_cmd_handlers = {
74 lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
75 lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
76 lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
77 lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
78 }
79
80 def _debug(self, msg):
81 return 'client "{}": {}'.format(self._name, msg)
82
83 def run(self):
84 while True:
85 try:
86 # connect to the session daemon
87 dbg._pdebug(self._debug('connecting to session daemon'))
88 self._connect_to_sessiond()
89
90 # register to the session daemon after a successful connection
91 dbg._pdebug(self._debug('registering to session daemon'))
92 self._register()
93
94 # wait for commands from the session daemon
95 self._wait_server_cmd()
96 except (Exception) as e:
97 # Whatever happens here, we have to close the socket and
98 # retry to connect to the session daemon since either
99 # the socket was closed, a network timeout occured, or
100 # invalid data was received.
101 dbg._pdebug(self._debug('got exception: {}'.format(e)))
102 self._cleanup_socket()
103 dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
104 time.sleep(_RETRY_REG_DELAY)
105
106 def _recv_server_cmd_header(self):
107 data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
108
109 if not data:
110 dbg._pdebug(self._debug('received empty server command header'))
111 return None
112
113 assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
114 dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
115
116 return lttngust.cmd._server_cmd_header_from_data(data)
117
118 def _recv_server_cmd(self):
119 server_cmd_header = self._recv_server_cmd_header()
120
121 if server_cmd_header is None:
122 return None
123
124 dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
125 dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
126 dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
127 data = bytes()
128
129 if server_cmd_header.data_size > 0:
130 data = self._sessiond_sock.recv(server_cmd_header.data_size)
131 assert(len(data) == server_cmd_header.data_size)
132
133 return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
134
135 def _send_cmd_reply(self, cmd_reply):
136 data = cmd_reply.get_data()
137 dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
138 self._sessiond_sock.sendall(data)
139
140 def _handle_server_cmd_reg_done(self, server_cmd):
141 dbg._pdebug(self._debug('got "registration done" server command'))
142
143 if self._reg_queue is not None:
144 dbg._pdebug(self._debug('notifying _init_threads()'))
145
146 try:
147 self._reg_queue.put(True)
148 except (Exception) as e:
149 # read side could be closed by now; ignore it
150 pass
151
152 self._reg_queue = None
153
154 def _handle_server_cmd_enable(self, server_cmd):
155 dbg._pdebug(self._debug('got "enable" server command'))
156 self._ref_count += 1
157
158 if self._ref_count == 1:
159 dbg._pdebug(self._debug('adding our handler to the root logger'))
160 self._root_logger.addHandler(self._log_handler)
161
162 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
163
164 return lttngust.cmd._ClientCmdReplyEnable()
165
166 def _handle_server_cmd_disable(self, server_cmd):
167 dbg._pdebug(self._debug('got "disable" server command'))
168 self._ref_count -= 1
169
170 if self._ref_count < 0:
171 # disable command could be sent again when a session is destroyed
172 self._ref_count = 0
173
174 if self._ref_count == 0:
175 dbg._pdebug(self._debug('removing our handler from the root logger'))
176 self._root_logger.removeHandler(self._log_handler)
177
178 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
179
180 return lttngust.cmd._ClientCmdReplyDisable()
181
182 def _handle_server_cmd_list(self, server_cmd):
183 dbg._pdebug(self._debug('got "list" server command'))
184 names = logging.Logger.manager.loggerDict.keys()
185 dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
186 cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
187
188 return cmd_reply
189
190 def _handle_server_cmd(self, server_cmd):
191 cmd_reply = None
192
193 if server_cmd is None:
194 dbg._pdebug(self._debug('bad server command'))
195 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
196 cmd_reply = lttngust.cmd._ClientCmdReply(status)
197 elif type(server_cmd) in self._server_cmd_handlers:
198 cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
199 else:
200 dbg._pdebug(self._debug('unknown server command'))
201 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
202 cmd_reply = lttngust.cmd._ClientCmdReply(status)
203
204 if cmd_reply is not None:
205 self._send_cmd_reply(cmd_reply)
206
207 def _wait_server_cmd(self):
208 while True:
209 try:
210 server_cmd = self._recv_server_cmd()
211 except socket.timeout:
212 # simply retry here; the protocol has no KA and we could
213 # wait for hours
214 continue
215
216 self._handle_server_cmd(server_cmd)
217
218 def _cleanup_socket(self):
219 try:
220 self._sessiond_sock.shutdown(socket.SHUT_RDWR)
221 self._sessiond_sock.close()
222 except:
223 pass
224
225 self._sessiond_sock = None
226
227 def _connect_to_sessiond(self):
228 # create session daemon TCP socket
229 if self._sessiond_sock is None:
230 self._sessiond_sock = socket.socket(socket.AF_INET,
231 socket.SOCK_STREAM)
232
233 # Use str(self._host) here. Since this host could be a string
234 # literal, and since we're importing __future__.unicode_literals,
235 # we want to make sure the host is a native string in Python 2.
236 # This avoids an indirect module import (unicode module to
237 # decode the unicode string, eventually imported by the
238 # socket module if needed), which is not allowed in a thread
239 # directly created by a module in Python 2 (our case).
240 #
241 # tl;dr: Do NOT remove str() here, or this call in Python 2
242 # _will_ block on an interpreter's mutex until the waiting
243 # register queue timeouts.
244 self._sessiond_sock.connect((str(self._host), self._port))
245
246 def _register(self):
247 cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
248 _PROTO_MAJOR, _PROTO_MINOR)
249 data = cmd.get_data()
250 self._sessiond_sock.sendall(data)
251
252
253 def _get_port_from_file(path):
254 port = None
255 dbg._pdebug('reading port from file "{}"'.format(path))
256
257 try:
258 f = open(path)
259 r_port = int(f.readline())
260 f.close()
261
262 if r_port > 0 or r_port <= 65535:
263 port = r_port
264 except:
265 pass
266
267 return port
268
269
270 def _get_user_home_path():
271 # $LTTNG_HOME overrides $HOME if it exists
272 return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
273
274
275 _initialized = False
276 _SESSIOND_HOST = '127.0.0.1'
277
278
279 def _client_thread_target(name, port, reg_queue):
280 dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
281 client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
282 dbg._pdebug('starting client "{}"'.format(name))
283 client.run()
284
285
286 def _init_threads():
287 global _initialized
288
289 dbg._pdebug('entering')
290
291 if _initialized:
292 dbg._pdebug('agent is already initialized')
293 return
294
295 # This makes sure that the appropriate modules for encoding and
296 # decoding strings/bytes are imported now, since no import should
297 # happen within a thread at import time (our case).
298 'lttng'.encode().decode()
299
300 _initialized = True
301 sys_port = _get_port_from_file('/var/run/lttng/agent.port')
302 user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
303 user_port = _get_port_from_file(user_port_file)
304 reg_queue = queue.Queue()
305 reg_expecting = 0
306
307 dbg._pdebug('system session daemon port: {}'.format(sys_port))
308 dbg._pdebug('user session daemon port: {}'.format(user_port))
309
310 if sys_port == user_port and sys_port is not None:
311 # The two session daemon ports are the same. This is not normal.
312 # Connect to only one.
313 dbg._pdebug('both user and system session daemon have the same port')
314 sys_port = None
315
316 try:
317 if sys_port is not None:
318 dbg._pdebug('creating system client thread')
319 t = threading.Thread(target=_client_thread_target,
320 args=('system', sys_port, reg_queue))
321 t.name = 'system'
322 t.daemon = True
323 t.start()
324 dbg._pdebug('created and started system client thread')
325 reg_expecting += 1
326
327 if user_port is not None:
328 dbg._pdebug('creating user client thread')
329 t = threading.Thread(target=_client_thread_target,
330 args=('user', user_port, reg_queue))
331 t.name = 'user'
332 t.daemon = True
333 t.start()
334 dbg._pdebug('created and started user client thread')
335 reg_expecting += 1
336 except:
337 # cannot create threads for some reason; stop this initialization
338 dbg._pwarning('cannot create client threads')
339 return
340
341 if reg_expecting == 0:
342 # early exit: looks like there's not even one valid port
343 dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
344 return
345
346 cur_timeout = _REG_TIMEOUT
347
348 # We block here to make sure the agent is properly registered to
349 # the session daemon. If we timeout, the client threads will still
350 # continue to try to connect and register to the session daemon,
351 # but there is no guarantee that all following logging statements
352 # will make it to LTTng-UST.
353 #
354 # When a client thread receives a "registration done" confirmation
355 # from the session daemon it's connected to, it puts True in
356 # reg_queue.
357 while True:
358 try:
359 dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
360 cur_timeout))
361 t1 = lttngust.compat._clock()
362 reg_queue.get(timeout=cur_timeout)
363 t2 = lttngust.compat._clock()
364 reg_expecting -= 1
365 dbg._pdebug('unblocked')
366
367 if reg_expecting == 0:
368 # done!
369 dbg._pdebug('successfully registered to session daemon(s)')
370 break
371
372 cur_timeout -= (t2 - t1)
373
374 if cur_timeout <= 0:
375 # timeout
376 dbg._pdebug('ran out of time')
377 break
378 except queue.Empty:
379 dbg._pdebug('ran out of time')
380 break
381
382 dbg._pdebug('leaving')
383
384
385 _init_threads()
This page took 0.03782 seconds and 4 git commands to generate.