Fix: relayd vs consumerd compatibility
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 2ea5fa114223f101362e4d780d106b25b3812238..43a5d366348d98a1037ec4a9bef3024b0ba6a5e2 100644 (file)
@@ -118,7 +118,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                struct lttng_consumer_local_data *ctx)
 {
        int ret;
-       unsigned long consumed_pos, produced_pos;
        struct lttng_consumer_channel *channel;
        struct lttng_consumer_stream *stream;
 
@@ -141,6 +140,9 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        }
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               /* Are we at a position _before_ the first available packet ? */
+               bool before_first_packet = true;
+               unsigned long consumed_pos, produced_pos;
 
                health_code_update();
 
@@ -227,6 +229,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
+                       int lost_packet = 0;
 
                        health_code_update();
 
@@ -241,6 +244,15 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
+
+                               /*
+                                * Start accounting lost packets only when we
+                                * already have extracted packets (to match the
+                                * content of the final snapshot).
+                                */
+                               if (!before_first_packet) {
+                                       lost_packet = 1;
+                               }
                                continue;
                        }
 
@@ -284,6 +296,16 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                goto end_unlock;
                        }
                        consumed_pos += stream->max_sb_size;
+
+                       /*
+                        * Only account lost packets located between
+                        * succesfully extracted packets (do not account before
+                        * and after since they are not visible in the
+                        * resulting snapshot).
+                        */
+                       stream->chan->lost_packets += lost_packet;
+                       lost_packet = 0;
+                       before_first_packet = false;
                }
 
                if (relayd_id == (uint64_t) -1ULL) {
@@ -947,18 +969,18 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                uint64_t id = msg.u.discarded_events.session_id;
                uint64_t key = msg.u.discarded_events.channel_key;
 
+               DBG("Kernel consumer discarded events command for session id %"
+                               PRIu64 ", channel key %" PRIu64, id, key);
+
                channel = consumer_find_channel(key);
                if (!channel) {
                        ERR("Kernel consumer discarded events channel %"
                                        PRIu64 " not found", key);
-                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       ret = 0;
+               } else {
+                       ret = channel->discarded_events;
                }
 
-               DBG("Kernel consumer discarded events command for session id %"
-                               PRIu64 ", channel key %" PRIu64, id, key);
-
-               ret = channel->discarded_events;
-
                health_code_update();
 
                /* Send back returned value to session daemon */
@@ -977,18 +999,18 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                uint64_t id = msg.u.lost_packets.session_id;
                uint64_t key = msg.u.lost_packets.channel_key;
 
+               DBG("Kernel consumer lost packets command for session id %"
+                               PRIu64 ", channel key %" PRIu64, id, key);
+
                channel = consumer_find_channel(key);
                if (!channel) {
                        ERR("Kernel consumer lost packets channel %"
                                        PRIu64 " not found", key);
-                       ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+                       ret = 0;
+               } else {
+                       ret = channel->lost_packets;
                }
 
-               DBG("Kernel consumer lost packets command for session id %"
-                               PRIu64 ", channel key %" PRIu64, id, key);
-
-               ret = channel->lost_packets;
-
                health_code_update();
 
                /* Send back returned value to session daemon */
@@ -1071,6 +1093,32 @@ static int get_index_values(struct ctf_packet_index *index, int infd)
        }
        index->stream_id = htobe64(index->stream_id);
 
+       ret = kernctl_get_instance_id(infd, &index->stream_instance_id);
+       if (ret < 0) {
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       index->stream_instance_id = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_instance_id");
+                       goto error;
+               }
+       }
+       index->stream_instance_id = htobe64(index->stream_instance_id);
+
+       ret = kernctl_get_sequence_number(infd, &index->packet_seq_num);
+       if (ret < 0) {
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       index->packet_seq_num = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_sequence_number");
+                       goto error;
+               }
+       }
+       index->packet_seq_num = htobe64(index->packet_seq_num);
+
 error:
        return ret;
 }
@@ -1119,8 +1167,14 @@ int update_stream_stats(struct lttng_consumer_stream *stream)
 
        ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
        if (ret < 0) {
-               PERROR("kernctl_get_sequence_number");
-               goto end;
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       seq = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_sequence_number");
+                       goto end;
+               }
        }
 
        /*
@@ -1150,8 +1204,8 @@ int update_stream_stats(struct lttng_consumer_stream *stream)
        }
        if (discarded < stream->last_discarded_events) {
                /*
-                * Overflow has occured. We assume only one wrap-around
-                * has occured.
+                * Overflow has occurred. We assume only one wrap-around
+                * has occurred.
                 */
                stream->chan->discarded_events += (1ULL << (CAA_BITS_PER_LONG - 1)) -
                        stream->last_discarded_events + discarded;
@@ -1166,6 +1220,45 @@ end:
        return ret;
 }
 
+/*
+ * Check if the local version of the metadata stream matches with the version
+ * of the metadata stream in the kernel. If it was updated, set the reset flag
+ * on the stream.
+ */
+static
+int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream)
+{
+       int ret;
+       uint64_t cur_version;
+
+       ret = kernctl_get_metadata_version(infd, &cur_version);
+       if (ret < 0) {
+               if (ret == -ENOTTY) {
+                       /*
+                        * LTTng-modules does not implement this
+                        * command.
+                        */
+                       ret = 0;
+                       goto end;
+               }
+               ERR("Failed to get the metadata version");
+               goto end;
+       }
+
+       if (stream->metadata_version == cur_version) {
+               ret = 0;
+               goto end;
+       }
+
+       DBG("New metadata version detected");
+       stream->metadata_version = cur_version;
+       stream->reset_metadata_flag = 1;
+       ret = 0;
+
+end:
+       return ret;
+}
+
 /*
  * Consume data on a file descriptor and write it on a trace file.
  */
@@ -1236,6 +1329,10 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                }
        } else {
                write_index = 0;
+               ret = metadata_stream_check_version(infd, stream);
+               if (ret < 0) {
+                       goto end;
+               }
        }
 
        switch (stream->chan->output) {
@@ -1388,14 +1485,17 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->tracefile_size_current = 0;
 
                if (!stream->metadata_flag) {
-                       ret = index_create_file(stream->chan->pathname,
+                       struct lttng_index_file *index_file;
+
+                       index_file = lttng_index_file_create(stream->chan->pathname,
                                        stream->name, stream->uid, stream->gid,
                                        stream->chan->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
+                                       stream->tracefile_count_current,
+                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                       if (!index_file) {
                                goto error;
                        }
-                       stream->index_fd = ret;
+                       stream->index_file = index_file;
                }
        }
 
This page took 0.025691 seconds and 4 git commands to generate.