Fix: python agent: 'time' has no attribute 'clock'
[lttng-ust.git] / python-lttngust / lttngust / agent.py
1 # -*- coding: utf-8 -*-
2 #
3 # Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
4 # Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
5 #
6 # This library is free software; you can redistribute it and/or modify it under
7 # the terms of the GNU Lesser General Public License as published by the Free
8 # Software Foundation; version 2.1 of the License.
9 #
10 # This library is distributed in the hope that it will be useful, but WITHOUT
11 # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
12 # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
13 # details.
14 #
15 # You should have received a copy of the GNU Lesser General Public License
16 # along with this library; if not, write to the Free Software Foundation, Inc.,
17 # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
18
19 from __future__ import unicode_literals
20 from __future__ import print_function
21 from __future__ import division
22 import lttngust.debug as dbg
23 import lttngust.loghandler
24 import lttngust.compat
25 import lttngust.cmd
26 from io import open
27 import threading
28 import logging
29 import socket
30 import time
31 import sys
32 import os
33
34
35 try:
36 # Python 2
37 import Queue as queue
38 except ImportError:
39 # Python 3
40 import queue
41
42
43 _PROTO_DOMAIN = 5
44 _PROTO_MAJOR = 2
45 _PROTO_MINOR = 0
46
47
48 def _get_env_value_ms(key, default_s):
49 try:
50 val = int(os.getenv(key, default_s * 1000)) / 1000
51 except:
52 val = -1
53
54 if val < 0:
55 fmt = 'invalid ${} value; {} seconds will be used'
56 dbg._pwarning(fmt.format(key, default_s))
57 val = default_s
58
59 return val
60
61
62 _REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
63 _RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
64
65
66 class _TcpClient(object):
67 def __init__(self, name, host, port, reg_queue):
68 super(self.__class__, self).__init__()
69 self._name = name
70 self._host = host
71 self._port = port
72
73 try:
74 self._log_handler = lttngust.loghandler._Handler()
75 except (OSError) as e:
76 dbg._pwarning('cannot load library: {}'.format(e))
77 raise e
78
79 self._root_logger = logging.getLogger()
80 self._root_logger.setLevel(logging.NOTSET)
81 self._ref_count = 0
82 self._sessiond_sock = None
83 self._reg_queue = reg_queue
84 self._server_cmd_handlers = {
85 lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
86 lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
87 lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
88 lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
89 }
90
91 def _debug(self, msg):
92 return 'client "{}": {}'.format(self._name, msg)
93
94 def run(self):
95 while True:
96 try:
97 # connect to the session daemon
98 dbg._pdebug(self._debug('connecting to session daemon'))
99 self._connect_to_sessiond()
100
101 # register to the session daemon after a successful connection
102 dbg._pdebug(self._debug('registering to session daemon'))
103 self._register()
104
105 # wait for commands from the session daemon
106 self._wait_server_cmd()
107 except (Exception) as e:
108 # Whatever happens here, we have to close the socket and
109 # retry to connect to the session daemon since either
110 # the socket was closed, a network timeout occured, or
111 # invalid data was received.
112 dbg._pdebug(self._debug('got exception: {}'.format(e)))
113 self._cleanup_socket()
114 dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
115 time.sleep(_RETRY_REG_DELAY)
116
117 def _recv_server_cmd_header(self):
118 data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
119
120 if not data:
121 dbg._pdebug(self._debug('received empty server command header'))
122 return None
123
124 assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
125 dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
126
127 return lttngust.cmd._server_cmd_header_from_data(data)
128
129 def _recv_server_cmd(self):
130 server_cmd_header = self._recv_server_cmd_header()
131
132 if server_cmd_header is None:
133 return None
134
135 dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
136 dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
137 dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
138 data = bytes()
139
140 if server_cmd_header.data_size > 0:
141 data = self._sessiond_sock.recv(server_cmd_header.data_size)
142 assert(len(data) == server_cmd_header.data_size)
143
144 return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
145
146 def _send_cmd_reply(self, cmd_reply):
147 data = cmd_reply.get_data()
148 dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
149 self._sessiond_sock.sendall(data)
150
151 def _handle_server_cmd_reg_done(self, server_cmd):
152 dbg._pdebug(self._debug('got "registration done" server command'))
153
154 if self._reg_queue is not None:
155 dbg._pdebug(self._debug('notifying _init_threads()'))
156
157 try:
158 self._reg_queue.put(True)
159 except (Exception) as e:
160 # read side could be closed by now; ignore it
161 pass
162
163 self._reg_queue = None
164
165 def _handle_server_cmd_enable(self, server_cmd):
166 dbg._pdebug(self._debug('got "enable" server command'))
167 self._ref_count += 1
168
169 if self._ref_count == 1:
170 dbg._pdebug(self._debug('adding our handler to the root logger'))
171 self._root_logger.addHandler(self._log_handler)
172
173 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
174
175 return lttngust.cmd._ClientCmdReplyEnable()
176
177 def _handle_server_cmd_disable(self, server_cmd):
178 dbg._pdebug(self._debug('got "disable" server command'))
179 self._ref_count -= 1
180
181 if self._ref_count < 0:
182 # disable command could be sent again when a session is destroyed
183 self._ref_count = 0
184
185 if self._ref_count == 0:
186 dbg._pdebug(self._debug('removing our handler from the root logger'))
187 self._root_logger.removeHandler(self._log_handler)
188
189 dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
190
191 return lttngust.cmd._ClientCmdReplyDisable()
192
193 def _handle_server_cmd_list(self, server_cmd):
194 dbg._pdebug(self._debug('got "list" server command'))
195 names = logging.Logger.manager.loggerDict.keys()
196 dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
197 cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
198
199 return cmd_reply
200
201 def _handle_server_cmd(self, server_cmd):
202 cmd_reply = None
203
204 if server_cmd is None:
205 dbg._pdebug(self._debug('bad server command'))
206 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
207 cmd_reply = lttngust.cmd._ClientCmdReply(status)
208 elif type(server_cmd) in self._server_cmd_handlers:
209 cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
210 else:
211 dbg._pdebug(self._debug('unknown server command'))
212 status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
213 cmd_reply = lttngust.cmd._ClientCmdReply(status)
214
215 if cmd_reply is not None:
216 self._send_cmd_reply(cmd_reply)
217
218 def _wait_server_cmd(self):
219 while True:
220 try:
221 server_cmd = self._recv_server_cmd()
222 except socket.timeout:
223 # simply retry here; the protocol has no KA and we could
224 # wait for hours
225 continue
226
227 self._handle_server_cmd(server_cmd)
228
229 def _cleanup_socket(self):
230 try:
231 self._sessiond_sock.shutdown(socket.SHUT_RDWR)
232 self._sessiond_sock.close()
233 except:
234 pass
235
236 self._sessiond_sock = None
237
238 def _connect_to_sessiond(self):
239 # create session daemon TCP socket
240 if self._sessiond_sock is None:
241 self._sessiond_sock = socket.socket(socket.AF_INET,
242 socket.SOCK_STREAM)
243
244 # Use str(self._host) here. Since this host could be a string
245 # literal, and since we're importing __future__.unicode_literals,
246 # we want to make sure the host is a native string in Python 2.
247 # This avoids an indirect module import (unicode module to
248 # decode the unicode string, eventually imported by the
249 # socket module if needed), which is not allowed in a thread
250 # directly created by a module in Python 2 (our case).
251 #
252 # tl;dr: Do NOT remove str() here, or this call in Python 2
253 # _will_ block on an interpreter's mutex until the waiting
254 # register queue timeouts.
255 self._sessiond_sock.connect((str(self._host), self._port))
256
257 def _register(self):
258 cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
259 _PROTO_MAJOR, _PROTO_MINOR)
260 data = cmd.get_data()
261 self._sessiond_sock.sendall(data)
262
263
264 def _get_port_from_file(path):
265 port = None
266 dbg._pdebug('reading port from file "{}"'.format(path))
267
268 try:
269 f = open(path)
270 r_port = int(f.readline())
271 f.close()
272
273 if r_port > 0 or r_port <= 65535:
274 port = r_port
275 except:
276 pass
277
278 return port
279
280
281 def _get_user_home_path():
282 # $LTTNG_HOME overrides $HOME if it exists
283 return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
284
285
286 _initialized = False
287 _SESSIOND_HOST = '127.0.0.1'
288
289
290 def _client_thread_target(name, port, reg_queue):
291 dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
292 client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
293 dbg._pdebug('starting client "{}"'.format(name))
294 client.run()
295
296
297 def _init_threads():
298 global _initialized
299
300 dbg._pdebug('entering')
301
302 if _initialized:
303 dbg._pdebug('agent is already initialized')
304 return
305
306 # This makes sure that the appropriate modules for encoding and
307 # decoding strings/bytes are imported now, since no import should
308 # happen within a thread at import time (our case).
309 'lttng'.encode().decode()
310
311 _initialized = True
312 sys_port = _get_port_from_file('/var/run/lttng/agent.port')
313 user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
314 user_port = _get_port_from_file(user_port_file)
315 reg_queue = queue.Queue()
316 reg_expecting = 0
317
318 dbg._pdebug('system session daemon port: {}'.format(sys_port))
319 dbg._pdebug('user session daemon port: {}'.format(user_port))
320
321 if sys_port == user_port and sys_port is not None:
322 # The two session daemon ports are the same. This is not normal.
323 # Connect to only one.
324 dbg._pdebug('both user and system session daemon have the same port')
325 sys_port = None
326
327 try:
328 if sys_port is not None:
329 dbg._pdebug('creating system client thread')
330 t = threading.Thread(target=_client_thread_target,
331 args=('system', sys_port, reg_queue))
332 t.name = 'system'
333 t.daemon = True
334 t.start()
335 dbg._pdebug('created and started system client thread')
336 reg_expecting += 1
337
338 if user_port is not None:
339 dbg._pdebug('creating user client thread')
340 t = threading.Thread(target=_client_thread_target,
341 args=('user', user_port, reg_queue))
342 t.name = 'user'
343 t.daemon = True
344 t.start()
345 dbg._pdebug('created and started user client thread')
346 reg_expecting += 1
347 except:
348 # cannot create threads for some reason; stop this initialization
349 dbg._pwarning('cannot create client threads')
350 return
351
352 if reg_expecting == 0:
353 # early exit: looks like there's not even one valid port
354 dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
355 return
356
357 cur_timeout = _REG_TIMEOUT
358
359 # We block here to make sure the agent is properly registered to
360 # the session daemon. If we timeout, the client threads will still
361 # continue to try to connect and register to the session daemon,
362 # but there is no guarantee that all following logging statements
363 # will make it to LTTng-UST.
364 #
365 # When a client thread receives a "registration done" confirmation
366 # from the session daemon it's connected to, it puts True in
367 # reg_queue.
368 while True:
369 try:
370 dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
371 cur_timeout))
372 t1 = lttngust.compat._clock()
373 reg_queue.get(timeout=cur_timeout)
374 t2 = lttngust.compat._clock()
375 reg_expecting -= 1
376 dbg._pdebug('unblocked')
377
378 if reg_expecting == 0:
379 # done!
380 dbg._pdebug('successfully registered to session daemon(s)')
381 break
382
383 cur_timeout -= (t2 - t1)
384
385 if cur_timeout <= 0:
386 # timeout
387 dbg._pdebug('ran out of time')
388 break
389 except queue.Empty:
390 dbg._pdebug('ran out of time')
391 break
392
393 dbg._pdebug('leaving')
394
395
396 _init_threads()
This page took 0.036288 seconds and 4 git commands to generate.