/*
* ring_buffer_frontend.c
*
- * (C) Copyright 2005-2010 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; only
+ * version 2.1 of the License.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ *
*
* Ring buffer wait-free buffer synchronization. Producer-consumer and flight
* recorder (overwrite) modes. See thesis:
* - splice one subbuffer worth of data to a pipe
* - splice the data from pipe to disk/network
* - put_subbuf
- *
- * Dual LGPL v2.1/GPL v2 license.
*/
#include <linux/delay.h>
* Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD)
*/
static
-int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
+int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb,
unsigned long action,
void *hcpu)
{
*/
if (mode == SWITCH_FLUSH || off > 0) {
if (unlikely(off == 0)) {
+ /*
+ * A final flush that encounters an empty
+ * sub-buffer cannot switch buffer if a
+ * reader is located within this sub-buffer.
+ * Anyway, the purpose of final flushing of a
+ * sub-buffer at offset 0 is to handle the case
+ * of entirely empty stream.
+ */
+ if (unlikely(subbuf_trunc(offsets->begin, chan)
+ - subbuf_trunc((unsigned long)
+ atomic_long_read(&buf->consumed), chan)
+ >= chan->backend.buf_size))
+ return -1;
/*
* The client does not save any header information.
* Don't switch empty subbuffer on finalize, because it
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow);
+static void remote_switch(void *info)
+{
+ struct lib_ring_buffer *buf = info;
+
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+}
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+ struct channel *chan = buf->backend.chan;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+ int ret;
+
+ /*
+ * With global synchronization we don't need to use the IPI scheme.
+ */
+ if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ return;
+ }
+
+ /*
+ * Taking lock on CPU hotplug to ensure two things: first, that the
+ * target cpu is not taken concurrently offline while we are within
+ * smp_call_function_single() (I don't trust that get_cpu() on the
+ * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be
+ * confirmed)). Secondly, if it happens that the CPU is not online, our
+ * own call to lib_ring_buffer_switch_slow() needs to be protected from
+ * CPU hotplug handlers, which can also perform a remote subbuffer
+ * switch.
+ */
+ get_online_cpus();
+ ret = smp_call_function_single(buf->backend.cpu,
+ remote_switch, buf, 1);
+ if (ret) {
+ /* Remote CPU is offline, do it ourself. */
+ lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ }
+ put_online_cpus();
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
+
/*
* Returns :
* 0 if ok
struct lib_ring_buffer_ctx *ctx)
{
const struct lib_ring_buffer_config *config = &chan->backend.config;
- unsigned long reserve_commit_diff;
+ unsigned long reserve_commit_diff, offset_cmp;
- offsets->begin = v_read(config, &buf->offset);
+retry:
+ offsets->begin = offset_cmp = v_read(config, &buf->offset);
offsets->old = offsets->begin;
offsets->switch_new_start = 0;
offsets->switch_new_end = 0;
}
}
if (unlikely(offsets->switch_new_start)) {
- unsigned long sb_index;
+ unsigned long sb_index, commit_count;
/*
* We are typically not filling the previous buffer completely.
+ config->cb.subbuffer_header_size();
/* Test new buffer integrity */
sb_index = subbuf_index(offsets->begin, chan);
+ /*
+ * Read buf->offset before buf->commit_cold[sb_index].cc_sb.
+ * lib_ring_buffer_check_deliver() has the matching
+ * memory barriers required around commit_cold cc_sb
+ * updates to ensure reserve and commit counter updates
+ * are not seen reordered when updated by another CPU.
+ */
+ smp_rmb();
+ commit_count = v_read(config,
+ &buf->commit_cold[sb_index].cc_sb);
+ /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */
+ smp_rmb();
+ if (unlikely(offset_cmp != v_read(config, &buf->offset))) {
+ /*
+ * The reserve counter have been concurrently updated
+ * while we read the commit counter. This means the
+ * commit counter we read might not match buf->offset
+ * due to concurrent update. We therefore need to retry.
+ */
+ goto retry;
+ }
reserve_commit_diff =
(buf_trunc(offsets->begin, chan)
>> chan->backend.num_subbuf_order)
- - ((unsigned long) v_read(config,
- &buf->commit_cold[sb_index].cc_sb)
- & chan->commit_count_mask);
+ - (commit_count & chan->commit_count_mask);
if (likely(reserve_commit_diff == 0)) {
/* Next subbuffer not being written to. */
if (unlikely(config->mode != RING_BUFFER_OVERWRITE &&
} else {
/*
* Next subbuffer reserve offset does not match the
- * commit offset. Drop record in producer-consumer and
- * overwrite mode. Caused by either a writer OOPS or too
- * many nested writes over a reserve/commit pair.
+ * commit offset, and this did not involve update to the
+ * reserve counter. Drop record in producer-consumer and
+ * overwrite mode. Caused by either a writer OOPS or
+ * too many nested writes over a reserve/commit pair.
*/
v_inc(config, &buf->records_lost_wrap);
return -EIO;