-/* SPDX-License-Identifier: (GPL-2.0 OR LGPL-2.1)
+/* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only)
*
* ring_buffer_frontend.c
*
#include <linux/delay.h>
#include <linux/module.h>
#include <linux/percpu.h>
+#include <linux/kref.h>
+#include <linux/percpu-defs.h>
+#include <linux/timer.h>
#include <asm/cacheflush.h>
#include <wrapper/ringbuffer/config.h>
#include <wrapper/ringbuffer/frontend.h>
#include <wrapper/ringbuffer/iterator.h>
#include <wrapper/ringbuffer/nohz.h>
-#include <wrapper/atomic.h>
-#include <wrapper/kref.h>
-#include <wrapper/percpu-defs.h>
-#include <wrapper/timer.h>
-#include <wrapper/vmalloc.h>
/*
* Internal structure representing offsets to use at a sub-buffer switch.
struct channel *chan = buf->backend.chan;
lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
- lttng_kvfree(buf->commit_hot);
- lttng_kvfree(buf->commit_cold);
- lttng_kvfree(buf->ts_end);
+ kvfree(buf->commit_hot);
+ kvfree(buf->commit_cold);
+ kvfree(buf->ts_end);
lib_ring_buffer_backend_free(&buf->backend);
}
return ret;
buf->commit_hot =
- lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_hot)
+ kvzalloc_node(ALIGN(sizeof(*buf->commit_hot)
* chan->backend.num_subbuf,
1 << INTERNODE_CACHE_SHIFT),
GFP_KERNEL | __GFP_NOWARN,
}
buf->commit_cold =
- lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_cold)
+ kvzalloc_node(ALIGN(sizeof(*buf->commit_cold)
* chan->backend.num_subbuf,
1 << INTERNODE_CACHE_SHIFT),
GFP_KERNEL | __GFP_NOWARN,
}
buf->ts_end =
- lttng_kvzalloc_node(ALIGN(sizeof(*buf->ts_end)
+ kvzalloc_node(ALIGN(sizeof(*buf->ts_end)
* chan->backend.num_subbuf,
1 << INTERNODE_CACHE_SHIFT),
GFP_KERNEL | __GFP_NOWARN,
chan->backend.cpumask));
cpumask_set_cpu(cpu, chan->backend.cpumask);
}
-
return 0;
/* Error handling */
free_init:
- lttng_kvfree(buf->ts_end);
+ kvfree(buf->ts_end);
free_commit_cold:
- lttng_kvfree(buf->commit_cold);
+ kvfree(buf->commit_cold);
free_commit:
- lttng_kvfree(buf->commit_hot);
+ kvfree(buf->commit_hot);
free_chanbuf:
lib_ring_buffer_backend_free(&buf->backend);
return ret;
}
-static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t)
+static void switch_buffer_timer(struct timer_list *t)
{
- struct lib_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer);
+ struct lib_ring_buffer *buf = from_timer(buf, t, switch_timer);
struct channel *chan = buf->backend.chan;
- const struct lib_ring_buffer_config *config = &chan->backend.config;
/*
* Only flush buffers periodically if readers are active.
if (atomic_long_read(&buf->active_readers))
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- lttng_mod_timer_pinned(&buf->switch_timer,
- jiffies + chan->switch_timer_interval);
- else
- mod_timer(&buf->switch_timer,
- jiffies + chan->switch_timer_interval);
+ mod_timer(&buf->switch_timer,
+ jiffies + chan->switch_timer_interval);
}
/*
return;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- flags = LTTNG_TIMER_PINNED;
+ flags = TIMER_PINNED;
- lttng_timer_setup(&buf->switch_timer, switch_buffer_timer, flags, buf);
+ timer_setup(&buf->switch_timer, switch_buffer_timer, flags);
buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
/*
* Polling timer to check the channels for data.
*/
-static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t)
+static void read_buffer_timer(struct timer_list *t)
{
- struct lib_ring_buffer *buf = lttng_from_timer(buf, t, read_timer);
+ struct lib_ring_buffer *buf = from_timer(buf, t, read_timer);
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
wake_up_interruptible(&chan->read_wait);
}
- if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- lttng_mod_timer_pinned(&buf->read_timer,
- jiffies + chan->read_timer_interval);
- else
- mod_timer(&buf->read_timer,
- jiffies + chan->read_timer_interval);
+ mod_timer(&buf->read_timer,
+ jiffies + chan->read_timer_interval);
}
/*
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
- unsigned int flags;
+ unsigned int flags = 0;
if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
|| !chan->read_timer_interval
return;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- flags = LTTNG_TIMER_PINNED;
+ flags = TIMER_PINNED;
- lttng_timer_setup(&buf->read_timer, read_buffer_timer, flags, buf);
+ timer_setup(&buf->read_timer, read_buffer_timer, flags);
buf->read_timer.expires = jiffies + chan->read_timer_interval;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
buf->read_timer_enabled = 0;
}
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
-
enum cpuhp_state lttng_rb_hp_prepare;
enum cpuhp_state lttng_rb_hp_online;
}
EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline);
-#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
-
-#ifdef CONFIG_HOTPLUG_CPU
-
-/**
- * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
- * @nb: notifier block
- * @action: hotplug action to take
- * @hcpu: CPU number
- *
- * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
- */
-static
-int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
- unsigned long action,
- void *hcpu)
-{
- unsigned int cpu = (unsigned long)hcpu;
- struct channel *chan = container_of(nb, struct channel,
- cpu_hp_notifier);
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
- const struct lib_ring_buffer_config *config = &chan->backend.config;
-
- if (!chan->cpu_hp_enable)
- return NOTIFY_DONE;
-
- CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
-
- switch (action) {
- case CPU_DOWN_FAILED:
- case CPU_DOWN_FAILED_FROZEN:
- case CPU_ONLINE:
- case CPU_ONLINE_FROZEN:
- wake_up_interruptible(&chan->hp_wait);
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
- return NOTIFY_OK;
-
- case CPU_DOWN_PREPARE:
- case CPU_DOWN_PREPARE_FROZEN:
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
- return NOTIFY_OK;
-
- case CPU_DEAD:
- case CPU_DEAD_FROZEN:
- /*
- * Performing a buffer switch on a remote CPU. Performed by
- * the CPU responsible for doing the hotunplug after the target
- * CPU stopped running completely. Ensures that all data
- * from that remote CPU is flushed.
- */
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
- return NOTIFY_OK;
-
- default:
- return NOTIFY_DONE;
- }
-}
-
-#endif
-
-#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
-
#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
/*
* For per-cpu buffers, call the reader wakeups before switching the buffer, so
raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
break;
case TICK_NOHZ_STOP:
- spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_stop_switch_timer(buf);
lib_ring_buffer_stop_read_timer(buf);
- spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock));
break;
case TICK_NOHZ_RESTART:
- spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_start_read_timer(buf);
lib_ring_buffer_start_switch_timer(buf);
- spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
+ spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock));
break;
}
* concurrency.
*/
#endif /* CONFIG_NO_HZ */
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
{
int ret;
&chan->cpuhp_prepare.node);
WARN_ON(ret);
}
-#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
- {
- int cpu;
-
-#ifdef CONFIG_HOTPLUG_CPU
- get_online_cpus();
- chan->cpu_hp_enable = 0;
- for_each_online_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
- }
- put_online_cpus();
- unregister_cpu_notifier(&chan->cpu_hp_notifier);
-#else
- for_each_possible_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
- }
-#endif
- }
-#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
} else {
struct lib_ring_buffer *buf = chan->backend.buf;
init_waitqueue_head(&chan->hp_wait);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND;
ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare,
&chan->cpuhp_prepare.node);
&chan->cpuhp_online.node);
if (ret)
goto cpuhp_online_error;
-#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
- {
- int cpu;
- /*
- * In case of non-hotplug cpu, if the ring-buffer is allocated
- * in early initcall, it will not be notified of secondary cpus.
- * In that off case, we need to allocate for all possible cpus.
- */
-#ifdef CONFIG_HOTPLUG_CPU
- chan->cpu_hp_notifier.notifier_call =
- lib_ring_buffer_cpu_hp_callback;
- chan->cpu_hp_notifier.priority = 6;
- register_cpu_notifier(&chan->cpu_hp_notifier);
-
- get_online_cpus();
- for_each_online_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
- spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
- }
- chan->cpu_hp_enable = 1;
- put_online_cpus();
-#else
- for_each_possible_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
- spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
- }
-#endif
- }
-#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
/* Only benefit from NO_HZ idle with per-cpu buffers for now. */
return chan;
-#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
cpuhp_online_error:
ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
&chan->cpuhp_prepare.node);
WARN_ON(ret);
cpuhp_prepare_error:
-#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
error_free_backend:
channel_backend_free(&chan->backend);
error:
config->cb.buffer_finalize(buf,
chan->backend.priv,
cpu);
- if (buf->backend.allocated)
- lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf, chan->backend.priv, -1);
- if (buf->backend.allocated)
- lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
return -EBUSY;
- if (!lttng_kref_get(&chan->ref)) {
- atomic_long_dec(&buf->active_readers);
- return -EOVERFLOW;
- }
- lttng_smp_mb__after_atomic();
+ kref_get(&chan->ref);
+ smp_mb__after_atomic();
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
struct channel *chan = buf->backend.chan;
CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
- lttng_smp_mb__before_atomic();
+ smp_mb__before_atomic();
atomic_long_dec(&buf->active_readers);
kref_put(&chan->ref, channel_release);
}
cpu);
}
+#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS
static
-void lib_ring_buffer_print_errors(struct channel *chan,
- struct lib_ring_buffer *buf, int cpu)
+void lib_ring_buffer_print_records_count(struct channel *chan,
+ struct lib_ring_buffer *buf,
+ int cpu)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- void *priv = chan->backend.priv;
if (!strcmp(chan->backend.name, "relay-metadata")) {
printk(KERN_DEBUG "ring buffer %s: %lu records written, "
chan->backend.name, cpu,
v_read(config, &buf->records_count),
v_read(config, &buf->records_overrun));
+ }
+}
+#else
+static
+void lib_ring_buffer_print_records_count(struct channel *chan,
+ struct lib_ring_buffer *buf,
+ int cpu)
+{
+}
+#endif
+static
+void lib_ring_buffer_print_errors(struct channel *chan,
+ struct lib_ring_buffer *buf, int cpu)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+ void *priv = chan->backend.priv;
+
+ lib_ring_buffer_print_records_count(chan, buf, cpu);
+ if (strcmp(chan->backend.name, "relay-metadata")) {
if (v_read(config, &buf->records_lost_full)
|| v_read(config, &buf->records_lost_wrap)
|| v_read(config, &buf->records_lost_big))
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty);
+void lib_ring_buffer_clear(struct lib_ring_buffer *buf)
+{
+ struct lib_ring_buffer_backend *bufb = &buf->backend;
+ struct channel *chan = bufb->chan;
+
+ lib_ring_buffer_switch_remote(buf);
+ lib_ring_buffer_clear_reader(buf, chan);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear);
+
/*
* Returns :
* 0 if ok