wrapper: remove timer wrapper
[lttng-modules.git] / lib / ringbuffer / ring_buffer_frontend.c
index bdd31add4382dfb0a4cf41cfc3b9f124075a4df0..2ca778a06ddca1dc5cadf0f0cbec00fa24713855 100644 (file)
@@ -1,23 +1,9 @@
-/*
+/* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only)
+ *
  * ring_buffer_frontend.c
  *
  * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; only
- * version 2.1 of the License.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- *
  * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
  * recorder (overwrite) modes. See thesis:
  *
@@ -54,6 +40,9 @@
 #include <linux/delay.h>
 #include <linux/module.h>
 #include <linux/percpu.h>
+#include <linux/kref.h>
+#include <linux/percpu-defs.h>
+#include <linux/timer.h>
 #include <asm/cacheflush.h>
 
 #include <wrapper/ringbuffer/config.h>
 #include <wrapper/ringbuffer/iterator.h>
 #include <wrapper/ringbuffer/nohz.h>
 #include <wrapper/atomic.h>
-#include <wrapper/kref.h>
-#include <wrapper/percpu-defs.h>
-#include <wrapper/timer.h>
-#include <wrapper/vmalloc.h>
 
 /*
  * Internal structure representing offsets to use at a sub-buffer switch.
@@ -148,8 +133,9 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf)
        struct channel *chan = buf->backend.chan;
 
        lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
-       lttng_kvfree(buf->commit_hot);
-       lttng_kvfree(buf->commit_cold);
+       kvfree(buf->commit_hot);
+       kvfree(buf->commit_cold);
+       kvfree(buf->ts_end);
 
        lib_ring_buffer_backend_free(&buf->backend);
 }
@@ -179,6 +165,7 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf)
                v_set(config, &buf->commit_hot[i].cc, 0);
                v_set(config, &buf->commit_hot[i].seq, 0);
                v_set(config, &buf->commit_cold[i].cc_sb, 0);
+               buf->ts_end[i] = 0;
        }
        atomic_long_set(&buf->consumed, 0);
        atomic_set(&buf->record_disabled, 0);
@@ -246,7 +233,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                return ret;
 
        buf->commit_hot =
-               lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_hot)
+               kvzalloc_node(ALIGN(sizeof(*buf->commit_hot)
                                   * chan->backend.num_subbuf,
                                   1 << INTERNODE_CACHE_SHIFT),
                        GFP_KERNEL | __GFP_NOWARN,
@@ -257,7 +244,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
        }
 
        buf->commit_cold =
-               lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_cold)
+               kvzalloc_node(ALIGN(sizeof(*buf->commit_cold)
                                   * chan->backend.num_subbuf,
                                   1 << INTERNODE_CACHE_SHIFT),
                        GFP_KERNEL | __GFP_NOWARN,
@@ -267,6 +254,17 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                goto free_commit;
        }
 
+       buf->ts_end =
+               kvzalloc_node(ALIGN(sizeof(*buf->ts_end)
+                                  * chan->backend.num_subbuf,
+                                  1 << INTERNODE_CACHE_SHIFT),
+                       GFP_KERNEL | __GFP_NOWARN,
+                       cpu_to_node(max(cpu, 0)));
+       if (!buf->ts_end) {
+               ret = -ENOMEM;
+               goto free_commit_cold;
+       }
+
        init_waitqueue_head(&buf->read_wait);
        init_waitqueue_head(&buf->write_wait);
        raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
@@ -301,24 +299,24 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf,
                             chan->backend.cpumask));
                cpumask_set_cpu(cpu, chan->backend.cpumask);
        }
-
        return 0;
 
        /* Error handling */
 free_init:
-       lttng_kvfree(buf->commit_cold);
+       kvfree(buf->ts_end);
+free_commit_cold:
+       kvfree(buf->commit_cold);
 free_commit:
-       lttng_kvfree(buf->commit_hot);
+       kvfree(buf->commit_hot);
 free_chanbuf:
        lib_ring_buffer_backend_free(&buf->backend);
        return ret;
 }
 
