/*
- * Copyright (C) 2012 - Julien Desfossez <jdesfossez@efficios.com>
- * David Goulet <dgoulet@efficios.com>
- * 2013 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
- * 2015 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2012 Julien Desfossez <jdesfossez@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2013 Jérémie Galarneau <jeremie.galarneau@efficios.com>
+ * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License, version 2 only,
- * as published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along
- * with this program; if not, write to the Free Software Foundation, Inc.,
- * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
#include <common/buffer-view.h>
#include <common/string-utils/format.h>
#include <common/fd-tracker/fd-tracker.h>
+#include <common/fd-tracker/utils.h>
#include "backward-compatibility-group-by.h"
#include "cmd.h"
static struct relay_conn_queue relay_conn_queue;
/* Cap of file desriptors to be in simultaneous use by the relay daemon. */
-static unsigned int lttng_opt_fd_cap;
+static unsigned int lttng_opt_fd_pool_size = -1;
/* Global relay stream hash table. */
struct lttng_ht *relay_streams_ht;
{ "daemonize", 0, 0, 'd', },
{ "background", 0, 0, 'b', },
{ "group", 1, 0, 'g', },
- { "fd-cap", 1, 0, '\0', },
+ { "fd-pool-size", 1, 0, '\0', },
{ "help", 0, 0, 'h', },
{ "output", 1, 0, 'o', },
{ "verbose", 0, 0, 'v', },
switch (opt) {
case 0:
- if (!strcmp(optname, "fd-cap")) {
+ if (!strcmp(optname, "fd-pool-size")) {
unsigned long v;
errno = 0;
v = strtoul(arg, NULL, 0);
- if (errno != 0 || !isdigit(arg[0])) {
- ERR("Wrong value in --fd-cap parameter: %s",
- arg);
+ if (errno != 0 || !isdigit((unsigned char) arg[0])) {
+ ERR("Wrong value in --fd-pool-size parameter: %s", arg);
ret = -1;
goto end;
}
- if (v < DEFAULT_RELAYD_MINIMAL_FD_CAP) {
- ERR("File descriptor cap must be set to at least %d",
- DEFAULT_RELAYD_MINIMAL_FD_CAP);
- }
if (v >= UINT_MAX) {
- ERR("File descriptor cap overflow in --fd-cap parameter: %s",
- arg);
+ ERR("File descriptor cap overflow in --fd-pool-size parameter: %s", arg);
ret = -1;
goto end;
}
- lttng_opt_fd_cap = (unsigned int) v;
- DBG3("File descriptor cap set to %u", lttng_opt_fd_cap);
+ lttng_opt_fd_pool_size = (unsigned int) v;
} else {
fprintf(stderr, "unknown option %s", optname);
if (arg) {
return ret;
}
+static int set_fd_pool_size(void)
+{
+ int ret = 0;
+ struct rlimit rlimit;
+
+ ret = getrlimit(RLIMIT_NOFILE, &rlimit);
+ if (ret) {
+ PERROR("Failed to get file descriptor limit");
+ ret = -1;
+ goto end;
+ }
+
+ DBG("File descriptor count limits are %" PRIu64 " (soft) and %" PRIu64 " (hard)",
+ (uint64_t) rlimit.rlim_cur,
+ (uint64_t) rlimit.rlim_max);
+ if (lttng_opt_fd_pool_size == -1) {
+ /* Use default value (soft limit - reserve). */
+ if (rlimit.rlim_cur < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) {
+ ERR("The process' file number limit is too low (%" PRIu64 "). The process' file number limit must be set to at least %i.",
+ (uint64_t) rlimit.rlim_cur, DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
+ ret = -1;
+ goto end;
+ }
+ lttng_opt_fd_pool_size = rlimit.rlim_cur -
+ DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE;
+ goto end;
+ }
+
+ if (lttng_opt_fd_pool_size < DEFAULT_RELAYD_MIN_FD_POOL_SIZE) {
+ ERR("File descriptor pool size must be set to at least %d",
+ DEFAULT_RELAYD_MIN_FD_POOL_SIZE);
+ ret = -1;
+ goto end;
+ }
+
+ if (lttng_opt_fd_pool_size > rlimit.rlim_cur) {
+ ERR("File descriptor pool size argument (%u) exceeds the process' soft limit (%" PRIu64 ").",
+ lttng_opt_fd_pool_size, (uint64_t) rlimit.rlim_cur);
+ ret = -1;
+ goto end;
+ }
+
+ DBG("File descriptor pool size argument (%u) adjusted to %u to accommodates transient fd uses",
+ lttng_opt_fd_pool_size,
+ lttng_opt_fd_pool_size - DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE);
+ lttng_opt_fd_pool_size -= DEFAULT_RELAYD_FD_POOL_SIZE_RESERVE;
+end:
+ return ret;
+}
+
static int set_options(int argc, char **argv)
{
int c, ret = 0, option_index = 0, retval = 0;
int orig_optopt = optopt, orig_optind = optind;
char *default_address, *optstring;
- const char *config_path = NULL;
+ char *config_path = NULL;
optstring = utils_generate_optstring(long_options,
sizeof(long_options) / sizeof(struct option));
WARN("Getting '%s' argument from setuid/setgid binary refused for security reasons.",
"-f, --config");
} else {
+ free(config_path);
config_path = utils_expand_path(optarg);
if (!config_path) {
ERR("Failed to resolve path: %s", optarg);
goto exit;
}
}
- if (lttng_opt_fd_cap == 0) {
- int ret;
- struct rlimit rlimit;
-
- ret = getrlimit(RLIMIT_NOFILE, &rlimit);
- if (ret) {
- PERROR("Failed to get file descriptor limit");
- retval = -1;
- }
-
- lttng_opt_fd_cap = rlimit.rlim_cur;
+ ret = set_fd_pool_size();
+ if (ret) {
+ retval = -1;
+ goto exit;
}
if (opt_group_output_by == RELAYD_GROUP_OUTPUT_BY_UNKNOWN) {
}
exit:
+ free(config_path);
free(optstring);
return retval;
}
print_sessions();
}
+static int noop_close(void *data, int *fds)
+{
+ return 0;
+}
+
+static void untrack_stdio(void)
+{
+ int fds[] = { fileno(stdout), fileno(stderr) };
+
+ /*
+ * noop_close is used since we don't really want to close
+ * the stdio output fds; we merely want to stop tracking them.
+ */
+ (void) fd_tracker_close_unsuspendable_fd(the_fd_tracker,
+ fds, 2, noop_close, NULL);
+}
+
/*
* Cleanup the daemon
*/
health_app_destroy(health_relayd);
}
/* Close thread quit pipes */
- utils_close_pipe(health_quit_pipe);
- utils_close_pipe(thread_quit_pipe);
-
+ if (health_quit_pipe[0] != -1) {
+ (void) fd_tracker_util_pipe_close(
+ the_fd_tracker, health_quit_pipe);
+ }
+ if (thread_quit_pipe[0] != -1) {
+ (void) fd_tracker_util_pipe_close(
+ the_fd_tracker, thread_quit_pipe);
+ }
if (sessiond_trace_chunk_registry) {
sessiond_trace_chunk_registry_destroy(
sessiond_trace_chunk_registry);
}
if (the_fd_tracker) {
+ untrack_stdio();
+ /*
+ * fd_tracker_destroy() will log the contents of the fd-tracker
+ * if a leak is detected.
+ */
fd_tracker_destroy(the_fd_tracker);
}
if (tracing_group_name_override) {
free((void *) tracing_group_name);
}
- fd_tracker_log(the_fd_tracker);
}
/*
*/
static int init_thread_quit_pipe(void)
{
- int ret;
-
- ret = utils_create_pipe_cloexec(thread_quit_pipe);
+ return fd_tracker_util_pipe_open_cloexec(
+ the_fd_tracker, "Quit pipe", thread_quit_pipe);
+}
- return ret;
+/*
+ * Init health quit pipe.
+ *
+ * Return -1 on error or 0 if all pipes are created.
+ */
+static int init_health_quit_pipe(void)
+{
+ return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
+ "Health quit pipe", health_quit_pipe);
}
/*
* Create a poll set with O_CLOEXEC and add the thread quit pipe to the set.
*/
-static int create_thread_poll_set(struct lttng_poll_event *events, int size)
+static int create_named_thread_poll_set(struct lttng_poll_event *events,
+ int size, const char *name)
{
int ret;
goto error;
}
- ret = lttng_poll_create(events, size, LTTNG_CLOEXEC);
- if (ret < 0) {
+ ret = fd_tracker_util_poll_create(the_fd_tracker,
+ name, events, 1, LTTNG_CLOEXEC);
+ if (ret) {
+ PERROR("Failed to create \"%s\" poll file descriptor", name);
goto error;
}
return 0;
}
+static int create_sock(void *data, int *out_fd)
+{
+ int ret;
+ struct lttcomm_sock *sock = data;
+
+ ret = lttcomm_create_sock(sock);
+ if (ret < 0) {
+ goto end;
+ }
+
+ *out_fd = sock->fd;
+end:
+ return ret;
+}
+
+static int close_sock(void *data, int *in_fd)
+{
+ struct lttcomm_sock *sock = data;
+
+ return sock->ops->close(sock);
+}
+
+static int accept_sock(void *data, int *out_fd)
+{
+ int ret = 0;
+ /* Socks is an array of in_sock, out_sock. */
+ struct lttcomm_sock **socks = data;
+ struct lttcomm_sock *in_sock = socks[0];
+
+ socks[1] = in_sock->ops->accept(in_sock);
+ if (!socks[1]) {
+ ret = -1;
+ goto end;
+ }
+ *out_fd = socks[1]->fd;
+end:
+ return ret;
+}
+
/*
* Create and init socket from uri.
*/
-static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri)
+static struct lttcomm_sock *relay_socket_create(struct lttng_uri *uri,
+ const char *name)
{
- int ret;
+ int ret, sock_fd;
struct lttcomm_sock *sock = NULL;
+ char uri_str[PATH_MAX];
+ char *formated_name = NULL;
sock = lttcomm_alloc_sock_from_uri(uri);
if (sock == NULL) {
goto error;
}
- ret = lttcomm_create_sock(sock);
- if (ret < 0) {
+ /*
+ * Don't fail to create the socket if the name can't be built as it is
+ * only used for debugging purposes.
+ */
+ ret = uri_to_str_url(uri, uri_str, sizeof(uri_str));
+ uri_str[sizeof(uri_str) - 1] = '\0';
+ if (ret >= 0) {
+ ret = asprintf(&formated_name, "%s socket @ %s", name,
+ uri_str);
+ if (ret < 0) {
+ formated_name = NULL;
+ }
+ }
+
+ ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
+ (const char **) (formated_name ? &formated_name : NULL),
+ 1, create_sock, sock);
+ if (ret) {
+ PERROR("Failed to open \"%s\" relay socket",
+ formated_name ?: "Unknown");
goto error;
}
- DBG("Listening on sock %d", sock->fd);
+ DBG("Listening on %s socket %d", name, sock->fd);
ret = sock->ops->bind(sock);
if (ret < 0) {
}
+ free(formated_name);
return sock;
error:
if (sock) {
lttcomm_destroy_sock(sock);
}
+ free(formated_name);
return NULL;
}
+static
+struct lttcomm_sock *accept_relayd_sock(struct lttcomm_sock *listening_sock,
+ const char *name)
+{
+ int out_fd, ret;
+ struct lttcomm_sock *socks[2] = { listening_sock, NULL };
+ struct lttcomm_sock *new_sock = NULL;
+
+ ret = fd_tracker_open_unsuspendable_fd(
+ the_fd_tracker, &out_fd,
+ (const char **) &name,
+ 1, accept_sock, &socks);
+ if (ret) {
+ goto end;
+ }
+ new_sock = socks[1];
+ DBG("%s accepted, socket %d", name, new_sock->fd);
+end:
+ return new_sock;
+}
+
/*
* This thread manages the listening for new connections on the network
*/
DBG("[thread] Relay listener started");
+ rcu_register_thread();
health_register(health_relayd, HEALTH_RELAYD_TYPE_LISTENER);
health_code_update();
- control_sock = relay_socket_create(control_uri);
+ control_sock = relay_socket_create(control_uri, "Control listener");
if (!control_sock) {
goto error_sock_control;
}
- data_sock = relay_socket_create(data_uri);
+ data_sock = relay_socket_create(data_uri, "Data listener");
if (!data_sock) {
goto error_sock_relay;
}
* Pass 3 as size here for the thread quit pipe, control and
* data socket.
*/
- ret = create_thread_poll_set(&events, 3);
+ ret = create_named_thread_poll_set(&events, 3, "Listener thread epoll");
if (ret < 0) {
goto error_create_poll;
}
*/
int val = 1;
struct relay_connection *new_conn;
- struct lttcomm_sock *newsock;
+ struct lttcomm_sock *newsock = NULL;
enum connection_type type;
if (pollfd == data_sock->fd) {
type = RELAY_DATA;
- newsock = data_sock->ops->accept(data_sock);
- DBG("Relay data connection accepted, socket %d",
- newsock->fd);
+ newsock = accept_relayd_sock(data_sock,
+ "Data socket to relayd");
} else {
- assert(pollfd == control_sock->fd);
+ LTTNG_ASSERT(pollfd == control_sock->fd);
type = RELAY_CONTROL;
- newsock = control_sock->ops->accept(control_sock);
- DBG("Relay control connection accepted, socket %d",
- newsock->fd);
+ newsock = accept_relayd_sock(control_sock,
+ "Control socket to relayd");
}
if (!newsock) {
PERROR("accepting sock");
error:
error_poll_add:
error_testpoint:
- lttng_poll_clean(&events);
+ (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
error_create_poll:
if (data_sock->fd >= 0) {
- ret = data_sock->ops->close(data_sock);
+ int data_sock_fd = data_sock->fd;
+
+ ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
+ &data_sock_fd, 1, close_sock,
+ data_sock);
if (ret) {
- PERROR("close");
+ PERROR("Failed to close the data listener socket file descriptor");
}
+ data_sock->fd = -1;
}
lttcomm_destroy_sock(data_sock);
error_sock_relay:
if (control_sock->fd >= 0) {
- ret = control_sock->ops->close(control_sock);
+ int control_sock_fd = control_sock->fd;
+
+ ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker,
+ &control_sock_fd, 1, close_sock,
+ control_sock);
if (ret) {
- PERROR("close");
+ PERROR("Failed to close the control listener socket file descriptor");
}
+ control_sock->fd = -1;
}
lttcomm_destroy_sock(control_sock);
error_sock_control:
ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
+ rcu_unregister_thread();
DBG("Relay listener thread cleanup complete");
lttng_relay_stop_threads();
return NULL;
ret = -1;
goto send_reply;
}
- assert(!conn->session);
+ LTTNG_ASSERT(!conn->session);
conn->session = session;
DBG("Created session %" PRIu64, session->id);
} else if (conn->minor >= 4 && conn->minor < 11) {
char *group_by_session_path_name;
- assert(session->session_name[0] != '\0');
+ LTTNG_ASSERT(session->session_name[0] != '\0');
group_by_session_path_name =
backward_compat_group_by_session(
path_name,
- session->session_name);
+ session->session_name,
+ session->creation_time.value);
if (!group_by_session_path_name) {
ERR("Failed to apply group by session to stream of session %" PRIu64,
session->id);
packet_view = lttng_buffer_view_from_view(payload,
sizeof(metadata_payload_header), metadata_payload_size);
- if (!packet_view.data) {
+ if (!lttng_buffer_view_is_valid(&packet_view)) {
ERR("Invalid metadata packet length announced by header");
ret = -1;
goto end_put;
struct lttcomm_relayd_generic_reply reply;
struct relay_stream *stream;
- assert(recv_hdr);
- assert(conn);
+ LTTNG_ASSERT(recv_hdr);
+ LTTNG_ASSERT(conn);
DBG("Init streams for data pending");
struct relay_stream *stream;
size_t msg_len;
- assert(conn);
+ LTTNG_ASSERT(conn);
DBG("Relay receiving index");
ssize_t send_ret;
struct lttcomm_relayd_generic_reply reply;
- assert(conn);
+ LTTNG_ASSERT(conn);
DBG("Relay receiving streams_sent");
struct lttcomm_relayd_create_trace_chunk *msg;
struct lttcomm_relayd_generic_reply reply = {};
struct lttng_buffer_view header_view;
- struct lttng_buffer_view chunk_name_view;
struct lttng_trace_chunk *chunk = NULL, *published_chunk = NULL;
enum lttng_error_code reply_code = LTTNG_OK;
enum lttng_trace_chunk_status chunk_status;
- struct lttng_directory_handle *session_output = NULL;
const char *new_path;
if (!session || !conn->version_check_done) {
}
header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
- if (!header_view.data) {
+ if (!lttng_buffer_view_is_valid(&header_view)) {
ERR("Failed to receive payload of chunk creation command");
ret = -1;
goto end_no_reply;
reply_code = LTTNG_ERR_NOMEM;
goto end;
}
+ lttng_trace_chunk_set_fd_tracker(chunk, the_fd_tracker);
if (msg->override_name_length) {
const char *name;
+ const struct lttng_buffer_view chunk_name_view =
+ lttng_buffer_view_from_view(payload,
+ sizeof(*msg),
+ msg->override_name_length);
+
+ if (!lttng_buffer_view_is_valid(&chunk_name_view)) {
+ ERR("Invalid payload of chunk creation command (protocol error): buffer too short for expected name length");
+ ret = -1;
+ reply_code = LTTNG_ERR_INVALID;
+ goto end;
+ }
- chunk_name_view = lttng_buffer_view_from_view(payload,
- sizeof(*msg),
- msg->override_name_length);
name = chunk_name_view.data;
- if (!name || name[msg->override_name_length - 1]) {
- ERR("Failed to receive payload of chunk creation command");
+ if (name[msg->override_name_length - 1]) {
+ ERR("Invalid payload of chunk creation command (protocol error): name is not null-terminated");
ret = -1;
reply_code = LTTNG_ERR_INVALID;
goto end;
goto end;
}
- session_output = session_create_output_directory_handle(
- conn->session);
- if (!session_output) {
- reply_code = LTTNG_ERR_CREATE_DIR_FAIL;
- goto end;
- }
- chunk_status = lttng_trace_chunk_set_as_owner(chunk, session_output);
- lttng_directory_handle_put(session_output);
- session_output = NULL;
+ LTTNG_ASSERT(conn->session->output_directory);
+ chunk_status = lttng_trace_chunk_set_as_owner(chunk,
+ conn->session->output_directory);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
reply_code = LTTNG_ERR_UNK;
ret = -1;
end_no_reply:
lttng_trace_chunk_put(chunk);
lttng_trace_chunk_put(published_chunk);
- lttng_directory_handle_put(session_output);
return ret;
}
}
header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
- if (!header_view.data) {
+ if (!lttng_buffer_view_is_valid(&header_view)) {
ERR("Failed to receive payload of chunk close command");
ret = -1;
goto end_no_reply;
new_path);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
- goto end;
+ goto end_unlock_session;
}
session->ongoing_rotation = false;
}
chunk_status = lttng_trace_chunk_rename_path(chunk, old_path);
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ret = -1;
- goto end;
+ goto end_unlock_session;
}
}
chunk_status = lttng_trace_chunk_set_close_timestamp(
bool chunk_exists;
if (!session || !conn->version_check_done) {
- ERR("Trying to close a trace chunk before version check");
+ ERR("Trying to check for the existance of a trace chunk before version check");
ret = -1;
goto end_no_reply;
}
if (session->major == 2 && session->minor < 11) {
- ERR("Chunk close command is unsupported before 2.11");
+ ERR("Chunk exists command is unsupported before 2.11");
ret = -1;
goto end_no_reply;
}
header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
- if (!header_view.data) {
- ERR("Failed to receive payload of chunk close command");
+ if (!lttng_buffer_view_is_valid(&header_view)) {
+ ERR("Failed to receive payload of chunk exists command");
ret = -1;
goto end_no_reply;
}
uint64_t result_flags = 0;
header_view = lttng_buffer_view_from_view(payload, 0, sizeof(*msg));
- if (!header_view.data) {
+ if (!lttng_buffer_view_is_valid(&header_view)) {
ERR("Failed to receive payload of chunk close command");
ret = -1;
goto end_no_reply;
goto end;
}
- assert(ret > 0);
- assert(ret <= state->left_to_receive);
+ LTTNG_ASSERT(ret > 0);
+ LTTNG_ASSERT(ret <= state->left_to_receive);
state->left_to_receive -= ret;
state->received += ret;
struct ctrl_connection_state_receive_header *state =
&conn->protocol.ctrl.state.receive_header;
- assert(state->left_to_receive != 0);
+ LTTNG_ASSERT(state->left_to_receive != 0);
ret = conn->sock->ops->recvmsg(conn->sock,
reception_buffer->data + state->received,
goto end;
}
- assert(ret > 0);
- assert(ret <= state->left_to_receive);
+ LTTNG_ASSERT(ret > 0);
+ LTTNG_ASSERT(ret <= state->left_to_receive);
state->left_to_receive -= ret;
state->received += ret;
struct lttcomm_relayd_data_hdr header;
struct relay_stream *stream;
- assert(state->left_to_receive != 0);
+ LTTNG_ASSERT(state->left_to_receive != 0);
ret = conn->sock->ops->recvmsg(conn->sock,
state->header_reception_buffer + state->received,
goto end;
}
- assert(ret > 0);
- assert(ret <= state->left_to_receive);
+ LTTNG_ASSERT(ret > 0);
+ LTTNG_ASSERT(ret <= state->left_to_receive);
state->left_to_receive -= ret;
state->received += ret;
packet_chunk = lttng_buffer_view_init(data_buffer,
0, recv_size);
- assert(packet_chunk.data);
+ LTTNG_ASSERT(packet_chunk.data);
ret = stream_write(stream, &packet_chunk, 0);
if (ret) {
(void) lttng_poll_del(events, pollfd);
- ret = close(pollfd);
+ ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1,
+ fd_tracker_util_close_fd, NULL);
if (ret < 0) {
ERR("Closing pollfd %d", pollfd);
}
goto relay_connections_ht_error;
}
- ret = create_thread_poll_set(&events, 2);
+ ret = create_named_thread_poll_set(&events, 2, "Worker thread epoll");
if (ret < 0) {
goto error_poll_create;
}
ctrl_conn = connection_get_by_sock(relay_connections_ht, pollfd);
/* If not found, there is a synchronization issue. */
- assert(ctrl_conn);
+ LTTNG_ASSERT(ctrl_conn);
if (ctrl_conn->type == RELAY_DATA) {
if (revents & LPOLLIN) {
}
goto put_ctrl_connection;
}
- assert(ctrl_conn->type == RELAY_CONTROL);
+ LTTNG_ASSERT(ctrl_conn->type == RELAY_CONTROL);
if (revents & LPOLLIN) {
enum relay_connection_status status;
if (data_conn->type == RELAY_CONTROL) {
goto put_data_connection;
}
- assert(data_conn->type == RELAY_DATA);
+ LTTNG_ASSERT(data_conn->type == RELAY_DATA);
if (revents & LPOLLIN) {
enum relay_connection_status status;
}
rcu_read_unlock();
- lttng_poll_clean(&events);
+ (void) fd_tracker_util_poll_clean(the_fd_tracker, &events);
error_poll_create:
lttng_ht_destroy(relay_connections_ht);
relay_connections_ht_error:
/* Close relay conn pipes */
- utils_close_pipe(relay_conn_pipe);
+ (void) fd_tracker_util_pipe_close(the_fd_tracker,
+ relay_conn_pipe);
if (err) {
DBG("Thread exited with error");
}
*/
static int create_relay_conn_pipe(void)
{
- int ret;
+ return fd_tracker_util_pipe_open_cloexec(the_fd_tracker,
+ "Relayd connection pipe", relay_conn_pipe);
+}
- ret = utils_create_pipe_cloexec(relay_conn_pipe);
+static int stdio_open(void *data, int *fds)
+{
+ fds[0] = fileno(stdout);
+ fds[1] = fileno(stderr);
+ return 0;
+}
- return ret;
+static int track_stdio(void)
+{
+ int fds[2];
+ const char *names[] = { "stdout", "stderr" };
+
+ return fd_tracker_open_unsuspendable_fd(the_fd_tracker, fds,
+ names, 2, stdio_open, NULL);
}
/*
bool thread_is_rcu_registered = false;
int ret = 0, retval = 0;
void *status;
+ char *unlinked_file_directory_path = NULL, *output_path = NULL;
/* Parse environment variables */
ret = parse_env_options();
rcu_register_thread();
thread_is_rcu_registered = true;
- the_fd_tracker = fd_tracker_create(lttng_opt_fd_cap);
+ output_path = create_output_path("");
+ if (!output_path) {
+ ERR("Failed to get output path");
+ retval = -1;
+ goto exit_options;
+ }
+ ret = asprintf(&unlinked_file_directory_path, "%s/%s", output_path,
+ DEFAULT_UNLINKED_FILES_DIRECTORY);
+ free(output_path);
+ if (ret < 0) {
+ ERR("Failed to format unlinked file directory path");
+ retval = -1;
+ goto exit_options;
+ }
+ the_fd_tracker = fd_tracker_create(
+ unlinked_file_directory_path, lttng_opt_fd_pool_size);
+ free(unlinked_file_directory_path);
if (!the_fd_tracker) {
retval = -1;
goto exit_options;
}
+ ret = track_stdio();
+ if (ret) {
+ retval = -1;
+ goto exit_options;
+ }
+
/* Initialize thread health monitoring */
health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
if (!health_relayd) {
goto exit_options;
}
- ret = utils_create_pipe(health_quit_pipe);
+ ret = init_health_quit_pipe();
if (ret) {
retval = -1;
goto exit_options;