Fix: lost packet accounting always lost on snapshot
[lttng-tools.git] / src / common / kernel-consumer / kernel-consumer.c
index 65dcce40b34f2b07e892c7f7a1298b9709bbd180..1079047f0137ce5b0d992e6b723998137288b288 100644 (file)
@@ -137,8 +137,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
        }
 
        cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
-               /* Are we at a position _before_ the first available packet ? */
-               bool before_first_packet = true;
                unsigned long consumed_pos, produced_pos;
 
                health_code_update();
@@ -182,11 +180,23 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                ERR("sending streams sent to relayd");
                                goto end_unlock;
                        }
+                       channel->streams_sent_to_relayd = true;
                }
 
-               ret = kernctl_buffer_flush(stream->wait_fd);
+               ret = kernctl_buffer_flush_empty(stream->wait_fd);
                if (ret < 0) {
-                       ERR("Failed to flush kernel stream");
+                       /*
+                        * Doing a buffer flush which does not take into
+                        * account empty packets. This is not perfect
+                        * for stream intersection, but required as a
+                        * fall-back when "flush_empty" is not
+                        * implemented by lttng-modules.
+                        */
+                       ret = kernctl_buffer_flush(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to flush kernel stream");
+                               goto end_unlock;
+                       }
                        goto end_unlock;
                }
 
@@ -224,7 +234,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                while (consumed_pos < produced_pos) {
                        ssize_t read_len;
                        unsigned long len, padded_len;
-                       int lost_packet = 0;
 
                        health_code_update();
 
@@ -238,15 +247,7 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                }
                                DBG("Kernel consumer get subbuf failed. Skipping it.");
                                consumed_pos += stream->max_sb_size;
-
-                               /*
-                                * Start accounting lost packets only when we
-                                * already have extracted packets (to match the
-                                * content of the final snapshot).
-                                */
-                               if (!before_first_packet) {
-                                       lost_packet = 1;
-                               }
+                               stream->chan->lost_packets++;
                                continue;
                        }
 
@@ -287,16 +288,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path,
                                goto end_unlock;
                        }
                        consumed_pos += stream->max_sb_size;
-
-                       /*
-                        * Only account lost packets located between
-                        * succesfully extracted packets (do not account before
-                        * and after since they are not visible in the
-                        * resulting snapshot).
-                        */
-                       stream->chan->lost_packets += lost_packet;
-                       lost_packet = 0;
-                       before_first_packet = false;
                }
 
                if (relayd_id == (uint64_t) -1ULL) {
@@ -716,6 +707,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                consumer_stream_free(new_stream);
                                goto end_nosignal;
                        }
+
+                       /*
+                        * If adding an extra stream to an already
+                        * existing channel (e.g. cpu hotplug), we need
+                        * to send the "streams_sent" command to relayd.
+                        */
+                       if (channel->streams_sent_to_relayd) {
+                               ret = consumer_send_relayd_streams_sent(
+                                               new_stream->net_seq_idx);
+                               if (ret < 0) {
+                                       goto end_nosignal;
+                               }
+                       }
                }
 
                /* Get the right pipe where the stream will be sent. */
@@ -809,6 +813,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        if (ret < 0) {
                                goto end_nosignal;
                        }
+                       channel->streams_sent_to_relayd = true;
                }
                break;
        }
@@ -1085,15 +1090,27 @@ static int get_index_values(struct ctf_packet_index *index, int infd)
 
        ret = kernctl_get_instance_id(infd, &index->stream_instance_id);
        if (ret < 0) {
-               PERROR("kernctl_get_instance_id");
-               goto error;
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       index->stream_instance_id = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_instance_id");
+                       goto error;
+               }
        }
        index->stream_instance_id = htobe64(index->stream_instance_id);
 
        ret = kernctl_get_sequence_number(infd, &index->packet_seq_num);
        if (ret < 0) {
-               PERROR("kernctl_get_sequence_number");
-               goto error;
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       index->packet_seq_num = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_sequence_number");
+                       goto error;
+               }
        }
        index->packet_seq_num = htobe64(index->packet_seq_num);
 
@@ -1145,8 +1162,14 @@ int update_stream_stats(struct lttng_consumer_stream *stream)
 
        ret = kernctl_get_sequence_number(stream->wait_fd, &seq);
        if (ret < 0) {
-               PERROR("kernctl_get_sequence_number");
-               goto end;
+               if (ret == -ENOTTY) {
+                       /* Command not implemented by lttng-modules. */
+                       seq = -1ULL;
+                       ret = 0;
+               } else {
+                       PERROR("kernctl_get_sequence_number");
+                       goto end;
+               }
        }
 
        /*
@@ -1205,6 +1228,14 @@ int metadata_stream_check_version(int infd, struct lttng_consumer_stream *stream
 
        ret = kernctl_get_metadata_version(infd, &cur_version);
        if (ret < 0) {
+               if (ret == -ENOTTY) {
+                       /*
+                        * LTTng-modules does not implement this
+                        * command.
+                        */
+                       ret = 0;
+                       goto end;
+               }
                ERR("Failed to get the metadata version");
                goto end;
        }
@@ -1289,12 +1320,34 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                }
                ret = update_stream_stats(stream);
                if (ret < 0) {
+                       err = kernctl_put_subbuf(infd);
+                       if (err != 0) {
+                               if (err == -EFAULT) {
+                                       PERROR("Error in unreserving sub buffer\n");
+                               } else if (err == -EIO) {
+                                       /* Should never happen with newer LTTng versions */
+                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+                               }
+                               ret = err;
+                               goto end;
+                       }
                        goto end;
                }
        } else {
                write_index = 0;
                ret = metadata_stream_check_version(infd, stream);
                if (ret < 0) {
+                       err = kernctl_put_subbuf(infd);
+                       if (err != 0) {
+                               if (err == -EFAULT) {
+                                       PERROR("Error in unreserving sub buffer\n");
+                               } else if (err == -EIO) {
+                                       /* Should never happen with newer LTTng versions */
+                                       PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+                               }
+                               ret = err;
+                               goto end;
+                       }
                        goto end;
                }
        }
@@ -1449,14 +1502,17 @@ int lttng_kconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->tracefile_size_current = 0;
 
                if (!stream->metadata_flag) {
-                       ret = index_create_file(stream->chan->pathname,
+                       struct lttng_index_file *index_file;
+
+                       index_file = lttng_index_file_create(stream->chan->pathname,
                                        stream->name, stream->uid, stream->gid,
                                        stream->chan->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
+                                       stream->tracefile_count_current,
+                                       CTF_INDEX_MAJOR, CTF_INDEX_MINOR);
+                       if (!index_file) {
                                goto error;
                        }
-                       stream->index_fd = ret;
+                       stream->index_file = index_file;
                }
        }
 
This page took 0.026506 seconds and 4 git commands to generate.