-static void switch_buffer_timer(unsigned long data)
+static void switch_buffer_timer(struct timer_list *t)
 {
-       struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+       struct lib_ring_buffer *buf = from_timer(buf, t, switch_timer);
        struct channel *chan = buf->backend.chan;
-       const struct lib_ring_buffer_config *config = &chan->backend.config;
 
        /*
         * Only flush buffers periodically if readers are active.
@@ -326,12 +324,8 @@ static void switch_buffer_timer(unsigned long data)
        if (atomic_long_read(&buf->active_readers))
                lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
 
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               lttng_mod_timer_pinned(&buf->switch_timer,
-                                jiffies + chan->switch_timer_interval);
-       else
-               mod_timer(&buf->switch_timer,
-                         jiffies + chan->switch_timer_interval);
+       mod_timer(&buf->switch_timer,
+                 jiffies + chan->switch_timer_interval);
 }
 
 /*
@@ -341,22 +335,22 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf)
 {
        struct channel *chan = buf->backend.chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
+       unsigned int flags = 0;
 
        if (!chan->switch_timer_interval || buf->switch_timer_enabled)
                return;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               lttng_init_timer_pinned(&buf->switch_timer);
-       else
-               init_timer(&buf->switch_timer);
+               flags = TIMER_PINNED;
 
-       buf->switch_timer.function = switch_buffer_timer;
+       timer_setup(&buf->switch_timer, switch_buffer_timer, flags);
        buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
-       buf->switch_timer.data = (unsigned long)buf;
+
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
                add_timer_on(&buf->switch_timer, buf->backend.cpu);
        else
                add_timer(&buf->switch_timer);
+
        buf->switch_timer_enabled = 1;
 }
 
@@ -377,9 +371,9 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf)
 /*
  * Polling timer to check the channels for data.
  */
-static void read_buffer_timer(unsigned long data)
+static void read_buffer_timer(struct timer_list *t)
 {
-       struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+       struct lib_ring_buffer *buf = from_timer(buf, t, read_timer);
        struct channel *chan = buf->backend.chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
 
@@ -391,12 +385,8 @@ static void read_buffer_timer(unsigned long data)
                wake_up_interruptible(&chan->read_wait);
        }
 
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               lttng_mod_timer_pinned(&buf->read_timer,
-                                jiffies + chan->read_timer_interval);
-       else
-               mod_timer(&buf->read_timer,
-                         jiffies + chan->read_timer_interval);
+       mod_timer(&buf->read_timer,
+                 jiffies + chan->read_timer_interval);
 }
 
 /*
@@ -406,6 +396,7 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
 {
        struct channel *chan = buf->backend.chan;
        const struct lib_ring_buffer_config *config = &chan->backend.config;
+       unsigned int flags = 0;
 
        if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
            || !chan->read_timer_interval
@@ -413,18 +404,16 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf)
                return;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               lttng_init_timer_pinned(&buf->read_timer);
-       else
-               init_timer(&buf->read_timer);
+               flags = TIMER_PINNED;
 
-       buf->read_timer.function = read_buffer_timer;
+       timer_setup(&buf->read_timer, read_buffer_timer, flags);
        buf->read_timer.expires = jiffies + chan->read_timer_interval;
-       buf->read_timer.data = (unsigned long)buf;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
                add_timer_on(&buf->read_timer, buf->backend.cpu);
        else
                add_timer(&buf->read_timer);
+
        buf->read_timer_enabled = 1;
 }
 
@@ -635,16 +624,16 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb,
                raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
                break;
        case TICK_NOHZ_STOP:
-               spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+               spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock));
                lib_ring_buffer_stop_switch_timer(buf);
                lib_ring_buffer_stop_read_timer(buf);
-               spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+               spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock));
                break;
        case TICK_NOHZ_RESTART:
-               spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+               spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock));
                lib_ring_buffer_start_read_timer(buf);
                lib_ring_buffer_start_switch_timer(buf);
-               spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+               spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock));
                break;
        }
 
@@ -978,13 +967,11 @@ void *channel_destroy(struct channel *chan)
                                config->cb.buffer_finalize(buf,
                                                           chan->backend.priv,
                                                           cpu);
-                       if (buf->backend.allocated)
-                               lib_ring_buffer_set_quiescent(buf);
                        /*
                         * Perform flush before writing to finalized.
                         */
                        smp_wmb();
-                       ACCESS_ONCE(buf->finalized) = 1;
+                       WRITE_ONCE(buf->finalized, 1);
                        wake_up_interruptible(&buf->read_wait);
                }
        } else {
@@ -992,16 +979,14 @@ void *channel_destroy(struct channel *chan)
 
                if (config->cb.buffer_finalize)
                        config->cb.buffer_finalize(buf, chan->backend.priv, -1);
-               if (buf->backend.allocated)
-                       lib_ring_buffer_set_quiescent(buf);
                /*
                 * Perform flush before writing to finalized.
                 */
                smp_wmb();
-               ACCESS_ONCE(buf->finalized) = 1;
+               WRITE_ONCE(buf->finalized, 1);
                wake_up_interruptible(&buf->read_wait);
        }
-       ACCESS_ONCE(chan->finalized) = 1;
+       WRITE_ONCE(chan->finalized, 1);
        wake_up_interruptible(&chan->hp_wait);
        wake_up_interruptible(&chan->read_wait);
        priv = chan->backend.priv;
@@ -1027,10 +1012,7 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf)
 
        if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
                return -EBUSY;
