X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fust-consumer%2Fust-consumer.cpp;h=30d1f102142781289be72a965a9b8bfe719486ff;hb=9cc4ae91845c03b141af7ef58a86a2a9689dfafd;hp=a2bf099b236f94772c9fd1d9fbb0fa3831639848;hpb=edb555b5791df39308cb5d9b1fc9883aab21951e;p=lttng-tools.git diff --git a/src/common/ust-consumer/ust-consumer.cpp b/src/common/ust-consumer/ust-consumer.cpp index a2bf099b2..30d1f1021 100644 --- a/src/common/ust-consumer/ust-consumer.cpp +++ b/src/common/ust-consumer/ust-consumer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2017 Jérémie Galarneau * @@ -25,22 +25,22 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "ust-consumer.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ust-consumer.hpp" #define INT_MAX_STR_LEN 12 /* includes \0 */ @@ -49,46 +49,6 @@ extern int consumer_poll_timeout; LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE(); -/* - * Free channel object and all streams associated with it. This MUST be used - * only and only if the channel has _NEVER_ been added to the global channel - * hash table. - */ -static void destroy_channel(struct lttng_consumer_channel *channel) -{ - struct lttng_consumer_stream *stream, *stmp; - - LTTNG_ASSERT(channel); - - DBG("UST consumer cleaning stream list"); - - cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head, - send_node) { - - health_code_update(); - - cds_list_del(&stream->send_node); - lttng_ust_ctl_destroy_stream(stream->ustream); - lttng_trace_chunk_put(stream->trace_chunk); - free(stream); - } - - /* - * If a channel is available meaning that was created before the streams - * were, delete it. - */ - if (channel->uchan) { - lttng_ustconsumer_del_channel(channel); - lttng_ustconsumer_free_channel(channel); - } - - if (channel->trace_chunk) { - lttng_trace_chunk_put(channel->trace_chunk); - } - - free(channel); -} - /* * Add channel to internal consumer state. * @@ -203,7 +163,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream, * global. */ stream->globally_visible = 1; - cds_list_del(&stream->send_node); + cds_list_del_init(&stream->send_node); ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream)); if (ret < 0) { @@ -407,7 +367,7 @@ static int create_ust_channel(struct lttng_consumer_channel *channel, nr_stream_fds = 1; else nr_stream_fds = lttng_ust_ctl_get_nr_stream_per_channel(); - stream_fds = (int *) zmalloc(nr_stream_fds * sizeof(*stream_fds)); + stream_fds = calloc(nr_stream_fds); if (!stream_fds) { ret = -1; goto error_alloc; @@ -742,6 +702,14 @@ static int flush_channel(uint64_t chan_key) next: pthread_mutex_unlock(&stream->lock); } + + /* + * Send one last buffer statistics update to the session daemon. This + * ensures that the session daemon gets at least one statistics update + * per channel even in the case of short-lived channels, such as when a + * short-lived app is traced in per-pid mode. + */ + sample_and_send_channel_buffer_stats(channel); error: rcu_read_unlock(); return ret; @@ -876,6 +844,8 @@ static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key) int ret; struct lttng_consumer_channel *metadata; + ASSERT_RCU_READ_LOCKED(); + DBG("UST consumer setup metadata key %" PRIu64, key); metadata = consumer_find_channel(key); @@ -948,7 +918,6 @@ error: * will make sure to clean that list. */ consumer_stream_destroy(metadata->metadata_stream, NULL); - cds_list_del(&metadata->metadata_stream->send_node); metadata->metadata_stream = NULL; send_streams_error: error_no_stream: @@ -971,6 +940,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, LTTNG_ASSERT(path); LTTNG_ASSERT(ctx); + ASSERT_RCU_READ_LOCKED(); DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); @@ -1004,7 +974,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, metadata_stream = metadata_channel->metadata_stream; LTTNG_ASSERT(metadata_stream); - pthread_mutex_lock(&metadata_stream->lock); + metadata_stream->read_subbuffer_ops.lock(metadata_stream); if (relayd_id != (uint64_t) -1ULL) { metadata_stream->net_seq_idx = relayd_id; ret = consumer_send_relayd_stream(metadata_stream, path); @@ -1012,14 +982,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, ret = consumer_stream_create_output_files(metadata_stream, false); } - pthread_mutex_unlock(&metadata_stream->lock); if (ret < 0) { goto error_stream; } do { health_code_update(); - ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true); if (ret < 0) { goto error_stream; @@ -1027,12 +995,12 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel, } while (ret > 0); error_stream: + metadata_stream->read_subbuffer_ops.unlock(metadata_stream); /* - * Clean up the stream completly because the next snapshot will use a new - * metadata stream. + * Clean up the stream completely because the next snapshot will use a + * new metadata stream. */ consumer_stream_destroy(metadata_stream, NULL); - cds_list_del(&metadata_stream->send_node); metadata_channel->metadata_stream = NULL; error: @@ -1087,6 +1055,7 @@ static int snapshot_channel(struct lttng_consumer_channel *channel, LTTNG_ASSERT(path); LTTNG_ASSERT(ctx); + ASSERT_RCU_READ_LOCKED(); rcu_read_lock(); @@ -1285,7 +1254,7 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len); - metadata_str = (char *) zmalloc(len * sizeof(char)); + metadata_str = calloc(len); if (!metadata_str) { PERROR("zmalloc metadata string"); ret_code = LTTCOMM_CONSUMERD_ENOMEM; @@ -1323,10 +1292,21 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset, * metadata position to ensure the metadata poll thread consumes * the whole cache. */ - pthread_mutex_lock(&channel->metadata_stream->lock); - metadata_stream_reset_cache_consumed_position( - channel->metadata_stream); - pthread_mutex_unlock(&channel->metadata_stream->lock); + + /* + * channel::metadata_stream can be null when the metadata + * channel is under a snapshot session type. No need to update + * the stream position in that scenario. + */ + if (channel->metadata_stream != NULL) { + pthread_mutex_lock(&channel->metadata_stream->lock); + metadata_stream_reset_cache_consumed_position( + channel->metadata_stream); + pthread_mutex_unlock(&channel->metadata_stream->lock); + } else { + /* Validate we are in snapshot mode. */ + LTTNG_ASSERT(!channel->monitor); + } /* Fall-through. */ case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT: /* @@ -1417,11 +1397,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + uint32_t major = msg.u.relayd_sock.major; + uint32_t minor = msg.u.relayd_sock.minor; + enum lttcomm_sock_proto protocol = + (enum lttcomm_sock_proto) msg.u.relayd_sock + .relayd_socket_protocol; + /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id); + msg.u.relayd_sock.type, ctx, sock, + consumer_sockpoll, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, major, + minor, protocol); goto end_nosignal; } case LTTNG_CONSUMER_DESTROY_RELAYD: @@ -2046,8 +2033,7 @@ error_push_metadata_fatal: */ rotate_channel = lttng_consumer_rotate_channel( found_channel, key, - msg.u.rotate_channel.relayd_id, - msg.u.rotate_channel.metadata, ctx); + msg.u.rotate_channel.relayd_id); if (rotate_channel < 0) { ERR("Rotate channel failed"); ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; @@ -2074,8 +2060,7 @@ error_push_metadata_fatal: ret_rotate_read_streams = lttng_consumer_rotate_ready_streams( - found_channel, key, - ctx); + found_channel, key); if (ret_rotate_read_streams < 0) { ERR("Rotate channel failed"); } @@ -2116,9 +2101,11 @@ end_rotate_channel_nosignal: case LTTNG_CONSUMER_INIT: { int ret_send_status; + lttng_uuid sessiond_uuid; - ret_code = lttng_consumer_init_command(ctx, - msg.u.init.sessiond_uuid); + std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid), + sessiond_uuid.begin()); + ret_code = lttng_consumer_init_command(ctx, sessiond_uuid); health_code_update(); ret_send_status = consumer_send_status_msg(sock, ret_code); if (ret_send_status < 0) { @@ -2298,11 +2285,7 @@ end_msg_sessiond: end_channel_error: if (channel) { - /* - * Free channel here since no one has a reference to it. We don't - * free after that because a stream can store this pointer. - */ - destroy_channel(channel); + consumer_del_channel(channel); } /* We have to send a status channel message indicating an error. */ { @@ -2448,8 +2431,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream) stream->quiescent = true; } } - pthread_mutex_unlock(&stream->lock); + stream->hangup_flush_done = 1; + pthread_mutex_unlock(&stream->lock); } void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan) @@ -2639,6 +2623,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata( LTTNG_ASSERT(ctx); LTTNG_ASSERT(metadata_stream); + ASSERT_RCU_READ_LOCKED(); metadata_channel = metadata_stream->chan; pthread_mutex_unlock(&metadata_stream->lock); @@ -3049,7 +3034,7 @@ end: } static int put_next_subbuffer(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) + struct stream_subbuffer *subbuffer __attribute__((unused))) { const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream); @@ -3058,7 +3043,7 @@ static int put_next_subbuffer(struct lttng_consumer_stream *stream, } static int signal_metadata(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx __attribute__((unused))) { ASSERT_LOCKED(stream->metadata_rdv_lock); return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;