X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=3cab365264e87431530c449648b8a9d6314449e4;hb=c245d0d3253680f0f8caf751ae99d10bd74034bf;hp=bdd31add4382dfb0a4cf41cfc3b9f124075a4df0;hpb=f78cce4bcb07f2e98c6a1f6fd15af12819bbd5c7;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index bdd31add..3cab3652 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -1,23 +1,9 @@ -/* +/* SPDX-License-Identifier: (GPL-2.0 OR LGPL-2.1) + * * ring_buffer_frontend.c * * Copyright (C) 2005-2012 Mathieu Desnoyers * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; only - * version 2.1 of the License. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * - * * Ring buffer wait-free buffer synchronization. Producer-consumer and flight * recorder (overwrite) modes. See thesis: * @@ -150,6 +136,7 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); lttng_kvfree(buf->commit_hot); lttng_kvfree(buf->commit_cold); + lttng_kvfree(buf->ts_end); lib_ring_buffer_backend_free(&buf->backend); } @@ -179,6 +166,7 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) v_set(config, &buf->commit_hot[i].cc, 0); v_set(config, &buf->commit_hot[i].seq, 0); v_set(config, &buf->commit_cold[i].cc_sb, 0); + buf->ts_end[i] = 0; } atomic_long_set(&buf->consumed, 0); atomic_set(&buf->record_disabled, 0); @@ -267,6 +255,17 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, goto free_commit; } + buf->ts_end = + lttng_kvzalloc_node(ALIGN(sizeof(*buf->ts_end) + * chan->backend.num_subbuf, + 1 << INTERNODE_CACHE_SHIFT), + GFP_KERNEL | __GFP_NOWARN, + cpu_to_node(max(cpu, 0))); + if (!buf->ts_end) { + ret = -ENOMEM; + goto free_commit_cold; + } + init_waitqueue_head(&buf->read_wait); init_waitqueue_head(&buf->write_wait); raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); @@ -306,6 +305,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, /* Error handling */ free_init: + lttng_kvfree(buf->ts_end); +free_commit_cold: lttng_kvfree(buf->commit_cold); free_commit: lttng_kvfree(buf->commit_hot); @@ -314,9 +315,9 @@ free_chanbuf: return ret; } -static void switch_buffer_timer(unsigned long data) +static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) { - struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; + struct lib_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer); struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -341,22 +342,22 @@ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; + unsigned int flags = 0; if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - lttng_init_timer_pinned(&buf->switch_timer); - else - init_timer(&buf->switch_timer); + flags = LTTNG_TIMER_PINNED; - buf->switch_timer.function = switch_buffer_timer; + lttng_timer_setup(&buf->switch_timer, switch_buffer_timer, flags, buf); buf->switch_timer.expires = jiffies + chan->switch_timer_interval; - buf->switch_timer.data = (unsigned long)buf; + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) add_timer_on(&buf->switch_timer, buf->backend.cpu); else add_timer(&buf->switch_timer); + buf->switch_timer_enabled = 1; } @@ -377,9 +378,9 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) /* * Polling timer to check the channels for data. */ -static void read_buffer_timer(unsigned long data) +static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t) { - struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; + struct lib_ring_buffer *buf = lttng_from_timer(buf, t, read_timer); struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; @@ -406,6 +407,7 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) { struct channel *chan = buf->backend.chan; const struct lib_ring_buffer_config *config = &chan->backend.config; + unsigned int flags = 0; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER || !chan->read_timer_interval @@ -413,18 +415,16 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) return; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - lttng_init_timer_pinned(&buf->read_timer); - else - init_timer(&buf->read_timer); + flags = LTTNG_TIMER_PINNED; - buf->read_timer.function = read_buffer_timer; + lttng_timer_setup(&buf->read_timer, read_buffer_timer, flags, buf); buf->read_timer.expires = jiffies + chan->read_timer_interval; - buf->read_timer.data = (unsigned long)buf; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) add_timer_on(&buf->read_timer, buf->backend.cpu); else add_timer(&buf->read_timer); + buf->read_timer_enabled = 1; } @@ -978,13 +978,11 @@ void *channel_destroy(struct channel *chan) config->cb.buffer_finalize(buf, chan->backend.priv, cpu); - if (buf->backend.allocated) - lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ smp_wmb(); - ACCESS_ONCE(buf->finalized) = 1; + WRITE_ONCE(buf->finalized, 1); wake_up_interruptible(&buf->read_wait); } } else { @@ -992,16 +990,14 @@ void *channel_destroy(struct channel *chan) if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); - if (buf->backend.allocated) - lib_ring_buffer_set_quiescent(buf); /* * Perform flush before writing to finalized. */ smp_wmb(); - ACCESS_ONCE(buf->finalized) = 1; + WRITE_ONCE(buf->finalized, 1); wake_up_interruptible(&buf->read_wait); } - ACCESS_ONCE(chan->finalized) = 1; + WRITE_ONCE(chan->finalized, 1); wake_up_interruptible(&chan->hp_wait); wake_up_interruptible(&chan->read_wait); priv = chan->backend.priv; @@ -1078,7 +1074,7 @@ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, int finalized; retry: - finalized = ACCESS_ONCE(buf->finalized); + finalized = READ_ONCE(buf->finalized); /* * Read finalized before counters. */ @@ -1249,7 +1245,7 @@ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, return -EBUSY; } retry: - finalized = ACCESS_ONCE(buf->finalized); + finalized = READ_ONCE(buf->finalized); /* * Read finalized before counters. */ @@ -1502,12 +1498,13 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, cpu); } +#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS static -void lib_ring_buffer_print_errors(struct channel *chan, - struct lib_ring_buffer *buf, int cpu) +void lib_ring_buffer_print_records_count(struct channel *chan, + struct lib_ring_buffer *buf, + int cpu) { const struct lib_ring_buffer_config *config = &chan->backend.config; - void *priv = chan->backend.priv; if (!strcmp(chan->backend.name, "relay-metadata")) { printk(KERN_DEBUG "ring buffer %s: %lu records written, " @@ -1521,7 +1518,26 @@ void lib_ring_buffer_print_errors(struct channel *chan, chan->backend.name, cpu, v_read(config, &buf->records_count), v_read(config, &buf->records_overrun)); + } +} +#else +static +void lib_ring_buffer_print_records_count(struct channel *chan, + struct lib_ring_buffer *buf, + int cpu) +{ +} +#endif + +static +void lib_ring_buffer_print_errors(struct channel *chan, + struct lib_ring_buffer *buf, int cpu) +{ + const struct lib_ring_buffer_config *config = &chan->backend.config; + void *priv = chan->backend.priv; + lib_ring_buffer_print_records_count(chan, buf, cpu); + if (strcmp(chan->backend.name, "relay-metadata")) { if (v_read(config, &buf->records_lost_full) || v_read(config, &buf->records_lost_wrap) || v_read(config, &buf->records_lost_big)) @@ -1597,14 +1613,26 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; struct commit_counters_hot *cc_hot; + u64 *ts_end; data_size = subbuf_offset(offsets->old - 1, chan) + 1; padding_size = chan->backend.subbuf_size - data_size; subbuffer_set_data_size(config, &buf->backend, oldidx, data_size); + ts_end = &buf->ts_end[oldidx]; /* - * Order all writes to buffer before the commit count update that will - * determine that the subbuffer is full. + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; + + /* + * Order all writes to buffer and store to ts_end before the commit + * count update that will determine that the subbuffer is full. */ if (config->ipi == RING_BUFFER_IPI_BARRIER) { /* @@ -1685,10 +1713,21 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, { const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long endidx, data_size; + u64 *ts_end; endidx = subbuf_index(offsets->end - 1, chan); data_size = subbuf_offset(offsets->end - 1, chan) + 1; subbuffer_set_data_size(config, &buf->backend, endidx, data_size); + ts_end = &buf->ts_end[endidx]; + /* + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; } /* @@ -1918,6 +1957,16 @@ void lib_ring_buffer_switch_remote_empty(struct lib_ring_buffer *buf) } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty); +void lib_ring_buffer_clear(struct lib_ring_buffer *buf) +{ + struct lib_ring_buffer_backend *bufb = &buf->backend; + struct channel *chan = bufb->chan; + + lib_ring_buffer_switch_remote(buf); + lib_ring_buffer_clear_reader(buf, chan); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_clear); + /* * Returns : * 0 if ok @@ -2252,14 +2301,24 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { + u64 *ts_end; + /* * Start of exclusive subbuffer access. We are * guaranteed to be the last writer in this subbuffer * and any other writer trying to access this subbuffer * in this state is required to drop records. + * + * We can read the ts_end for the current sub-buffer + * which has been saved by the very last space + * reservation for the current sub-buffer. + * + * Order increment of commit counter before reading ts_end. */ + smp_mb(); + ts_end = &buf->ts_end[idx]; deliver_count_events(config, buf, idx); - config->cb.buffer_end(buf, tsc, idx, + config->cb.buffer_end(buf, *ts_end, idx, lib_ring_buffer_get_data_size(config, buf, idx));