/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
*
#include <unistd.h>
#include <common/common.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer-timer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/consumer.h>
+#include <common/consumer/metadata-bucket.h>
+#include <common/consumer/metadata-bucket.h>
#include <common/index/index.h>
#include <common/kernel-consumer/kernel-consumer.h>
+#include <common/kernel-ctl/kernel-ctl.h>
+#include <common/macros.h>
#include <common/relayd/relayd.h>
#include <common/ust-consumer/ust-consumer.h>
#include <common/utils.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/metadata-bucket.h>
-#include <common/kernel-ctl/kernel-ctl.h>
#include "consumer-stream.h"
pthread_mutex_unlock(&stream->chan->lock);
}
+static void consumer_stream_data_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+ ASSERT_LOCKED(stream->lock);
+ ASSERT_LOCKED(stream->chan->lock);
+}
+
static void consumer_stream_metadata_lock_all(struct lttng_consumer_stream *stream)
{
consumer_stream_data_lock_all(stream);
consumer_stream_data_unlock_all(stream);
}
+static void consumer_stream_metadata_assert_locked_all(struct lttng_consumer_stream *stream)
+{
+ ASSERT_LOCKED(stream->metadata_rdv_lock);
+ consumer_stream_data_assert_locked_all(stream);
+}
+
/* Only used for data streams. */
static int consumer_stream_update_stats(struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuf)
assert(ctx);
/* Ease our life a bit. */
- ht = consumer_data.stream_list_ht;
+ ht = the_consumer_data.stream_list_ht;
rcu_read_lock();
const struct stream_subbuffer *subbuffer,
struct lttng_consumer_local_data *ctx)
{
+ bool missed_metadata_flush;
int ret;
/* Block until all the metadata is sent. */
pthread_mutex_lock(&stream->metadata_timer_lock);
stream->waiting_on_metadata = false;
- if (stream->missed_metadata_flush) {
+ missed_metadata_flush = stream->missed_metadata_flush;
+ if (missed_metadata_flush) {
stream->missed_metadata_flush = false;
- pthread_mutex_unlock(&stream->metadata_timer_lock);
- (void) stream->read_subbuffer_ops.send_live_beacon(stream);
- } else {
- pthread_mutex_unlock(&stream->metadata_timer_lock);
}
+ pthread_mutex_unlock(&stream->metadata_timer_lock);
if (ret < 0) {
goto end;
}
ret = consumer_stream_send_index(stream, subbuffer, ctx);
+ /*
+ * Send the live inactivity beacon to handle the situation where
+ * the live timer is prevented from sampling this stream
+ * because the stream lock was being held while this stream is
+ * waiting on metadata. This ensures live viewer progress in the
+ * unlikely scenario where a live timer would be prevented from
+ * locking a stream lock repeatedly due to a steady flow of
+ * incoming metadata, for a stream which is mostly inactive.
+ *
+ * It is important to send the inactivity beacon packet to
+ * relayd _after_ sending the index associated with the data
+ * that was just sent, otherwise this can cause live viewers to
+ * observe timestamps going backwards between an inactivity
+ * beacon and a following trace packet.
+ */
+ if (missed_metadata_flush) {
+ (void) stream->read_subbuffer_ops.send_live_beacon(stream);
+ }
end:
return ret;
}
goto end;
}
+ rcu_read_lock();
+
if (trace_chunk && !lttng_trace_chunk_get(trace_chunk)) {
ERR("Failed to acquire trace chunk reference during the creation of a stream");
ret = -1;
goto error;
}
- rcu_read_lock();
+ stream->send_node = CDS_LIST_HEAD_INIT(stream->send_node);
stream->chan = channel;
stream->key = stream_key;
stream->trace_chunk = trace_chunk;
consumer_stream_metadata_lock_all;
stream->read_subbuffer_ops.unlock =
consumer_stream_metadata_unlock_all;
+ stream->read_subbuffer_ops.assert_locked =
+ consumer_stream_metadata_assert_locked_all;
stream->read_subbuffer_ops.pre_consume_subbuffer =
metadata_stream_check_version;
} else {
stream->read_subbuffer_ops.lock = consumer_stream_data_lock_all;
stream->read_subbuffer_ops.unlock =
consumer_stream_data_unlock_all;
+ stream->read_subbuffer_ops.assert_locked =
+ consumer_stream_data_assert_locked_all;
stream->read_subbuffer_ops.pre_consume_subbuffer =
consumer_stream_update_stats;
}
assert(stream);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (stream->mmap_base != NULL) {
ret = munmap(stream->mmap_base, stream->mmap_len);
* that did not add the stream to a (all) hash table. Same goes for the
* next call ht del call.
*/
- (void) lttng_ht_del(consumer_data.stream_per_chan_id_ht, &iter);
+ (void) lttng_ht_del(the_consumer_data.stream_per_chan_id_ht, &iter);
/* Delete from the global stream list. */
iter.iter.node = &stream->node_session_id.node;
/* See the previous ht del on why we ignore the returned value. */
- (void) lttng_ht_del(consumer_data.stream_list_ht, &iter);
+ (void) lttng_ht_del(the_consumer_data.stream_list_ht, &iter);
rcu_read_unlock();
if (!stream->metadata_flag) {
/* Decrement the stream count of the global consumer data. */
- assert(consumer_data.stream_count > 0);
- consumer_data.stream_count--;
+ assert(the_consumer_data.stream_count > 0);
+ the_consumer_data.stream_count--;
}
}
{
assert(stream);
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
case LTTNG_CONSUMER32_UST:
{
assert(stream);
+ cds_list_del_init(&stream->send_node);
+
/* Stream is in monitor mode. */
if (stream->monitor) {
struct lttng_consumer_channel *free_chan = NULL;
* stream thus being globally visible.
*/
if (stream->globally_visible) {
- pthread_mutex_lock(&consumer_data.lock);
+ pthread_mutex_lock(&the_consumer_data.lock);
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->lock);
/* Remove every reference of the stream in the consumer. */
free_chan = unref_channel(stream);
/* Indicates that the consumer data state MUST be updated after this. */
- consumer_data.need_update = 1;
+ the_consumer_data.need_update = 1;
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
- pthread_mutex_unlock(&consumer_data.lock);
+ pthread_mutex_unlock(&the_consumer_data.lock);
} else {
/*
* If the stream is not visible globally, this needs to be done
* outside of the consumer data lock section.
*/
+ destroy_close_stream(stream);
free_chan = unref_channel(stream);
}
goto end;
}
stream->out_fd = -1;
- }
+ }
DBG("Opening stream output file \"%s\"", stream_path);
chunk_status = lttng_trace_chunk_open_file(stream->trace_chunk, stream_path,
flags, mode, &stream->out_fd, false);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to open stream file \"%s\"", stream->name);
ret = -1;
goto end;
- }
+ }
if (!stream->metadata_flag && (create_index || stream->index_file)) {
if (stream->index_file) {
{
int ret = 0;
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
if (producer_active) {
ret = kernctl_buffer_flush(stream->wait_fd);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
+ ret = lttng_ustconsumer_flush_buffer(stream, (int) producer_active);
break;
default:
ERR("Unknown consumer_data type");