const char *progname;
+const char *tracing_group_name = DEFAULT_TRACING_GROUP;
+
/*
* Quit pipe for all threads. This permits a single cancellation point
* for all threads when receiving an event on the pipe.
static pthread_t listener_thread;
static pthread_t dispatcher_thread;
static pthread_t worker_thread;
+static pthread_t health_thread;
static uint64_t last_relay_stream_id;
static uint64_t last_relay_session_id;
struct lttng_ht *indexes_ht;
/* Relayd health monitoring */
-static struct health_app *health_relayd;
+struct health_app *health_relayd;
/*
* usage function on stderr
fprintf(stderr, " -D, --data-port URL Data port listening.\n");
fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n");
fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n");
+ fprintf(stderr, " -g, --group NAME Specify the tracing group name. (default: tracing)\n");
}
static
{ "control-port", 1, 0, 'C', },
{ "data-port", 1, 0, 'D', },
{ "daemonize", 0, 0, 'd', },
+ { "group", 1, 0, 'g', },
{ "help", 0, 0, 'h', },
{ "output", 1, 0, 'o', },
{ "verbose", 0, 0, 'v', },
while (1) {
int option_index = 0;
- c = getopt_long(argc, argv, "dhv" "C:D:o:",
+ c = getopt_long(argc, argv, "dhv" "C:D:o:g:",
long_options, &option_index);
if (c == -1) {
break;
case 'd':
opt_daemon = 1;
break;
+ case 'g':
+ tracing_group_name = optarg;
+ break;
case 'h':
usage();
exit(EXIT_FAILURE);
static
int notify_thread_pipe(int wpipe)
{
- int ret;
+ ssize_t ret;
- do {
- ret = write(wpipe, "!", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(wpipe, "!", 1);
+ if (ret < 1) {
PERROR("write poll pipe");
}
return ret;
}
+static void notify_health_quit_pipe(int *pipe)
+{
+ ssize_t ret;
+
+ ret = lttng_write(pipe[1], "4", 1);
+ if (ret < 1) {
+ PERROR("write relay health quit");
+ }
+}
+
/*
* Stop all threads by closing the thread quit pipe.
*/
ERR("write error on thread quit pipe");
}
+ notify_health_quit_pipe(health_quit_pipe);
+
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_cmd_queue.futex);
static
void *relay_thread_dispatcher(void *data)
{
- int ret, err = -1;
+ int err = -1;
+ ssize_t ret;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
* call is blocking so we can be assured that the data will be read
* at some point in time or wait to the end of the world :)
*/
- do {
- ret = write(relay_cmd_pipe[1], relay_cmd,
- sizeof(struct relay_command));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(relay_cmd_pipe[1], relay_cmd,
+ sizeof(struct relay_command));
free(relay_cmd);
- if (ret < 0 || ret != sizeof(struct relay_command)) {
+ if (ret < sizeof(struct relay_command)) {
PERROR("write cmd pipe");
goto error;
}
* RCU read side lock MUST be acquired. If NO close_stream_check() was called
* BEFORE the stream lock MUST be acquired.
*/
-static void destroy_stream(struct relay_stream *stream,
- struct lttng_ht *ctf_traces_ht)
+static void destroy_stream(struct relay_stream *stream)
{
int delret;
struct relay_viewer_stream *vstream;
delret = lttng_ht_del(relay_streams_ht, &iter);
assert(!delret);
iter.iter.node = &stream->ctf_trace_node.node;
- delret = lttng_ht_del(ctf_traces_ht, &iter);
+ delret = lttng_ht_del(stream->ctf_traces_ht, &iter);
assert(!delret);
call_rcu(&stream->rcu_node, deferred_free_stream);
DBG("Closed tracefile %d from close stream", stream->fd);
}
stream = caa_container_of(node, struct relay_stream, stream_n);
if (stream->session == cmd->session) {
- destroy_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream);
+ cmd->session->stream_count--;
+ assert(cmd->session->stream_count >= 0);
}
}
stream->ctf_trace->metadata_stream = stream;
}
ctf_trace_assign(cmd->ctf_traces_ht, stream);
+ stream->ctf_traces_ht = cmd->ctf_traces_ht;
lttng_ht_node_init_ulong(&stream->stream_n,
(unsigned long) stream->stream_handle);
lttng_ht_node_init_str(&stream->ctf_trace_node, stream->path_name);
lttng_ht_add_str(cmd->ctf_traces_ht, &stream->ctf_trace_node);
+ session->stream_count++;
DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name,
stream->stream_handle);
stream->last_net_seq_num = be64toh(stream_info.last_net_seq_num);
stream->close_flag = 1;
+ session->stream_count--;
+ assert(session->stream_count >= 0);
if (close_stream_check(stream)) {
- destroy_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream);
}
end_unlock:
*/
static int write_padding_to_file(int fd, uint32_t size)
{
- int ret = 0;
+ ssize_t ret = 0;
char *zeros;
if (size == 0) {
goto end;
}
- do {
- ret = write(fd, zeros, size);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != size) {
+ ret = lttng_write(fd, zeros, size);
+ if (ret < size) {
PERROR("write padding to file");
}
struct relay_command *cmd)
{
int ret = htobe32(LTTNG_OK);
+ ssize_t size_ret;
struct relay_session *session = cmd->session;
struct lttcomm_relayd_metadata_payload *metadata_struct;
struct relay_stream *metadata_stream;
goto end_unlock;
}
- do {
- ret = write(metadata_stream->fd, metadata_struct->payload,
- payload_size);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != payload_size) {
+ size_ret = lttng_write(metadata_stream->fd, metadata_struct->payload,
+ payload_size);
+ if (size_ret < payload_size) {
ERR("Relay error writing metadata on file");
ret = -1;
goto end_unlock;
int relay_process_data(struct relay_command *cmd)
{
int ret = 0, rotate_index = 0;
+ ssize_t size_ret;
struct relay_stream *stream;
struct lttcomm_relayd_data_hdr data_hdr;
uint64_t stream_id;
}
/* Write data to stream output fd. */
- do {
- ret = write(stream->fd, data_buffer, data_size);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != data_size) {
+ size_ret = lttng_write(stream->fd, data_buffer, data_size);
+ if (size_ret < data_size) {
ERR("Relay error writing data to file");
ret = -1;
goto end_rcu_unlock;
/* Check if we need to close the FD */
if (close_stream_check(stream)) {
- destroy_stream(stream, cmd->ctf_traces_ht);
+ destroy_stream(stream);
}
end_rcu_unlock:
struct lttng_ht *relay_connections_ht)
{
struct relay_command *relay_connection;
- int ret;
+ ssize_t ret;
relay_connection = zmalloc(sizeof(struct relay_command));
if (relay_connection == NULL) {
PERROR("Relay command zmalloc");
goto error;
}
- do {
- ret = read(fd, relay_connection, sizeof(struct relay_command));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(struct relay_command)) {
+ ret = lttng_read(fd, relay_connection, sizeof(struct relay_command));
+ if (ret < sizeof(struct relay_command)) {
PERROR("read relay cmd pipe");
goto error_read;
}
- relay_connection->ctf_traces_ht = lttng_ht_new(0, LTTNG_HT_TYPE_STRING);
- if (!relay_connection->ctf_traces_ht) {
- goto error_read;
+ /*
+ * Only used by the control side and the reference is copied inside each
+ * stream from that connection. Thus a destroy HT must be done after every
+ * stream has been destroyed.
+ */
+ if (relay_connection->type == RELAY_CONTROL) {
+ relay_connection->ctf_traces_ht = lttng_ht_new(0,
+ LTTNG_HT_TYPE_STRING);
+ if (!relay_connection->ctf_traces_ht) {
+ goto error_read;
+ }
}
lttng_ht_node_init_ulong(&relay_connection->sock_n,
struct relay_command *relay_connection =
caa_container_of(head, struct relay_command, rcu_node);
- lttng_ht_destroy(relay_connection->ctf_traces_ht);
lttcomm_destroy_sock(relay_connection->sock);
free(relay_connection);
}
ret = lttng_ht_del(relay_connections_ht, iter);
assert(!ret);
+
if (relay_connection->type == RELAY_CONTROL) {
relay_delete_session(relay_connection, sessions_ht);
+ lttng_ht_destroy(relay_connection->ctf_traces_ht);
}
- call_rcu(&relay_connection->rcu_node,
- deferred_free_connection);
+ call_rcu(&relay_connection->rcu_node, deferred_free_connection);
}
/*
goto exit_health_app_create;
}
+ ret = utils_create_pipe(health_quit_pipe);
+ if (ret < 0) {
+ goto error_health_pipe;
+ }
+
+ /* Create thread to manage the client socket */
+ ret = pthread_create(&health_thread, NULL,
+ thread_manage_health, (void *) NULL);
+ if (ret != 0) {
+ PERROR("pthread_create health");
+ goto health_error;
+ }
+
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
goto exit_live;
}
- live_stop_threads();
-
exit_live:
ret = pthread_join(listener_thread, &status);
if (ret != 0) {
}
exit_dispatcher:
+ ret = pthread_join(health_thread, &status);
+ if (ret != 0) {
+ PERROR("pthread_join health thread");
+ goto error; /* join error, exit without cleanup */
+ }
+
+ /*
+ * Stop live threads only after joining other threads.
+ */
+ live_stop_threads();
+
+health_error:
+ utils_close_pipe(health_quit_pipe);
+
+error_health_pipe:
health_app_destroy(health_relayd);
exit_health_app_create: