--- /dev/null
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
+# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
+#
+# This library is free software; you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+from __future__ import unicode_literals
+from __future__ import print_function
+from __future__ import division
+import lttngust.debug as dbg
+import lttngust.loghandler
+import lttngust.cmd
+from io import open
+import threading
+import logging
+import socket
+import time
+import sys
+import os
+
+
+try:
+ # Python 2
+ import Queue as queue
+except ImportError:
+ # Python 3
+ import queue
+
+
+_PROTO_DOMAIN = 5
+_PROTO_MAJOR = 1
+_PROTO_MINOR = 0
+
+
+def _get_env_value_ms(key, default_s):
+ try:
+ val = int(os.getenv(key, default_s * 1000)) / 1000
+ except:
+ val = -1
+
+ if val < 0:
+ fmt = 'invalid ${} value; {} seconds will be used'
+ dbg._pwarning(fmt.format(key, default_s))
+ val = default_s
+
+ return val
+
+
+_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
+_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
+
+
+class _TcpClient(object):
+ def __init__(self, name, host, port, reg_queue):
+ super(self.__class__, self).__init__()
+ self._name = name
+ self._host = host
+ self._port = port
+
+ try:
+ self._log_handler = lttngust.loghandler._Handler()
+ except (OSError) as e:
+ dbg._pwarning('cannot load library: {}'.format(e))
+ raise e
+
+ self._root_logger = logging.getLogger()
+ self._root_logger.setLevel(logging.NOTSET)
+ self._ref_count = 0
+ self._sessiond_sock = None
+ self._reg_queue = reg_queue
+ self._server_cmd_handlers = {
+ lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
+ lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
+ lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
+ lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
+ }
+
+ def _debug(self, msg):
+ return 'client "{}": {}'.format(self._name, msg)
+
+ def run(self):
+ while True:
+ try:
+ # connect to the session daemon
+ dbg._pdebug(self._debug('connecting to session daemon'))
+ self._connect_to_sessiond()
+
+ # register to the session daemon after a successful connection
+ dbg._pdebug(self._debug('registering to session daemon'))
+ self._register()
+
+ # wait for commands from the session daemon
+ self._wait_server_cmd()
+ except (Exception) as e:
+ # Whatever happens here, we have to close the socket and
+ # retry to connect to the session daemon since either
+ # the socket was closed, a network timeout occured, or
+ # invalid data was received.
+ dbg._pdebug(self._debug('got exception: {}'.format(e)))
+ self._cleanup_socket()
+ dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
+ time.sleep(_RETRY_REG_DELAY)
+
+ def _recv_server_cmd_header(self):
+ data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
+
+ if not data:
+ dbg._pdebug(self._debug('received empty server command header'))
+ return None
+
+ assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
+ dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
+
+ return lttngust.cmd._server_cmd_header_from_data(data)
+
+ def _recv_server_cmd(self):
+ server_cmd_header = self._recv_server_cmd_header()
+
+ if server_cmd_header is None:
+ return None
+
+ dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
+ dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
+ dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
+ data = bytes()
+
+ if server_cmd_header.data_size > 0:
+ data = self._sessiond_sock.recv(server_cmd_header.data_size)
+ assert(len(data) == server_cmd_header.data_size)
+
+ return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
+
+ def _send_cmd_reply(self, cmd_reply):
+ data = cmd_reply.get_data()
+ dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
+ self._sessiond_sock.sendall(data)
+
+ def _handle_server_cmd_reg_done(self, server_cmd):
+ dbg._pdebug(self._debug('got "registration done" server command'))
+
+ if self._reg_queue is not None:
+ dbg._pdebug(self._debug('notifying _init_threads()'))
+
+ try:
+ self._reg_queue.put(True)
+ except (Exception) as e:
+ # read side could be closed by now; ignore it
+ pass
+
+ self._reg_queue = None
+
+ def _handle_server_cmd_enable(self, server_cmd):
+ dbg._pdebug(self._debug('got "enable" server command'))
+ self._ref_count += 1
+
+ if self._ref_count == 1:
+ dbg._pdebug(self._debug('adding our handler to the root logger'))
+ self._root_logger.addHandler(self._log_handler)
+
+ dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
+
+ return lttngust.cmd._ClientCmdReplyEnable()
+
+ def _handle_server_cmd_disable(self, server_cmd):
+ dbg._pdebug(self._debug('got "disable" server command'))
+ self._ref_count -= 1
+
+ if self._ref_count < 0:
+ # disable command could be sent again when a session is destroyed
+ self._ref_count = 0
+
+ if self._ref_count == 0:
+ dbg._pdebug(self._debug('removing our handler from the root logger'))
+ self._root_logger.removeHandler(self._log_handler)
+
+ dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
+
+ return lttngust.cmd._ClientCmdReplyDisable()
+
+ def _handle_server_cmd_list(self, server_cmd):
+ dbg._pdebug(self._debug('got "list" server command'))
+ names = logging.Logger.manager.loggerDict.keys()
+ dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
+ cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
+
+ return cmd_reply
+
+ def _handle_server_cmd(self, server_cmd):
+ cmd_reply = None
+
+ if server_cmd is None:
+ dbg._pdebug(self._debug('bad server command'))
+ status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
+ cmd_reply = lttngust.cmd._ClientCmdReply(status)
+ elif type(server_cmd) in self._server_cmd_handlers:
+ cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
+ else:
+ dbg._pdebug(self._debug('unknown server command'))
+ status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
+ cmd_reply = lttngust.cmd._ClientCmdReply(status)
+
+ if cmd_reply is not None:
+ self._send_cmd_reply(cmd_reply)
+
+ def _wait_server_cmd(self):
+ while True:
+ try:
+ server_cmd = self._recv_server_cmd()
+ except socket.timeout:
+ # simply retry here; the protocol has no KA and we could
+ # wait for hours
+ continue
+
+ self._handle_server_cmd(server_cmd)
+
+ def _cleanup_socket(self):
+ try:
+ self._sessiond_sock.shutdown(socket.SHUT_RDWR)
+ self._sessiond_sock.close()
+ except:
+ pass
+
+ self._sessiond_sock = None
+
+ def _connect_to_sessiond(self):
+ # create session daemon TCP socket
+ if self._sessiond_sock is None:
+ self._sessiond_sock = socket.socket(socket.AF_INET,
+ socket.SOCK_STREAM)
+
+ # Use str(self._host) here. Since this host could be a string
+ # literal, and since we're importing __future__.unicode_literals,
+ # we want to make sure the host is a native string in Python 2.
+ # This avoids an indirect module import (unicode module to
+ # decode the unicode string, eventually imported by the
+ # socket module if needed), which is not allowed in a thread
+ # directly created by a module in Python 2 (our case).
+ #
+ # tl;dr: Do NOT remove str() here, or this call in Python 2
+ # _will_ block on an interpreter's mutex until the waiting
+ # register queue timeouts.
+ self._sessiond_sock.connect((str(self._host), self._port))
+
+ def _register(self):
+ cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
+ _PROTO_MAJOR, _PROTO_MINOR)
+ data = cmd.get_data()
+ self._sessiond_sock.sendall(data)
+
+
+def _get_port_from_file(path):
+ port = None
+ dbg._pdebug('reading port from file "{}"'.format(path))
+
+ try:
+ f = open(path)
+ r_port = int(f.readline())
+ f.close()
+
+ if r_port > 0 or r_port <= 65535:
+ port = r_port
+ except:
+ pass
+
+ return port
+
+
+def _get_user_home_path():
+ # $LTTNG_HOME overrides $HOME if it exists
+ return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
+
+
+_initialized = False
+_SESSIOND_HOST = '127.0.0.1'
+
+
+def _client_thread_target(name, port, reg_queue):
+ dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
+ client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
+ dbg._pdebug('starting client "{}"'.format(name))
+ client.run()
+
+
+def _init_threads():
+ global _initialized
+
+ dbg._pdebug('entering')
+
+ if _initialized:
+ dbg._pdebug('agent is already initialized')
+ return
+
+ # This makes sure that the appropriate modules for encoding and
+ # decoding strings/bytes are imported now, since no import should
+ # happen within a thread at import time (our case).
+ 'lttng'.encode().decode()
+
+ _initialized = True
+ sys_port = _get_port_from_file('/var/run/lttng/agent.port')
+ user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
+ user_port = _get_port_from_file(user_port_file)
+ reg_queue = queue.Queue()
+ reg_expecting = 0
+
+ dbg._pdebug('system session daemon port: {}'.format(sys_port))
+ dbg._pdebug('user session daemon port: {}'.format(user_port))
+
+ try:
+ if sys_port is not None:
+ dbg._pdebug('creating system client thread')
+ t = threading.Thread(target=_client_thread_target,
+ args=('system', sys_port, reg_queue))
+ t.name = 'system'
+ t.daemon = True
+ t.start()
+ dbg._pdebug('created and started system client thread')
+ reg_expecting += 1
+
+ if user_port is not None:
+ dbg._pdebug('creating user client thread')
+ t = threading.Thread(target=_client_thread_target,
+ args=('user', user_port, reg_queue))
+ t.name = 'user'
+ t.daemon = True
+ t.start()
+ dbg._pdebug('created and started user client thread')
+ reg_expecting += 1
+ except:
+ # cannot create threads for some reason; stop this initialization
+ dbg._pwarning('cannot create client threads')
+ return
+
+ if reg_expecting == 0:
+ # early exit: looks like there's not even one valid port
+ dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
+ return
+
+ cur_timeout = _REG_TIMEOUT
+
+ # We block here to make sure the agent is properly registered to
+ # the session daemon. If we timeout, the client threads will still
+ # continue to try to connect and register to the session daemon,
+ # but there is no guarantee that all following logging statements
+ # will make it to LTTng-UST.
+ #
+ # When a client thread receives a "registration done" confirmation
+ # from the session daemon it's connected to, it puts True in
+ # reg_queue.
+ while True:
+ try:
+ dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
+ cur_timeout))
+ t1 = time.clock()
+ reg_queue.get(timeout=cur_timeout)
+ t2 = time.clock()
+ reg_expecting -= 1
+ dbg._pdebug('unblocked')
+
+ if reg_expecting == 0:
+ # done!
+ dbg._pdebug('successfully registered to session daemon(s)')
+ break
+
+ cur_timeout -= (t2 - t1)
+
+ if cur_timeout <= 0:
+ # timeout
+ dbg._pdebug('ran out of time')
+ break
+ except queue.Empty:
+ dbg._pdebug('ran out of time')
+ break
+
+ dbg._pdebug('leaving')
+
+
+_init_threads()