# -*- coding: utf-8 -*- # # Copyright (C) 2014 - David Goulet # # This library is free software; you can redistribute it and/or modify it under # the terms of the GNU Lesser General Public License as published by the Free # Software Foundation; version 2.1 of the License. # # This library is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more # details. # # You should have received a copy of the GNU Lesser General Public License # along with this library; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA from __future__ import unicode_literals import ctypes import errno import logging import os import sys import threading import struct import select from select import epoll, EPOLLIN, EPOLLERR, EPOLLHUP from socket import * from time import sleep __all__ = ["lttng-agent"] __author__ = "David Goulet " class LTTngAgent(): """ LTTng agent python code. A LTTng Agent is responsible to spawn two threads, the current UID and root session daemon. Those two threads register to the right daemon and handle the tracing. This class needs to be instantiate once and once the init returns, tracing is ready to happen. """ SESSIOND_ADDR = "127.0.0.1" SEM_COUNT = 2 # Timeout for the sempahore in seconds. SEM_TIMEOUT = 5 SEM_WAIT_PERIOD = 0.2 def __init__(self): # Session daemon register semaphore. self.register_sem = threading.Semaphore(LTTngAgent.SEM_COUNT); self.client_user = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) self.client_user.start() self.client_root = LTTngTCPClient(LTTngAgent.SESSIOND_ADDR, self.register_sem) self.client_root.log_handler.is_root = True self.client_root.start() acquire = 0 timeout = LTTngAgent.SEM_TIMEOUT while True: # Quit if timeout has reached 0 or below. if acquire == LTTngAgent.SEM_COUNT or timeout <= 0: break; # Acquire semaphore for *user* thread. if not self.register_sem.acquire(False): sleep(LTTngAgent.SEM_WAIT_PERIOD) timeout -= LTTngAgent.SEM_WAIT_PERIOD else: acquire += 1 def __del__(self): self.destroy() def destroy(self): self.client_user.destroy() self.client_user.join() self.client_root.destroy() self.client_root.join() class LTTngCmdError(RuntimeError): """ Command error thrown if an error is encountered in a command from the session daemon. """ def __init__(self, code): super().__init__('LTTng command error: code {}'.format(code)) self._code = code def get_code(self): return self._code class LTTngUnknownCmdError(RuntimeError): pass class LTTngLoggingHandler(logging.Handler): """ Class handler for the Python logging API. """ def __init__(self): logging.Handler.__init__(self, level = logging.NOTSET) # Refcount tracking how many events have been enabled. This value above # 0 means that the handler is attached to the root logger. self.refcount = 0 # Dict of enabled event. We track them so we know if it's ok to disable # the received event. self.enabled_events = {} # Am I root ? self.is_root = False # Using the logging formatter to extract the asctime only. self.log_fmt = logging.Formatter("%(asctime)s") self.setFormatter(self.log_fmt) # ctypes lib for lttng-ust try: self.lttng_ust = ctypes.cdll.LoadLibrary("LIBDIR_STR/liblttng-ust-python-agent.so") except OSError as e: print("Unable to find libust for Python.") def emit(self, record): """ Fire LTTng UST tracepoint with the given record. """ asctime = self.format(record) self.lttng_ust.py_tracepoint(asctime.encode(), record.getMessage().encode(), record.name.encode(), record.funcName.encode(), record.lineno, record.levelno, record.thread, record.threadName.encode()) def enable_event(self, name): """ Enable an event name which will ultimately add an handler to the root logger if none is present. """ # Don't update the refcount if the event has been enabled prior. if name in self.enabled_events: return # Get the root logger and attach our handler. root_logger = logging.getLogger() # First thing first, we need to set the root logger to the loglevel # NOTSET so we can catch everything. The default is 30. root_logger.setLevel(logging.NOTSET) self.refcount += 1 if self.refcount == 1: root_logger.addHandler(self) self.enabled_events[name] = True def disable_event(self, name): """ Disable an event name which will ultimately add an handler to the root logger if none is present. """ if name not in self.enabled_events: # Event was not enabled prior, do nothing. return # Get the root logger and attach our handler. root_logger = logging.getLogger() self.refcount -= 1 if self.refcount == 0: root_logger.removeHandler(self) del self.enabled_events[name] def list_logger(self): """ Return a list of logger name. """ return logging.Logger.manager.loggerDict.keys() class LTTngSessiondCmd(): """ Class handling session daemon command. """ # Command values from the agent protocol CMD_LIST = 1 CMD_ENABLE = 2 CMD_DISABLE = 3 CMD_REG_DONE = 4 # Return code CODE_SUCCESS = 1 CODE_INVALID_CMD = 2 # Python Logger LTTng domain value taken from lttng/domain.h DOMAIN = 5 # Protocol version MAJOR_VERSION = 1 MINOR_VERSION = 0 def execute(self): """ This is part of the command interface. Must be implemented. """ raise NotImplementedError class LTTngCommandReply(): """ Object that contains the information that should be replied to the session daemon after a command execution. """ def __init__(self, payload = None, reply = True): self.payload = payload self.reply = reply class LTTngCommandEnable(LTTngSessiondCmd): """ Handle the enable event command from the session daemon. """ def __init__(self, log_handler, data): self.log_handler = log_handler # 4 bytes for loglevel and 4 bytes for loglevel_type thus 8. name_offset = 8; data_size = len(data) if data_size == 0: raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) try: self.loglevel, self.loglevel_type, self.name = \ struct.unpack('>II%us' % (data_size - name_offset), data) # Remove trailing NULL bytes from name. self.name = self.name.decode().rstrip('\x00') except struct.error: raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) def execute(self): self.log_handler.enable_event(self.name) return LTTngCommandReply() class LTTngCommandDisable(LTTngSessiondCmd): """ Handle the disable event command from the session daemon. """ def __init__(self, log_handler, data): self.log_handler = log_handler data_size = len(data) if data_size == 0: raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) try: self.name = struct.unpack('>%us' % (data_size), data)[0] # Remove trailing NULL bytes from name. self.name = self.name.decode().rstrip('\x00') except struct.error: raise LTTngCmdError(LTTngSessiondCmd.CODE_INVALID_CMD) def execute(self): self.log_handler.disable_event(self.name) return LTTngCommandReply() class LTTngCommandRegDone(LTTngSessiondCmd): """ Handle register done command. This is sent back after a successful registration from the session daemon. We basically release the given semaphore so the agent can return to the caller. """ def __init__(self, sem): self.sem = sem def execute(self): self.sem.release() return LTTngCommandReply(reply = False) class LTTngCommandList(LTTngSessiondCmd): """ Handle the list command from the session daemon on the given socket. """ def __init__(self, log_handler): self.log_handler = log_handler def execute(self): data_size = 0 data = logger_data = bytearray() loggers = self.log_handler.list_logger() # First, pack nb_event that must preceed the data. logger_data += struct.pack('>I', len(loggers)) # Populate payload with logger name. for logger in loggers: # Increment data size plus the NULL byte at the end of the name. data_size += len(logger) + 1 # Pack logger name and NULL byte. logger_data += struct.pack('>%usB' % (len(logger)), \ bytes(bytearray(str.encode(logger))), 0) # Pack uint32_t data_size followed by nb event (number of logger) data = struct.pack('>I', data_size) data += logger_data return LTTngCommandReply(payload = data) class LTTngTCPClient(threading.Thread): """ TCP client that register and receives command from the session daemon. """ SYSTEM_PORT_FILE = "/var/run/lttng/agent.port" USER_PORT_FILE = os.path.join(os.path.expanduser("~"), ".lttng/agent.port") # The time in seconds this client should wait before trying again to # register back to the session daemon. WAIT_TIME = 3 def __init__(self, host, sem): threading.Thread.__init__(self) # Which host to connect to. The port is fetch dynamically. self.sessiond_host = host # The session daemon register done semaphore. Needs to be released when # receiving a CMD_REG_DONE command. self.register_sem = sem self.register_sem.acquire() # Indicate that we have to quit thus stop the main loop. self.quit_flag = False # Quit pipe. The thread poll on it to know when to quit. self.quit_pipe = os.pipe() # Socket on which we communicate with the session daemon. self.sessiond_sock = None # LTTng Logging Handler self.log_handler = LTTngLoggingHandler() def cleanup_socket(self, epfd = None): # Ease our life a bit. sock = self.sessiond_sock if not sock: return try: if epfd is not None: epfd.unregister(sock) sock.shutdown(SHUT_RDWR) sock.close() except select.error: # Cleanup fail, we can't do anything much... pass except IOError: pass self.sessiond_sock = None def destroy(self): self.quit_flag = True try: fp = os.fdopen(self.quit_pipe[1], 'w') fp.write("42") fp.close() except OSError as e: pass def register(self): """ Register to session daemon using the previously connected socket of the class. Command ABI: uint32 domain uint32 pid """ data = struct.pack('>IIII', LTTngSessiondCmd.DOMAIN, os.getpid(), \ LTTngSessiondCmd.MAJOR_VERSION, LTTngSessiondCmd.MINOR_VERSION) self.sessiond_sock.send(data) def run(self): """ Start the TCP client thread by registering to the session daemon and polling on that socket for commands. """ epfd = epoll() epfd.register(self.quit_pipe[0], EPOLLIN) # Main loop to handle session daemon command and disconnection. while not self.quit_flag: try: # First, connect to the session daemon. self.connect_sessiond() # Register to session daemon after a successful connection. self.register() # Add registered socket to poll set. epfd.register(self.sessiond_sock, EPOLLIN | EPOLLERR | EPOLLHUP) self.quit_flag = self.wait_cmd(epfd) except IOError as e: # Whatever happens here, we have to close down everything and # retry to connect to the session daemon since either the # socket is closed or invalid data was sent. self.cleanup_socket(epfd) self.register_sem.release() sleep(LTTngTCPClient.WAIT_TIME) continue self.cleanup_socket(epfd) os.close(self.quit_pipe[0]) epfd.close() def recv_header(self, sock): """ Receive the command header from the given socket. Set the internal state of this object with the header data. Header ABI is defined like this: uint64 data_size uint32 cmd uint32 cmd_version """ s_pack = struct.Struct('>QII') pack_data = sock.recv(s_pack.size) data_received = len(pack_data) if data_received == 0: raise IOError(errno.ESHUTDOWN) try: return s_pack.unpack(pack_data) except struct.error: raise IOError(errno.EINVAL) def create_command(self, cmd_type, version, data): """ Return the right command object using the given command type. The command version is unused since we only have once for now. """ cmd_dict = { LTTngSessiondCmd.CMD_LIST: \ lambda: LTTngCommandList(self.log_handler), LTTngSessiondCmd.CMD_ENABLE: \ lambda: LTTngCommandEnable(self.log_handler, data), LTTngSessiondCmd.CMD_DISABLE: \ lambda: LTTngCommandDisable(self.log_handler, data), LTTngSessiondCmd.CMD_REG_DONE: \ lambda: LTTngCommandRegDone(self.register_sem), } if cmd_type in cmd_dict: return cmd_dict[cmd_type]() else: raise LTTngUnknownCmdError() def pack_code(self, code): return struct.pack('>I', code) def handle_command(self, data, cmd_type, cmd_version): """ Handle the given command type with the received payload. This function sends back data to the session daemon using to the return value of the command. """ payload = bytearray() try: cmd = self.create_command(cmd_type, cmd_version, data) cmd_reply = cmd.execute() # Set success code in data payload += self.pack_code(LTTngSessiondCmd.CODE_SUCCESS) if cmd_reply.payload is not None: payload += cmd_reply.payload except LTTngCmdError as e: # Set error code in payload payload += self.pack_code(e.get_code()) except LTTngUnknownCmdError: # Set error code in payload payload += self.pack_code(LTTngSessiondCmd.CODE_INVALID_CMD) # Send response only if asked for. if cmd_reply.reply: self.sessiond_sock.send(payload) def wait_cmd(self, epfd): """ """ while True: try: # Poll on socket for command. events = epfd.poll() except select.error as e: raise IOError(e.errno, e.message) for fileno, event in events: if fileno == self.quit_pipe[0]: return True elif event & (EPOLLERR | EPOLLHUP): raise IOError(errno.ESHUTDOWN) elif event & EPOLLIN: data = bytearray() data_size, cmd, cmd_version = self.recv_header(self.sessiond_sock) if data_size: data += self.sessiond_sock.recv(data_size) self.handle_command(data, cmd, cmd_version) else: raise IOError(errno.ESHUTDOWN) def get_port_from_file(self, path): """ Open the session daemon agent port file and returns the value. If none found, 0 is returned. """ # By default, the port is set to 0 so if we can not find the agent port # file simply don't try to connect. A value set to 0 indicates that. port = 0 try: f = open(path, "r") r_port = int(f.readline()) if r_port > 0 or r_port <= 65535: port = r_port f.close() except IOError as e: pass except ValueError as e: pass return port def connect_sessiond(self): """ Connect sessiond_sock to running session daemon using the port file. """ # Create session daemon TCP socket if not self.sessiond_sock: self.sessiond_sock = socket(AF_INET, SOCK_STREAM) if self.log_handler.is_root: port = self.get_port_from_file(LTTngTCPClient.SYSTEM_PORT_FILE) else: port = self.get_port_from_file(LTTngTCPClient.USER_PORT_FILE) # No session daemon available if port == 0: raise IOError(errno.ECONNREFUSED) # Can raise an IOError so caller must catch it. self.sessiond_sock.connect((self.sessiond_host, port))