#include "../../wrapper/ringbuffer/frontend.h"
#include "../../wrapper/ringbuffer/iterator.h"
#include "../../wrapper/ringbuffer/nohz.h"
+#include "../../wrapper/atomic.h"
+#include "../../wrapper/percpu-defs.h"
/*
* Internal structure representing offsets to use at a sub-buffer switch.
static
void lib_ring_buffer_print_errors(struct channel *chan,
struct lib_ring_buffer *buf, int cpu);
+static
+void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode);
/*
* Must be called under cpu hotplug protection.
raw_spin_unlock(&buf->raw_tick_nohz_spinlock);
break;
case TICK_NOHZ_STOP:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_stop_switch_timer(buf);
lib_ring_buffer_stop_read_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
case TICK_NOHZ_RESTART:
- spin_lock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_lock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
lib_ring_buffer_start_read_timer(buf);
lib_ring_buffer_start_switch_timer(buf);
- spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock));
+ spin_unlock(lttng_this_cpu_ptr(&ring_buffer_nohz_lock));
break;
}
channel_backend_unregister_notifiers(&chan->backend);
}
+static void lib_ring_buffer_set_quiescent(struct lib_ring_buffer *buf)
+{
+ if (!buf->quiescent) {
+ buf->quiescent = true;
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+ }
+}
+
+static void lib_ring_buffer_clear_quiescent(struct lib_ring_buffer *buf)
+{
+ buf->quiescent = false;
+}
+
+void lib_ring_buffer_set_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_set_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_set_quiescent_channel);
+
+void lib_ring_buffer_clear_quiescent_channel(struct channel *chan)
+{
+ int cpu;
+ const struct lib_ring_buffer_config *config = &chan->backend.config;
+
+ if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+ get_online_cpus();
+ for_each_channel_cpu(cpu, chan) {
+ struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf,
+ cpu);
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+ put_online_cpus();
+ } else {
+ struct lib_ring_buffer *buf = chan->backend.buf;
+
+ lib_ring_buffer_clear_quiescent(buf);
+ }
+}
+EXPORT_SYMBOL_GPL(lib_ring_buffer_clear_quiescent_channel);
+
static void channel_free(struct channel *chan)
{
if (chan->backend.release_priv_ops) {
chan->backend.priv,
cpu);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (config->cb.buffer_finalize)
config->cb.buffer_finalize(buf, chan->backend.priv, -1);
if (buf->backend.allocated)
- lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH);
+ lib_ring_buffer_set_quiescent(buf);
/*
* Perform flush before writing to finalized.
*/
if (!atomic_long_add_unless(&buf->active_readers, 1, 1))
return -EBUSY;
kref_get(&chan->ref);
- smp_mb__after_atomic_inc();
+ lttng_smp_mb__after_atomic();
return 0;
}
EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read);
struct channel *chan = buf->backend.chan;
CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1);
- smp_mb__before_atomic_dec();
+ lttng_smp_mb__before_atomic();
atomic_long_dec(&buf->active_readers);
kref_put(&chan->ref, channel_release);
}
unsigned long consumed_cur, write_offset;
int finalized;
+ /*
+ * First, ensure we perform a "final" flush onto the stream. This will
+ * ensure we create a packet of padding if we encounter an empty
+ * packet. This ensures the time-stamps right before the snapshot is
+ * used as end of packet timestamp.
+ */
+ if (!buf->quiescent)
+ _lib_ring_buffer_switch_remote(buf, SWITCH_FLUSH);
+
retry:
finalized = ACCESS_ONCE(buf->finalized);
/*
/*
* lib_ring_buffer_switch_old_start: Populate old subbuffer header.
*
- * Only executed when the buffer is finalized, in SWITCH_FLUSH.
+ * Only executed by SWITCH_FLUSH, which can be issued while tracing is active
+ * or at buffer finalization (destroy).
*/
static
void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf,
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->old + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
commit_count, oldidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx,
- offsets->old, commit_count,
- padding_size);
+ offsets->old + padding_size, commit_count);
}
/*
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
commit_count, beginidx, tsc);
lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx,
- offsets->begin, commit_count,
- config->cb.subbuffer_header_size());
+ offsets->begin + config->cb.subbuffer_header_size(),
+ commit_count);
}
/*
unsigned long sb_index, commit_count;
/*
- * We are performing a SWITCH_FLUSH. At this stage, there are no
- * concurrent writes into the buffer.
+ * We are performing a SWITCH_FLUSH. There may be concurrent
+ * writes into the buffer if e.g. invoked while performing a
+ * snapshot on an active trace.
*
- * The client does not save any header information. Don't
- * switch empty subbuffer on finalize, because it is invalid to
- * deliver a completely empty subbuffer.
+ * If the client does not save any header information (sub-buffer
+ * header size == 0), don't switch empty subbuffer on finalize,
+ * because it is invalid to deliver a completely empty
+ * subbuffer.
*/
if (!config->cb.subbuffer_header_size())
return -1;
lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
}
-void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+static void _lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf,
+ enum switch_mode mode)
{
struct channel *chan = buf->backend.chan;
const struct lib_ring_buffer_config *config = &chan->backend.config;
* With global synchronization we don't need to use the IPI scheme.
*/
if (config->sync == RING_BUFFER_SYNC_GLOBAL) {
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
return;
}
remote_switch, buf, 1);
if (ret) {
/* Remote CPU is offline, do it ourself. */
- lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE);
+ lib_ring_buffer_switch_slow(buf, mode);
}
put_online_cpus();
}
+
+void lib_ring_buffer_switch_remote(struct lib_ring_buffer *buf)
+{
+ _lib_ring_buffer_switch_remote(buf, SWITCH_ACTIVE);
+}
EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_remote);
/*