Fix: eliminate timestamp overlap between packets
[lttng-modules.git] / lib / ringbuffer / ring_buffer_frontend.c
index 92aa78f57ddf2ed95f42c16e23728d32f58291b6..5043105e465ffcac0633d930f8f91cb03b2272da 100644 (file)
@@ -1015,7 +1015,7 @@ retry:
         */
        if (((commit_count - chan->backend.subbuf_size)
             & chan->commit_count_mask)
-           - (buf_trunc(consumed_cur, chan)
+           - (buf_trunc(consumed, chan)
               >> chan->backend.num_subbuf_order)
            != 0)
                goto nodata;
@@ -1024,7 +1024,7 @@ retry:
         * Check that we are not about to read the same subbuffer in
         * which the writer head is.
         */
-       if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan)
+       if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan)
            == 0)
                goto nodata;
 
@@ -1249,7 +1249,7 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
        commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
-                                     commit_count, oldidx);
+                                     commit_count, oldidx, tsc);
        lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
                                             offsets->old, commit_count,
                                             config->cb.subbuffer_header_size());
@@ -1293,7 +1293,7 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
        v_add(config, padding_size, &buf->commit_hot[oldidx].cc);
        commit_count = v_read(config, &buf->commit_hot[oldidx].cc);
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
-                                     commit_count, oldidx);
+                                     commit_count, oldidx, tsc);
        lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
                                             offsets->old, commit_count,
                                             padding_size);
@@ -1336,7 +1336,7 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf,
        commit_count = v_read(config, &buf->commit_hot[beginidx].cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
-                                     commit_count, beginidx);
+                                     commit_count, beginidx, tsc);
        lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
                                             offsets->begin, commit_count,
                                             config->cb.subbuffer_header_size());
@@ -1397,7 +1397,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
                                    u64 *tsc)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
-       unsigned long off;
+       unsigned long off, reserve_commit_diff;
 
        offsets->begin = v_read(config, &buf->offset);
        offsets->old = offsets->begin;
@@ -1422,36 +1422,68 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
         * (records and header timestamps) are visible to the reader. This is
         * required for quiescence guarantees for the fusion merge.
         */
-       if (mode == SWITCH_FLUSH || off > 0) {
-               if (unlikely(off == 0)) {
-                        /*
-                        * A final flush that encounters an empty
-                        * sub-buffer cannot switch buffer if a
-                        * reader is located within this sub-buffer.
-                        * Anyway, the purpose of final flushing of a
-                        * sub-buffer at offset 0 is to handle the case
-                        * of entirely empty stream.
-                        */
-                       if (unlikely(subbuf_trunc(offsets->begin, chan)
-                                       - subbuf_trunc((unsigned long)
-                                               atomic_long_read(&buf->consumed), chan)
-                                       >= chan->backend.buf_size))
-                               return -1;
-                       /*
-                        * The client does not save any header information.
-                        * Don't switch empty subbuffer on finalize, because it
-                        * is invalid to deliver a completely empty subbuffer.
-                        */
-                       if (!config->cb.subbuffer_header_size())
+       if (mode != SWITCH_FLUSH && !off)
+               return -1;      /* we do not have to switch : buffer is empty */
+
+       if (unlikely(off == 0)) {
+               unsigned long sb_index, commit_count;
+
+               /*
+                * We are performing a SWITCH_FLUSH. At this stage, there are no
+                * concurrent writes into the buffer.
+                *
+                * The client does not save any header information.  Don't
+                * switch empty subbuffer on finalize, because it is invalid to
+                * deliver a completely empty subbuffer.
+                */
+               if (!config->cb.subbuffer_header_size())
+                       return -1;
+
+               /* Test new buffer integrity */
+               sb_index = subbuf_index(offsets->begin, chan);
+               commit_count = v_read(config,
+                               &buf->commit_cold[sb_index].cc_sb);
+               reserve_commit_diff =
+                 (buf_trunc(offsets->begin, chan)
+                  >> chan->backend.num_subbuf_order)
+                 - (commit_count & chan->commit_count_mask);
+               if (likely(reserve_commit_diff == 0)) {
+                       /* Next subbuffer not being written to. */
+                       if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
+                               subbuf_trunc(offsets->begin, chan)
+                                - subbuf_trunc((unsigned long)
+                                    atomic_long_read(&buf->consumed), chan)
+                               >= chan->backend.buf_size)) {
+                               /*
+                                * We do not overwrite non consumed buffers
+                                * and we are full : don't switch.
+                                */
                                return -1;
+                       } else {
+                               /*
+                                * Next subbuffer not being written to, and we
+                                * are either in overwrite mode or the buffer is
+                                * not full. It's safe to write in this new
+                                * subbuffer.
+                                */
+                       }
+               } else {
                        /*
-                        * Need to write the subbuffer start header on finalize.
+                        * Next subbuffer reserve offset does not match the
+                        * commit offset. Don't perform switch in
+                        * producer-consumer and overwrite mode.  Caused by
+                        * either a writer OOPS or too many nested writes over a
+                        * reserve/commit pair.
                         */
-                       offsets->switch_old_start = 1;
+                       return -1;
                }
-               offsets->begin = subbuf_align(offsets->begin, chan);
-       } else
-               return -1;      /* we do not have to switch : buffer is empty */
+
+               /*
+                * Need to write the subbuffer start header on finalize.
+                */
+               offsets->switch_old_start = 1;
+       }
+       offsets->begin = subbuf_align(offsets->begin, chan);
        /* Note: old points to the next subbuf at offset 0 */
        offsets->end = offsets->begin;
        return 0;
