From c3e14096db731a8624b55fb91a231e7a97a5e7cd Mon Sep 17 00:00:00 2001 From: David Goulet Date: Thu, 9 Oct 2014 13:21:02 -0400 Subject: [PATCH] Add Python agent support This adds a new library in this repository being the liblttng-ust-python-agent. It contains the Python agent and a library for the tracepoints used by the agent. To enable the python agent, use --enable-python-agent to build it. This new agent is bound to the LTTng python domain in lttng-tools which implements the -p,--python option for agent's command. It works and tested for python 2.7.x and 3.x Signed-off-by: David Goulet Signed-off-by: Mathieu Desnoyers --- .gitignore | 3 + Makefile.am | 4 + configure.ac | 12 + liblttng-ust-python-agent/Makefile.am | 20 + liblttng-ust-python-agent/lttng_agent.py.in | 567 +++++++++++++++++++ liblttng-ust-python-agent/lttng_ust_python.c | 32 ++ liblttng-ust-python-agent/lttng_ust_python.h | 56 ++ 7 files changed, 694 insertions(+) create mode 100644 liblttng-ust-python-agent/Makefile.am create mode 100644 liblttng-ust-python-agent/lttng_agent.py.in create mode 100644 liblttng-ust-python-agent/lttng_ust_python.c create mode 100644 liblttng-ust-python-agent/lttng_ust_python.h diff --git a/.gitignore b/.gitignore index 10e4176c..166255e8 100644 --- a/.gitignore +++ b/.gitignore @@ -61,3 +61,6 @@ log4j-jni-header.stamp org_lttng_ust_agent_jul_LTTngLogHandler.h org_lttng_ust_agent_log4j_LTTngLogAppender.h liblttng-ust-java-agent/java/liblttng-ust-jul.jar + +# Python agent +lttng_agent.py diff --git a/Makefile.am b/Makefile.am index 9fb95565..16663eef 100644 --- a/Makefile.am +++ b/Makefile.am @@ -20,6 +20,10 @@ if BUILD_JAVA_AGENT SUBDIRS += liblttng-ust-java-agent endif +if BUILD_PYTHON_AGENT +SUBDIRS += liblttng-ust-python-agent +endif + SUBDIRS += tests doc #temporarily disabled diff --git a/configure.ac b/configure.ac index f09eb519..55cec234 100644 --- a/configure.ac +++ b/configure.ac @@ -299,6 +299,14 @@ if test "x$java_agent_log4j" = "xyes"; then fi fi +# Option to build the python agent +AC_ARG_ENABLE([python-agent], + [AS_HELP_STRING([--enable-python-agent],[build the LTTng UST Python agent [default=no]])], + [python_agent=$enableval], + [:] +) +AM_CONDITIONAL([BUILD_PYTHON_AGENT], [test "x$python_agent" = "xyes"]) + # sdt.h integration AC_ARG_WITH([sdt], [AS_HELP_STRING([--with-sdt],[provide SystemTap integration via sdt.h [default=no]])], @@ -366,6 +374,7 @@ AC_CONFIG_FILES([ liblttng-ust-java-agent/jni/log4j/Makefile liblttng-ust-libc-wrapper/Makefile liblttng-ust-cyg-profile/Makefile + liblttng-ust-python-agent/Makefile tools/Makefile tests/Makefile tests/hello/Makefile @@ -398,6 +407,9 @@ AS_IF([test "x$java_agent_log4j" = "xyes"], [AS_ECHO("Enabled")], [AS_ECHO("Disa AS_ECHO_N("JNI interface (JNI): ") AS_IF([test "x$jni_interface" = "xyes"], [AS_ECHO("Enabled")], [AS_ECHO("Disabled")]) +AS_ECHO_N("Python agent: ") +AS_IF([test "x$python_agent" = "xyes"], [AS_ECHO("Enabled")], [AS_ECHO("Disabled")]) + AS_ECHO_N("sdt.h integration: ") AS_IF([test "x$with_sdt" = "xyes"], [AS_ECHO("Enabled")], [AS_ECHO("Disabled")]) diff --git a/liblttng-ust-python-agent/Makefile.am b/liblttng-ust-python-agent/Makefile.am new file mode 100644 index 00000000..7b514470 --- /dev/null +++ b/liblttng-ust-python-agent/Makefile.am @@ -0,0 +1,20 @@ + +AM_CPPFLAGS = $(PYTHON_INCLUDE) -I$(top_srcdir)/include/ +AM_CFLAGS = -fno-strict-aliasing + +EXTRA_DIST = lttng-agent.py.in + +lttng_agent_PYTHON = lttng_agent.py +lttng_agentdir = $(pythondir) + +lib_LTLIBRARIES = liblttng-ust-python-agent.la + +nodist_liblttng_ust_python_agent_la_SOURCES = lttng_agent.py +liblttng_ust_python_agent_la_SOURCES = lttng_ust_python.c lttng_ust_python.h +liblttng_ust_python_agent_la_LIBADD = -lc -llttng-ust \ + -L$(top_builddir)/liblttng-ust/.libs + +all: + $(SED) 's|LIBDIR_STR|$(libdir)|g' < lttng_agent.py.in > lttng_agent.py + +CLEANFILES = lttng_agent.py diff --git a/liblttng-ust-python-agent/lttng_agent.py.in b/liblttng-ust-python-agent/lttng_agent.py.in new file mode 100644 index 00000000..9e8cf611 --- /dev/null +++ b/liblttng-ust-python-agent/lttng_agent.py.in @@ -0,0 +1,567 @@ +# -*- coding: utf-8 -*- +# +# Copyright (C) 2014 - David Goulet +# +# This library is free software; you can redistribute it and/or modify it under +# the terms of the GNU Lesser General Public License as published by the Free +# Software Foundation; version 2.1 of the License. +# +# This library is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more +# details. +# +# You should have received a copy of the GNU Lesser General Public License +# along with this library; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +from __future__ import unicode_literals + +import ctypes +import errno +import logging +import os +import sys +import threading +import struct +import select + +from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP +from socket import * +from time import sleep + +__all__ = ["lttng-agent"] +__author__ = "David Goulet " + +class LTTngAgent(): + """ + LTTng agent python code. A LTTng Agent is responsible to spawn two threads, + the current UID and root session daemon. Those two threads register to the + right daemon and handle the tracing. + + This class needs to be instantiate once and once the init returns, tracing + is ready to happen. + """ + + SESSIOND_ADDR = "127.0.0.1" + SEM_COUNT = 2 + # Timeout for the sempahore in seconds. + SEM_TIMEOUT = 5 + SEM_WAIT_PERIOD = 0.2 + + def __init__(self): + # Session daemon register semaphore. + self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT); + + self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) + self.client_user.start() + + self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) + self.client_root.log_handler.is_root = True + self.client_root.start() + + acquire = 0 + timeout = LTTngAgent.SEM_TIMEOUT + while True: + # Quit if timeout has reached 0 or below. + if acquire == LTTngAgent.SEM_COUNT or timeout <= 0: + break; + + # Acquire semaphore for *user* thread. + if not self.register_sem.acquire(False): + sleep(LTTngAgent.SEM_WAIT_PERIOD) + timeout -= LTTngAgent.SEM_WAIT_PERIOD + else: + acquire += 1 + + def __del__(self): + self.destroy() + + def destroy(self): + self.client_user.destroy() + self.client_user.join() + + self.client_root.destroy() + self.client_root.join() + +class LTTngCmdError(RuntimeError): + """ + Command error thrown if an error is encountered in a command from the + session daemon. + """ + + def __init__(self, code): + super().__init__('LTTng command error: code {}'.format(code)) + self._code = code + + def get_code(self): + return self._code + +class LTTngUnknownCmdError(RuntimeError): + pass + +class LTTngLoggingHandler(logging.Handler): + """ + Class handler for the Python logging API. + """ + + def __init__(self): + logging.Handler.__init__(self, level = logging.NOTSET) + + # Refcount tracking how many events have been enabled. This value above + # 0 means that the handler is attached to the root logger. + self.refcount = 0 + + # Dict of enabled event. We track them so we know if it's ok to disable + # the received event. + self.enabled_events = {} + + # Am I root ? + self.is_root = False + + # Using the logging formatter to extract the asctime only. + self.log_fmt = logging.Formatter("%(asctime)s") + self.setFormatter(self.log_fmt) + + # ctypes lib for lttng-ust + try: + self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so") + except OSError as e: + print("Unable to find libust for Python.") + + def emit(self, record): + """ + Fire LTTng UST tracepoint with the given record. + """ + asctime = self.format(record) + + self.lttng_ust.py_tracepoint(asctime.encode(), + record.getMessage().encode(), record.name.encode(), + record.funcName.encode(), record.lineno, record.levelno, + record.thread, record.threadName.encode()) + + def enable_event(self, name): + """ + Enable an event name which will ultimately add an handler to the root + logger if none is present. + """ + # Don't update the refcount if the event has been enabled prior. + if name in self.enabled_events: + return + + # Get the root logger and attach our handler. + root_logger = logging.getLogger() + # First thing first, we need to set the root logger to the loglevel + # NOTSET so we can catch everything. The default is 30. + root_logger.setLevel(logging.NOTSET) + + self.refcount += 1 + if self.refcount == 1: + root_logger.addHandler(self) + + self.enabled_events[name] = True + + def disable_event(self, name): + """ + Disable an event name which will ultimately add an handler to the root + logger if none is present. + """ + + if name not in self.enabled_events: + # Event was not enabled prior, do nothing. + return + + # Get the root logger and attach our handler. + root_logger = logging.getLogger() + + self.refcount -= 1 + if self.refcount == 0: + root_logger.removeHandler(self) + del self.enabled_events[name] + + def list_logger(self): + """ + Return a list of logger name. + """ + return logging.Logger.manager.loggerDict.keys() + +class LTTngSessiondCmd(): + """ + Class handling session daemon command. + """ + + # Command values from the agent protocol + CMD_LIST = 1 + CMD_ENABLE = 2 + CMD_DISABLE = 3 + CMD_REG_DONE = 4 + + # Return code + CODE_SUCCESS = 1 + CODE_INVALID_CMD = 2 + + # Python Logger LTTng domain value taken from lttng/domain.h + DOMAIN = 5 + + # Protocol version + MAJOR_VERSION = 1 + MINOR_VERSION = 0 + + def execute(self): + """ + This is part of the command interface. Must be implemented. + """ + raise NotImplementedError + +class LTTngCommandReply(): + """ + Object that contains the information that should be replied to the session + daemon after a command execution. + """ + + def __init__(self, payload = None, reply = True): + self.payload = payload + self.reply = reply + +class LTTngCommandEnable(LTTngSessiondCmd): + """ + Handle the enable event command from the session daemon. + """ + + def __init__(self, log_handler, data): + self.log_handler = log_handler + # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8. + name_offset = 8; + + data_size = len(data) + if data_size == 0: + raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) + + try: + self.loglevel, self.loglevel_type, self.name = \ + struct.unpack('>II%us' % (data_size - name_offset), data) + # Remove trailing NULL bytes from name. + self.name = self.name.decode().rstrip('\x00') + except struct.error: + raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) + + def execute(self): + self.log_handler.enable_event(self.name) + return LTTngCommandReply() + +class LTTngCommandDisable(LTTngSessiondCmd): + """ + Handle the disable event command from the session daemon. + """ + + def __init__(self, log_handler, data): + self.log_handler = log_handler + + data_size = len(data) + if data_size == 0: + raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) + + try: + self.name = struct.unpack('>%us' % (data_size), data)[0] + # Remove trailing NULL bytes from name. + self.name = self.name.decode().rstrip('\x00') + except struct.error: + raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) + + def execute(self): + self.log_handler.disable_event(self.name) + return LTTngCommandReply() + +class LTTngCommandRegDone(LTTngSessiondCmd): + """ + Handle register done command. This is sent back after a successful + registration from the session daemon. We basically release the given + semaphore so the agent can return to the caller. + """ + + def __init__(self, sem): + self.sem = sem + + def execute(self): + self.sem.release() + return LTTngCommandReply(reply = False) + +class LTTngCommandList(LTTngSessiondCmd): + """ + Handle the list command from the session daemon on the given socket. + """ + + def __init__(self, log_handler): + self.log_handler = log_handler + + def execute(self): + data_size = 0 + data = logger_data = bytearray() + + loggers = self.log_handler.list_logger() + # First, pack nb_event that must preceed the data. + logger_data += struct.pack('>I', len(loggers)) + + # Populate payload with logger name. + for logger in loggers: + # Increment data size plus the NULL byte at the end of the name. + data_size += len(logger) + 1 + # Pack logger name and NULL byte. + logger_data += struct.pack('>%usB' % (len(logger)), \ + bytes(bytearray(str.encode(logger))), 0) + + # Pack uint32_t data_size followed by nb event (number of logger) + data = struct.pack('>I', data_size) + data += logger_data + return LTTngCommandReply(payload = data) + +class LTTngTCPClient(threading.Thread): + """ + TCP client that register and receives command from the session daemon. + """ + + SYSTEM_PORT_FILE = "/var/run/lttng/agent.port" + USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port") + + # The time in seconds this client should wait before trying again to + # register back to the session daemon. + WAIT_TIME = 3 + + def __init__(self, host, sem): + threading.Thread.__init__(self) + + # Which host to connect to. The port is fetch dynamically. + self.sessiond_host = host + + # The session daemon register done semaphore. Needs to be released when + # receiving a CMD_REG_DONE command. + self.register_sem = sem + self.register_sem.acquire() + + # Indicate that we have to quit thus stop the main loop. + self.quit_flag = False + # Quit pipe. The thread poll on it to know when to quit. + self.quit_pipe = os.pipe() + + # Socket on which we communicate with the session daemon. + self.sessiond_sock = None + # LTTng Logging Handler + self.log_handler = LTTngLoggingHandler() + + def cleanup_socket(self, epfd = None): + # Ease our life a bit. + sock = self.sessiond_sock + if not sock: + return + + try: + if epfd is not None: + epfd.unregister(sock) + sock.shutdown(SHUT_RDWR) + sock.close() + except select.error: + # Cleanup fail, we can't do anything much... + pass + except IOError: + pass + + self.sessiond_sock = None + + def destroy(self): + self.quit_flag = True + try: + fp = os.fdopen(self.quit_pipe[1], 'w') + fp.write("42") + fp.close() + except OSError as e: + pass + + def register(self): + """ + Register to session daemon using the previously connected socket of the + class. + + Command ABI: + uint32 domain + uint32 pid + """ + data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \ + LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION) + self.sessiond_sock.send(data) + + def run(self): + """ + Start the TCP client thread by registering to the session daemon and polling + on that socket for commands. + """ + + epfd = epoll() + epfd.register(self.quit_pipe[0], EPOLLIN) + + # Main loop to handle session daemon command and disconnection. + while not self.quit_flag: + try: + # First, connect to the session daemon. + self.connect_sessiond() + + # Register to session daemon after a successful connection. + self.register() + # Add registered socket to poll set. + epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP) + + self.quit_flag = self.wait_cmd(epfd) + except IOError as e: + # Whatever happens here, we have to close down everything and + # retry to connect to the session daemon since either the + # socket is closed or invalid data was sent. + self.cleanup_socket(epfd) + self.register_sem.release() + sleep(LTTngTCPClient.WAIT_TIME) + continue + + self.cleanup_socket(epfd) + os.close(self.quit_pipe[0]) + epfd.close() + + def recv_header(self, sock): + """ + Receive the command header from the given socket. Set the internal + state of this object with the header data. + + Header ABI is defined like this: + uint64 data_size + uint32 cmd + uint32 cmd_version + """ + s_pack = struct.Struct('>QII') + + pack_data = sock.recv(s_pack.size) + data_received = len(pack_data) + if data_received == 0: + raise IOError(errno.ESHUTDOWN) + + try: + return s_pack.unpack(pack_data) + except struct.error: + raise IOError(errno.EINVAL) + + def create_command(self, cmd_type, version, data): + """ + Return the right command object using the given command type. The + command version is unused since we only have once for now. + """ + + cmd_dict = { + LTTngSessiondCmd.CMD_LIST: \ + lambda: LTTngCommandList(self.log_handler), + LTTngSessiondCmd.CMD_ENABLE: \ + lambda: LTTngCommandEnable(self.log_handler, data), + LTTngSessiondCmd.CMD_DISABLE: \ + lambda: LTTngCommandDisable(self.log_handler, data), + LTTngSessiondCmd.CMD_REG_DONE: \ + lambda: LTTngCommandRegDone(self.register_sem), + } + + if cmd_type in cmd_dict: + return cmd_dict[cmd_type]() + else: + raise LTTngUnknownCmdError() + + def pack_code(self, code): + return struct.pack('>I', code) + + def handle_command(self, data, cmd_type, cmd_version): + """ + Handle the given command type with the received payload. This function + sends back data to the session daemon using to the return value of the + command. + """ + payload = bytearray() + + try: + cmd = self.create_command(cmd_type, cmd_version, data) + cmd_reply = cmd.execute() + # Set success code in data + payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS) + if cmd_reply.payload is not None: + payload += cmd_reply.payload + except LTTngCmdError as e: + # Set error code in payload + payload += self.pack_code(e.get_code()) + except LTTngUnknownCmdError: + # Set error code in payload + payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD) + + # Send response only if asked for. + if cmd_reply.reply: + self.sessiond_sock.send(payload) + + def wait_cmd(self, epfd): + """ + """ + + while True: + try: + # Poll on socket for command. + events = epfd.poll() + except select.error as e: + raise IOError(e.errno, e.message) + + for fileno, event in events: + if fileno == self.quit_pipe[0]: + return True + elif event & (EPOLLERR | EPOLLHUP): + raise IOError(errno.ESHUTDOWN) + elif event & EPOLLIN: + data = bytearray() + + data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock) + if data_size: + data += self.sessiond_sock.recv(data_size) + + self.handle_command(data, cmd, cmd_version) + else: + raise IOError(errno.ESHUTDOWN) + + def get_port_from_file(self, path): + """ + Open the session daemon agent port file and returns the value. If none + found, 0 is returned. + """ + + # By default, the port is set to 0 so if we can not find the agent port + # file simply don't try to connect. A value set to 0 indicates that. + port = 0 + + try: + f = open(path, "r") + r_port = int(f.readline()) + if r_port > 0 or r_port <= 65535: + port = r_port + f.close() + except IOError as e: + pass + except ValueError as e: + pass + + return port + + def connect_sessiond(self): + """ + Connect sessiond_sock to running session daemon using the port file. + """ + # Create session daemon TCP socket + if not self.sessiond_sock: + self.sessiond_sock = socket(AF_INET, SOCK_STREAM) + + if self.log_handler.is_root: + port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE) + else: + port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE) + + # No session daemon available + if port == 0: + raise IOError(errno.ECONNREFUSED) + + # Can raise an IOError so caller must catch it. + self.sessiond_sock.connect((self.sessiond_host, port)) diff --git a/liblttng-ust-python-agent/lttng_ust_python.c b/liblttng-ust-python-agent/lttng_ust_python.c new file mode 100644 index 00000000..d10daf14 --- /dev/null +++ b/liblttng-ust-python-agent/lttng_ust_python.c @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2014 - David Goulet + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by the + * Free Software Foundation; version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#define TRACEPOINT_DEFINE +#define TRACEPOINT_CREATE_PROBES +#include "lttng_ust_python.h" + +/* + * The tracepoint fired by the agent. + */ + +void py_tracepoint(const char *asctime, const char *msg, + const char *logger_name, const char *funcName, unsigned int lineno, + unsigned int int_loglevel, unsigned int thread, const char *threadName) +{ + tracepoint(lttng_python, user_event, asctime, msg, logger_name, funcName, + lineno, int_loglevel, thread, threadName); +} diff --git a/liblttng-ust-python-agent/lttng_ust_python.h b/liblttng-ust-python-agent/lttng_ust_python.h new file mode 100644 index 00000000..8ac7604f --- /dev/null +++ b/liblttng-ust-python-agent/lttng_ust_python.h @@ -0,0 +1,56 @@ +#undef TRACEPOINT_PROVIDER +#define TRACEPOINT_PROVIDER lttng_python + +#if !defined(_TRACEPOINT_LTTNG_UST_PYTHON_H) || defined(TRACEPOINT_HEADER_MULTI_READ) +#define _TRACEPOINT_LTTNG_UST_PYTHON_H + +/* + * Copyright (C) 2014 - David Goulet + * + * This library is free software; you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published by the + * Free Software Foundation; version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License + * for more details. + * + * You should have received a copy of the GNU Lesser General Public License + * along with this library; if not, write to the Free Software Foundation, + * Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + */ + +#include +#include + +TRACEPOINT_EVENT(lttng_python, user_event, + TP_ARGS( + const char *, asctime, + const char *, msg, + const char *, logger_name, + const char *, funcName, + int, lineno, + int, int_loglevel, + int, thread, + const char *, threadName + ), + TP_FIELDS( + ctf_string(asctime, asctime) + ctf_string(msg, msg) + ctf_string(logger_name, logger_name) + ctf_string(funcName, funcName) + ctf_integer(unsigned int, lineno, lineno) + ctf_integer(unsigned int, int_loglevel, int_loglevel) + ctf_integer(unsigned int, thread, thread) + ctf_string(threadName, threadName) + ) +) + +#endif /* _TRACEPOINT_LTTNG_UST_PYTHON_H */ + +#undef TRACEPOINT_INCLUDE +#define TRACEPOINT_INCLUDE "./lttng_ust_python.h" + +/* This part must be outside ifdef protection */ +#include -- 2.34.1