consumerd: move rotation logic to domain-agnostic read path
authorJérémie Galarneau <jeremie.galarneau@efficios.com>
Fri, 8 May 2020 20:00:11 +0000 (16:00 -0400)
committerJérémie Galarneau <jeremie.galarneau@efficios.com>
Tue, 26 May 2020 20:25:31 +0000 (16:25 -0400)
The "rotation ready" logic is duplicated in both user space and kernel
specializations of the read subbuffer functions.

It is moved to the domain-agnostic caller where it is needed only
once. This makes it easier to implement a follow-up fix and reduces
code duplication.

Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
Change-Id: Iae952a2cd52fa458cec956ae219492557e4adf79

src/common/consumer/consumer.c
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index e2e7438f62d29a942740f37dcd255c6e9242cbf7..f13e90a6881ac4ab87312c61a7e632004d97e81d 100644 (file)
@@ -3413,6 +3413,7 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        ssize_t ret;
+       int rotation_ret;
 
        pthread_mutex_lock(&stream->chan->lock);
        pthread_mutex_lock(&stream->lock);
@@ -3420,6 +3421,19 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                pthread_mutex_lock(&stream->metadata_rdv_lock);
        }
 
+       /*
+        * If the stream was flagged to be ready for rotation before we extract
+        * the next packet, rotate it now.
+        */
+       if (stream->rotate_ready) {
+               DBG("Rotate stream before consuming data");
+               ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (ret < 0) {
+                       ERR("Stream rotation error before consuming data");
+                       goto end;
+               }
+       }
+
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
                ret = lttng_kconsumer_read_subbuffer(stream, ctx);
@@ -3435,13 +3449,38 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                break;
        }
 
+       if (ret < 0) {
+               goto end;
+       }
+
+       /*
+        * After extracting the packet, we check if the stream is now ready to
+        * be rotated and perform the action immediately.
+        *
+        * Don't overwrite `ret` as callers expect the number of bytes
+        * consumed to be returned on success.
+        */
+       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
+       if (rotation_ret == 1) {
+               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+               if (rotation_ret < 0) {
+                       ret = rotation_ret;
+                       ERR("Stream rotation error after consuming data");
+                       goto end;
+               }
+       } else if (rotation_ret < 0) {
+               ret = rotation_ret;
+               ERR("Failed to check if stream was ready to rotate after consuming data");
+               goto end;
+       }
+
+end:
        if (stream->metadata_flag) {
                pthread_cond_broadcast(&stream->metadata_rdv);
                pthread_mutex_unlock(&stream->metadata_rdv_lock);
        }
        pthread_mutex_unlock(&stream->lock);
        pthread_mutex_unlock(&stream->chan->lock);
-
        return ret;
 }
 
index 7032a7f7ffcc9bea224db0c6059f1928de72aa47..4a2e4e07d7c0b8e5eac8e34927afad5f8762da4a 100644 (file)
@@ -1590,26 +1590,14 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret;
+       int err, write_index = 1;
        ssize_t ret = 0;
        int infd = stream->wait_fd;
        struct ctf_packet_index index = {};
+       bool in_error_state = false;
 
        DBG("In read_subbuffer (infd : %d)", infd);
 
-       /*
-        * If the stream was flagged to be ready for rotation before we extract the
-        * next packet, rotate it now.
-        */
-       if (stream->rotate_ready) {
-               DBG("Rotate stream before extracting data");
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-               if (rotation_ret < 0) {
-                       ERR("Stream rotation error");
-                       ret = -1;
-                       goto error;
-               }
-       }
 
        /* Get the next subbuffer */
        err = kernctl_get_next_subbuf(infd);
@@ -1795,11 +1783,13 @@ error_put_subbuf:
                }
                ret = err;
                goto error;
+       } else if (in_error_state) {
+               goto error;
        }
 
        /* Write index if needed. */
        if (!write_index) {
-               goto rotate;
+               goto end;
        }
 
        if (stream->chan->live_timer_interval && !stream->metadata_flag) {
@@ -1832,25 +1822,7 @@ error_put_subbuf:
                goto error;
        }
 
-rotate:
-       /*
-        * After extracting the packet, we check if the stream is now ready to be
-        * rotated and perform the action immediately.
-        */
-       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
-       if (rotation_ret == 1) {
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-               if (rotation_ret < 0) {
-                       ERR("Stream rotation error");
-                       ret = -1;
-                       goto error;
-               }
-       } else if (rotation_ret < 0) {
-               ERR("Checking if stream is ready to rotate");
-               ret = -1;
-               goto error;
-       }
-
+end:
 error:
        return ret;
 }
index e5143cd4a5f38afbc8e77189135c68fff25e4e96..2f09eac80763e2c20ae60b83e38e4593d5f84346 100644 (file)
@@ -2789,7 +2789,7 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                struct lttng_consumer_local_data *ctx)
 {
        unsigned long len, subbuf_size, padding;
-       int err, write_index = 1, rotation_ret;
+       int err, write_index = 1;
        long ret = 0;
        struct ustctl_consumer_stream *ustream;
        struct ctf_packet_index index;
@@ -2827,20 +2827,6 @@ int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
                }
        }
 
-       /*
-        * If the stream was flagged to be ready for rotation before we extract the
-        * next packet, rotate it now.
-        */
-       if (stream->rotate_ready) {
-               DBG("Rotate stream before extracting data");
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-               if (rotation_ret < 0) {
-                       ERR("Stream rotation error");
-                       ret = -1;
-                       goto error;
-               }
-       }
-
 retry:
        /* Get the next subbuffer */
        err = ustctl_get_next_subbuf(ustream);
@@ -2952,7 +2938,7 @@ error_put_subbuf:
 
        /* Write index if needed. */
        if (!write_index) {
-               goto rotate;
+               goto end;
        }
 
        if (stream->chan->live_timer_interval && !stream->metadata_flag) {
@@ -2987,24 +2973,7 @@ error_put_subbuf:
                goto error;
        }
 
-rotate:
-       /*
-        * After extracting the packet, we check if the stream is now ready to be
-        * rotated and perform the action immediately.
-        */
-       rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
-       if (rotation_ret == 1) {
-               rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
-               if (rotation_ret < 0) {
-                       ERR("Stream rotation error");
-                       ret = -1;
-                       goto error;
-               }
-       } else if (rotation_ret < 0) {
-               ERR("Checking if stream is ready to rotate");
-               ret = -1;
-               goto error;
-       }
+end:
 error:
        return ret;
 }
This page took 0.029423 seconds and 4 git commands to generate.