Refactor Python agent build and install
[lttng-ust.git] / python-lttngust / lttngust / agent.py
diff --git a/python-lttngust/lttngust/agent.py b/python-lttngust/lttngust/agent.py
new file mode 100644 (file)
index 0000000..ebfa2de
--- /dev/null
@@ -0,0 +1,395 @@
+# -*- coding: utf-8 -*-
+#
+# Copyright (C) 2015 - Philippe Proulx <pproulx@efficios.com>
+# Copyright (C) 2014 - David Goulet <dgoulet@efficios.com>
+#
+# This library is free software; you can redistribute it and/or modify it under
+# the terms of the GNU Lesser General Public License as published by the Free
+# Software Foundation; version 2.1 of the License.
+#
+# This library is distributed in the hope that it will be useful, but WITHOUT
+# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+# FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more
+# details.
+#
+# You should have received a copy of the GNU Lesser General Public License
+# along with this library; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301 USA
+
+from __future__ import unicode_literals
+from __future__ import print_function
+from __future__ import division
+import lttngust.debug as dbg
+import lttngust.loghandler
+import lttngust.cmd
+from io import open
+import threading
+import logging
+import socket
+import time
+import sys
+import os
+
+
+try:
+    # Python 2
+    import Queue as queue
+except ImportError:
+    # Python 3
+    import queue
+
+
+_PROTO_DOMAIN = 5
+_PROTO_MAJOR = 2
+_PROTO_MINOR = 0
+
+
+def _get_env_value_ms(key, default_s):
+    try:
+        val = int(os.getenv(key, default_s * 1000)) / 1000
+    except:
+        val = -1
+
+    if val < 0:
+        fmt = 'invalid ${} value; {} seconds will be used'
+        dbg._pwarning(fmt.format(key, default_s))
+        val = default_s
+
+    return val
+
+
+_REG_TIMEOUT = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_TIMEOUT', 5)
+_RETRY_REG_DELAY = _get_env_value_ms('LTTNG_UST_PYTHON_REGISTER_RETRY_DELAY', 3)
+
+
+class _TcpClient(object):
+    def __init__(self, name, host, port, reg_queue):
+        super(self.__class__, self).__init__()
+        self._name = name
+        self._host = host
+        self._port = port
+
+        try:
+            self._log_handler = lttngust.loghandler._Handler()
+        except (OSError) as e:
+            dbg._pwarning('cannot load library: {}'.format(e))
+            raise e
+
+        self._root_logger = logging.getLogger()
+        self._root_logger.setLevel(logging.NOTSET)
+        self._ref_count = 0
+        self._sessiond_sock = None
+        self._reg_queue = reg_queue
+        self._server_cmd_handlers = {
+            lttngust.cmd._ServerCmdRegistrationDone: self._handle_server_cmd_reg_done,
+            lttngust.cmd._ServerCmdEnable: self._handle_server_cmd_enable,
+            lttngust.cmd._ServerCmdDisable: self._handle_server_cmd_disable,
+            lttngust.cmd._ServerCmdList: self._handle_server_cmd_list,
+        }
+
+    def _debug(self, msg):
+        return 'client "{}": {}'.format(self._name, msg)
+
+    def run(self):
+        while True:
+            try:
+                # connect to the session daemon
+                dbg._pdebug(self._debug('connecting to session daemon'))
+                self._connect_to_sessiond()
+
+                # register to the session daemon after a successful connection
+                dbg._pdebug(self._debug('registering to session daemon'))
+                self._register()
+
+                # wait for commands from the session daemon
+                self._wait_server_cmd()
+            except (Exception) as e:
+                # Whatever happens here, we have to close the socket and
+                # retry to connect to the session daemon since either
+                # the socket was closed, a network timeout occured, or
+                # invalid data was received.
+                dbg._pdebug(self._debug('got exception: {}'.format(e)))
+                self._cleanup_socket()
+                dbg._pdebug(self._debug('sleeping for {} s'.format(_RETRY_REG_DELAY)))
+                time.sleep(_RETRY_REG_DELAY)
+
+    def _recv_server_cmd_header(self):
+        data = self._sessiond_sock.recv(lttngust.cmd._SERVER_CMD_HEADER_SIZE)
+
+        if not data:
+            dbg._pdebug(self._debug('received empty server command header'))
+            return None
+
+        assert(len(data) == lttngust.cmd._SERVER_CMD_HEADER_SIZE)
+        dbg._pdebug(self._debug('received server command header ({} bytes)'.format(len(data))))
+
+        return lttngust.cmd._server_cmd_header_from_data(data)
+
+    def _recv_server_cmd(self):
+        server_cmd_header = self._recv_server_cmd_header()
+
+        if server_cmd_header is None:
+            return None
+
+        dbg._pdebug(self._debug('server command header: data size: {} bytes'.format(server_cmd_header.data_size)))
+        dbg._pdebug(self._debug('server command header: command ID: {}'.format(server_cmd_header.cmd_id)))
+        dbg._pdebug(self._debug('server command header: command version: {}'.format(server_cmd_header.cmd_version)))
+        data = bytes()
+
+        if server_cmd_header.data_size > 0:
+            data = self._sessiond_sock.recv(server_cmd_header.data_size)
+            assert(len(data) == server_cmd_header.data_size)
+
+        return lttngust.cmd._server_cmd_from_data(server_cmd_header, data)
+
+    def _send_cmd_reply(self, cmd_reply):
+        data = cmd_reply.get_data()
+        dbg._pdebug(self._debug('sending command reply ({} bytes)'.format(len(data))))
+        self._sessiond_sock.sendall(data)
+
+    def _handle_server_cmd_reg_done(self, server_cmd):
+        dbg._pdebug(self._debug('got "registration done" server command'))
+
+        if self._reg_queue is not None:
+            dbg._pdebug(self._debug('notifying _init_threads()'))
+
+            try:
+                self._reg_queue.put(True)
+            except (Exception) as e:
+                # read side could be closed by now; ignore it
+                pass
+
+            self._reg_queue = None
+
+    def _handle_server_cmd_enable(self, server_cmd):
+        dbg._pdebug(self._debug('got "enable" server command'))
+        self._ref_count += 1
+
+        if self._ref_count == 1:
+            dbg._pdebug(self._debug('adding our handler to the root logger'))
+            self._root_logger.addHandler(self._log_handler)
+
+        dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
+
+        return lttngust.cmd._ClientCmdReplyEnable()
+
+    def _handle_server_cmd_disable(self, server_cmd):
+        dbg._pdebug(self._debug('got "disable" server command'))
+        self._ref_count -= 1
+
+        if self._ref_count < 0:
+            # disable command could be sent again when a session is destroyed
+            self._ref_count = 0
+
+        if self._ref_count == 0:
+            dbg._pdebug(self._debug('removing our handler from the root logger'))
+            self._root_logger.removeHandler(self._log_handler)
+
+        dbg._pdebug(self._debug('ref count is {}'.format(self._ref_count)))
+
+        return lttngust.cmd._ClientCmdReplyDisable()
+
+    def _handle_server_cmd_list(self, server_cmd):
+        dbg._pdebug(self._debug('got "list" server command'))
+        names = logging.Logger.manager.loggerDict.keys()
+        dbg._pdebug(self._debug('found {} loggers'.format(len(names))))
+        cmd_reply = lttngust.cmd._ClientCmdReplyList(names=names)
+
+        return cmd_reply
+
+    def _handle_server_cmd(self, server_cmd):
+        cmd_reply = None
+
+        if server_cmd is None:
+            dbg._pdebug(self._debug('bad server command'))
+            status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
+            cmd_reply = lttngust.cmd._ClientCmdReply(status)
+        elif type(server_cmd) in self._server_cmd_handlers:
+            cmd_reply = self._server_cmd_handlers[type(server_cmd)](server_cmd)
+        else:
+            dbg._pdebug(self._debug('unknown server command'))
+            status = lttngust.cmd._CLIENT_CMD_REPLY_STATUS_INVALID_CMD
+            cmd_reply = lttngust.cmd._ClientCmdReply(status)
+
+        if cmd_reply is not None:
+            self._send_cmd_reply(cmd_reply)
+
+    def _wait_server_cmd(self):
+        while True:
+            try:
+                server_cmd = self._recv_server_cmd()
+            except socket.timeout:
+                # simply retry here; the protocol has no KA and we could
+                # wait for hours
+                continue
+
+            self._handle_server_cmd(server_cmd)
+
+    def _cleanup_socket(self):
+        try:
+            self._sessiond_sock.shutdown(socket.SHUT_RDWR)
+            self._sessiond_sock.close()
+        except:
+            pass
+
+        self._sessiond_sock = None
+
+    def _connect_to_sessiond(self):
+        # create session daemon TCP socket
+        if self._sessiond_sock is None:
+            self._sessiond_sock = socket.socket(socket.AF_INET,
+                                                socket.SOCK_STREAM)
+
+        # Use str(self._host) here. Since this host could be a string
+        # literal, and since we're importing __future__.unicode_literals,
+        # we want to make sure the host is a native string in Python 2.
+        # This avoids an indirect module import (unicode module to
+        # decode the unicode string, eventually imported by the
+        # socket module if needed), which is not allowed in a thread
+        # directly created by a module in Python 2 (our case).
+        #
+        # tl;dr: Do NOT remove str() here, or this call in Python 2
+        # _will_ block on an interpreter's mutex until the waiting
+        # register queue timeouts.
+        self._sessiond_sock.connect((str(self._host), self._port))
+
+    def _register(self):
+        cmd = lttngust.cmd._ClientRegisterCmd(_PROTO_DOMAIN, os.getpid(),
+                                              _PROTO_MAJOR, _PROTO_MINOR)
+        data = cmd.get_data()
+        self._sessiond_sock.sendall(data)
+
+
+def _get_port_from_file(path):
+    port = None
+    dbg._pdebug('reading port from file "{}"'.format(path))
+
+    try:
+        f = open(path)
+        r_port = int(f.readline())
+        f.close()
+
+        if r_port > 0 or r_port <= 65535:
+            port = r_port
+    except:
+        pass
+
+    return port
+
+
+def _get_user_home_path():
+    # $LTTNG_HOME overrides $HOME if it exists
+    return os.getenv('LTTNG_HOME', os.path.expanduser('~'))
+
+
+_initialized = False
+_SESSIOND_HOST = '127.0.0.1'
+
+
+def _client_thread_target(name, port, reg_queue):
+    dbg._pdebug('creating client "{}" using TCP port {}'.format(name, port))
+    client = _TcpClient(name, _SESSIOND_HOST, port, reg_queue)
+    dbg._pdebug('starting client "{}"'.format(name))
+    client.run()
+
+
+def _init_threads():
+    global _initialized
+
+    dbg._pdebug('entering')
+
+    if _initialized:
+        dbg._pdebug('agent is already initialized')
+        return
+
+    # This makes sure that the appropriate modules for encoding and
+    # decoding strings/bytes are imported now, since no import should
+    # happen within a thread at import time (our case).
+    'lttng'.encode().decode()
+
+    _initialized = True
+    sys_port = _get_port_from_file('/var/run/lttng/agent.port')
+    user_port_file = os.path.join(_get_user_home_path(), '.lttng', 'agent.port')
+    user_port = _get_port_from_file(user_port_file)
+    reg_queue = queue.Queue()
+    reg_expecting = 0
+
+    dbg._pdebug('system session daemon port: {}'.format(sys_port))
+    dbg._pdebug('user session daemon port: {}'.format(user_port))
+
+    if sys_port == user_port and sys_port is not None:
+        # The two session daemon ports are the same. This is not normal.
+        # Connect to only one.
+        dbg._pdebug('both user and system session daemon have the same port')
+        sys_port = None
+
+    try:
+        if sys_port is not None:
+            dbg._pdebug('creating system client thread')
+            t = threading.Thread(target=_client_thread_target,
+                                 args=('system', sys_port, reg_queue))
+            t.name = 'system'
+            t.daemon = True
+            t.start()
+            dbg._pdebug('created and started system client thread')
+            reg_expecting += 1
+
+        if user_port is not None:
+            dbg._pdebug('creating user client thread')
+            t = threading.Thread(target=_client_thread_target,
+                                 args=('user', user_port, reg_queue))
+            t.name = 'user'
+            t.daemon = True
+            t.start()
+            dbg._pdebug('created and started user client thread')
+            reg_expecting += 1
+    except:
+        # cannot create threads for some reason; stop this initialization
+        dbg._pwarning('cannot create client threads')
+        return
+
+    if reg_expecting == 0:
+        # early exit: looks like there's not even one valid port
+        dbg._pwarning('no valid LTTng session daemon port found (is the session daemon started?)')
+        return
+
+    cur_timeout = _REG_TIMEOUT
+
+    # We block here to make sure the agent is properly registered to
+    # the session daemon. If we timeout, the client threads will still
+    # continue to try to connect and register to the session daemon,
+    # but there is no guarantee that all following logging statements
+    # will make it to LTTng-UST.
+    #
+    # When a client thread receives a "registration done" confirmation
+    # from the session daemon it's connected to, it puts True in
+    # reg_queue.
+    while True:
+        try:
+            dbg._pdebug('waiting for registration done (expecting {}, timeout is {} s)'.format(reg_expecting,
+                                                                                               cur_timeout))
+            t1 = time.clock()
+            reg_queue.get(timeout=cur_timeout)
+            t2 = time.clock()
+            reg_expecting -= 1
+            dbg._pdebug('unblocked')
+
+            if reg_expecting == 0:
+                # done!
+                dbg._pdebug('successfully registered to session daemon(s)')
+                break
+
+            cur_timeout -= (t2 - t1)
+
+            if cur_timeout <= 0:
+                # timeout
+                dbg._pdebug('ran out of time')
+                break
+        except queue.Empty:
+            dbg._pdebug('ran out of time')
+            break
+
+    dbg._pdebug('leaving')
+
+
+_init_threads()
This page took 0.026222 seconds and 4 git commands to generate.