-/*
+/* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only)
+ *
* ring_buffer_frontend.c
*
* Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; only
- * version 2.1 of the License.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- *
* Ring buffer wait-free buffer synchronization. Producer-consumer and flight
* recorder (overwrite) modes. See thesis:
*
lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
lttng_kvfree(buf->commit_hot);
lttng_kvfree(buf->commit_cold);
+ lttng_kvfree(buf->ts_end);
lib_ring_buffer_backend_free(&buf->backend);
}
v_set(config, &buf->commit_hot[i].cc, 0);
v_set(config, &buf->commit_hot[i].seq, 0);
v_set(config, &buf->commit_cold[i].cc_sb, 0);
+ buf->ts_end[i] = 0;
}
atomic_long_set(&buf->consumed, 0);
atomic_set(&buf->record_disabled, 0);
goto free_commit;
}
+ buf->ts_end =
+ lttng_kvzalloc_node(ALIGN(sizeof(*buf->ts_end)
+ * chan->backend.num_subbuf,
+ 1 << INTERNODE_CACHE_SHIFT),
+ GFP_KERNEL | __GFP_NOWARN,
+ cpu_to_node(max(cpu, 0)));
+ if (!buf->ts_end) {
+ ret = -ENOMEM;
+ goto free_commit_cold;
+ }
+
init_waitqueue_head(&buf->read_wait);
init_waitqueue_head(&buf->write_wait);
raw_spin_lock_init(&buf->raw_tick_nohz_spinlock);
/* Error handling */
free_init:
+ lttng_kvfree(buf->ts_end);
+free_commit_cold:
lttng_kvfree(buf->commit_cold);
free_commit:
lttng_kvfree(buf->commit_hot);
return ret;
}
-static void switch_buffer_timer(unsigned long data)
+static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t)
{
- struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+ struct lib_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer);
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
+ unsigned int flags = 0;
if (!chan->switch_timer_interval || buf->switch_timer_enabled)
return;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- lttng_init_timer_pinned(&buf->switch_timer);
- else
- init_timer(&buf->switch_timer);
+ flags = LTTNG_TIMER_PINNED;
- buf->switch_timer.function = switch_buffer_timer;
+ lttng_timer_setup(&buf->switch_timer, switch_buffer_timer, flags, buf);
buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
- buf->switch_timer.data = (unsigned long)buf;
+
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
add_timer_on(&buf->switch_timer, buf->backend.cpu);
else
add_timer(&buf->switch_timer);
+
buf->switch_timer_enabled = 1;
}
/*
* Polling timer to check the channels for data.
*/
-static void read_buffer_timer(unsigned long data)
+static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t)
{
- struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+ struct lib_ring_buffer *buf = lttng_from_timer(buf, t, read_timer);
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
+ unsigned int flags = 0;
if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
|| !chan->read_timer_interval
return;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- lttng_init_timer_pinned(&buf->read_timer);
- else
- init_timer(&buf->read_timer);
+ flags = LTTNG_TIMER_PINNED;
- buf->read_timer.function = read_buffer_timer;
+ lttng_timer_setup(&buf->read_timer, read_buffer_timer, flags, buf);
buf->read_timer.expires = jiffies + chan->read_timer_interval;
- buf->read_timer.data = (unsigned long)buf;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
add_timer_on(&buf->read_timer, buf->backend.cpu);
else
add_timer(&buf->read_timer);
+
buf->read_timer_enabled = 1;
}
config->cb.buffer_finalize(buf,
chan->backend.priv,
cpu);
- if (buf->backend.allocated)
- lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
smp_wmb();
- ACCESS_ONCE(buf->finalized) = 1;
+ WRITE_ONCE(buf->finalized, 1);
wake_up_interruptible(&buf->read_wait);
}
} else {
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf, chan->backend.priv, -1);
- if (buf->backend.allocated)
- lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
smp_wmb();
- ACCESS_ONCE(buf->finalized) = 1;
+ WRITE_ONCE(buf->finalized, 1);
wake_up_interruptible(&buf->read_wait);
}
- ACCESS_ONCE(chan->finalized) = 1;
+ WRITE_ONCE(chan->finalized, 1);
wake_up_interruptible(&chan->hp_wait);
wake_up_interruptible(&chan->read_wait);
priv = chan->backend.priv;
int finalized;
retry:
- finalized = ACCESS_ONCE(buf->finalized);
+ finalized = READ_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
return -EBUSY;
}
retry:
- finalized = ACCESS_ONCE(buf->finalized);
+ finalized = READ_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
cpu);
}
+#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS
static
-void lib_ring_buffer_print_errors(struct channel *chan,
- struct lib_ring_buffer *buf, int cpu)
+void lib_ring_buffer_print_records_count(struct channel *chan,
+ struct lib_ring_buffer *buf,
+ int cpu)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- void *priv = chan->backend.priv;
if (!strcmp(chan->backend.name, "relay-metadata")) {
printk(KERN_DEBUG "ring buffer %s: %lu records written, "
chan->backend.name, cpu,
v_read(config, &buf->records_count),
v_read(config, &buf->records_overrun));
+ }
+}
+#else
+static
+void lib_ring_buffer_print_records_count(struct channel *chan,
+ struct lib_ring_buffer *buf,
+ int cpu)
+{
+}
+#endif
+
+static
+void lib_ring_buffer_print_errors(struct channel *chan,
+ struct lib_ring_buffer *buf, int cpu)
+{
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+ void *priv = chan->backend.priv;
+ lib_ring_buffer_print_records_count(chan, buf, cpu);
+ if (strcmp(chan->backend.name, "relay-metadata")) {
if (v_read(config, &buf->records_lost_full)
|| v_read(config, &buf->records_lost_wrap)
|| v_read(config, &buf->records_lost_big))
unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
unsigned long commit_count, padding_size, data_size;
struct commit_counters_hot *cc_hot;
+ u64 *ts_end;
data_size = subbuf_offset(offsets->old - 1, chan) + 1;
padding_size = chan->backend.subbuf_size - data_size;
subbuffer_set_data_size(config, &buf->backend, oldidx, data_size);
+ ts_end = &buf->ts_end[oldidx];
/*
- * Order all writes to buffer before the commit count update that will
- * determine that the subbuffer is full.
+ * This is the last space reservation in that sub-buffer before
+ * it gets delivered. This provides exclusive access to write to
+ * this sub-buffer's ts_end. There are also no concurrent
+ * readers of that ts_end because delivery of that sub-buffer is
+ * postponed until the commit counter is incremented for the
+ * current space reservation.
+ */
+ *ts_end = tsc;
+
+ /*
+ * Order all writes to buffer and store to ts_end before the commit
+ * count update that will determine that the subbuffer is full.
*/
if (config->ipi == RING_BUFFER_IPI_BARRIER) {
/*
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
unsigned long endidx, data_size;
+ u64 *ts_end;
endidx = subbuf_index(offsets->end - 1, chan);
data_size = subbuf_offset(offsets->end - 1, chan) + 1;
subbuffer_set_data_size(config, &buf->backend, endidx, data_size);
+ ts_end = &buf->ts_end[endidx];
+ /*
+ * This is the last space reservation in that sub-buffer before
+ * it gets delivered. This provides exclusive access to write to
+ * this sub-buffer's ts_end. There are also no concurrent
+ * readers of that ts_end because delivery of that sub-buffer is
+ * postponed until the commit counter is incremented for the
+ * current space reservation.
+ */
+ *ts_end = tsc;
}
/*
}
/*
- * Taking lock on CPU hotplug to ensure two things: first, that the
+ * Disabling preemption ensures two things: first, that the
* target cpu is not taken concurrently offline while we are within
- * smp_call_function_single() (I don't trust that get_cpu() on the
- * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be
- * confirmed)). Secondly, if it happens that the CPU is not online, our
- * own call to lib_ring_buffer_switch_slow() needs to be protected from
- * CPU hotplug handlers, which can also perform a remote subbuffer
- * switch.
+ * smp_call_function_single(). Secondly, if it happens that the
+ * CPU is not online, our own call to lib_ring_buffer_switch_slow()
+ * needs to be protected from CPU hotplug handlers, which can
+ * also perform a remote subbuffer switch.
*/
- get_online_cpus();
+ preempt_disable();
param.buf = buf;
param.mode = mode;
ret = smp_call_function_single(buf->backend.cpu,
/* Remote CPU is offline, do it ourself. */
lib_ring_buffer_switch_slow(buf, mode);
}
- put_online_cpus();
+ preempt_enable();
}
/* Switch sub-buffer if current sub-buffer is non-empty. */
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote_empty);
+void lib_ring_buffer_clear(struct lib_ring_buffer *buf)
+{
+ struct lib_ring_buffer_backend *bufb = &buf->backend;
+ struct channel *chan = bufb->chan;
+
+ lib_ring_buffer_switch_remote(buf);
+ lib_ring_buffer_clear_reader(buf, chan);
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear);
+
/*
* Returns :
* 0 if ok
if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb,
old_commit_count, old_commit_count + 1)
== old_commit_count)) {
+ u64 *ts_end;
+
/*
* Start of exclusive subbuffer access. We are
* guaranteed to be the last writer in this subbuffer
* and any other writer trying to access this subbuffer
* in this state is required to drop records.
+ *
+ * We can read the ts_end for the current sub-buffer
+ * which has been saved by the very last space
+ * reservation for the current sub-buffer.
+ *
+ * Order increment of commit counter before reading ts_end.
*/
+ smp_mb();
+ ts_end = &buf->ts_end[idx];
deliver_count_events(config, buf, idx);
- config->cb.buffer_end(buf, tsc, idx,
+ config->cb.buffer_end(buf, *ts_end, idx,
lib_ring_buffer_get_data_size(config,
buf,
idx));