Reviewed-by: Julien Desfossez <julien.desfossez@efficios.com>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
cache = channel->metadata_cache;
pthread_mutex_lock(&consumer_data.lock);
cache = channel->metadata_cache;
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
if (cache->rb_pushed >= offset) {
pthread_mutex_lock(&channel->metadata_cache->lock);
if (cache->rb_pushed >= offset) {
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
pthread_mutex_unlock(&consumer_data.lock);
return ret;
*/
if (stream->globally_visible) {
pthread_mutex_lock(&consumer_data.lock);
*/
if (stream->globally_visible) {
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
consumer_stream_delete(stream, ht);
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
consumer_stream_delete(stream, ht);
consumer_data.need_update = 1;
pthread_mutex_unlock(&stream->lock);
consumer_data.need_update = 1;
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
} else {
/*
pthread_mutex_unlock(&consumer_data.lock);
} else {
/*
DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
call_rcu(&channel->node.head, free_channel_rcu);
end:
call_rcu(&channel->node.head, free_channel_rcu);
end:
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
}
pthread_mutex_unlock(&consumer_data.lock);
}
DBG3("Adding consumer stream %" PRIu64, stream->key);
pthread_mutex_lock(&consumer_data.lock);
DBG3("Adding consumer stream %" PRIu64, stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
rcu_read_lock();
pthread_mutex_lock(&stream->lock);
rcu_read_lock();
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
pthread_mutex_unlock(&consumer_data.lock);
return ret;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
+ pthread_mutex_init(&channel->lock, NULL);
/*
* In monitor mode, the streams associated with the channel will be put in
/*
* In monitor mode, the streams associated with the channel will be put in
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (!ret && channel->wait_fd != -1 &&
pthread_mutex_unlock(&consumer_data.lock);
if (!ret && channel->wait_fd != -1 &&
}
pthread_mutex_lock(&consumer_data.lock);
}
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
stream->chan->metadata_stream = NULL;
pthread_mutex_unlock(&stream->lock);
stream->chan->metadata_stream = NULL;
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
/*
pthread_mutex_lock(&stream->lock);
/*
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
* monitor list of the channel.
*/
unsigned int monitor;
* monitor list of the channel.
*/
unsigned int monitor;
+
+ /*
+ * Channel lock.
+ *
+ * This is nested INSIDE the consumer data lock.
+ * This is nested OUTSIDE the metadata cache lock.
+ * This is nested OUTSIDE stream lock.
+ * This is nested OUTSIDE consumer_relayd_sock_pair lock.
+ */
+ pthread_mutex_t lock;
*
* This is nested INSIDE the consumer_data lock.
* This is nested INSIDE the metadata cache lock.
*
* This is nested INSIDE the consumer_data lock.
* This is nested INSIDE the metadata cache lock.
+ * This is nested INSIDE the channel lock.
* This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
* This is nested OUTSIDE consumer_relayd_sock_pair lock.
*/
pthread_mutex_t lock;
}
pthread_mutex_lock(&consumer_data.lock);
}
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
if (cds_lfht_is_node_deleted(&channel->node.node)) {
goto error_unlock;
if (cds_lfht_is_node_deleted(&channel->node.node)) {
goto error_unlock;
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
error:
return ret;
pthread_mutex_unlock(&consumer_data.lock);
error:
return ret;
* and ultimately try to get rid of this global consumer data lock.
*/
pthread_mutex_lock(&consumer_data.lock);
* and ultimately try to get rid of this global consumer data lock.
*/
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
* waiting for the metadata cache to be flushed.
*/
pthread_mutex_unlock(&channel->metadata_cache->lock);
* waiting for the metadata cache to be flushed.
*/
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
goto end_free;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
pthread_mutex_unlock(&consumer_data.lock);
goto end_free;
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
while (consumer_metadata_cache_flushed(channel, offset + len)) {
pthread_mutex_unlock(&consumer_data.lock);
while (consumer_metadata_cache_flushed(channel, offset + len)) {