X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.c;h=17672fb20950e77f5d61d2738ef5109f059a161f;hb=65121c2141d0df9a1d6fc759904cea4bbc016331;hp=366f8550bf03774db161b6d70bd34765f9005869;hpb=3ab4ee7d2fec586872739cd0f6e58bd4ed54b78a;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index 366f8550b..17672fb20 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -768,10 +768,19 @@ static int flush_channel(uint64_t chan_key) health_code_update(); pthread_mutex_lock(&stream->lock); + + /* + * Protect against concurrent teardown of a stream. + */ + if (cds_lfht_is_node_deleted(&stream->node.node)) { + goto next; + } + if (!stream->quiescent) { ustctl_flush_buffer(stream->ustream, 0); stream->quiescent = true; } +next: pthread_mutex_unlock(&stream->lock); } error: @@ -831,6 +840,7 @@ static int close_metadata(uint64_t chan_key) { int ret = 0; struct lttng_consumer_channel *channel; + unsigned int channel_monitor; DBG("UST consumer close metadata key %" PRIu64, chan_key); @@ -849,13 +859,48 @@ static int close_metadata(uint64_t chan_key) pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&channel->lock); - + channel_monitor = channel->monitor; if (cds_lfht_is_node_deleted(&channel->node.node)) { goto error_unlock; } lttng_ustconsumer_close_metadata(channel); + pthread_mutex_unlock(&channel->lock); + pthread_mutex_unlock(&consumer_data.lock); + /* + * The ownership of a metadata channel depends on the type of + * session to which it belongs. In effect, the monitor flag is checked + * to determine if this metadata channel is in "snapshot" mode or not. + * + * In the non-snapshot case, the metadata channel is created along with + * a single stream which will remain present until the metadata channel + * is destroyed (on the destruction of its session). In this case, the + * metadata stream in "monitored" by the metadata poll thread and holds + * the ownership of its channel. + * + * Closing the metadata will cause the metadata stream's "metadata poll + * pipe" to be closed. Closing this pipe will wake-up the metadata poll + * thread which will teardown the metadata stream which, in return, + * deletes the metadata channel. + * + * In the snapshot case, the metadata stream is created and destroyed + * on every snapshot record. Since the channel doesn't have an owner + * other than the session daemon, it is safe to destroy it immediately + * on reception of the CLOSE_METADATA command. + */ + if (!channel_monitor) { + /* + * The channel and consumer_data locks must be + * released before this call since consumer_del_channel + * re-acquires the channel and consumer_data locks to teardown + * the channel and queue its reclamation by the "call_rcu" + * worker thread. + */ + consumer_del_channel(channel); + } + + return ret; error_unlock: pthread_mutex_unlock(&channel->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -1101,12 +1146,6 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path, stream->name, stream->key); } - if (relayd_id != -1ULL) { - ret = consumer_send_relayd_streams_sent(relayd_id); - if (ret < 0) { - goto error_unlock; - } - } /* * If tracing is active, we want to perform a "full" buffer flush. @@ -1144,7 +1183,7 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, produced_pos, nb_packets_per_stream, stream->max_sb_size); - while (consumed_pos < produced_pos) { + while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; unsigned long len, padded_len; @@ -2159,62 +2198,69 @@ static int get_index_values(struct ctf_packet_index *index, struct ustctl_consumer_stream *ustream) { int ret; + uint64_t packet_size, content_size, timestamp_begin, timestamp_end, + events_discarded, stream_id, stream_instance_id, + packet_seq_num; - ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin); + ret = ustctl_get_timestamp_begin(ustream, ×tamp_begin); if (ret < 0) { PERROR("ustctl_get_timestamp_begin"); goto error; } - index->timestamp_begin = htobe64(index->timestamp_begin); - ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end); + ret = ustctl_get_timestamp_end(ustream, ×tamp_end); if (ret < 0) { PERROR("ustctl_get_timestamp_end"); goto error; } - index->timestamp_end = htobe64(index->timestamp_end); - ret = ustctl_get_events_discarded(ustream, &index->events_discarded); + ret = ustctl_get_events_discarded(ustream, &events_discarded); if (ret < 0) { PERROR("ustctl_get_events_discarded"); goto error; } - index->events_discarded = htobe64(index->events_discarded); - ret = ustctl_get_content_size(ustream, &index->content_size); + ret = ustctl_get_content_size(ustream, &content_size); if (ret < 0) { PERROR("ustctl_get_content_size"); goto error; } - index->content_size = htobe64(index->content_size); - ret = ustctl_get_packet_size(ustream, &index->packet_size); + ret = ustctl_get_packet_size(ustream, &packet_size); if (ret < 0) { PERROR("ustctl_get_packet_size"); goto error; } - index->packet_size = htobe64(index->packet_size); - ret = ustctl_get_stream_id(ustream, &index->stream_id); + ret = ustctl_get_stream_id(ustream, &stream_id); if (ret < 0) { PERROR("ustctl_get_stream_id"); goto error; } - index->stream_id = htobe64(index->stream_id); - ret = ustctl_get_instance_id(ustream, &index->stream_instance_id); + ret = ustctl_get_instance_id(ustream, &stream_instance_id); if (ret < 0) { PERROR("ustctl_get_instance_id"); goto error; } - index->stream_instance_id = htobe64(index->stream_instance_id); - ret = ustctl_get_sequence_number(ustream, &index->packet_seq_num); + ret = ustctl_get_sequence_number(ustream, &packet_seq_num); if (ret < 0) { PERROR("ustctl_get_sequence_number"); goto error; } - index->packet_seq_num = htobe64(index->packet_seq_num); + + *index = (typeof(*index)) { + .offset = index->offset, + .packet_size = htobe64(packet_size), + .content_size = htobe64(content_size), + .timestamp_begin = htobe64(timestamp_begin), + .timestamp_end = htobe64(timestamp_end), + .events_discarded = htobe64(events_discarded), + .stream_id = htobe64(stream_id), + .stream_instance_id = htobe64(stream_instance_id), + .packet_seq_num = htobe64(packet_seq_num), + }; error: return ret; @@ -2291,6 +2337,13 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream) stream->ust_metadata_pushed); ret = write_len; + /* + * Switch packet (but don't open the next one) on every commit of + * a metadata packet. Since the subbuffer is fully filled (with padding, + * if needed), the stream is "quiescent" after this commit. + */ + ustctl_flush_buffer(stream->ustream, 1); + stream->quiescent = true; end: pthread_mutex_unlock(&stream->chan->metadata_cache->lock); return ret; @@ -2335,7 +2388,6 @@ int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx, retry = 1; } - ustctl_flush_buffer(metadata->ustream, 1); ret = ustctl_snapshot(metadata->ustream); if (ret < 0) { if (errno != EAGAIN) { @@ -2525,7 +2577,6 @@ retry: if (ret <= 0) { goto end; } - ustctl_flush_buffer(stream->ustream, 1); goto retry; } @@ -2892,7 +2943,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx, request.key = channel->key; DBG("Sending metadata request to sessiond, session id %" PRIu64 - ", per-pid %" PRIu64 ", app UID %u and channek key %" PRIu64, + ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64, request.session_id, request.session_id_per_pid, request.uid, request.key);