ebfa2de1dbb8abbf0b51eb89556558efe034ea74
[lttng-ust.git] / python-lttngust / lttngust / agent.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
4 # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
5 #
6 # This library is free software; you can redistribute it and/or modify it under
7 # the terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation; version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 # details.
14 #
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with this library; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
19 from __future__ import unicode_literals
20 from __future__ import print_function
21 from __future__ import division
22 import lttngust.debug as dbg
23 import lttngust.loghandler
24 import lttngust.cmd
25 from io import open
26 import threading
27 import logging
28 import socket
29 import time
30 import sys
31 import os
32
33
34 try:
35 # Python 2
36 import Queue as queue
37 except ImportError:
38 # Python 3
39 import queue
40
41
42 _PROTO_DOMAIN = 5
43 _PROTO_MAJOR = 2
44 _PROTO_MINOR = 0
45
46
47 def _get_env_value_ms(key, default_s):
48 try:
49 val = int(os.getenv(key, default_s * 1000)) / 1000
50 except:
51 val = -1
52
53 if val < 0:
54 fmt = 'invalid ${} value; {} seconds will be used'
55 dbg._pwarning(fmt.format(key, default_s))
56 val = default_s
57
58 return val
59
60
61 _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
62 _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
63
64
65 class _TcpClient(object):
66 def __init__(self, name, host, port, reg_queue):
67 super(self.__class__, self).__init__()
68 self._name = name
69 self._host = host
70 self._port = port
71
72 try:
73 self._log_handler = lttngust.loghandler._Handler()
74 except (OSError) as e:
75 dbg._pwarning('cannot load library: {}'.format(e))
76 raise e
77
78 self._root_logger = logging.getLogger()
79 self._root_logger.setLevel(logging.NOTSET)
80 self._ref_count = 0
81 self._sessiond_sock = None
82 self._reg_queue = reg_queue
83 self._server_cmd_handlers = {
84 lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
85 lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
86 lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
87 lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
88 }
89
90 def _debug(self, msg):
91 return 'client "{}": {}'.format(self._name, msg)
92
93 def run(self):
94 while True:
95 try:
96 # connect to the session daemon
97 dbg._pdebug(self._debug('connecting to session daemon'))
98 self._connect_to_sessiond()
99
100 # register to the session daemon after a successful connection
101 dbg._pdebug(self._debug('registering to session daemon'))
102 self._register()
103
104 # wait for commands from the session daemon
105 self._wait_server_cmd()
106 except (Exception) as e:
107 # Whatever happens here, we have to close the socket and
108 # retry to connect to the session daemon since either
109 # the socket was closed, a network timeout occured, or
110 # invalid data was received.
111 dbg._pdebug(self._debug('got exception: {}'.format(e)))
112 self._cleanup_socket()
113 dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
114 time.sleep(_RETRY_REG_DELAY)
115
116 def _recv_server_cmd_header(self):
117 data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
118
119 if not data:
120 dbg._pdebug(self._debug('received empty server command header'))
121 return None
122
123 assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
124 dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
125
126 return lttngust.cmd._server_cmd_header_from_data(data)
127
128 def _recv_server_cmd(self):
129 server_cmd_header = self._recv_server_cmd_header()
130
131 if server_cmd_header is None:
132 return None
133
134 dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
135 dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
136 dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
137 data = bytes()
138
139 if server_cmd_header.data_size > 0:
140 data = self._sessiond_sock.recv(server_cmd_header.data_size)
141 assert(len(data) == server_cmd_header.data_size)
142
143 return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
144
145 def _send_cmd_reply(self, cmd_reply):
146 data = cmd_reply.get_data()
147 dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
148 self._sessiond_sock.sendall(data)
149
150 def _handle_server_cmd_reg_done(self, server_cmd):
151 dbg._pdebug(self._debug('got "registration done" server command'))
152
153 if self._reg_queue is not None:
154 dbg._pdebug(self._debug('notifying _init_threads()'))
155
156 try:
157 self._reg_queue.put(True)
158 except (Exception) as e:
159 # read side could be closed by now; ignore it
160 pass
161
162 self._reg_queue = None
163
164 def _handle_server_cmd_enable(self, server_cmd):
165 dbg._pdebug(self._debug('got "enable" server command'))
166 self._ref_count += 1
167
168 if self._ref_count == 1:
169 dbg._pdebug(self._debug('adding our handler to the root logger'))
170 self._root_logger.addHandler(self._log_handler)
171
172 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
173
174 return lttngust.cmd._ClientCmdReplyEnable()
175
176 def _handle_server_cmd_disable(self, server_cmd):
177 dbg._pdebug(self._debug('got "disable" server command'))
178 self._ref_count -= 1
179
180 if self._ref_count < 0:
181 # disable command could be sent again when a session is destroyed
182 self._ref_count = 0
183
184 if self._ref_count == 0:
185 dbg._pdebug(self._debug('removing our handler from the root logger'))
186 self._root_logger.removeHandler(self._log_handler)
187
188 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
189
190 return lttngust.cmd._ClientCmdReplyDisable()
191
192 def _handle_server_cmd_list(self, server_cmd):
193 dbg._pdebug(self._debug('got "list" server command'))
194 names = logging.Logger.manager.loggerDict.keys()
195 dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
196 cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
197
198 return cmd_reply
199
200 def _handle_server_cmd(self, server_cmd):
201 cmd_reply = None
202
203 if server_cmd is None:
204 dbg._pdebug(self._debug('bad server command'))
205 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
206 cmd_reply = lttngust.cmd._ClientCmdReply(status)
207 elif type(server_cmd) in self._server_cmd_handlers:
208 cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
209 else:
210 dbg._pdebug(self._debug('unknown server command'))
211 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
212 cmd_reply = lttngust.cmd._ClientCmdReply(status)
213
214 if cmd_reply is not None:
215 self._send_cmd_reply(cmd_reply)
216
217 def _wait_server_cmd(self):
218 while True:
219 try:
220 server_cmd = self._recv_server_cmd()
221 except socket.timeout:
222 # simply retry here; the protocol has no KA and we could
223 # wait for hours
224 continue
225
226 self._handle_server_cmd(server_cmd)
227
228 def _cleanup_socket(self):
229 try:
230 self._sessiond_sock.shutdown(socket.SHUT_RDWR)
231 self._sessiond_sock.close()
232 except:
233 pass
234
235 self._sessiond_sock = None
236
237 def _connect_to_sessiond(self):
238 # create session daemon TCP socket
239 if self._sessiond_sock is None:
240 self._sessiond_sock = socket.socket(socket.AF_INET,
241 socket.SOCK_STREAM)
242
243 # Use str(self._host) here. Since this host could be a string
244 # literal, and since we're importing __future__.unicode_literals,
245 # we want to make sure the host is a native string in Python 2.
246 # This avoids an indirect module import (unicode module to
247 # decode the unicode string, eventually imported by the
248 # socket module if needed), which is not allowed in a thread
249 # directly created by a module in Python 2 (our case).
250 #
251 # tl;dr: Do NOT remove str() here, or this call in Python 2
252 # _will_ block on an interpreter's mutex until the waiting
253 # register queue timeouts.
254 self._sessiond_sock.connect((str(self._host), self._port))
255
256 def _register(self):
257 cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
258 _PROTO_MAJOR, _PROTO_MINOR)
259 data = cmd.get_data()
260 self._sessiond_sock.sendall(data)
261
262
263 def _get_port_from_file(path):
264 port = None
265 dbg._pdebug('reading port from file "{}"'.format(path))
266
267 try:
268 f = open(path)
269 r_port = int(f.readline())
270 f.close()
271
272 if r_port > 0 or r_port <= 65535:
273 port = r_port
274 except:
275 pass
276
277 return port
278
279
280 def _get_user_home_path():
281 # $LTTNG_HOME overrides $HOME if it exists
282 return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
283
284
285 _initialized = False
286 _SESSIOND_HOST = '127.0.0.1'
287
288
289 def _client_thread_target(name, port, reg_queue):
290 dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
291 client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
292 dbg._pdebug('starting client "{}"'.format(name))
293 client.run()
294
295
296 def _init_threads():
297 global _initialized
298
299 dbg._pdebug('entering')
300
301 if _initialized:
302 dbg._pdebug('agent is already initialized')
303 return
304
305 # This makes sure that the appropriate modules for encoding and
306 # decoding strings/bytes are imported now, since no import should
307 # happen within a thread at import time (our case).
308 'lttng'.encode().decode()
309
310 _initialized = True
311 sys_port = _get_port_from_file('/var/run/lttng/agent.port')
312 user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
313 user_port = _get_port_from_file(user_port_file)
314 reg_queue = queue.Queue()
315 reg_expecting = 0
316
317 dbg._pdebug('system session daemon port: {}'.format(sys_port))
318 dbg._pdebug('user session daemon port: {}'.format(user_port))
319
320 if sys_port == user_port and sys_port is not None:
321 # The two session daemon ports are the same. This is not normal.
322 # Connect to only one.
323 dbg._pdebug('both user and system session daemon have the same port')
324 sys_port = None
325
326 try:
327 if sys_port is not None:
328 dbg._pdebug('creating system client thread')
329 t = threading.Thread(target=_client_thread_target,
330 args=('system', sys_port, reg_queue))
331 t.name = 'system'
332 t.daemon = True
333 t.start()
334 dbg._pdebug('created and started system client thread')
335 reg_expecting += 1
336
337 if user_port is not None:
338 dbg._pdebug('creating user client thread')
339 t = threading.Thread(target=_client_thread_target,
340 args=('user', user_port, reg_queue))
341 t.name = 'user'
342 t.daemon = True
343 t.start()
344 dbg._pdebug('created and started user client thread')
345 reg_expecting += 1
346 except:
347 # cannot create threads for some reason; stop this initialization
348 dbg._pwarning('cannot create client threads')
349 return
350
351 if reg_expecting == 0:
352 # early exit: looks like there's not even one valid port
353 dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
354 return
355
356 cur_timeout = _REG_TIMEOUT
357
358 # We block here to make sure the agent is properly registered to
359 # the session daemon. If we timeout, the client threads will still
360 # continue to try to connect and register to the session daemon,
361 # but there is no guarantee that all following logging statements
362 # will make it to LTTng-UST.
363 #
364 # When a client thread receives a "registration done" confirmation
365 # from the session daemon it's connected to, it puts True in
366 # reg_queue.
367 while True:
368 try:
369 dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
370 cur_timeout))
371 t1 = time.clock()
372 reg_queue.get(timeout=cur_timeout)
373 t2 = time.clock()
374 reg_expecting -= 1
375 dbg._pdebug('unblocked')
376
377 if reg_expecting == 0:
378 # done!
379 dbg._pdebug('successfully registered to session daemon(s)')
380 break
381
382 cur_timeout -= (t2 - t1)
383
384 if cur_timeout <= 0:
385 # timeout
386 dbg._pdebug('ran out of time')
387 break
388 except queue.Empty:
389 dbg._pdebug('ran out of time')
390 break
391
392 dbg._pdebug('leaving')
393
394
395 _init_threads()
This page took 0.036811 seconds and 3 git commands to generate.