consumerd: move address computation from on_read_subbuffer_mmap
[lttng-tools.git] / src / common / consumer / consumer.c
index 1ea682f028e76ebe546b3dc892981bc10d2009a5..2eccee5cb9295fd84001a7f7cd39c009b423e92c 100644 (file)
@@ -603,6 +603,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
        stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
        stream->index_file = NULL;
        stream->last_sequence_number = -1ULL;
+       stream->rotate_position = -1ULL;
        pthread_mutex_init(&stream->lock, NULL);
        pthread_mutex_init(&stream->metadata_timer_lock, NULL);
 
@@ -1670,12 +1671,12 @@ end:
  */
 ssize_t lttng_consumer_on_read_subbuffer_mmap(
                struct lttng_consumer_local_data *ctx,
-               struct lttng_consumer_stream *stream, unsigned long len,
+               struct lttng_consumer_stream *stream,
+               const char *buffer,
+               unsigned long len,
                unsigned long padding,
                struct ctf_packet_index *index)
 {
-       unsigned long mmap_offset;
-       void *mmap_base;
        ssize_t ret = 0;
        off_t orig_offset = stream->out_fd_offset;
        /* Default is on the disk */
@@ -1697,36 +1698,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
                }
        }
 
-       /* get the offset inside the fd to mmap */
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               mmap_base = stream->mmap_base;
-               ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset);
-               if (ret < 0) {
-                       PERROR("tracer ctl get_mmap_read_offset");
-                       goto end;
-               }
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               mmap_base = lttng_ustctl_get_mmap_base(stream);
-               if (!mmap_base) {
-                       ERR("read mmap get mmap base for stream %s", stream->name);
-                       ret = -EPERM;
-                       goto end;
-               }
-               ret = lttng_ustctl_get_mmap_read_offset(stream, &mmap_offset);
-               if (ret != 0) {
-                       PERROR("tracer ctl get_mmap_read_offset");
-                       ret = -EINVAL;
-                       goto end;
-               }
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-       }
-
        /* Handle stream on the relayd if the output is on the network */
        if (relayd) {
                unsigned long netlen = len;
@@ -1803,7 +1774,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(
         * This call guarantee that len or less is returned. It's impossible to
         * receive a ret value that is bigger than len.
         */
-       ret = lttng_write(outfd, mmap_base + mmap_offset, len);
+       ret = lttng_write(outfd, buffer, len);
        DBG("Consumer mmap write() ret %zd (len %lu)", ret, len);
        if (ret < 0 || ((size_t) ret != len)) {
                /*
@@ -3943,15 +3914,23 @@ int consumer_flush_buffer(struct lttng_consumer_stream *stream, int producer_act
 
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
-               ret = kernctl_buffer_flush(stream->wait_fd);
-               if (ret < 0) {
-                       ERR("Failed to flush kernel stream");
-                       goto end;
+               if (producer_active) {
+                       ret = kernctl_buffer_flush(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to flush kernel stream");
+                               goto end;
+                       }
+               } else {
+                       ret = kernctl_buffer_flush_empty(stream->wait_fd);
+                       if (ret < 0) {
+                               ERR("Failed to flush kernel stream");
+                               goto end;
+                       }
                }
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               lttng_ustctl_flush_buffer(stream, producer_active);
+               lttng_ustconsumer_flush_buffer(stream, producer_active);
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -4006,7 +3985,7 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        ht->hash_fct(&channel->key, lttng_ht_seed),
                        ht->match_fct, &channel->key, &iter.iter,
                        stream, node_channel_id.node) {
-               unsigned long consumed_pos;
+               unsigned long produced_pos = 0, consumed_pos = 0;
 
                health_code_update();
 
@@ -4019,65 +3998,87 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                        rotating_to_new_chunk = false;
                }
 
-               ret = lttng_consumer_sample_snapshot_positions(stream);
-               if (ret < 0) {
-                       ERR("Failed to sample snapshot position during channel rotation");
-                       goto end_unlock_stream;
+               /*
+                * Do not flush an empty packet when rotating from a NULL trace
+                * chunk. The stream has no means to output data, and the prior
+                * rotation which rotated to NULL performed that side-effect already.
+                */
+               if (stream->trace_chunk) {
+                       /*
+                        * For metadata stream, do an active flush, which does not
+                        * produce empty packets. For data streams, empty-flush;
+                        * ensures we have at least one packet in each stream per trace
+                        * chunk, even if no data was produced.
+                        */
+                       ret = consumer_flush_buffer(stream, stream->metadata_flag ? 1 : 0);
+                       if (ret < 0) {
+                               ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+                                               stream->key);
+                               goto end_unlock_stream;
+                       }
                }
 
-               ret = lttng_consumer_get_produced_snapshot(stream,
-                               &stream->rotate_position);
-               if (ret < 0) {
-                       ERR("Failed to sample produced position during channel rotation");
+               ret = lttng_consumer_take_snapshot(stream);
+               if (ret < 0 && ret != -ENODATA && ret != -EAGAIN) {
+                       ERR("Failed to sample snapshot position during channel rotation");
                        goto end_unlock_stream;
                }
+               if (!ret) {
+                       ret = lttng_consumer_get_produced_snapshot(stream,
+                                       &produced_pos);
+                       if (ret < 0) {
+                               ERR("Failed to sample produced position during channel rotation");
+                               goto end_unlock_stream;
+                       }
 
-               lttng_consumer_get_consumed_snapshot(stream,
-                               &consumed_pos);
-               if (consumed_pos == stream->rotate_position) {
+                       ret = lttng_consumer_get_consumed_snapshot(stream,
+                                       &consumed_pos);
+                       if (ret < 0) {
+                               ERR("Failed to sample consumed position during channel rotation");
+                               goto end_unlock_stream;
+                       }
+               }
+               /*
+                * Align produced position on the start-of-packet boundary of the first
+                * packet going into the next trace chunk.
+                */
+               produced_pos = ALIGN_FLOOR(produced_pos, stream->max_sb_size);
+               if (consumed_pos == produced_pos) {
                        stream->rotate_ready = true;
                }
-
                /*
-                * Active flush; has no effect if the production position
-                * is at a packet boundary.
+                * The rotation position is based on the packet_seq_num of the
+                * packet following the last packet that was consumed for this
+                * stream, incremented by the offset between produced and
+                * consumed positions. This rotation position is a lower bound
+                * (inclusive) at which the next trace chunk starts. Since it
+                * is a lower bound, it is OK if the packet_seq_num does not
+                * correspond exactly to the same packet identified by the
+                * consumed_pos, which can happen in overwrite mode.
                 */
-               ret = consumer_flush_buffer(stream, 1);
-               if (ret < 0) {
-                       ERR("Failed to flush stream %" PRIu64 " during channel rotation",
+               if (stream->sequence_number_unavailable) {
+                       /*
+                        * Rotation should never be performed on a session which
+                        * interacts with a pre-2.8 lttng-modules, which does
+                        * not implement packet sequence number.
+                        */
+                       ERR("Failure to rotate stream %" PRIu64 ": sequence number unavailable",
                                        stream->key);
+                       ret = -1;
                        goto end_unlock_stream;
                }
+               stream->rotate_position = stream->last_sequence_number + 1 +
+                               ((produced_pos - consumed_pos) / stream->max_sb_size);
 
                if (!is_local_trace) {
                        /*
                         * The relay daemon control protocol expects a rotation
                         * position as "the sequence number of the first packet
-                        * _after_ the current trace chunk.
-                        *
-                        * At the moment when the positions of the buffers are
-                        * sampled, the production position does not necessarily
-                        * sit at a packet boundary. The 'active' flush
-                        * operation above will push the production position to
-                        * the next packet boundary _if_ it is not already
-                        * sitting at such a boundary.
-                        *
-                        * Assuming a current production position that is not
-                        * on the bound of a packet, the 'target' sequence
-                        * number is
-                        *   (consumed_pos / subbuffer_size) + 1
-                        * Note the '+ 1' to ensure the current packet is
-                        * part of the current trace chunk.
-                        *
-                        * However, if the production position is already at
-                        * a packet boundary, the '+ 1' is not necessary as the
-                        * last packet of the current chunk is already
-                        * 'complete'.
+                        * _after_ the current trace chunk".
                         */
                        const struct relayd_stream_rotation_position position = {
                                .stream_id = stream->relayd_stream_id,
-                               .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) +
-                                       !!(stream->rotate_position % stream->max_sb_size),
+                               .rotate_at_seq_num = stream->rotate_position,
                        };
 
                        ret = lttng_dynamic_array_add_element(
@@ -4140,44 +4141,39 @@ end:
  */
 int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream)
 {
-       int ret;
-       unsigned long consumed_pos;
-
-       if (!stream->rotate_position && !stream->rotate_ready) {
-               ret = 0;
-               goto end;
-       }
-
        if (stream->rotate_ready) {
-               ret = 1;
-               goto end;
+               return 1;
        }
 
        /*
-        * If we don't have the rotate_ready flag, check the consumed position
-        * to determine if we need to rotate.
+        * If packet seq num is unavailable, it means we are interacting
+        * with a pre-2.8 lttng-modules which does not implement the
+        * sequence number. Rotation should never be used by sessiond in this
+        * scenario.
         */
-       ret = lttng_consumer_sample_snapshot_positions(stream);
-       if (ret < 0) {
-               ERR("Taking snapshot positions");
-               goto end;
+       if (stream->sequence_number_unavailable) {
+               ERR("Internal error: rotation used on stream %" PRIu64
+                               " with unavailable sequence number",
+                               stream->key);
+               return -1;
        }
 
-       ret = lttng_consumer_get_consumed_snapshot(stream, &consumed_pos);
-       if (ret < 0) {
-               ERR("Consumed snapshot position");
-               goto end;
+       if (stream->rotate_position == -1ULL ||
+                       stream->last_sequence_number == -1ULL) {
+               return 0;
        }
 
-       /* Rotate position not reached yet (with check for overflow). */
-       if ((long) (consumed_pos - stream->rotate_position) < 0) {
-               ret = 0;
-               goto end;
+       /*
+        * Rotate position not reached yet. The stream rotate position is
+        * the position of the next packet belonging to the next trace chunk,
+        * but consumerd considers rotation ready when reaching the last
+        * packet of the current chunk, hence the "rotate_position - 1".
+        */
+       if (stream->last_sequence_number >= stream->rotate_position - 1) {
+               return 1;
        }
-       ret = 1;
 
-end:
-       return ret;
+       return 0;
 }
 
 /*
@@ -4185,7 +4181,7 @@ end:
  */
 void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream)
 {
-       stream->rotate_position = 0;
+       stream->rotate_position = -1ULL;
        stream->rotate_ready = false;
 }
 
This page took 0.042293 seconds and 4 git commands to generate.