-/*
+/* SPDX-License-Identifier: (GPL-2.0 OR LGPL-2.1)
+ *
* ring_buffer_frontend.c
*
* Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
*
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; only
- * version 2.1 of the License.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- *
* Ring buffer wait-free buffer synchronization. Producer-consumer and flight
* recorder (overwrite) modes. See thesis:
*
#include <linux/delay.h>
#include <linux/module.h>
#include <linux/percpu.h>
+#include <asm/cacheflush.h>
#include <wrapper/ringbuffer/config.h>
#include <wrapper/ringbuffer/backend.h>
#include <wrapper/kref.h>
#include <wrapper/percpu-defs.h>
#include <wrapper/timer.h>
+#include <wrapper/vmalloc.h>
/*
* Internal structure representing offsets to use at a sub-buffer switch.
struct channel *chan = buf->backend.chan;
lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu);
- kfree(buf->commit_hot);
- kfree(buf->commit_cold);
+ lttng_kvfree(buf->commit_hot);
+ lttng_kvfree(buf->commit_cold);
lib_ring_buffer_backend_free(&buf->backend);
}
return ret;
buf->commit_hot =
- kzalloc_node(ALIGN(sizeof(*buf->commit_hot)
+ lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_hot)
* chan->backend.num_subbuf,
1 << INTERNODE_CACHE_SHIFT),
GFP_KERNEL | __GFP_NOWARN,
}
buf->commit_cold =
- kzalloc_node(ALIGN(sizeof(*buf->commit_cold)
+ lttng_kvzalloc_node(ALIGN(sizeof(*buf->commit_cold)
* chan->backend.num_subbuf,
1 << INTERNODE_CACHE_SHIFT),
GFP_KERNEL | __GFP_NOWARN,
/* Error handling */
free_init:
- kfree(buf->commit_cold);
+ lttng_kvfree(buf->commit_cold);
free_commit:
- kfree(buf->commit_hot);
+ lttng_kvfree(buf->commit_hot);
free_chanbuf:
lib_ring_buffer_backend_free(&buf->backend);
return ret;
}
-static void switch_buffer_timer(unsigned long data)
+static void switch_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t)
{
- struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+ struct lib_ring_buffer *buf = lttng_from_timer(buf, t, switch_timer);
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
+ unsigned int flags = 0;
if (!chan->switch_timer_interval || buf->switch_timer_enabled)
return;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- lttng_init_timer_pinned(&buf->switch_timer);
- else
- init_timer(&buf->switch_timer);
+ flags = LTTNG_TIMER_PINNED;
- buf->switch_timer.function = switch_buffer_timer;
+ lttng_timer_setup(&buf->switch_timer, switch_buffer_timer, flags, buf);
buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
- buf->switch_timer.data = (unsigned long)buf;
+
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
add_timer_on(&buf->switch_timer, buf->backend.cpu);
else
add_timer(&buf->switch_timer);
+
buf->switch_timer_enabled = 1;
}
/*
* Polling timer to check the channels for data.
*/
-static void read_buffer_timer(unsigned long data)
+static void read_buffer_timer(LTTNG_TIMER_FUNC_ARG_TYPE t)
{
- struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data;
+ struct lib_ring_buffer *buf = lttng_from_timer(buf, t, read_timer);
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
+ unsigned int flags;
if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
|| !chan->read_timer_interval
return;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
- lttng_init_timer_pinned(&buf->read_timer);
- else
- init_timer(&buf->read_timer);
+ flags = LTTNG_TIMER_PINNED;
- buf->read_timer.function = read_buffer_timer;
+ lttng_timer_setup(&buf->read_timer, read_buffer_timer, flags, buf);
buf->read_timer.expires = jiffies + chan->read_timer_interval;
- buf->read_timer.data = (unsigned long)buf;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
add_timer_on(&buf->read_timer, buf->backend.cpu);
else
add_timer(&buf->read_timer);
+
buf->read_timer_enabled = 1;
}
buf->read_timer_enabled = 0;
}
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+
+enum cpuhp_state lttng_rb_hp_prepare;
+enum cpuhp_state lttng_rb_hp_online;
+
+void lttng_rb_set_hp_prepare(enum cpuhp_state val)
+{
+ lttng_rb_hp_prepare = val;
+}
+EXPORT_SYMBOL_GPL(lttng_rb_set_hp_prepare);
+
+void lttng_rb_set_hp_online(enum cpuhp_state val)
+{
+ lttng_rb_hp_online = val;
+}
+EXPORT_SYMBOL_GPL(lttng_rb_set_hp_online);
+
+int lttng_cpuhp_rb_frontend_dead(unsigned int cpu,
+ struct lttng_cpuhp_node *node)
+{
+ struct channel *chan = container_of(node, struct channel,
+ cpuhp_prepare);
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ /*
+ * Performing a buffer switch on a remote CPU. Performed by
+ * the CPU responsible for doing the hotunplug after the target
+ * CPU stopped running completely. Ensures that all data
+ * from that remote CPU is flushed.
+ */
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_dead);
+
+int lttng_cpuhp_rb_frontend_online(unsigned int cpu,
+ struct lttng_cpuhp_node *node)
+{
+ struct channel *chan = container_of(node, struct channel,
+ cpuhp_online);
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ wake_up_interruptible(&chan->hp_wait);
+ lib_ring_buffer_start_switch_timer(buf);
+ lib_ring_buffer_start_read_timer(buf);
+ return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_online);
+
+int lttng_cpuhp_rb_frontend_offline(unsigned int cpu,
+ struct lttng_cpuhp_node *node)
+{
+ struct channel *chan = container_of(node, struct channel,
+ cpuhp_online);
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu);
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL);
+
+ lib_ring_buffer_stop_switch_timer(buf);
+ lib_ring_buffer_stop_read_timer(buf);
+ return 0;
+}
+EXPORT_SYMBOL_GPL(lttng_cpuhp_rb_frontend_offline);
+
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
#ifdef CONFIG_HOTPLUG_CPU
+
/**
* lib_ring_buffer_cpu_hp_callback - CPU hotplug callback
* @nb: notifier block
return NOTIFY_DONE;
}
}
+
#endif
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
/*
* For per-cpu buffers, call the reader wakeups before switching the buffer, so
static void channel_unregister_notifiers(struct channel *chan)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- int cpu;
channel_iterator_unregister_notifiers(chan);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
* concurrency.
*/
#endif /* CONFIG_NO_HZ */
-#ifdef CONFIG_HOTPLUG_CPU
- get_online_cpus();
- chan->cpu_hp_enable = 0;
- for_each_online_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+ {
+ int ret;
+
+ ret = cpuhp_state_remove_instance(lttng_rb_hp_online,
+ &chan->cpuhp_online.node);
+ WARN_ON(ret);
+ ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
+ &chan->cpuhp_prepare.node);
+ WARN_ON(ret);
}
- put_online_cpus();
- unregister_cpu_notifier(&chan->cpu_hp_notifier);
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+ {
+ int cpu;
+
+#ifdef CONFIG_HOTPLUG_CPU
+ get_online_cpus();
+ chan->cpu_hp_enable = 0;
+ for_each_online_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ lib_ring_buffer_stop_switch_timer(buf);
+ lib_ring_buffer_stop_read_timer(buf);
+ }
+ put_online_cpus();
+ unregister_cpu_notifier(&chan->cpu_hp_notifier);
#else
- for_each_possible_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- lib_ring_buffer_stop_switch_timer(buf);
- lib_ring_buffer_stop_read_timer(buf);
- }
+ for_each_possible_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ lib_ring_buffer_stop_switch_timer(buf);
+ lib_ring_buffer_stop_read_timer(buf);
+ }
#endif
+ }
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
} else {
struct lib_ring_buffer *buf = chan->backend.buf;
size_t num_subbuf, unsigned int switch_timer_interval,
unsigned int read_timer_interval)
{
- int ret, cpu;
+ int ret;
struct channel *chan;
if (lib_ring_buffer_check_config(config, switch_timer_interval,
init_waitqueue_head(&chan->hp_wait);
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+ chan->cpuhp_prepare.component = LTTNG_RING_BUFFER_FRONTEND;
+ ret = cpuhp_state_add_instance_nocalls(lttng_rb_hp_prepare,
+ &chan->cpuhp_prepare.node);
+ if (ret)
+ goto cpuhp_prepare_error;
+
+ chan->cpuhp_online.component = LTTNG_RING_BUFFER_FRONTEND;
+ ret = cpuhp_state_add_instance(lttng_rb_hp_online,
+ &chan->cpuhp_online.node);
+ if (ret)
+ goto cpuhp_online_error;
+#else /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+ {
+ int cpu;
+ /*
+ * In case of non-hotplug cpu, if the ring-buffer is allocated
+ * in early initcall, it will not be notified of secondary cpus.
+ * In that off case, we need to allocate for all possible cpus.
+ */
+#ifdef CONFIG_HOTPLUG_CPU
+ chan->cpu_hp_notifier.notifier_call =
+ lib_ring_buffer_cpu_hp_callback;
+ chan->cpu_hp_notifier.priority = 6;
+ register_cpu_notifier(&chan->cpu_hp_notifier);
+
+ get_online_cpus();
+ for_each_online_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ lib_ring_buffer_start_switch_timer(buf);
+ lib_ring_buffer_start_read_timer(buf);
+ spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ }
+ chan->cpu_hp_enable = 1;
+ put_online_cpus();
+#else
+ for_each_possible_cpu(cpu) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+ spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ lib_ring_buffer_start_switch_timer(buf);
+ lib_ring_buffer_start_read_timer(buf);
+ spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
+ }
+#endif
+ }
+#endif /* #else #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
+
#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER)
/* Only benefit from NO_HZ idle with per-cpu buffers for now. */
chan->tick_nohz_notifier.notifier_call =
&chan->tick_nohz_notifier);
#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */
- /*
- * In case of non-hotplug cpu, if the ring-buffer is allocated
- * in early initcall, it will not be notified of secondary cpus.
- * In that off case, we need to allocate for all possible cpus.
- */
-#ifdef CONFIG_HOTPLUG_CPU
- chan->cpu_hp_notifier.notifier_call =
- lib_ring_buffer_cpu_hp_callback;
- chan->cpu_hp_notifier.priority = 6;
- register_cpu_notifier(&chan->cpu_hp_notifier);
-
- get_online_cpus();
- for_each_online_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
- spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
- }
- chan->cpu_hp_enable = 1;
- put_online_cpus();
-#else
- for_each_possible_cpu(cpu) {
- struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
- cpu);
- spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu));
- lib_ring_buffer_start_switch_timer(buf);
- lib_ring_buffer_start_read_timer(buf);
- spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu));
- }
-#endif
} else {
struct lib_ring_buffer *buf = chan->backend.buf;
return chan;
+#if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0))
+cpuhp_online_error:
+ ret = cpuhp_state_remove_instance_nocalls(lttng_rb_hp_prepare,
+ &chan->cpuhp_prepare.node);
+ WARN_ON(ret);
+cpuhp_prepare_error:
+#endif /* #if (LINUX_VERSION_CODE >= KERNEL_VERSION(4,10,0)) */
error_free_backend:
channel_backend_free(&chan->backend);
error:
* Perform flush before writing to finalized.
*/
smp_wmb();
- ACCESS_ONCE(buf->finalized) = 1;
+ WRITE_ONCE(buf->finalized, 1);
wake_up_interruptible(&buf->read_wait);
}
} else {
* Perform flush before writing to finalized.
*/
smp_wmb();
- ACCESS_ONCE(buf->finalized) = 1;
+ WRITE_ONCE(buf->finalized, 1);
wake_up_interruptible(&buf->read_wait);
}
- ACCESS_ONCE(chan->finalized) = 1;
+ WRITE_ONCE(chan->finalized, 1);
wake_up_interruptible(&chan->hp_wait);
wake_up_interruptible(&chan->read_wait);
priv = chan->backend.priv;
int finalized;
retry:
- finalized = ACCESS_ONCE(buf->finalized);
+ finalized = READ_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot);
+/**
+ * Performs the same function as lib_ring_buffer_snapshot(), but the positions
+ * are saved regardless of whether the consumed and produced positions are
+ * in the same subbuffer.
+ * @buf: ring buffer
+ * @consumed: consumed byte count indicating the last position read
+ * @produced: produced byte count indicating the last position written
+ *
+ * This function is meant to provide information on the exact producer and
+ * consumer positions without regard for the "snapshot" feature.
+ */
+int lib_ring_buffer_snapshot_sample_positions(struct lib_ring_buffer *buf,
+ unsigned long *consumed, unsigned long *produced)
+{
+ struct channel *chan = buf->backend.chan;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ smp_rmb();
+ *consumed = atomic_long_read(&buf->consumed);
+ /*
+ * No need to issue a memory barrier between consumed count read and
+ * write offset read, because consumed count can only change
+ * concurrently in overwrite mode, and we keep a sequence counter
+ * identifier derived from the write offset to check we are getting
+ * the same sub-buffer we are expecting (the sub-buffers are atomically
+ * "tagged" upon writes, tags are checked upon read).
+ */
+ *produced = v_read(config, &buf->offset);
+ return 0;
+}
+
/**
* lib_ring_buffer_put_snapshot - move consumed counter forward
*
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer);
+#if ARCH_IMPLEMENTS_FLUSH_DCACHE_PAGE
+static void lib_ring_buffer_flush_read_subbuf_dcache(
+ const struct lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lib_ring_buffer *buf)
+{
+ struct lib_ring_buffer_backend_pages *pages;
+ unsigned long sb_bindex, id, i, nr_pages;
+
+ if (config->output != RING_BUFFER_MMAP)
+ return;
+
+ /*
+ * Architectures with caches aliased on virtual addresses may
+ * use different cache lines for the linear mapping vs
+ * user-space memory mapping. Given that the ring buffer is
+ * based on the kernel linear mapping, aligning it with the
+ * user-space mapping is not straightforward, and would require
+ * extra TLB entries. Therefore, simply flush the dcache for the
+ * entire sub-buffer before reading it.
+ */
+ id = buf->backend.buf_rsb.id;
+ sb_bindex = subbuffer_id_get_index(config, id);
+ pages = buf->backend.array[sb_bindex];
+ nr_pages = buf->backend.num_pages_per_subbuf;
+ for (i = 0; i < nr_pages; i++) {
+ struct lib_ring_buffer_backend_page *backend_page;
+
+ backend_page = &pages->p[i];
+ flush_dcache_page(pfn_to_page(backend_page->pfn));
+ }
+}
+#else
+static void lib_ring_buffer_flush_read_subbuf_dcache(
+ const struct lib_ring_buffer_config *config,
+ struct channel *chan,
+ struct lib_ring_buffer *buf)
+{
+}
+#endif
+
/**
* lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading
* @buf: ring buffer
return -EBUSY;
}
retry:
- finalized = ACCESS_ONCE(buf->finalized);
+ finalized = READ_ONCE(buf->finalized);
/*
* Read finalized before counters.
*/
buf->get_subbuf_consumed = consumed;
buf->get_subbuf = 1;
+ lib_ring_buffer_flush_read_subbuf_dcache(config, chan, buf);
+
return 0;
nodata:
/*
* lib_ring_buffer_switch_old_start: Populate old subbuffer header.
*
- * Only executed by SWITCH_FLUSH, which can be issued while tracing is active
- * or at buffer finalization (destroy).
+ * Only executed when the buffer is finalized, in SWITCH_FLUSH.
*/
static
void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
unsigned long sb_index, commit_count;
/*
- * We are performing a SWITCH_FLUSH. There may be concurrent
- * writes into the buffer if e.g. invoked while performing a
- * snapshot on an active trace.
+ * We are performing a SWITCH_FLUSH. At this stage, there are no
+ * concurrent writes into the buffer.
*
- * If the client does not save any header information (sub-buffer
- * header size == 0), don't switch empty subbuffer on finalize,
- * because it is invalid to deliver a completely empty
- * subbuffer.
+ * The client does not save any header information. Don't
+ * switch empty subbuffer on finalize, because it is invalid to
+ * deliver a completely empty subbuffer.
*/
if (!config->cb.subbuffer_header_size())
return -1;
}
/*
- * Taking lock on CPU hotplug to ensure two things: first, that the
+ * Disabling preemption ensures two things: first, that the
* target cpu is not taken concurrently offline while we are within
- * smp_call_function_single() (I don't trust that get_cpu() on the
- * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be
- * confirmed)). Secondly, if it happens that the CPU is not online, our
- * own call to lib_ring_buffer_switch_slow() needs to be protected from
- * CPU hotplug handlers, which can also perform a remote subbuffer
- * switch.
+ * smp_call_function_single(). Secondly, if it happens that the
+ * CPU is not online, our own call to lib_ring_buffer_switch_slow()
+ * needs to be protected from CPU hotplug handlers, which can
+ * also perform a remote subbuffer switch.
*/
- get_online_cpus();
+ preempt_disable();
param.buf = buf;
param.mode = mode;
ret = smp_call_function_single(buf->backend.cpu,
/* Remote CPU is offline, do it ourself. */
lib_ring_buffer_switch_slow(buf, mode);
}
- put_online_cpus();
+ preempt_enable();
}
+/* Switch sub-buffer if current sub-buffer is non-empty. */
void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
{
_lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf,
struct channel *chan,
struct switch_offsets *offsets,
- struct lib_ring_buffer_ctx *ctx)
+ struct lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
unsigned long reserve_commit_diff, offset_cmp;
offsets->size = config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
* -EIO for other errors, else returns 0.
* It will take care of sub-buffer switching.
*/
-int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx)
+int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
struct channel *chan = ctx->chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
do {
ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
- ctx);
+ ctx, client_ctx);
if (unlikely(ret))
return ret;
} while (unlikely(v_cmpxchg(config, &buf->offset, offsets.old,