X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=59888c280c341cc1c16a5c89d652a88802787ec2;hb=efc8a87fb1ec94af764008a13b3576e793ae288c;hp=f12b1a335b6bd0eaf99a49bfa3bc234149fc833d;hpb=4c80d9a2ff26498f74b0731f11f33f1fd593ff8f;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index f12b1a335..59888c280 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -101,9 +101,6 @@ static struct relay_cmd_queue relay_cmd_queue; static char *data_buffer; static unsigned int data_buffer_size; -/* Global hash table that stores relay index object. */ -static struct lttng_ht *indexes_ht; - /* We need those values for the file/dir creation. */ static uid_t relayd_uid; static gid_t relayd_gid; @@ -114,6 +111,9 @@ struct lttng_ht *relay_streams_ht; /* Global relay viewer stream hash table. */ struct lttng_ht *viewer_streams_ht; +/* Global hash table that stores relay index object. */ +struct lttng_ht *indexes_ht; + /* * usage function on stderr */ @@ -802,6 +802,9 @@ static void destroy_stream(struct relay_stream *stream, vstream->total_index_received = stream->total_index_received; } + /* Cleanup index of that stream. */ + relay_index_destroy_by_stream_id(stream->stream_handle); + iter.iter.node = &stream->stream_n.node; delret = lttng_ht_del(relay_streams_ht, &iter); assert(!delret); @@ -841,8 +844,6 @@ void relay_delete_session(struct relay_command *cmd, if (stream->session == cmd->session) { destroy_stream(stream, cmd->ctf_traces_ht); } - /* Cleanup index of that stream. */ - relay_index_destroy_by_stream_id(stream->stream_handle, indexes_ht); } /* Make this session not visible anymore. */ @@ -1652,7 +1653,7 @@ end_no_session: */ static int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *indexes_ht) + struct relay_command *cmd) { int ret, send_ret, index_created = 0; struct relay_session *session = cmd->session; @@ -1663,7 +1664,6 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, uint64_t net_seq_num; assert(cmd); - assert(indexes_ht); DBG("Relay receiving index"); @@ -1711,7 +1711,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, stream->beacon_ts_end = -1ULL; } - index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht); + index = relay_index_find(stream->stream_handle, net_seq_num); if (!index) { /* A successful creation will add the object to the HT. */ index = relay_index_create(stream->stream_handle, net_seq_num); @@ -1729,7 +1729,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, * already exist, destroy back the index created, set the data in this * object and write it on disk. */ - relay_index_add(index, indexes_ht, &wr_index); + relay_index_add(index, &wr_index); if (wr_index) { copy_index_control_data(wr_index, &index_info); free(index); @@ -1752,7 +1752,7 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, stream->index_fd = ret; } - ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + ret = relay_index_write(wr_index->fd, wr_index); if (ret < 0) { goto end_rcu_unlock; } @@ -1818,7 +1818,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_end_data_pending(recv_hdr, cmd); break; case RELAYD_SEND_INDEX: - ret = relay_recv_index(recv_hdr, cmd, indexes_ht); + ret = relay_recv_index(recv_hdr, cmd); break; case RELAYD_UPDATE_SYNC_INFO: default: @@ -1836,8 +1836,7 @@ end: * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd, - struct lttng_ht *indexes_ht) +int relay_process_data(struct relay_command *cmd) { int ret = 0, rotate_index = 0, index_created = 0; struct relay_stream *stream; @@ -1924,7 +1923,7 @@ int relay_process_data(struct relay_command *cmd, * exists, the control thread already received the data for it thus we need * to write it on disk. */ - index = relay_index_find(stream_id, net_seq_num, indexes_ht); + index = relay_index_find(stream_id, net_seq_num); if (!index) { /* A successful creation will add the object to the HT. */ index = relay_index_create(stream->stream_handle, net_seq_num); @@ -1954,7 +1953,7 @@ int relay_process_data(struct relay_command *cmd, * Try to add the relay index object to the hash table. If an object * already exist, destroy back the index created and set the data. */ - relay_index_add(index, indexes_ht, &wr_index); + relay_index_add(index, &wr_index); if (wr_index) { /* Copy back data from the created index. */ wr_index->fd = index->fd; @@ -1980,7 +1979,7 @@ int relay_process_data(struct relay_command *cmd, stream->index_fd = ret; } - ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + ret = relay_index_write(wr_index->fd, wr_index); if (ret < 0) { goto end_rcu_unlock; } @@ -2311,7 +2310,7 @@ restart: continue; } - ret = relay_process_data(relay_connection, indexes_ht); + ret = relay_process_data(relay_connection); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2355,7 +2354,7 @@ error_poll_create: { struct relay_index *index; cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index, index_n.node) { - relay_index_delete(index, indexes_ht); + relay_index_delete(index); } lttng_ht_destroy(indexes_ht); }