DBG("Consumer delete channel key %" PRIu64, channel->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
call_rcu(&channel->node.head, free_channel_rcu);
end:
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
}
DBG3("Adding consumer stream %" PRIu64, stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
rcu_read_lock();
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
channel->tracefile_size = tracefile_size;
channel->tracefile_count = tracefile_count;
channel->monitor = monitor;
+ pthread_mutex_init(&channel->lock, NULL);
/*
* In monitor mode, the streams associated with the channel will be put in
struct lttng_ht_iter iter;
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&channel->lock);
rcu_read_lock();
lttng_ht_lookup(consumer_data.channel_ht, &channel->key, &iter);
end:
rcu_read_unlock();
+ pthread_mutex_unlock(&channel->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (!ret && channel->wait_fd != -1 &&
}
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
switch (consumer_data.type) {
stream->chan->metadata_stream = NULL;
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
if (free_chan) {
DBG3("Adding metadata stream %" PRIu64 " to hash table", stream->key);
pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
/*
rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
+ pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
return lttcomm_send_unix_sock(sock, &msg, sizeof(msg));
}
+
+/*
+ * Using a maximum stream size with the produced and consumed position of a
+ * stream, computes the new consumed position to be as close as possible to the
+ * maximum possible stream size.
+ *
+ * If maximum stream size is lower than the possible buffer size (produced -
+ * consumed), the consumed_pos given is returned untouched else the new value
+ * is returned.
+ */
+unsigned long consumer_get_consumed_maxsize(unsigned long consumed_pos,
+ unsigned long produced_pos, uint64_t max_stream_size)
+{
+ if (max_stream_size && max_stream_size < (produced_pos - consumed_pos)) {
+ /* Offset from the produced position to get the latest buffers. */
+ return produced_pos - max_stream_size;
+ }
+
+ return consumed_pos;
+}