X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=8ffb702e027a692ee88f6ab7581aafcff85d1f03;hb=5760f3f4e64af16249a4f208e680c7dcffb4f990;hp=f367dffc70190c3a040491349bd97eff0778d34d;hpb=68045fadc564005c6a103595e373e04e7b61244a;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index f367dffc..8ffb702e 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -1,4 +1,4 @@ -/* SPDX-License-Identifier: (GPL-2.0 OR LGPL-2.1) +/* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only) * * ring_buffer_frontend.c * @@ -40,18 +40,16 @@ #include #include #include +#include +#include +#include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include +#include +#include +#include +#include +#include /* * Internal structure representing offsets to use at a sub-buffer switch. @@ -134,9 +132,9 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) struct channel *chan = buf->backend.chan; lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); - lttng_kvfree(buf->commit_hot); - lttng_kvfree(buf->commit_cold); - lttng_kvfree(buf->ts_end); + kvfree(buf->commit_hot); + kvfree(buf->commit_cold); + kvfree(buf->ts_end); lib_ring_buffer_backend_free(&buf->backend); } @@ -234,7 +232,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, return ret; buf->commit_hot = - lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_hot) + kvzalloc_node(ALIGN(sizeof(*buf->commit_hot) * chan->backend.num_subbuf, 1 << INTERNODE_CACHE_SHIFT), GFP_KERNEL | __GFP_NOWARN, @@ -245,7 +243,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, } buf->commit_cold = - lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_cold) + kvzalloc_node(ALIGN(sizeof(*buf->commit_cold) * chan->backend.num_subbuf, 1 << INTERNODE_CACHE_SHIFT), GFP_KERNEL | __GFP_NOWARN, @@ -256,7 +254,7 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, } buf->ts_end = - lttng_kvzalloc_node(ALIGN(sizeof(*buf->ts_end) + kvzalloc_node(ALIGN(sizeof(*buf->ts_end) * chan->backend.num_subbuf, 1 << INTERNODE_CACHE_SHIFT), GFP_KERNEL | __GFP_NOWARN, @@ -300,26 +298,24 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, chan->backend.cpumask)); cpumask_set_cpu(cpu, chan->backend.cpumask); } - return 0; /* Error handling */ free_init: - lttng_kvfree(buf->ts_end); + kvfree(buf->ts_end); free_commit_cold: - lttng_kvfree(buf->commit_cold); + kvfree(buf->commit_cold); free_commit: - lttng_kvfree(buf->commit_hot); + kvfree(buf->commit_hot); free_chanbuf: lib_ring_buffer_backend_free(&buf->backend); return ret; } -static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) +static void switch_buffer_timer(struct timer_list *t) { - struct lib_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer); + struct lib_ring_buffer *buf = from_timer(buf, t, switch_timer); struct channel *chan = buf->backend.chan; - const struct lib_ring_buffer_config *config = &chan->backend.config; /* * Only flush buffers periodically if readers are active. @@ -327,12 +323,8 @@ static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) if (atomic_long_read(&buf->active_readers)) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - lttng_mod_timer_pinned(&buf->switch_timer, - jiffies + chan->switch_timer_interval); - else - mod_timer(&buf->switch_timer, - jiffies + chan->switch_timer_interval); + mod_timer(&buf->switch_timer, + jiffies + chan->switch_timer_interval); } /* @@ -348,9 +340,9 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) return; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - flags = LTTNG_TIMER_PINNED; + flags = TIMER_PINNED; - lttng_timer_setup(&buf->switch_timer, switch_buffer_timer, flags, buf); + timer_setup(&buf->switch_timer, switch_buffer_timer, flags); buf->switch_timer.expires = jiffies + chan->switch_timer_interval; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) @@ -378,9 +370,9 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) /* * Polling timer to check the channels for data. */ -static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) +static void read_buffer_timer(struct timer_list *t) { - struct lib_ring_buffer *buf = lttng_from_timer(buf, t, read_timer); + struct lib_ring_buffer *buf = from_timer(buf, t, read_timer); struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -392,12 +384,8 @@ static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) wake_up_interruptible(&chan->read_wait); } - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - lttng_mod_timer_pinned(&buf->read_timer, - jiffies + chan->read_timer_interval); - else - mod_timer(&buf->read_timer, - jiffies + chan->read_timer_interval); + mod_timer(&buf->read_timer, + jiffies + chan->read_timer_interval); } /* @@ -415,9 +403,9 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) return; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - flags = LTTNG_TIMER_PINNED; + flags = TIMER_PINNED; - lttng_timer_setup(&buf->read_timer, read_buffer_timer, flags, buf); + timer_setup(&buf->read_timer, read_buffer_timer, flags); buf->read_timer.expires = jiffies + chan->read_timer_interval; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) @@ -453,8 +441,6 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) buf->read_timer_enabled = 0; } -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) - enum cpuhp_state lttng_rb_hp_prepare; enum cpuhp_state lttng_rb_hp_online; @@ -524,70 +510,6 @@ int lttng_cpuhp_rb_frontend_offline(unsigned int cpu, } EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline); -#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ - -#ifdef CONFIG_HOTPLUG_CPU - -/** - * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback - * @nb: notifier block - * @action: hotplug action to take - * @hcpu: CPU number - * - * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) - */ -static -int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, - unsigned long action, - void *hcpu) -{ - unsigned int cpu = (unsigned long)hcpu; - struct channel *chan = container_of(nb, struct channel, - cpu_hp_notifier); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = &chan->backend.config; - - if (!chan->cpu_hp_enable) - return NOTIFY_DONE; - - CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); - - switch (action) { - case CPU_DOWN_FAILED: - case CPU_DOWN_FAILED_FROZEN: - case CPU_ONLINE: - case CPU_ONLINE_FROZEN: - wake_up_interruptible(&chan->hp_wait); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - return NOTIFY_OK; - - case CPU_DOWN_PREPARE: - case CPU_DOWN_PREPARE_FROZEN: - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - return NOTIFY_OK; - - case CPU_DEAD: - case CPU_DEAD_FROZEN: - /* - * Performing a buffer switch on a remote CPU. Performed by - * the CPU responsible for doing the hotunplug after the target - * CPU stopped running completely. Ensures that all data - * from that remote CPU is flushed. - */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - return NOTIFY_OK; - - default: - return NOTIFY_DONE; - } -} - -#endif - -#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ - #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* * For per-cpu buffers, call the reader wakeups before switching the buffer, so @@ -635,16 +557,16 @@ static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, raw_spin_unlock(&buf->raw_tick_nohz_spinlock); break; case TICK_NOHZ_STOP: - spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock)); lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); - spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock)); break; case TICK_NOHZ_RESTART: - spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_lock(this_cpu_ptr(&ring_buffer_nohz_lock)); lib_ring_buffer_start_read_timer(buf); lib_ring_buffer_start_switch_timer(buf); - spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock)); + spin_unlock(this_cpu_ptr(&ring_buffer_nohz_lock)); break; } @@ -692,7 +614,6 @@ static void channel_unregister_notifiers(struct channel *chan) * concurrency. */ #endif /* CONFIG_NO_HZ */ -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) { int ret; @@ -703,31 +624,6 @@ static void channel_unregister_notifiers(struct channel *chan) &chan->cpuhp_prepare.node); WARN_ON(ret); } -#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ - { - int cpu; - -#ifdef CONFIG_HOTPLUG_CPU - get_online_cpus(); - chan->cpu_hp_enable = 0; - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - } - put_online_cpus(); - unregister_cpu_notifier(&chan->cpu_hp_notifier); -#else - for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - } -#endif - } -#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ } else { struct lib_ring_buffer *buf = chan->backend.buf; @@ -856,7 +752,6 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, init_waitqueue_head(&chan->hp_wait); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND; ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare, &chan->cpuhp_prepare.node); @@ -868,43 +763,6 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, &chan->cpuhp_online.node); if (ret) goto cpuhp_online_error; -#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ - { - int cpu; - /* - * In case of non-hotplug cpu, if the ring-buffer is allocated - * in early initcall, it will not be notified of secondary cpus. - * In that off case, we need to allocate for all possible cpus. - */ -#ifdef CONFIG_HOTPLUG_CPU - chan->cpu_hp_notifier.notifier_call = - lib_ring_buffer_cpu_hp_callback; - chan->cpu_hp_notifier.priority = 6; - register_cpu_notifier(&chan->cpu_hp_notifier); - - get_online_cpus(); - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); - } - chan->cpu_hp_enable = 1; - put_online_cpus(); -#else - for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); - } -#endif - } -#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ #if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) /* Only benefit from NO_HZ idle with per-cpu buffers for now. */ @@ -924,13 +782,11 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, return chan; -#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) cpuhp_online_error: ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare, &chan->cpuhp_prepare.node); WARN_ON(ret); cpuhp_prepare_error: -#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */ error_free_backend: channel_backend_free(&chan->backend); error: @@ -1023,11 +879,8 @@ int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) return -EBUSY; - if (!lttng_kref_get(&chan->ref)) { - atomic_long_dec(&buf->active_readers); - return -EOVERFLOW; - } - lttng_smp_mb__after_atomic(); + kref_get(&chan->ref); + smp_mb__after_atomic(); return 0; } EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read); @@ -1037,7 +890,7 @@ void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) struct channel *chan = buf->backend.chan; CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); - lttng_smp_mb__before_atomic(); + smp_mb__before_atomic(); atomic_long_dec(&buf->active_readers); kref_put(&chan->ref, channel_release); } @@ -1498,12 +1351,13 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, cpu); } +#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS static -void lib_ring_buffer_print_errors(struct channel *chan, - struct lib_ring_buffer *buf, int cpu) +void lib_ring_buffer_print_records_count(struct channel *chan, + struct lib_ring_buffer *buf, + int cpu) { const struct lib_ring_buffer_config *config = &chan->backend.config; - void *priv = chan->backend.priv; if (!strcmp(chan->backend.name, "relay-metadata")) { printk(KERN_DEBUG "ring buffer %s: %lu records written, " @@ -1517,7 +1371,26 @@ void lib_ring_buffer_print_errors(struct channel *chan, chan->backend.name, cpu, v_read(config, &buf->records_count), v_read(config, &buf->records_overrun)); + } +} +#else +static +void lib_ring_buffer_print_records_count(struct channel *chan, + struct lib_ring_buffer *buf, + int cpu) +{ +} +#endif +static +void lib_ring_buffer_print_errors(struct channel *chan, + struct lib_ring_buffer *buf, int cpu) +{ + const struct lib_ring_buffer_config *config = &chan->backend.config; + void *priv = chan->backend.priv; + + lib_ring_buffer_print_records_count(chan, buf, cpu); + if (strcmp(chan->backend.name, "relay-metadata")) { if (v_read(config, &buf->records_lost_full) || v_read(config, &buf->records_lost_wrap) || v_read(config, &buf->records_lost_big)) @@ -1937,6 +1810,16 @@ void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf) } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty); +void lib_ring_buffer_clear(struct lib_ring_buffer *buf) +{ + struct lib_ring_buffer_backend *bufb = &buf->backend; + struct channel *chan = bufb->chan; + + lib_ring_buffer_switch_remote(buf); + lib_ring_buffer_clear_reader(buf, chan); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_clear); + /* * Returns : * 0 if ok