/*
* buffers.h
+ * LTTng userspace tracer buffering system
*
* Copyright (C) 2009 - Pierre-Marc Fournier (pierre-marc dot fournier at polymtl dot ca)
* Copyright (C) 2008 - Mathieu Desnoyers (mathieu.desnoyers@polymtl.ca)
*
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
*/
#ifndef _UST_BUFFERS_H
#define _UST_BUFFERS_H
-#include <kcompat/kref.h>
#include <assert.h>
+
+#include <ust/core.h>
+
+#include "usterr.h"
#include "channels.h"
#include "tracerconst.h"
#include "tracercore.h"
#include "header-inline.h"
-#include <usterr.h>
-/***** SHOULD BE REMOVED ***** */
+/***** FIXME: SHOULD BE REMOVED ***** */
/*
* BUFFER_TRUNC zeroes the subbuffer offset and the subbuffer number parts of
/**************************************/
struct commit_counters {
- local_t cc;
- local_t cc_sb; /* Incremented _once_ at sb switch */
+ long cc; /* ATOMIC */
+ long cc_sb; /* ATOMIC - Incremented _once_ at sb switch */
};
struct ust_buffer {
/* First 32 bytes cache-hot cacheline */
- local_t offset; /* Current offset in the buffer */
+ long offset; /* Current offset in the buffer *atomic* */
struct commit_counters *commit_count; /* Commit count per sub-buffer */
- atomic_long_t consumed; /*
- * Current offset in the buffer
- * standard atomic access (shared)
- */
+ long consumed; /* Current offset in the buffer *atomic* access (shared) */
unsigned long last_tsc; /*
* Last timestamp written in the buffer.
*/
/* End of first 32 bytes cacheline */
- atomic_long_t active_readers; /*
- * Active readers count
- * standard atomic access (shared)
- */
- local_t events_lost;
- local_t corrupted_subbuffers;
+ long active_readers; /* ATOMIC - Active readers count standard atomic access (shared) */
+ long events_lost; /* ATOMIC */
+ long corrupted_subbuffers; /* *ATOMIC* */
/* one byte is written to this pipe when data is available, in order
to wake the consumer */
/* portability: Single byte writes must be as quick as possible. The kernel-side
unsigned int cpu;
/* commit count per subbuffer; must be at end of struct */
- local_t commit_seq[0] ____cacheline_aligned;
+ long commit_seq[0] ____cacheline_aligned; /* ATOMIC */
} ____cacheline_aligned;
/*
*/
enum force_switch_mode { FORCE_ACTIVE, FORCE_FLUSH };
-extern int ltt_reserve_slot_lockless_slow(struct ust_trace *trace,
- struct ust_channel *ltt_channel, void **transport_data,
- size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
- unsigned int *rflags, int largest_align, int cpu);
+extern int ltt_reserve_slot_lockless_slow(struct ust_channel *chan,
+ struct ust_trace *trace, size_t data_size,
+ int largest_align, int cpu,
+ struct ust_buffer **ret_buf,
+ size_t *slot_size, long *buf_offset,
+ u64 *tsc, unsigned int *rflags);
extern void ltt_force_switch_lockless_slow(struct ust_buffer *buf,
enum force_switch_mode mode);
long consumed_old, consumed_new;
do {
- consumed_old = atomic_long_read(&buf->consumed);
+ consumed_old = uatomic_read(&buf->consumed);
/*
* If buffer is in overwrite mode, push the reader consumed
* count if the write position has reached it and we are not
consumed_new = SUBBUF_ALIGN(consumed_old, buf->chan);
else
return;
- } while (unlikely(atomic_long_cmpxchg(&buf->consumed, consumed_old,
+ } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old,
consumed_new) != consumed_old));
}
struct ust_buffer *buf,
long commit_count, long idx)
{
- local_set(&buf->commit_seq[idx], commit_count);
+ uatomic_set(&buf->commit_seq[idx], commit_count);
}
static __inline__ void ltt_check_deliver(struct ust_channel *chan,
* value without adding a add_return atomic operation to the
* fast path.
*/
- if (likely(local_cmpxchg(&buf->commit_count[idx].cc_sb,
+ if (likely(uatomic_cmpxchg(&buf->commit_count[idx].cc_sb,
old_commit_count, commit_count)
== old_commit_count)) {
int result;
{
long consumed_old, consumed_idx, commit_count, write_offset;
- consumed_old = atomic_long_read(&buf->consumed);
+ consumed_old = uatomic_read(&buf->consumed);
consumed_idx = SUBBUF_INDEX(consumed_old, buf->chan);
- commit_count = local_read(&buf->commit_count[consumed_idx].cc_sb);
+ commit_count = uatomic_read(&buf->commit_count[consumed_idx].cc_sb);
/*
* No memory barrier here, since we are only interested
* in a statistically correct polling result. The next poll will
* get the data is we are racing. The mb() that ensures correct
* memory order is in get_subbuf.
*/
- write_offset = local_read(&buf->offset);
+ write_offset = uatomic_read(&buf->offset);
/*
* Check that the subbuffer we are trying to consume has been
long *o_begin, long *o_end, long *o_old,
size_t *before_hdr_pad, size_t *size)
{
- *o_begin = local_read(&buf->offset);
+ *o_begin = uatomic_read(&buf->offset);
*o_old = *o_begin;
*tsc = trace_clock_read64();
return 0;
}
-static __inline__ int ltt_reserve_slot(struct ust_trace *trace,
- struct ust_channel *chan, void **transport_data,
- size_t data_size, size_t *slot_size, long *buf_offset, u64 *tsc,
- unsigned int *rflags, int largest_align, int cpu)
+static __inline__ int ltt_reserve_slot(struct ust_channel *chan,
+ struct ust_trace *trace, size_t data_size,
+ int largest_align, int cpu,
+ struct ust_buffer **ret_buf,
+ size_t *slot_size, long *buf_offset, u64 *tsc,
+ unsigned int *rflags)
{
- struct ust_buffer *buf = chan->buf[cpu];
+ struct ust_buffer *buf = *ret_buf = chan->buf[cpu];
long o_begin, o_end, o_old;
size_t before_hdr_pad;
/*
* Perform retryable operations.
*/
- /* FIXME: make this rellay per cpu? */
+ /* FIXME: make this really per cpu? */
if (unlikely(LOAD_SHARED(ltt_nesting) > 4)) {
DBG("Dropping event because nesting is too deep.");
- local_inc(&buf->events_lost);
+ uatomic_inc(&buf->events_lost);
return -EPERM;
}
&before_hdr_pad, slot_size)))
goto slow_path;
- if (unlikely(local_cmpxchg(&buf->offset, o_old, o_end) != o_old))
+ if (unlikely(uatomic_cmpxchg(&buf->offset, o_old, o_end) != o_old))
goto slow_path;
/*
*buf_offset = o_begin + before_hdr_pad;
return 0;
slow_path:
- return ltt_reserve_slot_lockless_slow(trace, chan,
- transport_data, data_size, slot_size, buf_offset, tsc,
- rflags, largest_align, cpu);
+ return ltt_reserve_slot_lockless_slow(chan, trace, data_size,
+ largest_align, cpu, ret_buf,
+ slot_size, buf_offset, tsc,
+ rflags);
}
/*
* Force a sub-buffer switch for a per-cpu buffer. This operation is
* completely reentrant : can be called while tracing is active with
* absolutely no lock held.
- *
- * Note, however, that as a local_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
*/
static __inline__ void ltt_force_switch(struct ust_buffer *buf,
enum force_switch_mode mode)
if (unlikely(SUBBUF_OFFSET(offset - commit_count, buf->chan)))
return;
- commit_seq_old = local_read(&buf->commit_seq[idx]);
+ commit_seq_old = uatomic_read(&buf->commit_seq[idx]);
while (commit_seq_old < commit_count)
- commit_seq_old = local_cmpxchg(&buf->commit_seq[idx],
+ commit_seq_old = uatomic_cmpxchg(&buf->commit_seq[idx],
commit_seq_old, commit_count);
DBG("commit_seq for channel %s_%d, subbuf %ld is now %ld", buf->chan->channel_name, buf->cpu, idx, commit_count);
long endidx = SUBBUF_INDEX(offset_end - 1, chan);
long commit_count;
-#ifdef LTT_NO_IPI_BARRIER
smp_wmb();
-#else
- /*
- * Must write slot data before incrementing commit count.
- * This compiler barrier is upgraded into a smp_mb() by the IPI
- * sent by get_subbuf().
- */
- barrier();
-#endif
- local_add(slot_size, &buf->commit_count[endidx].cc);
+
+ uatomic_add(&buf->commit_count[endidx].cc, slot_size);
/*
* commit count read can race with concurrent OOO commit count updates.
* This is only needed for ltt_check_deliver (for non-polling delivery
* - Multiple delivery for the same sub-buffer (which is handled
* gracefully by the reader code) if the value is for a full
* sub-buffer. It's important that we can never miss a sub-buffer
- * delivery. Re-reading the value after the local_add ensures this.
+ * delivery. Re-reading the value after the uatomic_add ensures this.
* - Reading a commit_count with a higher value that what was actually
* added to it for the ltt_write_commit_counter call (again caused by
* a concurrent committer). It does not matter, because this function
* reserve offset for a specific sub-buffer, which is completely
* independent of the order.
*/
- commit_count = local_read(&buf->commit_count[endidx].cc);
+ commit_count = uatomic_read(&buf->commit_count[endidx].cc);
ltt_check_deliver(chan, buf, offset_end - 1, commit_count, endidx);
/*
ltt_write_commit_counter(chan, buf, endidx, buf_offset, commit_count, data_size);
}
-void _ust_buffers_write(struct ust_buffer *buf, size_t offset,
- const void *src, size_t len, ssize_t cpy);
+void _ust_buffers_strncpy_fixup(struct ust_buffer *buf, size_t offset,
+ size_t len, size_t copied, int terminated);
static __inline__ int ust_buffers_write(struct ust_buffer *buf, size_t offset,
const void *src, size_t len)
{
- size_t cpy;
size_t buf_offset = BUFFER_OFFSET(offset, buf->chan);
assert(buf_offset < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+ assert(buf_offset + len < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
- cpy = min_t(size_t, len, buf->buf_size - buf_offset);
- ust_buffers_do_copy(buf->buf_data + buf_offset, src, cpy);
+ ust_buffers_do_copy(buf->buf_data + buf_offset, src, len);
- if (unlikely(len != cpy))
- _ust_buffers_write(buf, buf_offset, src, len, cpy);
return len;
}
-int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed);
-int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old);
+/*
+ * ust_buffers_do_memset - write character into dest.
+ * @dest: destination
+ * @src: source character
+ * @len: length to write
+ */
+static __inline__
+void ust_buffers_do_memset(void *dest, char src, size_t len)
+{
+ /*
+ * What we really want here is an __inline__ memset, but we
+ * don't have constants, so gcc generally uses a function call.
+ */
+ for (; len > 0; len--)
+ *(u8 *)dest++ = src;
+}
+
+/*
+ * ust_buffers_do_strncpy - copy a string up to a certain number of bytes
+ * @dest: destination
+ * @src: source
+ * @len: max. length to copy
+ * @terminated: output string ends with \0 (output)
+ *
+ * returns the number of bytes copied. Does not finalize with \0 if len is
+ * reached.
+ */
+static __inline__
+size_t ust_buffers_do_strncpy(void *dest, const void *src, size_t len,
+ int *terminated)
+{
+ size_t orig_len = len;
+
+ *terminated = 0;
+ /*
+ * What we really want here is an __inline__ strncpy, but we
+ * don't have constants, so gcc generally uses a function call.
+ */
+ for (; len > 0; len--) {
+ *(u8 *)dest = LOAD_SHARED(*(const u8 *)src);
+ /* Check with dest, because src may be modified concurrently */
+ if (*(const u8 *)dest == '\0') {
+ len--;
+ *terminated = 1;
+ break;
+ }
+ dest++;
+ src++;
+ }
+ return orig_len - len;
+}
+
+static __inline__
+int ust_buffers_strncpy(struct ust_buffer *buf, size_t offset, const void *src,
+ size_t len)
+{
+ size_t buf_offset = BUFFER_OFFSET(offset, buf->chan);
+ ssize_t copied;
+ int terminated;
+
+ assert(buf_offset < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+ assert(buf_offset + len < buf->chan->subbuf_size*buf->chan->subbuf_cnt);
+
+ copied = ust_buffers_do_strncpy(buf->buf_data + buf_offset,
+ src, len, &terminated);
+ if (unlikely(copied < len || !terminated))
+ _ust_buffers_strncpy_fixup(buf, offset, len, copied,
+ terminated);
+ return len;
+}
+
+extern int ust_buffers_get_subbuf(struct ust_buffer *buf, long *consumed);
+extern int ust_buffers_put_subbuf(struct ust_buffer *buf, unsigned long uconsumed_old);
+
+extern void init_ustrelay_transport(void);
#endif /* _UST_BUFFERS_H */