consumer: explicitly set endpoint status to active at init
[lttng-tools.git] / src / common / consumer.c
index 560ec6cd06a24366daf5879fb779c4aa6a19ff4d..feeb9163bb89cf59e668571ef487ff81091e6fba 100644 (file)
@@ -292,6 +292,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
        DBG("Consumer delete channel key %" PRIu64, channel->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -321,6 +322,7 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
 
        call_rcu(&channel->node.head, free_channel_rcu);
 end:
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
@@ -594,6 +596,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->gid = gid;
        stream->net_seq_idx = relayd_id;
        stream->session_id = session_id;
+       stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        pthread_mutex_init(&stream->lock, NULL);
 
        /* If channel is the metadata, flag this stream as metadata. */
@@ -651,6 +654,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding consumer stream %" PRIu64, stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
        rcu_read_lock();
 
@@ -694,6 +698,7 @@ static int add_stream(struct lttng_consumer_stream *stream,
 
        rcu_read_unlock();
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        return ret;
@@ -879,6 +884,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->output = output;
        channel->tracefile_size = tracefile_size;
        channel->tracefile_count = tracefile_count;
+       pthread_mutex_init(&channel->lock, NULL);
 
        strncpy(channel->pathname, pathname, sizeof(channel->pathname));
        channel->pathname[sizeof(channel->pathname) - 1] = '\0';
@@ -911,6 +917,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
        struct lttng_ht_iter iter;
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&channel->lock);
        rcu_read_lock();
 
        lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
@@ -927,6 +934,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel,
 
 end:
        rcu_read_unlock();
+       pthread_mutex_unlock(&channel->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (!ret && channel->wait_fd != -1 &&
@@ -1886,6 +1894,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
        }
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        switch (consumer_data.type) {
@@ -1980,6 +1989,7 @@ end:
        stream->chan->metadata_stream = NULL;
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
 
        if (free_chan) {
@@ -2008,6 +2018,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
 
        pthread_mutex_lock(&consumer_data.lock);
+       pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
 
        /*
@@ -2059,6 +2070,7 @@ static int add_metadata_stream(struct lttng_consumer_stream *stream,
        rcu_read_unlock();
 
        pthread_mutex_unlock(&stream->lock);
+       pthread_mutex_unlock(&stream->chan->lock);
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
This page took 0.024676 seconds and 4 git commands to generate.