*/
#define NR_LTTNG_RELAY_READY 3
static int lttng_relay_ready = NR_LTTNG_RELAY_READY;
+
+/* Size of receive buffer. */
+#define RECV_DATA_BUFFER_SIZE 65536
+
static int recv_child_signal; /* Set to 1 when a SIGUSR1 signal is received. */
static pid_t child_ppid; /* Internal parent PID use with daemonize. */
static void sighandler(int sig)
{
switch (sig) {
- case SIGPIPE:
- DBG("SIGPIPE caught");
- return;
case SIGINT:
DBG("SIGINT caught");
if (lttng_relay_stop_threads()) {
return ret;
}
- sa.sa_handler = sighandler;
sa.sa_mask = sigset;
sa.sa_flags = 0;
+
+ sa.sa_handler = sighandler;
if ((ret = sigaction(SIGTERM, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
return ret;
}
- if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
+ if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
}
- if ((ret = sigaction(SIGUSR1, &sa, NULL)) < 0) {
+ sa.sa_handler = SIG_IGN;
+ if ((ret = sigaction(SIGPIPE, &sa, NULL)) < 0) {
PERROR("sigaction");
return ret;
}
goto exit;
}
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("socket poll error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
/*
* A new connection is requested, therefore a
* sessiond/consumerd connection is allocated in
* exchange in cds_wfcq_enqueue.
*/
futex_nto1_wake(&relay_conn_queue.futex);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("socket poll error");
+ goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
}
}
static int relay_recv_metadata(struct lttcomm_relayd_hdr *recv_hdr,
struct relay_connection *conn)
{
- int ret = htobe32(LTTNG_OK);
+ int ret = 0;
ssize_t size_ret;
struct relay_session *session = conn->session;
struct lttcomm_relayd_metadata_payload *metadata_struct;
}
memset(data_buffer, 0, data_size);
DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
- ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
- if (ret < 0 || ret != data_size) {
- if (ret == 0) {
+ size_ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
+ if (size_ret < 0 || size_ret != data_size) {
+ if (size_ret == 0) {
/* Orderly shutdown. Not necessary to print an error. */
DBG("Socket %d did an orderly shutdown", conn->sock->fd);
} else {
goto end_put;
}
- ret = write_padding_to_file(metadata_stream->stream_fd->fd,
+ size_ret = write_padding_to_file(metadata_stream->stream_fd->fd,
be32toh(metadata_struct->padding_size));
- if (ret < 0) {
+ if (size_ret < 0) {
goto end_put;
}
end_put:
pthread_mutex_unlock(&metadata_stream->lock);
stream_put(metadata_stream);
-
end:
return ret;
}
/* Get data offset because we are about to update the index. */
data_offset = htobe64(stream->tracefile_size_current);
- DBG("handle_index_data: stream %" PRIu64 " data offset %" PRIu64,
- stream->stream_handle, stream->tracefile_size_current);
+ DBG("handle_index_data: stream %" PRIu64 " net_seq_num %" PRIu64 " data offset %" PRIu64,
+ stream->stream_handle, net_seq_num, stream->tracefile_size_current);
/*
* Lookup for an existing index for that stream id/sequence
uint32_t data_size;
struct relay_session *session;
bool new_stream = false, close_requested = false;
+ size_t chunk_size = RECV_DATA_BUFFER_SIZE;
+ size_t recv_off = 0;
+ char data_buffer[chunk_size];
ret = conn->sock->ops->recvmsg(conn->sock, &data_hdr,
sizeof(struct lttcomm_relayd_data_hdr), 0);
stream_id = be64toh(data_hdr.stream_id);
stream = stream_get_by_id(stream_id);
if (!stream) {
+ ERR("relay_process_data: Cannot find stream %" PRIu64, stream_id);
ret = -1;
goto end;
}
session = stream->trace->session;
data_size = be32toh(data_hdr.data_size);
- if (data_buffer_size < data_size) {
- char *tmp_data_ptr;
-
- tmp_data_ptr = realloc(data_buffer, data_size);
- if (!tmp_data_ptr) {
- ERR("Allocating data buffer");
- free(data_buffer);
- ret = -1;
- goto end_stream_put;
- }
- data_buffer = tmp_data_ptr;
- data_buffer_size = data_size;
- }
- memset(data_buffer, 0, data_size);
net_seq_num = be64toh(data_hdr.net_seq_num);
DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
data_size, stream_id, net_seq_num);
- ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, data_size, 0);
- if (ret <= 0) {
- if (ret == 0) {
- /* Orderly shutdown. Not necessary to print an error. */
- DBG("Socket %d did an orderly shutdown", conn->sock->fd);
- }
- ret = -1;
- goto end_stream_put;
- }
pthread_mutex_lock(&stream->lock);
if (session->minor >= 4 && !session->snapshot) {
ret = handle_index_data(stream, net_seq_num, rotate_index);
if (ret < 0) {
+ ERR("handle_index_data: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
goto end_stream_unlock;
}
}
- /* Write data to stream output fd. */
- size_ret = lttng_write(stream->stream_fd->fd, data_buffer, data_size);
- if (size_ret < data_size) {
- ERR("Relay error writing data to file");
- ret = -1;
- goto end_stream_unlock;
- }
+ for (recv_off = 0; recv_off < data_size; recv_off += chunk_size) {
+ size_t recv_size = min(data_size - recv_off, chunk_size);
+
+ ret = conn->sock->ops->recvmsg(conn->sock, data_buffer, recv_size, 0);
+ if (ret <= 0) {
+ if (ret == 0) {
+ /* Orderly shutdown. Not necessary to print an error. */
+ DBG("Socket %d did an orderly shutdown", conn->sock->fd);
+ } else {
+ ERR("Socket %d error %d", conn->sock->fd, ret);
+ }
+ ret = -1;
+ goto end_stream_unlock;
+ }
+
+ /* Write data to stream output fd. */
+ size_ret = lttng_write(stream->stream_fd->fd, data_buffer,
+ recv_size);
+ if (size_ret < recv_size) {
+ ERR("Relay error writing data to file");
+ ret = -1;
+ goto end_stream_unlock;
+ }
- DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
- size_ret, stream->stream_handle);
+ DBG2("Relay wrote %zd bytes to tracefile for stream id %" PRIu64,
+ size_ret, stream->stream_handle);
+ }
ret = write_padding_to_file(stream->stream_fd->fd,
be32toh(data_hdr.padding_size));
if (ret < 0) {
+ ERR("write_padding_to_file: fail stream %" PRIu64 " net_seq_num %" PRIu64 " ret %d",
+ stream->stream_handle, net_seq_num, ret);
goto end_stream_unlock;
}
stream->tracefile_size_current +=
uatomic_set(&session->new_streams, 1);
pthread_mutex_unlock(&session->lock);
}
-end_stream_put:
stream_put(stream);
end:
return ret;
/* Inspect the relay conn pipe for new connection */
if (pollfd == relay_conn_pipe[0]) {
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- ERR("Relay connection pipe error");
- goto error;
- } else if (revents & LPOLLIN) {
+ if (revents & LPOLLIN) {
struct relay_connection *conn;
ret = lttng_read(relay_conn_pipe[0], &conn, sizeof(conn));
LPOLLIN | LPOLLRDHUP);
connection_ht_add(relay_connections_ht, conn);
DBG("Connection socket %d added", conn->sock->fd);
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ ERR("Relay connection pipe error");
+ goto error;
+ } else {
+ ERR("Unexpected poll events %u for sock %d", revents, pollfd);
+ goto error;
}
} else {
struct relay_connection *ctrl_conn;
/* If not found, there is a synchronization issue. */
assert(ctrl_conn);
- if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
- relay_thread_close_connection(&events, pollfd, ctrl_conn);
- if (last_seen_data_fd == pollfd) {
- last_seen_data_fd = last_notdel_data_fd;
- }
- } else if (revents & LPOLLIN) {
- if (ctrl_conn->type == RELAY_CONTROL) {
- ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock, &recv_hdr,
- sizeof(recv_hdr), 0);
- if (ret <= 0) {
- /* Connection closed */
- relay_thread_close_connection(&events, pollfd,
- ctrl_conn);
- } else {
- ret = relay_process_control(&recv_hdr, ctrl_conn);
- if (ret < 0) {
- /* Clear the session on error. */
- relay_thread_close_connection(&events, pollfd,
- ctrl_conn);
- }
- seen_control = 1;
- }
- } else {
+ if (ctrl_conn->type == RELAY_DATA) {
+ if (revents & LPOLLIN) {
/*
* Flag the last seen data fd not deleted. It will be
* used as the last seen fd if any fd gets deleted in
*/
last_notdel_data_fd = pollfd;
}
+ goto put_ctrl_connection;
+ }
+ assert(ctrl_conn->type == RELAY_CONTROL);
+
+ if (revents & LPOLLIN) {
+ ret = ctrl_conn->sock->ops->recvmsg(ctrl_conn->sock,
+ &recv_hdr, sizeof(recv_hdr), 0);
+ if (ret <= 0) {
+ /* Connection closed */
+ relay_thread_close_connection(&events, pollfd,
+ ctrl_conn);
+ } else {
+ ret = relay_process_control(&recv_hdr, ctrl_conn);
+ if (ret < 0) {
+ /* Clear the session on error. */
+ relay_thread_close_connection(&events,
+ pollfd, ctrl_conn);
+ }
+ seen_control = 1;
+ }
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ relay_thread_close_connection(&events,
+ pollfd, ctrl_conn);
+ if (last_seen_data_fd == pollfd) {
+ last_seen_data_fd = last_notdel_data_fd;
+ }
} else {
- ERR("Unknown poll events %u for sock %d", revents, pollfd);
+ ERR("Unexpected poll events %u for control sock %d",
+ revents, pollfd);
+ connection_put(ctrl_conn);
+ goto error;
}
+ put_ctrl_connection:
connection_put(ctrl_conn);
}
}
/* Skip it. Might be removed before. */
continue;
}
+ if (data_conn->type == RELAY_CONTROL) {
+ goto put_data_connection;
+ }
+ assert(data_conn->type == RELAY_DATA);
if (revents & LPOLLIN) {
- if (data_conn->type != RELAY_DATA) {
- goto put_connection;
- }
-
ret = relay_process_data(data_conn);
/* Connection closed */
if (ret < 0) {
relay_thread_close_connection(&events, pollfd,
- data_conn);
+ data_conn);
/*
* Every goto restart call sets the last seen fd where
* here we don't really care since we gracefully
connection_put(data_conn);
goto restart;
}
+ } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
+ relay_thread_close_connection(&events, pollfd,
+ data_conn);
+ } else {
+ ERR("Unknown poll events %u for data sock %d",
+ revents, pollfd);
}
- put_connection:
+ put_data_connection:
connection_put(data_conn);
}
last_seen_data_fd = -1;
DBG("Thread exited with error");
}
DBG("Worker thread cleanup complete");
- free(data_buffer);
error_testpoint:
if (err) {
health_error();
goto exit_init_data;
}
- /* Check if daemon is UID = 0 */
- if (!getuid()) {
- if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
- ERR("Need to be root to use ports < 1024");
- retval = -1;
- goto exit_init_data;
- }
- }
-
/* Setup the thread apps communication pipe. */
if (create_relay_conn_pipe()) {
retval = -1;