X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=92aa78f57ddf2ed95f42c16e23728d32f58291b6;hb=294d35794ac83c4321e4bd3d56317844771517ae;hp=bf0db1163493b0ae3e279f9fe84f18258daa4bd8;hpb=5a8fd222992df9bbb709836e5bf4ca053dd776c3;p=lttng-modules.git diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index bf0db116..92aa78f5 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -1,7 +1,22 @@ /* * ring_buffer_frontend.c * - * (C) Copyright 2005-2010 - Mathieu Desnoyers + * Copyright (C) 2005-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * * * Ring buffer wait-free buffer synchronization. Producer-consumer and flight * recorder (overwrite) modes. See thesis: @@ -34,8 +49,6 @@ * - splice one subbuffer worth of data to a pipe * - splice the data from pipe to disk/network * - put_subbuf - * - * Dual LGPL v2.1/GPL v2 license. */ #include @@ -389,7 +402,7 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) */ static -int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, +int lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, unsigned long action, void *hcpu) { @@ -1173,24 +1186,31 @@ void lib_ring_buffer_print_errors(struct channel *chan, const struct lib_ring_buffer_config *config = &chan->backend.config; void *priv = chan->backend.priv; - printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " - "%lu records overrun\n", - chan->backend.name, cpu, - v_read(config, &buf->records_count), - v_read(config, &buf->records_overrun)); - - if (v_read(config, &buf->records_lost_full) - || v_read(config, &buf->records_lost_wrap) - || v_read(config, &buf->records_lost_big)) - printk(KERN_WARNING - "ring buffer %s, cpu %d: records were lost. Caused by:\n" - " [ %lu buffer full, %lu nest buffer wrap-around, " - "%lu event too big ]\n", - chan->backend.name, cpu, - v_read(config, &buf->records_lost_full), - v_read(config, &buf->records_lost_wrap), - v_read(config, &buf->records_lost_big)); - + if (!strcmp(chan->backend.name, "relay-metadata")) { + printk(KERN_DEBUG "ring buffer %s: %lu records written, " + "%lu records overrun\n", + chan->backend.name, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + } else { + printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " + "%lu records overrun\n", + chan->backend.name, cpu, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + + if (v_read(config, &buf->records_lost_full) + || v_read(config, &buf->records_lost_wrap) + || v_read(config, &buf->records_lost_big)) + printk(KERN_WARNING + "ring buffer %s, cpu %d: records were lost. Caused by:\n" + " [ %lu buffer full, %lu nest buffer wrap-around, " + "%lu event too big ]\n", + chan->backend.name, cpu, + v_read(config, &buf->records_lost_full), + v_read(config, &buf->records_lost_wrap), + v_read(config, &buf->records_lost_big)); + } lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu); } @@ -1404,6 +1424,19 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, */ if (mode == SWITCH_FLUSH || off > 0) { if (unlikely(off == 0)) { + /* + * A final flush that encounters an empty + * sub-buffer cannot switch buffer if a + * reader is located within this sub-buffer. + * Anyway, the purpose of final flushing of a + * sub-buffer at offset 0 is to handle the case + * of entirely empty stream. + */ + if (unlikely(subbuf_trunc(offsets->begin, chan) + - subbuf_trunc((unsigned long) + atomic_long_read(&buf->consumed), chan) + >= chan->backend.buf_size)) + return -1; /* * The client does not save any header information. * Don't switch empty subbuffer on finalize, because it @@ -1483,6 +1516,48 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m } EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); +static void remote_switch(void *info) +{ + struct lib_ring_buffer *buf = info; + + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); +} + +void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf) +{ + struct channel *chan = buf->backend.chan; + const struct lib_ring_buffer_config *config = &chan->backend.config; + int ret; + + /* + * With global synchronization we don't need to use the IPI scheme. + */ + if (config->sync == RING_BUFFER_SYNC_GLOBAL) { + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + return; + } + + /* + * Taking lock on CPU hotplug to ensure two things: first, that the + * target cpu is not taken concurrently offline while we are within + * smp_call_function_single() (I don't trust that get_cpu() on the + * _local_ CPU actually inhibit CPU hotplug for the _remote_ CPU (to be + * confirmed)). Secondly, if it happens that the CPU is not online, our + * own call to lib_ring_buffer_switch_slow() needs to be protected from + * CPU hotplug handlers, which can also perform a remote subbuffer + * switch. + */ + get_online_cpus(); + ret = smp_call_function_single(buf->backend.cpu, + remote_switch, buf, 1); + if (ret) { + /* Remote CPU is offline, do it ourself. */ + lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); + } + put_online_cpus(); +} +EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote); + /* * Returns : * 0 if ok