X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=0f381f7f5e46d7e450d0b54824882865af4ac626;hb=6550503d44c075f45b5c15e42d5770a0a5a7d82c;hp=53eaca2cbf944327b95aa0fced58bb564bb8013f;hpb=fa91dc52d62347d1c1ce56e995525f2c57adfc13;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 53eaca2cb..0f381f7f5 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -44,6 +44,7 @@ #include #include #include +#include #include #include #include @@ -232,6 +233,11 @@ int set_option(int opt, const char *arg, const char *optname) break; case 'g': tracing_group_name = strdup(arg); + if (tracing_group_name == NULL) { + ret = -errno; + PERROR("strdup"); + goto end; + } tracing_group_name_override = 1; break; case 'h': @@ -250,7 +256,10 @@ int set_option(int opt, const char *arg, const char *optname) if (arg) { lttng_opt_verbose = config_parse_value(arg); } else { - lttng_opt_verbose += 1; + /* Only 3 level of verbosity (-vvv). */ + if (lttng_opt_verbose < 3) { + lttng_opt_verbose += 1; + } } break; default: @@ -825,6 +834,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -1201,6 +1215,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, stream->session_id = session->id; stream->index_fd = -1; stream->read_index_fd = -1; + stream->ctf_stream_id = -1ULL; lttng_ht_node_init_u64(&stream->node, stream->stream_handle); pthread_mutex_init(&stream->lock, NULL); @@ -1937,9 +1952,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, DBG("Received live beacon for stream %" PRIu64, stream->stream_handle); /* - * Only flag a stream inactive when it has already received data. + * Only flag a stream inactive when it has already received data + * and no indexes are in flight. */ - if (stream->total_index_received > 0) { + if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) { stream->beacon_ts_end = be64toh(index_info.timestamp_end); } ret = 0; @@ -1956,9 +1972,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } index_created = 1; + stream->indexes_in_flight++; } copy_index_control_data(index, &index_info); + if (stream->ctf_stream_id == -1ULL) { + stream->ctf_stream_id = be64toh(index_info.stream_id); + } if (index_created) { /* @@ -1983,6 +2003,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, goto end_rcu_unlock; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } end_rcu_unlock: @@ -2146,6 +2168,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } index_created = 1; + stream->indexes_in_flight++; } if (rotate_index || stream->index_fd < 0) { @@ -2188,6 +2211,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num, goto error; } stream->total_index_received++; + stream->indexes_in_flight--; + assert(stream->indexes_in_flight >= 0); } error: @@ -2316,7 +2341,6 @@ int relay_process_data(struct relay_connection *conn) stream->tracefile_size, stream->tracefile_count, relayd_uid, relayd_gid, stream->fd, &(stream->tracefile_count_current), &stream->fd); - stream->total_index_received = 0; pthread_mutex_unlock(&stream->viewer_stream_rotation_lock); if (ret < 0) { ERR("Rotating stream output file"); @@ -2389,7 +2413,7 @@ static void destroy_connection(struct lttng_ht *relay_connections_ht, connection_delete(relay_connections_ht, conn); /* For the control socket, we try to destroy the session. */ - if (conn->type == RELAY_CONTROL) { + if (conn->type == RELAY_CONTROL && conn->session) { destroy_session(conn->session, conn->sessions_ht); } @@ -2411,6 +2435,7 @@ void *relay_thread_worker(void *data) struct lttcomm_relayd_hdr recv_hdr; struct relay_local_data *relay_ctx = (struct relay_local_data *) data; struct lttng_ht *sessions_ht = relay_ctx->sessions_ht; + struct relay_index *index; DBG("[thread] Relay worker started"); @@ -2481,6 +2506,11 @@ restart: health_code_update(); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Thread quit pipe has been closed. Killing thread. */ ret = check_thread_quit_pipe(pollfd, revents); if (ret) { @@ -2583,45 +2613,49 @@ restart: health_code_update(); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + /* Skip the command pipe. It's handled in the first loop. */ if (pollfd == relay_conn_pipe[0]) { continue; } - if (revents) { - rcu_read_lock(); - conn = connection_find_by_sock(relay_connections_ht, pollfd); - if (!conn) { - /* Skip it. Might be removed before. */ + rcu_read_lock(); + conn = connection_find_by_sock(relay_connections_ht, pollfd); + if (!conn) { + /* Skip it. Might be removed before. */ + rcu_read_unlock(); + continue; + } + + if (revents & LPOLLIN) { + if (conn->type != RELAY_DATA) { rcu_read_unlock(); continue; } - if (revents & LPOLLIN) { - if (conn->type != RELAY_DATA) { - continue; - } - - ret = relay_process_data(conn); - /* Connection closed */ - if (ret < 0) { - cleanup_connection_pollfd(&events, pollfd); - destroy_connection(relay_connections_ht, conn); - DBG("Data connection closed with %d", pollfd); - /* - * Every goto restart call sets the last seen fd where - * here we don't really care since we gracefully - * continue the loop after the connection is deleted. - */ - } else { - /* Keep last seen port. */ - last_seen_data_fd = pollfd; - rcu_read_unlock(); - goto restart; - } + ret = relay_process_data(conn); + /* Connection closed */ + if (ret < 0) { + cleanup_connection_pollfd(&events, pollfd); + destroy_connection(relay_connections_ht, conn); + DBG("Data connection closed with %d", pollfd); + /* + * Every goto restart call sets the last seen fd where + * here we don't really care since we gracefully + * continue the loop after the connection is deleted. + */ + } else { + /* Keep last seen port. */ + last_seen_data_fd = pollfd; + rcu_read_unlock(); + goto restart; } - rcu_read_unlock(); } + rcu_read_unlock(); } last_seen_data_fd = -1; } @@ -2642,6 +2676,14 @@ error: } rcu_read_unlock(); error_poll_create: + rcu_read_lock(); + cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, + index_n.node) { + health_code_update(); + relay_index_delete(index); + relay_index_free_safe(index); + } + rcu_read_unlock(); lttng_ht_destroy(indexes_ht); indexes_ht_error: lttng_ht_destroy(relay_connections_ht);