@@ -1572,9 +1604,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                     struct lib_ring_buffer_ctx *ctx)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
-       unsigned long reserve_commit_diff;
+       unsigned long reserve_commit_diff, offset_cmp;
 
-       offsets->begin = v_read(config, &buf->offset);
+retry:
+       offsets->begin = offset_cmp = v_read(config, &buf->offset);
        offsets->old = offsets->begin;
        offsets->switch_new_start = 0;
        offsets->switch_new_end = 0;
@@ -1606,7 +1639,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                }
        }
        if (unlikely(offsets->switch_new_start)) {
-               unsigned long sb_index;
+               unsigned long sb_index, commit_count;
 
                /*
                 * We are typically not filling the previous buffer completely.
@@ -1617,12 +1650,31 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                 + config->cb.subbuffer_header_size();
                /* Test new buffer integrity */
                sb_index = subbuf_index(offsets->begin, chan);
+               /*
+                * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
+                * lib_ring_buffer_check_deliver() has the matching
+                * memory barriers required around commit_cold cc_sb
+                * updates to ensure reserve and commit counter updates
+                * are not seen reordered when updated by another CPU.
+                */
+               smp_rmb();
+               commit_count = v_read(config,
+                               &buf->commit_cold[sb_index].cc_sb);
+               /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
+               smp_rmb();
+               if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
+                       /*
+                        * The reserve counter have been concurrently updated
+                        * while we read the commit counter. This means the
+                        * commit counter we read might not match buf->offset
+                        * due to concurrent update. We therefore need to retry.
+                        */
+                       goto retry;
+               }
                reserve_commit_diff =
                  (buf_trunc(offsets->begin, chan)
                   >> chan->backend.num_subbuf_order)
-                 - ((unsigned long) v_read(config,
-                                           &buf->commit_cold[sb_index].cc_sb)
-                    & chan->commit_count_mask);
+                 - (commit_count & chan->commit_count_mask);
                if (likely(reserve_commit_diff == 0)) {
                        /* Next subbuffer not being written to. */
                        if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
@@ -1647,9 +1699,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                } else {
                        /*
                         * Next subbuffer reserve offset does not match the
-                        * commit offset. Drop record in producer-consumer and
-                        * overwrite mode. Caused by either a writer OOPS or too
-                        * many nested writes over a reserve/commit pair.
+                        * commit offset, and this did not involve update to the
+                        * reserve counter. Drop record in producer-consumer and
+                        * overwrite mode.  Caused by either a writer OOPS or
+                        * too many nested writes over a reserve/commit pair.
                         */
                        v_inc(config, &buf->records_lost_wrap);
                        return -EIO;
This page took 0.025628 seconds and 4 git commands to generate.