.lock = PTHREAD_MUTEX_INITIALIZER,
};
-int lttng_ust_blocking_retry_timeout =
- CONFIG_LTTNG_UST_DEFAULT_BLOCKING_RETRY_TIMEOUT_MS;
+static bool lttng_ust_allow_blocking;
-void lttng_ust_ringbuffer_set_retry_timeout(int timeout)
+void lttng_ust_ringbuffer_set_allow_blocking(void)
{
- lttng_ust_blocking_retry_timeout = timeout;
+ lttng_ust_allow_blocking = true;
+}
+
+/* Get blocking timeout, in ms */
+static int lttng_ust_ringbuffer_get_timeout(struct channel *chan)
+{
+ if (!lttng_ust_allow_blocking)
+ return 0;
+ return chan->u.s.blocking_timeout_ms;
}
/**
for (i = 0; i < chan->backend.num_subbuf; i++) {
struct commit_counters_hot *cc_hot;
struct commit_counters_cold *cc_cold;
+ uint64_t *ts_end;
cc_hot = shmp_index(handle, buf->commit_hot, i);
if (!cc_hot)
cc_cold = shmp_index(handle, buf->commit_cold, i);
if (!cc_cold)
return;
+ ts_end = shmp_index(handle, buf->ts_end, i);
+ if (!ts_end)
+ return;
v_set(config, &cc_hot->cc, 0);
v_set(config, &cc_hot->seq, 0);
v_set(config, &cc_cold->cc_sb, 0);
+ *ts_end = 0;
}
uatomic_set(&buf->consumed, 0);
uatomic_set(&buf->record_disabled, 0);
goto free_commit;
}
+ align_shm(shmobj, __alignof__(uint64_t));
+ set_shmp(buf->ts_end,
+ zalloc_shm(shmobj,
+ sizeof(uint64_t) * chan->backend.num_subbuf));
+ if (!shmp(handle, buf->ts_end)) {
+ ret = -ENOMEM;
+ goto free_commit_cold;
+ }
+
+
ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
cpu, handle, shmobj);
if (ret) {
/* Error handling */
free_init:
+ /* ts_end will be freed by shm teardown */
+free_commit_cold:
/* commit_cold will be freed by shm teardown */
free_commit:
/* commit_hot will be freed by shm teardown */
void *buf_addr, size_t subbuf_size,
size_t num_subbuf, unsigned int switch_timer_interval,
unsigned int read_timer_interval,
- const int *stream_fds, int nr_stream_fds)
+ const int *stream_fds, int nr_stream_fds,
+ int64_t blocking_timeout)
{
int ret;
size_t shmsize, chansize;
struct lttng_ust_shm_handle *handle;
struct shm_object *shmobj;
unsigned int nr_streams;
+ int64_t blocking_timeout_ms;
if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
nr_streams = num_possible_cpus();
if (nr_stream_fds != nr_streams)
return NULL;
+ if (blocking_timeout < -1) {
+ return NULL;
+ }
+ /* usec to msec */
+ if (blocking_timeout == -1) {
+ blocking_timeout_ms = -1;
+ } else {
+ blocking_timeout_ms = blocking_timeout / 1000;
+ if (blocking_timeout_ms != (int32_t) blocking_timeout_ms) {
+ return NULL;
+ }
+ }
+
if (lib_ring_buffer_check_config(config, switch_timer_interval,
read_timer_interval))
return NULL;
/* Allocate normal memory for channel (not shared) */
shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM,
- -1);
+ -1, -1);
if (!shmobj)
goto error_append;
/* struct channel is at object 0, offset 0 (hardcoded) */
*priv_data = NULL;
}
+ chan->u.s.blocking_timeout_ms = (int32_t) blocking_timeout_ms;
+
ret = channel_backend_init(&chan->backend, name, config,
subbuf_size, num_subbuf, handle,
stream_fds);
{
struct channel *chan;
const struct lttng_ust_lib_ring_buffer_config *config;
- unsigned long consumed_cur, write_offset;
chan = shmp(handle, buf->backend.chan);
if (!chan)
unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
unsigned long commit_count, padding_size, data_size;
struct commit_counters_hot *cc_hot;
+ uint64_t *ts_end;
data_size = subbuf_offset(offsets->old - 1, chan) + 1;
padding_size = chan->backend.subbuf_size - data_size;
subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
handle);
+ ts_end = shmp_index(handle, buf->ts_end, oldidx);
+ if (!ts_end)
+ return;
/*
- * Order all writes to buffer before the commit count update that will
- * determine that the subbuffer is full.
+ * This is the last space reservation in that sub-buffer before
+ * it gets delivered. This provides exclusive access to write to
+ * this sub-buffer's ts_end. There are also no concurrent
+ * readers of that ts_end because delivery of that sub-buffer is
+ * postponed until the commit counter is incremented for the
+ * current space reservation.
+ */
+ *ts_end = tsc;
+
+ /*
+ * Order all writes to buffer and store to ts_end before the commit
+ * count update that will determine that the subbuffer is full.
*/
cmm_smp_wmb();
cc_hot = shmp_index(handle, buf->commit_hot, oldidx);
{
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
unsigned long endidx, data_size;
+ uint64_t *ts_end;
endidx = subbuf_index(offsets->end - 1, chan);
data_size = subbuf_offset(offsets->end - 1, chan) + 1;
subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
handle);
+ ts_end = shmp_index(handle, buf->ts_end, endidx);
+ if (!ts_end)
+ return;
+ /*
+ * This is the last space reservation in that sub-buffer before
+ * it gets delivered. This provides exclusive access to write to
+ * this sub-buffer's ts_end. There are also no concurrent
+ * readers of that ts_end because delivery of that sub-buffer is
+ * postponed until the commit counter is incremented for the
+ * current space reservation.
+ */
+ *ts_end = tsc;
}
/*
* Force a sub-buffer switch. This operation is completely reentrant : can be
* called while tracing is active with absolutely no lock held.
*
- * Note, however, that as a v_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
+ * For RING_BUFFER_SYNC_PER_CPU ring buffers, as a v_cmpxchg is used for
+ * some atomic operations, this function must be called from the CPU
+ * which owns the buffer for a ACTIVE flush. However, for
+ * RING_BUFFER_SYNC_GLOBAL ring buffers, this function can be called
+ * from any CPU.
*/
void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
struct lttng_ust_shm_handle *handle)
int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
struct channel *chan,
struct switch_offsets *offsets,
- struct lttng_ust_lib_ring_buffer_ctx *ctx)
+ struct lttng_ust_lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle = ctx->handle;
unsigned long reserve_commit_diff, offset_cmp;
- int timeout_left_ms = lttng_ust_blocking_retry_timeout;
+ int timeout_left_ms = lttng_ust_ringbuffer_get_timeout(chan);
retry:
offsets->begin = offset_cmp = v_read(config, &buf->offset);
offsets->size = config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
config->cb.record_header_size(config, chan,
offsets->begin,
&offsets->pre_header_padding,
- ctx);
+ ctx, client_ctx);
offsets->size +=
lib_ring_buffer_align(offsets->begin + offsets->size,
ctx->largest_align)
* -EIO for other errors, else returns 0.
* It will take care of sub-buffer switching.
*/
-int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx)
+int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
+ void *client_ctx)
{
struct channel *chan = ctx->chan;
struct lttng_ust_shm_handle *handle = ctx->handle;
do {
ret = lib_ring_buffer_try_reserve_slow(buf, chan, &offsets,
- ctx);
+ ctx, client_ctx);
if (caa_unlikely(ret))
return ret;
} while (caa_unlikely(v_cmpxchg(config, &buf->offset, offsets.old,
if (caa_likely(v_cmpxchg(config, &cc_cold->cc_sb,
old_commit_count, old_commit_count + 1)
== old_commit_count)) {
+ uint64_t *ts_end;
+
/*
* Start of exclusive subbuffer access. We are
* guaranteed to be the last writer in this subbuffer
* and any other writer trying to access this subbuffer
* in this state is required to drop records.
+ *
+ * We can read the ts_end for the current sub-buffer
+ * which has been saved by the very last space
+ * reservation for the current sub-buffer.
+ *
+ * Order increment of commit counter before reading ts_end.
*/
+ cmm_smp_mb();
+ ts_end = shmp_index(handle, buf->ts_end, idx);
+ if (!ts_end)
+ return;
deliver_count_events(config, buf, idx, handle);
- config->cb.buffer_end(buf, tsc, idx,
+ config->cb.buffer_end(buf, *ts_end, idx,
lib_ring_buffer_get_data_size(config,
buf,
idx,