*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation; either version 2
- * of the License, or (at your option) any later version.
+ * as published by the Free Software Foundation; only version 2
+ * of the License.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
#include <sys/types.h>
#include <unistd.h>
#include <urcu/list.h>
+#include <assert.h>
#include "libkernelctl.h"
#include "liblttkconsumerd.h"
#include "lttngerr.h"
-/* Init the list of FDs */
-static struct kconsumerd_fd_list kconsumerd_fd_list = {
- .head = CDS_LIST_HEAD_INIT(kconsumerd_fd_list.head),
+static
+struct kconsumerd_global_data {
+ /*
+ * kconsumerd_data.lock protects kconsumerd_data.fd_list,
+ * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
+ * ensures the count matches the number of items in the fd_list.
+ * It ensures the list updates *always* trigger an fd_array
+ * update (therefore need to make list update vs
+ * kconsumerd_data.need_update flag update atomic, and also flag
+ * read, fd array and flag clear atomic).
+ */
+ pthread_mutex_t lock;
+ /*
+ * Number of element for the list below. Protected by
+ * kconsumerd_data.lock.
+ */
+ unsigned int fds_count;
+ /*
+ * List of FDs. Protected by kconsumerd_data.lock.
+ */
+ struct kconsumerd_fd_list fd_list;
+ /*
+ * Flag specifying if the local array of FDs needs update in the
+ * poll function. Protected by kconsumerd_data.lock.
+ */
+ unsigned int need_update;
+} kconsumerd_data = {
+ .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
};
-/* Number of element for the list below. */
-static unsigned int kconsumerd_fds_count;
-
-/* If the local array of FDs needs update in the poll function */
-static unsigned int kconsumerd_update_fd_array = 1;
-
-/* lock the fd array and structures */
-static pthread_mutex_t kconsumerd_lock_fds;
-
/* communication with splice */
static int kconsumerd_thread_pipe[2];
/* pipe to wake the poll thread when necessary */
static int kconsumerd_poll_pipe[2];
+/* to let the signal handler wake up the fd receiver thread */
+static int kconsumerd_should_quit[2];
+
/* timeout parameter, to control the polling thread grace period */
static int kconsumerd_poll_timeout = -1;
/* socket to exchange commands with sessiond */
static char *kconsumerd_command_sock_path;
-/* flag to inform the polling thread to kconsumerd_quit when all fd hung up */
-static int kconsumerd_quit = 0;
+/*
+ * flag to inform the polling thread to quit when all fd hung up.
+ * Updated by the kconsumerd_thread_receive_fds when it notices that all
+ * fds has hung up. Also updated by the signal handler
+ * (kconsumerd_should_exit()). Read by the polling threads.
+ */
+static volatile int kconsumerd_quit = 0;
/*
* kconsumerd_set_error_socket
kconsumerd_command_sock_path = sock;
}
+/*
+ * kconsumerd_find_session_fd
+ *
+ * Find a session fd in the global list.
+ * The kconsumerd_data.lock must be locked during this call
+ *
+ * Return 1 if found else 0
+ */
+static int kconsumerd_find_session_fd(int fd)
+{
+ struct kconsumerd_fd *iter;
+
+ cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
+ if (iter->sessiond_fd == fd) {
+ DBG("Duplicate session fd %d", fd);
+ return 1;
+ }
+ }
+
+ return 0;
+}
+
/*
* kconsumerd_del_fd
*
*/
static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
{
- pthread_mutex_lock(&kconsumerd_lock_fds);
+ pthread_mutex_lock(&kconsumerd_data.lock);
cds_list_del(&lcf->list);
- if (kconsumerd_fds_count > 0) {
- kconsumerd_fds_count--;
+ if (kconsumerd_data.fds_count > 0) {
+ kconsumerd_data.fds_count--;
if (lcf != NULL) {
close(lcf->out_fd);
close(lcf->consumerd_fd);
lcf = NULL;
}
}
- pthread_mutex_unlock(&kconsumerd_lock_fds);
+ kconsumerd_data.need_update = 1;
+ pthread_mutex_unlock(&kconsumerd_data.lock);
}
/*
*/
static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
{
- struct kconsumerd_fd *tmp_fd;
int ret;
+ struct kconsumerd_fd *tmp_fd;
+
+ pthread_mutex_lock(&kconsumerd_data.lock);
+ /* Check if already exist */
+ ret = kconsumerd_find_session_fd(buf->fd);
+ if (ret == 1) {
+ goto end;
+ }
tmp_fd = malloc(sizeof(struct kconsumerd_fd));
tmp_fd->sessiond_fd = buf->fd;
DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
- pthread_mutex_lock(&kconsumerd_lock_fds);
- cds_list_add(&tmp_fd->list, &kconsumerd_fd_list.head);
- kconsumerd_fds_count++;
- pthread_mutex_unlock(&kconsumerd_lock_fds);
-
+ cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
+ kconsumerd_data.fds_count++;
+ kconsumerd_data.need_update = 1;
end:
+ pthread_mutex_unlock(&kconsumerd_data.lock);
return ret;
}
enum kconsumerd_fd_state state)
{
struct kconsumerd_fd *iter;
- cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+
+ pthread_mutex_lock(&kconsumerd_data.lock);
+ cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
if (iter->sessiond_fd == sessiond_fd) {
iter->state = state;
break;
}
}
+ kconsumerd_data.need_update = 1;
+ pthread_mutex_unlock(&kconsumerd_data.lock);
}
/*
* to avoid doing a lookup in the linked list and concurrency issues
* when writing is needed.
* Returns the number of fds in the structures
+ * Called with kconsumerd_data.lock held.
*/
static int kconsumerd_update_poll_array(struct pollfd **pollfd,
struct kconsumerd_fd **local_kconsumerd_fd)
int i = 0;
DBG("Updating poll fd array");
- pthread_mutex_lock(&kconsumerd_lock_fds);
- cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+ cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
DBG("Inside for each");
if (iter->state == ACTIVE_FD) {
DBG("Active FD %d", iter->consumerd_fd);
*/
(*pollfd)[i].fd = kconsumerd_poll_pipe[0];
(*pollfd)[i].events = POLLIN;
-
- kconsumerd_update_fd_array = 0;
- pthread_mutex_unlock(&kconsumerd_lock_fds);
return i;
}
}
switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
- case LTTNG_KERNEL_SPLICE:
+ case LTTNG_EVENT_SPLICE:
/* read the whole subbuffer */
err = kernctl_get_padded_subbuf_size(infd, &len);
if (err != 0) {
ERR("Error splicing to tracefile");
}
break;
- case LTTNG_KERNEL_MMAP:
+ case LTTNG_EVENT_MMAP:
/* read the used subbuffer size */
err = kernctl_get_subbuf_size(infd, &len);
if (err != 0) {
return ret;
}
+/*
+ * kconsumerd_poll_socket
+ *
+ * Poll on the should_quit pipe and the command socket
+ * return -1 on error and should exit, 0 if data is
+ * available on the command socket
+ */
+int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
+{
+ int num_rdy;
+
+ num_rdy = poll(kconsumerd_sockpoll, 2, -1);
+ if (num_rdy == -1) {
+ perror("Poll error");
+ goto exit;
+ }
+ if (kconsumerd_sockpoll[0].revents == POLLIN) {
+ DBG("kconsumerd_should_quit wake up");
+ goto exit;
+ }
+ return 0;
+
+exit:
+ return -1;
+}
+
/*
* kconsumerd_consumerd_recv_fd
*
* structures describing each fd (path name).
* Returns the size of received data
*/
-static int kconsumerd_consumerd_recv_fd(int sfd, int size,
+static int kconsumerd_consumerd_recv_fd(int sfd,
+ struct pollfd *kconsumerd_sockpoll, int size,
enum kconsumerd_command cmd_type)
{
- struct msghdr msg;
struct iovec iov[1];
int ret = 0, i, tmp2;
struct cmsghdr *cmsg;
/* the number of fds we are about to receive */
nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
+ /*
+ * nb_fd is the number of fds we receive. One fd per recvmsg.
+ */
for (i = 0; i < nb_fd; i++) {
- memset(&msg, 0, sizeof(msg));
+ struct msghdr msg = { 0 };
/* Prepare to receive the structures */
iov[0].iov_base = &lkm;
msg.msg_controllen = sizeof(recv_fd);
DBG("Waiting to receive fd");
+ if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+ goto end;
+ }
+
if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
perror("recvmsg");
continue;
kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
goto end;
}
+
/* if we received fds */
if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
switch (cmd_type) {
case ADD_STREAM:
- DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
- ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
+ DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, ((int *) CMSG_DATA(cmsg))[0]);
+ ret = kconsumerd_add_fd(&lkm, ((int *) CMSG_DATA(cmsg))[0]);
if (ret < 0) {
kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
goto end;
default:
break;
}
- /* flag to tell the polling thread to update its fd array */
- kconsumerd_update_fd_array = 1;
/* signal the poll thread */
tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
+ if (tmp2 < 0) {
+ perror("write kconsumerd poll");
+ }
} else {
ERR("Didn't received any fd");
kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
}
end:
- DBG("kconsumerd_consumerd_recv_fd thread exiting");
return ret;
}
struct pollfd *pollfd = NULL;
/* local view of the fds */
struct kconsumerd_fd **local_kconsumerd_fd = NULL;
- /* local view of kconsumerd_fds_count */
+ /* local view of kconsumerd_data.fds_count */
int nb_fd = 0;
char tmp;
int tmp2;
* the ltt_fd_list has been updated, we need to update our
* local array as well
*/
- if (kconsumerd_update_fd_array == 1) {
+ pthread_mutex_lock(&kconsumerd_data.lock);
+ if (kconsumerd_data.need_update) {
if (pollfd != NULL) {
free(pollfd);
pollfd = NULL;
free(local_kconsumerd_fd);
local_kconsumerd_fd = NULL;
}
+
/* allocate for all fds + 1 for the kconsumerd_poll_pipe */
- pollfd = malloc((kconsumerd_fds_count + 1) * sizeof(struct pollfd));
+ pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
perror("pollfd malloc");
+ pthread_mutex_unlock(&kconsumerd_data.lock);
goto end;
}
+
/* allocate for all fds + 1 for the kconsumerd_poll_pipe */
- local_kconsumerd_fd = malloc((kconsumerd_fds_count + 1) *
+ local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
sizeof(struct kconsumerd_fd));
if (local_kconsumerd_fd == NULL) {
perror("local_kconsumerd_fd malloc");
+ pthread_mutex_unlock(&kconsumerd_data.lock);
goto end;
}
ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
if (ret < 0) {
ERR("Error in allocating pollfd or local_outfds");
kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
+ pthread_mutex_unlock(&kconsumerd_data.lock);
goto end;
}
nb_fd = ret;
+ kconsumerd_data.need_update = 0;
}
+ pthread_mutex_unlock(&kconsumerd_data.lock);
/* poll on the array of fds */
DBG("polling on %d fd", nb_fd + 1);
}
/*
- * if only the kconsumerd_poll_pipe triggered poll to return just
- * return to the beginning of the loop to update the array
+ * If the kconsumerd_poll_pipe triggered poll go
+ * directly to the beginning of the loop to update the
+ * array. We want to prioritize array update over
+ * low-priority reads.
*/
- if (num_rdy == 1 && pollfd[nb_fd].revents == POLLIN) {
+ if (pollfd[nb_fd].revents == POLLIN) {
DBG("kconsumerd_poll_pipe wake up");
tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
+ if (tmp2 < 0) {
+ perror("read kconsumerd poll");
+ }
continue;
}
case POLLERR:
ERR("Error returned in polling fd %d.", pollfd[i].fd);
kconsumerd_del_fd(local_kconsumerd_fd[i]);
- kconsumerd_update_fd_array = 1;
num_hup++;
break;
case POLLHUP:
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
kconsumerd_del_fd(local_kconsumerd_fd[i]);
- kconsumerd_update_fd_array = 1;
num_hup++;
break;
case POLLNVAL:
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
kconsumerd_del_fd(local_kconsumerd_fd[i]);
- kconsumerd_update_fd_array = 1;
num_hup++;
break;
case POLLPRI:
free(local_kconsumerd_fd);
local_kconsumerd_fd = NULL;
}
- kconsumerd_cleanup();
return NULL;
}
/*
- * kconsumerd_create_poll_pipe
+ * kconsumerd_init(void)
*
- * create the pipe to wake to polling thread when needed
+ * initialise the necessary environnement :
+ * - inform the polling thread to update the polling array
+ * - create the poll_pipe
+ * - create the should_quit pipe (for signal handler)
*/
-int kconsumerd_create_poll_pipe()
+int kconsumerd_init(void)
{
- return pipe(kconsumerd_poll_pipe);
+ int ret;
+
+ /* need to update the polling array at init time */
+ kconsumerd_data.need_update = 1;
+
+ ret = pipe(kconsumerd_poll_pipe);
+ if (ret < 0) {
+ perror("Error creating poll pipe");
+ goto end;
+ }
+
+ ret = pipe(kconsumerd_should_quit);
+ if (ret < 0) {
+ perror("Error creating recv pipe");
+ goto end;
+ }
+
+end:
+ return ret;
}
/*
{
int sock, client_socket, ret;
struct lttcomm_kconsumerd_header tmp;
+ /*
+ * structure to poll for incoming data on communication socket
+ * avoids making blocking sockets
+ */
+ struct pollfd kconsumerd_sockpoll[2];
+
DBG("Creating command socket %s", kconsumerd_command_sock_path);
unlink(kconsumerd_command_sock_path);
goto end;
}
+ ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ perror("fcntl O_NONBLOCK");
+ goto end;
+ }
+
+ /* prepare the FDs to poll : to client socket and the should_quit pipe */
+ kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
+ kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
+ kconsumerd_sockpoll[1].fd = client_socket;
+ kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
+ if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+ goto end;
+ }
+ DBG("Connection on client_socket");
+
/* Blocking call, waiting for transmission */
sock = lttcomm_accept_unix_sock(client_socket);
if (sock <= 0) {
WARN("On accept");
goto end;
}
+ ret = fcntl(sock, F_SETFL, O_NONBLOCK);
+ if (ret < 0) {
+ perror("fcntl O_NONBLOCK");
+ goto end;
+ }
+
+ /* update the polling structure to poll on the established socket */
+ kconsumerd_sockpoll[1].fd = sock;
+ kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
+
while (1) {
+ if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
+ goto end;
+ }
+ DBG("Incoming fds on sock");
+
/* We first get the number of fd we are about to receive */
ret = lttcomm_recv_unix_sock(sock, &tmp,
sizeof(struct lttcomm_kconsumerd_header));
DBG("Received STOP command");
goto end;
}
+ if (kconsumerd_quit) {
+ DBG("kconsumerd_thread_receive_fds received quit from signal");
+ goto end;
+ }
+
/* we received a command to add or update fds */
- ret = kconsumerd_consumerd_recv_fd(sock, tmp.payload_size, tmp.cmd_type);
+ ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
+ tmp.payload_size, tmp.cmd_type);
if (ret <= 0) {
ERR("Receiving the FD, exiting");
goto end;
}
+ DBG("received fds on sock");
}
end:
*
* Cleanup the daemon's socket on exit
*/
-void kconsumerd_cleanup()
+void kconsumerd_cleanup(void)
{
- struct kconsumerd_fd *iter;
+ struct kconsumerd_fd *iter, *tmp;
/* remove the socket file */
unlink(kconsumerd_command_sock_path);
- /* close all outfd */
- cds_list_for_each_entry(iter, &kconsumerd_fd_list.head, list) {
+ /*
+ * close all outfd. Called when there are no more threads
+ * running (after joining on the threads), no need to protect
+ * list iteration with mutex.
+ */
+ cds_list_for_each_entry_safe(iter, tmp, &kconsumerd_data.fd_list.head, list) {
kconsumerd_del_fd(iter);
}
}
+/*
+ * kconsumerd_should_exit
+ *
+ * Called from signal handler.
+ */
+void kconsumerd_should_exit(void)
+{
+ int ret;
+ kconsumerd_quit = 1;
+ ret = write(kconsumerd_should_quit[1], "4", 1);
+ if (ret < 0) {
+ perror("write kconsumerd quit");
+ }
+}
+
/*
* kconsumerd_send_error
*