X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=7ae40c57c564a10c2bffbdff5c99618cdf37fff1;hb=5f3ea0040a33e37c9db44494f7ad64a3f78f9c6b;hp=3415cfe92872b01d2df95a564dca380db3cc16ce;hpb=fc9390b19f80ea9fe64783156ea5fdcee8b93817;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 3415cfe92..7ae40c57c 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -320,6 +320,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -1071,7 +1072,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ static int update_poll_array(struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream, - struct lttng_ht *ht) + struct lttng_ht *ht, int *nb_inactive_fd) { int i = 0; struct lttng_ht_iter iter; @@ -1083,6 +1084,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, assert(local_stream); DBG("Updating poll fd array"); + *nb_inactive_fd = 0; rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { /* @@ -1093,9 +1095,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * just after the check. However, this is OK since the stream(s) will * be deleted once the thread is notified that the end point state has * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; continue; } /* @@ -2051,12 +2058,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, DBG3("Consumer delete metadata stream %d", stream->wait_fd); pthread_mutex_lock(&consumer_data.lock); + pthread_mutex_lock(&stream->chan->lock); + pthread_mutex_lock(&stream->lock); if (stream->chan->metadata_cache) { /* Only applicable to userspace consumers. */ pthread_mutex_lock(&stream->chan->metadata_cache->lock); } - pthread_mutex_lock(&stream->chan->lock); - pthread_mutex_lock(&stream->lock); /* Remove any reference to that stream. */ consumer_stream_delete(stream, ht); @@ -2080,11 +2087,11 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, */ stream->chan->metadata_stream = NULL; - pthread_mutex_unlock(&stream->lock); - pthread_mutex_unlock(&stream->chan->lock); if (stream->chan->metadata_cache) { pthread_mutex_unlock(&stream->chan->metadata_cache->lock); } + pthread_mutex_unlock(&stream->lock); + pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); if (free_chan) { @@ -2145,7 +2152,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_u64(ht, &stream->node); - lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht, + lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht, &stream->node_channel_id); /* @@ -2449,6 +2456,8 @@ void *consumer_thread_data_poll(void *data) struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; + /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ + int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -2505,7 +2514,7 @@ void *consumer_thread_data_poll(void *data) goto end; } ret = update_poll_array(ctx, &pollfd, local_stream, - data_ht); + data_ht, &nb_inactive_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2518,7 +2527,7 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); /* No FDs and consumer_quit, consumer_cleanup the thread */ - if (nb_fd == 0 && consumer_quit == 1) { + if (nb_fd == 0 && consumer_quit == 1 && nb_inactive_fd == 0) { err = 0; /* All is OK */ goto end; }