X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=python-lttngust%2Flttngust%2Fagent.py;fp=python-lttngust%2Flttngust%2Fagent.py;h=ebfa2de1dbb8abbf0b51eb89556558efe034ea74;hb=5b6ff569c0bf82b72f263a259e7a73a453903648;hp=0000000000000000000000000000000000000000;hpb=4ed2398507da76ed80dd09b996242066fa34e2a0;p=lttng-ust.git diff --git a/python-lttngust/lttngust/agent.py b/python-lttngust/lttngust/agent.py new file mode 100644 index 00000000..ebfa2de1 --- /dev/null +++ b/python-lttngust/lttngust/agent.py @@ -0,0 +1,395 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 - Philippe Proulx +# Copyright (C) 2014 - David Goulet +# +# This library is free software; you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +from __future__ import unicode_literals +from __future__ import print_function +from __future__ import division +import lttngust.debug as dbg +import lttngust.loghandler +import lttngust.cmd +from io import open +import threading +import logging +import socket +import time +import sys +import os + + +try: + # Python 2 + import Queue as queue +except ImportError: + # Python 3 + import queue + + +_PROTO_DOMAIN = 5 +_PROTO_MAJOR = 2 +_PROTO_MINOR = 0 + + +def _get_env_value_ms(key, default_s): + try: + val = int(os.getenv(key, default_s * 1000)) / 1000 + except: + val = -1 + + if val < 0: + fmt = 'invalid ${} value; {} seconds will be used' + dbg._pwarning(fmt.format(key, default_s)) + val = default_s + + return val + + +_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5) +_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3) + + +class _TcpClient(object): + def __init__(self, name, host, port, reg_queue): + super(self.__class__, self).__init__() + self._name = name + self._host = host + self._port = port + + try: + self._log_handler = lttngust.loghandler._Handler() + except (OSError) as e: + dbg._pwarning('cannot load library: {}'.format(e)) + raise e + + self._root_logger = logging.getLogger() + self._root_logger.setLevel(logging.NOTSET) + self._ref_count = 0 + self._sessiond_sock = None + self._reg_queue = reg_queue + self._server_cmd_handlers = { + lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done, + lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable, + lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable, + lttngust.cmd._ServerCmdList: self._handle_server_cmd_list, + } + + def _debug(self, msg): + return 'client "{}": {}'.format(self._name, msg) + + def run(self): + while True: + try: + # connect to the session daemon + dbg._pdebug(self._debug('connecting to session daemon')) + self._connect_to_sessiond() + + # register to the session daemon after a successful connection + dbg._pdebug(self._debug('registering to session daemon')) + self._register() + + # wait for commands from the session daemon + self._wait_server_cmd() + except (Exception) as e: + # Whatever happens here, we have to close the socket and + # retry to connect to the session daemon since either + # the socket was closed, a network timeout occured, or + # invalid data was received. + dbg._pdebug(self._debug('got exception: {}'.format(e))) + self._cleanup_socket() + dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY))) + time.sleep(_RETRY_REG_DELAY) + + def _recv_server_cmd_header(self): + data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE) + + if not data: + dbg._pdebug(self._debug('received empty server command header')) + return None + + assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE) + dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data)))) + + return lttngust.cmd._server_cmd_header_from_data(data) + + def _recv_server_cmd(self): + server_cmd_header = self._recv_server_cmd_header() + + if server_cmd_header is None: + return None + + dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size))) + dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id))) + dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version))) + data = bytes() + + if server_cmd_header.data_size > 0: + data = self._sessiond_sock.recv(server_cmd_header.data_size) + assert(len(data) == server_cmd_header.data_size) + + return lttngust.cmd._server_cmd_from_data(server_cmd_header, data) + + def _send_cmd_reply(self, cmd_reply): + data = cmd_reply.get_data() + dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data)))) + self._sessiond_sock.sendall(data) + + def _handle_server_cmd_reg_done(self, server_cmd): + dbg._pdebug(self._debug('got "registration done" server command')) + + if self._reg_queue is not None: + dbg._pdebug(self._debug('notifying _init_threads()')) + + try: + self._reg_queue.put(True) + except (Exception) as e: + # read side could be closed by now; ignore it + pass + + self._reg_queue = None + + def _handle_server_cmd_enable(self, server_cmd): + dbg._pdebug(self._debug('got "enable" server command')) + self._ref_count += 1 + + if self._ref_count == 1: + dbg._pdebug(self._debug('adding our handler to the root logger')) + self._root_logger.addHandler(self._log_handler) + + dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) + + return lttngust.cmd._ClientCmdReplyEnable() + + def _handle_server_cmd_disable(self, server_cmd): + dbg._pdebug(self._debug('got "disable" server command')) + self._ref_count -= 1 + + if self._ref_count < 0: + # disable command could be sent again when a session is destroyed + self._ref_count = 0 + + if self._ref_count == 0: + dbg._pdebug(self._debug('removing our handler from the root logger')) + self._root_logger.removeHandler(self._log_handler) + + dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count))) + + return lttngust.cmd._ClientCmdReplyDisable() + + def _handle_server_cmd_list(self, server_cmd): + dbg._pdebug(self._debug('got "list" server command')) + names = logging.Logger.manager.loggerDict.keys() + dbg._pdebug(self._debug('found {} loggers'.format(len(names)))) + cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names) + + return cmd_reply + + def _handle_server_cmd(self, server_cmd): + cmd_reply = None + + if server_cmd is None: + dbg._pdebug(self._debug('bad server command')) + status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD + cmd_reply = lttngust.cmd._ClientCmdReply(status) + elif type(server_cmd) in self._server_cmd_handlers: + cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd) + else: + dbg._pdebug(self._debug('unknown server command')) + status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD + cmd_reply = lttngust.cmd._ClientCmdReply(status) + + if cmd_reply is not None: + self._send_cmd_reply(cmd_reply) + + def _wait_server_cmd(self): + while True: + try: + server_cmd = self._recv_server_cmd() + except socket.timeout: + # simply retry here; the protocol has no KA and we could + # wait for hours + continue + + self._handle_server_cmd(server_cmd) + + def _cleanup_socket(self): + try: + self._sessiond_sock.shutdown(socket.SHUT_RDWR) + self._sessiond_sock.close() + except: + pass + + self._sessiond_sock = None + + def _connect_to_sessiond(self): + # create session daemon TCP socket + if self._sessiond_sock is None: + self._sessiond_sock = socket.socket(socket.AF_INET, + socket.SOCK_STREAM) + + # Use str(self._host) here. Since this host could be a string + # literal, and since we're importing __future__.unicode_literals, + # we want to make sure the host is a native string in Python 2. + # This avoids an indirect module import (unicode module to + # decode the unicode string, eventually imported by the + # socket module if needed), which is not allowed in a thread + # directly created by a module in Python 2 (our case). + # + # tl;dr: Do NOT remove str() here, or this call in Python 2 + # _will_ block on an interpreter's mutex until the waiting + # register queue timeouts. + self._sessiond_sock.connect((str(self._host), self._port)) + + def _register(self): + cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(), + _PROTO_MAJOR, _PROTO_MINOR) + data = cmd.get_data() + self._sessiond_sock.sendall(data) + + +def _get_port_from_file(path): + port = None + dbg._pdebug('reading port from file "{}"'.format(path)) + + try: + f = open(path) + r_port = int(f.readline()) + f.close() + + if r_port > 0 or r_port <= 65535: + port = r_port + except: + pass + + return port + + +def _get_user_home_path(): + # $LTTNG_HOME overrides $HOME if it exists + return os.getenv('LTTNG_HOME', os.path.expanduser('~')) + + +_initialized = False +_SESSIOND_HOST = '127.0.0.1' + + +def _client_thread_target(name, port, reg_queue): + dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port)) + client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue) + dbg._pdebug('starting client "{}"'.format(name)) + client.run() + + +def _init_threads(): + global _initialized + + dbg._pdebug('entering') + + if _initialized: + dbg._pdebug('agent is already initialized') + return + + # This makes sure that the appropriate modules for encoding and + # decoding strings/bytes are imported now, since no import should + # happen within a thread at import time (our case). + 'lttng'.encode().decode() + + _initialized = True + sys_port = _get_port_from_file('/var/run/lttng/agent.port') + user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port') + user_port = _get_port_from_file(user_port_file) + reg_queue = queue.Queue() + reg_expecting = 0 + + dbg._pdebug('system session daemon port: {}'.format(sys_port)) + dbg._pdebug('user session daemon port: {}'.format(user_port)) + + if sys_port == user_port and sys_port is not None: + # The two session daemon ports are the same. This is not normal. + # Connect to only one. + dbg._pdebug('both user and system session daemon have the same port') + sys_port = None + + try: + if sys_port is not None: + dbg._pdebug('creating system client thread') + t = threading.Thread(target=_client_thread_target, + args=('system', sys_port, reg_queue)) + t.name = 'system' + t.daemon = True + t.start() + dbg._pdebug('created and started system client thread') + reg_expecting += 1 + + if user_port is not None: + dbg._pdebug('creating user client thread') + t = threading.Thread(target=_client_thread_target, + args=('user', user_port, reg_queue)) + t.name = 'user' + t.daemon = True + t.start() + dbg._pdebug('created and started user client thread') + reg_expecting += 1 + except: + # cannot create threads for some reason; stop this initialization + dbg._pwarning('cannot create client threads') + return + + if reg_expecting == 0: + # early exit: looks like there's not even one valid port + dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)') + return + + cur_timeout = _REG_TIMEOUT + + # We block here to make sure the agent is properly registered to + # the session daemon. If we timeout, the client threads will still + # continue to try to connect and register to the session daemon, + # but there is no guarantee that all following logging statements + # will make it to LTTng-UST. + # + # When a client thread receives a "registration done" confirmation + # from the session daemon it's connected to, it puts True in + # reg_queue. + while True: + try: + dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting, + cur_timeout)) + t1 = time.clock() + reg_queue.get(timeout=cur_timeout) + t2 = time.clock() + reg_expecting -= 1 + dbg._pdebug('unblocked') + + if reg_expecting == 0: + # done! + dbg._pdebug('successfully registered to session daemon(s)') + break + + cur_timeout -= (t2 - t1) + + if cur_timeout <= 0: + # timeout + dbg._pdebug('ran out of time') + break + except queue.Empty: + dbg._pdebug('ran out of time') + break + + dbg._pdebug('leaving') + + +_init_threads()