#include <common/utils.h>
#include <common/consumer/consumer.h>
#include <common/consumer/consumer-timer.h>
+#include <common/consumer/metadata-bucket.h>
#include "consumer-stream.h"
{
int ret = 0;
uint64_t sequence_number;
- const uint64_t discarded_events =
- LTTNG_OPTIONAL_GET(subbuf->info.data.sequence_number);
+ const uint64_t discarded_events = subbuf->info.data.events_discarded;
if (!subbuf->info.data.sequence_number.is_set) {
/* Command not supported by the tracer. */
const unsigned long padding_size =
subbuffer->info.data.padded_subbuf_size -
subbuffer->info.data.subbuf_size;
+ const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_mmap(
+ stream, &subbuffer->buffer.buffer, padding_size);
- return lttng_consumer_on_read_subbuffer_mmap(
- ctx, stream, &subbuffer->buffer.buffer, padding_size);
+ if (stream->net_seq_idx == -1ULL) {
+ /*
+ * When writing on disk, check that only the subbuffer (no
+ * padding) was written to disk.
+ */
+ if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+ DBG("Failed to write the entire padded subbuffer on disk (written_bytes: %zd, padded subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.padded_subbuf_size);
+ }
+ } else {
+ /*
+ * When streaming over the network, check that the entire
+ * subbuffer including padding was successfully written.
+ */
+ if (written_bytes != subbuffer->info.data.subbuf_size) {
+ DBG("Failed to write only the subbuffer over the network (written_bytes: %zd, subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.subbuf_size);
+ }
+ }
+
+ /*
+ * If `lttng_consumer_on_read_subbuffer_mmap()` returned an error, pass
+ * it along to the caller, else return zero.
+ */
+ if (written_bytes < 0) {
+ ERR("Error reading mmap subbuffer: %zd", written_bytes);
+ }
+
+ return written_bytes;
}
static ssize_t consumer_stream_consume_splice(
struct lttng_consumer_stream *stream,
const struct stream_subbuffer *subbuffer)
{
- return lttng_consumer_on_read_subbuffer_splice(ctx, stream,
- subbuffer->info.data.padded_subbuf_size, 0);
+ const ssize_t written_bytes = lttng_consumer_on_read_subbuffer_splice(
+ ctx, stream, subbuffer->info.data.padded_subbuf_size, 0);
+
+ if (written_bytes != subbuffer->info.data.padded_subbuf_size) {
+ DBG("Failed to write the entire padded subbuffer (written_bytes: %zd, padded subbuffer size %lu)",
+ written_bytes,
+ subbuffer->info.data.padded_subbuf_size);
+ }
+
+ /*
+ * If `lttng_consumer_on_read_subbuffer_splice()` returned an error,
+ * pass it along to the caller, else return zero.
+ */
+ if (written_bytes < 0) {
+ ERR("Error reading splice subbuffer: %zd", written_bytes);
+ }
+
+ return written_bytes;
}
static int consumer_stream_send_index(
struct lttng_consumer_local_data *ctx)
{
int ret;
+ enum sync_metadata_status status;
assert(metadata);
assert(metadata->metadata_flag);
/*
* Empty the metadata cache and flush the current stream.
*/
- ret = lttng_kconsumer_sync_metadata(metadata);
+ status = lttng_kconsumer_sync_metadata(metadata);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_sync_metadata(ctx, metadata);
+ status = lttng_ustconsumer_sync_metadata(ctx, metadata);
break;
default:
- assert(0);
- ret = -1;
- break;
+ abort();
}
- /*
- * Error or no new metadata, we exit here.
- */
- if (ret <= 0 || ret == ENODATA) {
+
+ switch (status) {
+ case SYNC_METADATA_STATUS_NEW_DATA:
+ break;
+ case SYNC_METADATA_STATUS_NO_DATA:
+ ret = 0;
goto end_unlock_mutex;
+ case SYNC_METADATA_STATUS_ERROR:
+ ret = -1;
+ goto end_unlock_mutex;
+ default:
+ abort();
}
/*
*/
pthread_cond_wait(&metadata->metadata_rdv, &metadata->metadata_rdv_lock);
pthread_mutex_unlock(&metadata->metadata_rdv_lock);
- } while (ret == EAGAIN);
+ } while (status == SYNC_METADATA_STATUS_NEW_DATA);
/* Success */
return 0;
}
DBG("New metadata version detected");
- stream->metadata_version = subbuffer->info.metadata.version;
- stream->reset_metadata_flag = 1;
+ consumer_stream_metadata_set_version(stream,
+ subbuffer->info.metadata.version);
if (stream->read_subbuffer_ops.reset_metadata) {
stream->read_subbuffer_ops.reset_metadata(stream);
stream->index_file = NULL;
stream->last_sequence_number = -1ULL;
stream->rotate_position = -1ULL;
+ /* Buffer is created with an open packet. */
+ stream->opened_packet_in_current_trace_chunk = true;
pthread_mutex_init(&stream->lock, NULL);
pthread_mutex_init(&stream->metadata_timer_lock, NULL);
{
assert(stream);
+ metadata_bucket_destroy(stream->metadata_bucket);
call_rcu(&stream->node.head, free_stream_rcu);
}
assert(stream);
return cds_lfht_is_node_deleted(&stream->node.node);
}
+
+static ssize_t metadata_bucket_flush(
+ const struct stream_subbuffer *buffer, void *data)
+{
+ ssize_t ret;
+ struct lttng_consumer_stream *stream = data;
+
+ ret = consumer_stream_consume_mmap(NULL, stream, buffer);
+ if (ret < 0) {
+ goto end;
+ }
+end:
+ return ret;
+}
+
+static ssize_t metadata_bucket_consume(
+ struct lttng_consumer_local_data *unused,
+ struct lttng_consumer_stream *stream,
+ const struct stream_subbuffer *subbuffer)
+{
+ ssize_t ret;
+ enum metadata_bucket_status status;
+
+ status = metadata_bucket_fill(stream->metadata_bucket, subbuffer);
+ switch (status) {
+ case METADATA_BUCKET_STATUS_OK:
+ /* Return consumed size. */
+ ret = subbuffer->buffer.buffer.size;
+ break;
+ default:
+ ret = -1;
+ }
+
+ return ret;
+}
+
+int consumer_stream_enable_metadata_bucketization(
+ struct lttng_consumer_stream *stream)
+{
+ int ret = 0;
+
+ assert(stream->metadata_flag);
+ assert(!stream->metadata_bucket);
+ assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+ stream->metadata_bucket = metadata_bucket_create(
+ metadata_bucket_flush, stream);
+ if (!stream->metadata_bucket) {
+ ret = -1;
+ goto end;
+ }
+
+ stream->read_subbuffer_ops.consume_subbuffer = metadata_bucket_consume;
+end:
+ return ret;
+}
+
+void consumer_stream_metadata_set_version(
+ struct lttng_consumer_stream *stream, uint64_t new_version)
+{
+ assert(new_version > stream->metadata_version);
+ stream->metadata_version = new_version;
+ stream->reset_metadata_flag = 1;
+
+ if (stream->metadata_bucket) {
+ metadata_bucket_reset(stream->metadata_bucket);
+ }
+}