Fix: ring buffer: handle concurrent update in nested buffer wrap around check
[lttng-modules.git] / lib / ringbuffer / ring_buffer_frontend.c
index 5ea140eecde59b8f2f569ad2790e52a2994939ca..38dbb937462d3140365b6cb22599d9de24a083c6 100644 (file)
@@ -1,7 +1,22 @@
 /*
  * ring_buffer_frontend.c
  *
- * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; only
+ * version 2.1 of the License.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
  *
  * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
  * recorder (overwrite) modes. See thesis:
@@ -34,8 +49,6 @@
  *   - splice one subbuffer worth of data to a pipe
  *   - splice the data from pipe to disk/network
  *   - put_subbuf
- *
- * Dual LGPL v2.1/GPL v2 license.
  */
 
 #include <linux/delay.h>
@@ -389,7 +402,7 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf)
  *     Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
  */
 static
-int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
+int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
                                              unsigned long action,
                                              void *hcpu)
 {
@@ -1411,6 +1424,19 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
         */
        if (mode == SWITCH_FLUSH || off > 0) {
                if (unlikely(off == 0)) {
+                        /*
+                        * A final flush that encounters an empty
+                        * sub-buffer cannot switch buffer if a
+                        * reader is located within this sub-buffer.
+                        * Anyway, the purpose of final flushing of a
+                        * sub-buffer at offset 0 is to handle the case
+                        * of entirely empty stream.
+                        */
+                       if (unlikely(subbuf_trunc(offsets->begin, chan)
+                                       - subbuf_trunc((unsigned long)
+                                               atomic_long_read(&buf->consumed), chan)
+                                       >= chan->backend.buf_size))
+                               return -1;
                        /*
                         * The client does not save any header information.
                         * Don't switch empty subbuffer on finalize, because it
@@ -1490,6 +1516,48 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m
 }
 EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
 
+static void remote_switch(void *info)
+{
+       struct lib_ring_buffer *buf = info;
+
+       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+}
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+       struct channel *chan = buf->backend.chan;
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+       int ret;
+
+       /*
+        * With global synchronization we don't need to use the IPI scheme.
+        */
+       if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
+               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+               return;
+       }
+
+       /*
+        * Taking lock on CPU hotplug to ensure two things: first, that the
+        * target cpu is not taken concurrently offline while we are within
+        * smp_call_function_single() (I don't trust that get_cpu() on the
+        * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be
+        * confirmed)). Secondly, if it happens that the CPU is not online, our
+        * own call to lib_ring_buffer_switch_slow() needs to be protected from
+        * CPU hotplug handlers, which can also perform a remote subbuffer
+        * switch.
+        */
+       get_online_cpus();
+       ret = smp_call_function_single(buf->backend.cpu,
+                                remote_switch, buf, 1);
+       if (ret) {
+               /* Remote CPU is offline, do it ourself. */
+               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+       }
+       put_online_cpus();
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
+
 /*
  * Returns :
  * 0 if ok
@@ -1504,9 +1572,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                     struct lib_ring_buffer_ctx *ctx)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
-       unsigned long reserve_commit_diff;
+       unsigned long reserve_commit_diff, offset_cmp;
 
-       offsets->begin = v_read(config, &buf->offset);
+retry:
+       offsets->begin = offset_cmp = v_read(config, &buf->offset);
        offsets->old = offsets->begin;
        offsets->switch_new_start = 0;
        offsets->switch_new_end = 0;
@@ -1538,7 +1607,7 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                }
        }
        if (unlikely(offsets->switch_new_start)) {
-               unsigned long sb_index;
+               unsigned long sb_index, commit_count;
 
                /*
                 * We are typically not filling the previous buffer completely.
@@ -1549,12 +1618,31 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                                 + config->cb.subbuffer_header_size();
                /* Test new buffer integrity */
                sb_index = subbuf_index(offsets->begin, chan);
+               /*
+                * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
+                * lib_ring_buffer_check_deliver() has the matching
+                * memory barriers required around commit_cold cc_sb
+                * updates to ensure reserve and commit counter updates
+                * are not seen reordered when updated by another CPU.
+                */
+               smp_rmb();
+               commit_count = v_read(config,
+                               &buf->commit_cold[sb_index].cc_sb);
+               /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
+               smp_rmb();
+               if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
+                       /*
+                        * The reserve counter have been concurrently updated
+                        * while we read the commit counter. This means the
+                        * commit counter we read might not match buf->offset
+                        * due to concurrent update. We therefore need to retry.
+                        */
+                       goto retry;
+               }
                reserve_commit_diff =
                  (buf_trunc(offsets->begin, chan)
                   >> chan->backend.num_subbuf_order)
-                 - ((unsigned long) v_read(config,
-                                           &buf->commit_cold[sb_index].cc_sb)
-                    & chan->commit_count_mask);
+                 - (commit_count & chan->commit_count_mask);
                if (likely(reserve_commit_diff == 0)) {
                        /* Next subbuffer not being written to. */
                        if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
@@ -1579,9 +1667,10 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
                } else {
                        /*
                         * Next subbuffer reserve offset does not match the
-                        * commit offset. Drop record in producer-consumer and
-                        * overwrite mode. Caused by either a writer OOPS or too
-                        * many nested writes over a reserve/commit pair.
+                        * commit offset, and this did not involve update to the
+                        * reserve counter. Drop record in producer-consumer and
+                        * overwrite mode.  Caused by either a writer OOPS or
+                        * too many nested writes over a reserve/commit pair.
                         */
                        v_inc(config, &buf->records_lost_wrap);
                        return -EIO;
This page took 0.025305 seconds and 4 git commands to generate.