Fix: potential leaks in error paths
[lttng-ust.git] / liblttng-ust-python-agent / lttngust / agent.py
CommitLineData
de4dee04
PP
1# -*- coding: utf-8 -*-
2#
3# Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
4# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
5#
6# This library is free software; you can redistribute it and/or modify it under
7# the terms of the GNU Lesser General Public License as published by the Free
8# Software Foundation; version 2.1 of the License.
9#
10# This library is distributed in the hope that it will be useful, but WITHOUT
11# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13# details.
14#
15# You should have received a copy of the GNU Lesser General Public License
16# along with this library; if not, write to the Free Software Foundation, Inc.,
17# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
19from __future__ import unicode_literals
20from __future__ import print_function
21from __future__ import division
22import lttngust.debug as dbg
23import lttngust.loghandler
24import lttngust.cmd
25from io import open
26import threading
27import logging
28import socket
29import time
30import sys
31import os
32
33
34try:
35 # Python 2
36 import Queue as queue
37except ImportError:
38 # Python 3
39 import queue
40
41
42_PROTO_DOMAIN = 5
43_PROTO_MAJOR = 1
44_PROTO_MINOR = 0
45
46
47def _get_env_value_ms(key, default_s):
48 try:
49 val = int(os.getenv(key, default_s * 1000)) / 1000
50 except:
51 val = -1
52
53 if val < 0:
54 fmt = 'invalid ${} value; {} seconds will be used'
55 dbg._pwarning(fmt.format(key, default_s))
56 val = default_s
57
58 return val
59
60
61_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
62_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
63
64
65class _TcpClient(object):
66 def __init__(self, name, host, port, reg_queue):
67 super(self.__class__, self).__init__()
68 self._name = name
69 self._host = host
70 self._port = port
71
72 try:
73 self._log_handler = lttngust.loghandler._Handler()
74 except (OSError) as e:
75 dbg._pwarning('cannot load library: {}'.format(e))
76 raise e
77
78 self._root_logger = logging.getLogger()
79 self._root_logger.setLevel(logging.NOTSET)
80 self._ref_count = 0
81 self._sessiond_sock = None
82 self._reg_queue = reg_queue
83 self._server_cmd_handlers = {
84 lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
85 lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
86 lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
87 lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
88 }
89
90 def _debug(self, msg):
91 return 'client "{}": {}'.format(self._name, msg)
92
93 def run(self):
94 while True:
95 try:
96 # connect to the session daemon
97 dbg._pdebug(self._debug('connecting to session daemon'))
98 self._connect_to_sessiond()
99
100 # register to the session daemon after a successful connection
101 dbg._pdebug(self._debug('registering to session daemon'))
102 self._register()
103
104 # wait for commands from the session daemon
105 self._wait_server_cmd()
106 except (Exception) as e:
107 # Whatever happens here, we have to close the socket and
108 # retry to connect to the session daemon since either
109 # the socket was closed, a network timeout occured, or
110 # invalid data was received.
111 dbg._pdebug(self._debug('got exception: {}'.format(e)))
112 self._cleanup_socket()
113 dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
114 time.sleep(_RETRY_REG_DELAY)
115
116 def _recv_server_cmd_header(self):
117 data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
118
119 if not data:
120 dbg._pdebug(self._debug('received empty server command header'))
121 return None
122
123 assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
124 dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
125
126 return lttngust.cmd._server_cmd_header_from_data(data)
127
128 def _recv_server_cmd(self):
129 server_cmd_header = self._recv_server_cmd_header()
130
131 if server_cmd_header is None:
132 return None
133
134 dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
135 dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
136 dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
137 data = bytes()
138
139 if server_cmd_header.data_size > 0:
140 data = self._sessiond_sock.recv(server_cmd_header.data_size)
141 assert(len(data) == server_cmd_header.data_size)
142
143 return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
144
145 def _send_cmd_reply(self, cmd_reply):
146 data = cmd_reply.get_data()
147 dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
148 self._sessiond_sock.sendall(data)
149
150 def _handle_server_cmd_reg_done(self, server_cmd):
151 dbg._pdebug(self._debug('got "registration done" server command'))
152
153 if self._reg_queue is not None:
154 dbg._pdebug(self._debug('notifying _init_threads()'))
155
156 try:
157 self._reg_queue.put(True)
158 except (Exception) as e:
159 # read side could be closed by now; ignore it
160 pass
161
162 self._reg_queue = None
163
164 def _handle_server_cmd_enable(self, server_cmd):
165 dbg._pdebug(self._debug('got "enable" server command'))
166 self._ref_count += 1
167
168 if self._ref_count == 1:
169 dbg._pdebug(self._debug('adding our handler to the root logger'))
170 self._root_logger.addHandler(self._log_handler)
171
172 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
173
174 return lttngust.cmd._ClientCmdReplyEnable()
175
176 def _handle_server_cmd_disable(self, server_cmd):
177 dbg._pdebug(self._debug('got "disable" server command'))
178 self._ref_count -= 1
179
180 if self._ref_count < 0:
181 # disable command could be sent again when a session is destroyed
182 self._ref_count = 0
183
184 if self._ref_count == 0:
185 dbg._pdebug(self._debug('removing our handler from the root logger'))
186 self._root_logger.removeHandler(self._log_handler)
187
188 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
189
190 return lttngust.cmd._ClientCmdReplyDisable()
191
192 def _handle_server_cmd_list(self, server_cmd):
193 dbg._pdebug(self._debug('got "list" server command'))
194 names = logging.Logger.manager.loggerDict.keys()
195 dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
196 cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
197
198 return cmd_reply
199
200 def _handle_server_cmd(self, server_cmd):
201 cmd_reply = None
202
203 if server_cmd is None:
204 dbg._pdebug(self._debug('bad server command'))
205 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
206 cmd_reply = lttngust.cmd._ClientCmdReply(status)
207 elif type(server_cmd) in self._server_cmd_handlers:
208 cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
209 else:
210 dbg._pdebug(self._debug('unknown server command'))
211 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
212 cmd_reply = lttngust.cmd._ClientCmdReply(status)
213
214 if cmd_reply is not None:
215 self._send_cmd_reply(cmd_reply)
216
217 def _wait_server_cmd(self):
218 while True:
219 try:
220 server_cmd = self._recv_server_cmd()
221 except socket.timeout:
222 # simply retry here; the protocol has no KA and we could
223 # wait for hours
224 continue
225
226 self._handle_server_cmd(server_cmd)
227
228 def _cleanup_socket(self):
229 try:
230 self._sessiond_sock.shutdown(socket.SHUT_RDWR)
231 self._sessiond_sock.close()
232 except:
233 pass
234
235 self._sessiond_sock = None
236
237 def _connect_to_sessiond(self):
238 # create session daemon TCP socket
239 if self._sessiond_sock is None:
240 self._sessiond_sock = socket.socket(socket.AF_INET,
241 socket.SOCK_STREAM)
242
243 # Use str(self._host) here. Since this host could be a string
244 # literal, and since we're importing __future__.unicode_literals,
245 # we want to make sure the host is a native string in Python 2.
246 # This avoids an indirect module import (unicode module to
247 # decode the unicode string, eventually imported by the
248 # socket module if needed), which is not allowed in a thread
249 # directly created by a module in Python 2 (our case).
250 #
251 # tl;dr: Do NOT remove str() here, or this call in Python 2
252 # _will_ block on an interpreter's mutex until the waiting
253 # register queue timeouts.
254 self._sessiond_sock.connect((str(self._host), self._port))
255
256 def _register(self):
257 cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
258 _PROTO_MAJOR, _PROTO_MINOR)
259 data = cmd.get_data()
260 self._sessiond_sock.sendall(data)
261
262
263def _get_port_from_file(path):
264 port = None
265 dbg._pdebug('reading port from file "{}"'.format(path))
266
267 try:
268 f = open(path)
269 r_port = int(f.readline())
270 f.close()
271
272 if r_port > 0 or r_port <= 65535:
273 port = r_port
274 except:
275 pass
276
277 return port
278
279
280def _get_user_home_path():
281 # $LTTNG_HOME overrides $HOME if it exists
282 return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
283
284
285_initialized = False
286_SESSIOND_HOST = '127.0.0.1'
287
288
289def _client_thread_target(name, port, reg_queue):
290 dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
291 client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
292 dbg._pdebug('starting client "{}"'.format(name))
293 client.run()
294
295
296def _init_threads():
297 global _initialized
298
299 dbg._pdebug('entering')
300
301 if _initialized:
302 dbg._pdebug('agent is already initialized')
303 return
304
305 # This makes sure that the appropriate modules for encoding and
306 # decoding strings/bytes are imported now, since no import should
307 # happen within a thread at import time (our case).
308 'lttng'.encode().decode()
309
310 _initialized = True
311 sys_port = _get_port_from_file('/var/run/lttng/agent.port')
312 user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
313 user_port = _get_port_from_file(user_port_file)
314 reg_queue = queue.Queue()
315 reg_expecting = 0
316
317 dbg._pdebug('system session daemon port: {}'.format(sys_port))
318 dbg._pdebug('user session daemon port: {}'.format(user_port))
319
320 try:
321 if sys_port is not None:
322 dbg._pdebug('creating system client thread')
323 t = threading.Thread(target=_client_thread_target,
324 args=('system', sys_port, reg_queue))
325 t.name = 'system'
326 t.daemon = True
327 t.start()
328 dbg._pdebug('created and started system client thread')
329 reg_expecting += 1
330
331 if user_port is not None:
332 dbg._pdebug('creating user client thread')
333 t = threading.Thread(target=_client_thread_target,
334 args=('user', user_port, reg_queue))
335 t.name = 'user'
336 t.daemon = True
337 t.start()
338 dbg._pdebug('created and started user client thread')
339 reg_expecting += 1
340 except:
341 # cannot create threads for some reason; stop this initialization
342 dbg._pwarning('cannot create client threads')
343 return
344
345 if reg_expecting == 0:
346 # early exit: looks like there's not even one valid port
347 dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
348 return
349
350 cur_timeout = _REG_TIMEOUT
351
352 # We block here to make sure the agent is properly registered to
353 # the session daemon. If we timeout, the client threads will still
354 # continue to try to connect and register to the session daemon,
355 # but there is no guarantee that all following logging statements
356 # will make it to LTTng-UST.
357 #
358 # When a client thread receives a "registration done" confirmation
359 # from the session daemon it's connected to, it puts True in
360 # reg_queue.
361 while True:
362 try:
363 dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
364 cur_timeout))
365 t1 = time.clock()
366 reg_queue.get(timeout=cur_timeout)
367 t2 = time.clock()
368 reg_expecting -= 1
369 dbg._pdebug('unblocked')
370
371 if reg_expecting == 0:
372 # done!
373 dbg._pdebug('successfully registered to session daemon(s)')
374 break
375
376 cur_timeout -= (t2 - t1)
377
378 if cur_timeout <= 0:
379 # timeout
380 dbg._pdebug('ran out of time')
381 break
382 except queue.Empty:
383 dbg._pdebug('ran out of time')
384 break
385
386 dbg._pdebug('leaving')
387
388
389_init_threads()
This page took 0.035766 seconds and 4 git commands to generate.