-       if (!lttng_kref_get(&chan->ref)) {
-               atomic_long_dec(&buf->active_readers);
-               return -EOVERFLOW;
-       }
+       kref_get(&chan->ref);
        lttng_smp_mb__after_atomic();
        return 0;
 }
@@ -1078,7 +1060,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf,
        int finalized;
 
 retry:
-       finalized = ACCESS_ONCE(buf->finalized);
+       finalized = READ_ONCE(buf->finalized);
        /*
         * Read finalized before counters.
         */
@@ -1249,7 +1231,7 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf,
                return -EBUSY;
        }
 retry:
-       finalized = ACCESS_ONCE(buf->finalized);
+       finalized = READ_ONCE(buf->finalized);
        /*
         * Read finalized before counters.
         */
@@ -1502,12 +1484,13 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf,
                                                       cpu);
 }
 
+#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS
 static
-void lib_ring_buffer_print_errors(struct channel *chan,
-                                 struct lib_ring_buffer *buf, int cpu)
+void lib_ring_buffer_print_records_count(struct channel *chan,
+                                        struct lib_ring_buffer *buf,
+                                        int cpu)
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
-       void *priv = chan->backend.priv;
 
        if (!strcmp(chan->backend.name, "relay-metadata")) {
                printk(KERN_DEBUG "ring buffer %s: %lu records written, "
@@ -1521,7 +1504,26 @@ void lib_ring_buffer_print_errors(struct channel *chan,
                        chan->backend.name, cpu,
                        v_read(config, &buf->records_count),
                        v_read(config, &buf->records_overrun));
+       }
+}
+#else
+static
+void lib_ring_buffer_print_records_count(struct channel *chan,
+                                        struct lib_ring_buffer *buf,
+                                        int cpu)
+{
+}
+#endif
 
+static
+void lib_ring_buffer_print_errors(struct channel *chan,
+                                 struct lib_ring_buffer *buf, int cpu)
+{
+       const struct lib_ring_buffer_config *config = &chan->backend.config;
+       void *priv = chan->backend.priv;
+
+       lib_ring_buffer_print_records_count(chan, buf, cpu);
+       if (strcmp(chan->backend.name, "relay-metadata")) {
                if (v_read(config, &buf->records_lost_full)
                    || v_read(config, &buf->records_lost_wrap)
                    || v_read(config, &buf->records_lost_big))
@@ -1597,14 +1599,26 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf,
        unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
        unsigned long commit_count, padding_size, data_size;
        struct commit_counters_hot *cc_hot;
+       u64 *ts_end;
 
        data_size = subbuf_offset(offsets->old - 1, chan) + 1;
        padding_size = chan->backend.subbuf_size - data_size;
        subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
 
+       ts_end = &buf->ts_end[oldidx];
        /*
-        * Order all writes to buffer before the commit count update that will
-        * determine that the subbuffer is full.
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
+
+       /*
+        * Order all writes to buffer and store to ts_end before the commit
+        * count update that will determine that the subbuffer is full.
         */
        if (config->ipi == RING_BUFFER_IPI_BARRIER) {
                /*
@@ -1685,10 +1699,21 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf,
 {
        const struct lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long endidx, data_size;
+       u64 *ts_end;
 
        endidx = subbuf_index(offsets->end - 1, chan);
        data_size = subbuf_offset(offsets->end - 1, chan) + 1;
        subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
+       ts_end = &buf->ts_end[endidx];
+       /*
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
 }
 
 /*
@@ -1918,6 +1943,16 @@ void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf)
 }
 EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty);
 
+void lib_ring_buffer_clear(struct lib_ring_buffer *buf)
+{
+       struct lib_ring_buffer_backend *bufb = &buf->backend;
+       struct channel *chan = bufb->chan;
+
+       lib_ring_buffer_switch_remote(buf);
+       lib_ring_buffer_clear_reader(buf, chan);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear);
+
 /*
  * Returns :
  * 0 if ok
@@ -2252,14 +2287,24 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con
        if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb,
                                 old_commit_count, old_commit_count + 1)
                   == old_commit_count)) {
+               u64 *ts_end;
+
                /*
                 * Start of exclusive subbuffer access. We are
                 * guaranteed to be the last writer in this subbuffer
                 * and any other writer trying to access this subbuffer
                 * in this state is required to drop records.
+                *
+                * We can read the ts_end for the current sub-buffer
+                * which has been saved by the very last space
+                * reservation for the current sub-buffer.
+                *
+                * Order increment of commit counter before reading ts_end.
                 */
+               smp_mb();
+               ts_end = &buf->ts_end[idx];
                deliver_count_events(config, buf, idx);
-               config->cb.buffer_end(buf, tsc, idx,
+               config->cb.buffer_end(buf, *ts_end, idx,
                                      lib_ring_buffer_get_data_size(config,
                                                                buf,
                                                                idx));
This page took 0.028833 seconds and 4 git commands to generate.