Fix: consumerd: slow metadata push slows down application registration
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
index bfecf09e2a6ae36bf578052f91b2b4c95855ebff..b43ae58ffd84b4be5c836130341d6c8b200b672c 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
  * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
  *
@@ -68,7 +68,7 @@ static void destroy_channel(struct lttng_consumer_channel *channel)
 
                health_code_update();
 
-               cds_list_del(&stream->send_node);
+               cds_list_del_init(&stream->send_node);
                lttng_ust_ctl_destroy_stream(stream->ustream);
                lttng_trace_chunk_put(stream->trace_chunk);
                free(stream);
@@ -204,7 +204,7 @@ static int send_stream_to_thread(struct lttng_consumer_stream *stream,
         * global.
         */
        stream->globally_visible = 1;
-       cds_list_del(&stream->send_node);
+       cds_list_del_init(&stream->send_node);
 
        ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
        if (ret < 0) {
@@ -949,8 +949,9 @@ error:
         * will make sure to clean that list.
         */
        consumer_stream_destroy(metadata->metadata_stream, NULL);
-       cds_list_del(&metadata->metadata_stream->send_node);
        metadata->metadata_stream = NULL;
+       lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
 send_streams_error:
 error_no_stream:
 end:
@@ -986,7 +987,7 @@ static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
         * Ask the sessiond if we have new metadata waiting and update the
         * consumer metadata cache.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
        if (ret < 0) {
                goto error;
        }
@@ -1032,8 +1033,8 @@ error_stream:
         * new metadata stream.
         */
        consumer_stream_destroy(metadata_stream, NULL);
-       cds_list_del(&metadata_stream->send_node);
        metadata_channel->metadata_stream = NULL;
+       lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
 
 error:
        rcu_read_unlock();
@@ -1120,13 +1121,13 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                if (use_relayd) {
                        ret = consumer_send_relayd_stream(stream, path);
                        if (ret < 0) {
-                               goto error_unlock;
+                               goto error_close_stream;
                        }
                } else {
                        ret = consumer_stream_create_output_files(stream,
                                        false);
                        if (ret < 0) {
-                               goto error_unlock;
+                               goto error_close_stream;
                        }
                        DBG("UST consumer snapshot stream (%" PRIu64 ")",
                                        stream->key);
@@ -1148,19 +1149,19 @@ static int snapshot_channel(struct lttng_consumer_channel *channel,
                ret = lttng_ustconsumer_take_snapshot(stream);
                if (ret < 0) {
                        ERR("Taking UST snapshot");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
                if (ret < 0) {
                        ERR("Produced UST snapshot position");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
                if (ret < 0) {
                        ERR("Consumerd UST snapshot position");
-                       goto error_unlock;
+                       goto error_close_stream;
                }
 
                /*
@@ -1277,7 +1278,7 @@ void metadata_stream_reset_cache_consumed_position(
  */
 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
                uint64_t len, uint64_t version,
-               struct lttng_consumer_channel *channel, int timer, int wait)
+               struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
 {
        int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        char *metadata_str;
@@ -1366,13 +1367,8 @@ int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
        if (!wait) {
                goto end_free;
        }
-       while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
-               DBG("Waiting for metadata to be flushed");
 
-               health_code_update();
-
-               usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
-       }
+       consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
 
 end_free:
        free(metadata_str);
@@ -1428,11 +1424,18 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
        switch (msg.cmd_type) {
        case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
        {
+               uint32_t major = msg.u.relayd_sock.major;
+               uint32_t minor = msg.u.relayd_sock.minor;
+               enum lttcomm_sock_proto protocol =
+                               (enum lttcomm_sock_proto) msg.u.relayd_sock
+                                               .relayd_socket_protocol;
+
                /* Session daemon status message are handled in the following call. */
                consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
-                               msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
-                               &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
-                               msg.u.relayd_sock.relayd_session_id);
+                               msg.u.relayd_sock.type, ctx, sock,
+                               consumer_sockpoll, msg.u.relayd_sock.session_id,
+                               msg.u.relayd_sock.relayd_session_id, major,
+                               minor, protocol);
                goto end_nosignal;
        }
        case LTTNG_CONSUMER_DESTROY_RELAYD:
@@ -1816,7 +1819,7 @@ end_get_channel_nosignal:
                health_code_update();
 
                ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
-                               version, found_channel, 0, 1);
+                               version, found_channel, false, 1);
                if (ret < 0) {
                        /* error receiving from sessiond */
                        goto error_push_metadata_fatal;
@@ -2458,8 +2461,9 @@ void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
                        stream->quiescent = true;
                }
        }
-       pthread_mutex_unlock(&stream->lock);
+
        stream->hangup_flush_done = 1;
+       pthread_mutex_unlock(&stream->lock);
 }
 
 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
@@ -2607,6 +2611,7 @@ int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
                goto end;
        }
        stream->ust_metadata_pushed += write_len;
+       lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
 
        assert(stream->chan->metadata_cache->contents.size >=
                        stream->ust_metadata_pushed);
@@ -2656,7 +2661,7 @@ enum sync_metadata_status lttng_ustconsumer_sync_metadata(
         * Request metadata from the sessiond, but don't wait for the flush
         * because we locked the metadata thread.
         */
-       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+       ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
        pthread_mutex_lock(&metadata_stream->lock);
        if (ret < 0) {
                status = SYNC_METADATA_STATUS_ERROR;
@@ -3306,7 +3311,7 @@ void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
  * pushed out due to concurrent interaction with the session daemon.
  */
 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_channel *channel, int timer, int wait)
+               struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
 {
        struct lttcomm_metadata_request_msg request;
        struct lttcomm_consumer_msg msg;
@@ -3414,7 +3419,7 @@ int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
        health_code_update();
 
        ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
-                       key, offset, len, version, channel, timer, wait);
+                       key, offset, len, version, channel, invoked_by_timer, wait);
        if (ret >= 0) {
                /*
                 * Only send the status msg if the sessiond is alive meaning a positive
This page took 0.02837 seconds and 4 git commands to generate.