summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
bee7cf7)
Sampling the discarded events count in the buffer_end callback is done
out of order, and may therefore include increments performed by following
events (in following packets) if the thread doing the end-of-packet
event write is preempted for a long time.
Sampling the event discarded counts before reserving space for the last
event in a packet, and keeping this as part of the private ring buffer
context, should fix this race.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: Ib59b634bbaefd2444751547d20a891c9dd93cd73
static void client_buffer_end(struct lttng_ust_ring_buffer *buf,
uint64_t tsc __attribute__((unused)),
unsigned int subbuf_idx, unsigned long data_size,
static void client_buffer_end(struct lttng_ust_ring_buffer *buf,
uint64_t tsc __attribute__((unused)),
unsigned int subbuf_idx, unsigned long data_size,
- struct lttng_ust_shm_handle *handle)
+ struct lttng_ust_shm_handle *handle,
+ const struct lttng_ust_ring_buffer_ctx *ctx)
{
struct lttng_ust_ring_buffer_channel *chan = shmp(handle, buf->backend.chan);
struct metadata_packet_header *header =
{
struct lttng_ust_ring_buffer_channel *chan = shmp(handle, buf->backend.chan);
struct metadata_packet_header *header =
* We do not care about the records lost count, because the metadata
* channel waits and retry.
*/
* We do not care about the records lost count, because the metadata
* channel waits and retry.
*/
- (void) lib_ring_buffer_get_records_lost_full(&client_config, buf);
- records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
- records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
+ (void) lib_ring_buffer_get_records_lost_full(&client_config, ctx);
+ records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx);
+ records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx);
WARN_ON_ONCE(records_lost != 0);
}
WARN_ON_ONCE(records_lost != 0);
}
*/
static void client_buffer_end(struct lttng_ust_ring_buffer *buf, uint64_t tsc,
unsigned int subbuf_idx, unsigned long data_size,
*/
static void client_buffer_end(struct lttng_ust_ring_buffer *buf, uint64_t tsc,
unsigned int subbuf_idx, unsigned long data_size,
- struct lttng_ust_shm_handle *handle)
+ struct lttng_ust_shm_handle *handle,
+ const struct lttng_ust_ring_buffer_ctx *ctx)
{
struct lttng_ust_ring_buffer_channel *chan = shmp(handle, buf->backend.chan);
struct packet_header *header =
{
struct lttng_ust_ring_buffer_channel *chan = shmp(handle, buf->backend.chan);
struct packet_header *header =
header->ctx.packet_size =
(uint64_t) LTTNG_UST_PAGE_ALIGN(data_size) * CHAR_BIT; /* in bits */
header->ctx.packet_size =
(uint64_t) LTTNG_UST_PAGE_ALIGN(data_size) * CHAR_BIT; /* in bits */
- records_lost += lib_ring_buffer_get_records_lost_full(&client_config, buf);
- records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, buf);
- records_lost += lib_ring_buffer_get_records_lost_big(&client_config, buf);
+ records_lost += lib_ring_buffer_get_records_lost_full(&client_config, ctx);
+ records_lost += lib_ring_buffer_get_records_lost_wrap(&client_config, ctx);
+ records_lost += lib_ring_buffer_get_records_lost_big(&client_config, ctx);
header->ctx.events_discarded = records_lost;
}
header->ctx.events_discarded = records_lost;
}
static inline
unsigned long lib_ring_buffer_get_records_lost_full(
static inline
unsigned long lib_ring_buffer_get_records_lost_full(
- const struct lttng_ust_ring_buffer_config *config,
- struct lttng_ust_ring_buffer *buf)
+ const struct lttng_ust_ring_buffer_config *config __attribute__((unused)),
+ const struct lttng_ust_ring_buffer_ctx *ctx)
- return v_read(config, &buf->records_lost_full);
+ return ctx->priv->records_lost_full;
}
static inline
unsigned long lib_ring_buffer_get_records_lost_wrap(
}
static inline
unsigned long lib_ring_buffer_get_records_lost_wrap(
- const struct lttng_ust_ring_buffer_config *config,
- struct lttng_ust_ring_buffer *buf)
+ const struct lttng_ust_ring_buffer_config *config __attribute__((unused)),
+ const struct lttng_ust_ring_buffer_ctx *ctx)
- return v_read(config, &buf->records_lost_wrap);
+ return ctx->priv->records_lost_wrap;
}
static inline
unsigned long lib_ring_buffer_get_records_lost_big(
}
static inline
unsigned long lib_ring_buffer_get_records_lost_big(
- const struct lttng_ust_ring_buffer_config *config,
- struct lttng_ust_ring_buffer *buf)
+ const struct lttng_ust_ring_buffer_config *config __attribute__((unused)),
+ const struct lttng_ust_ring_buffer_ctx *ctx)
- return v_read(config, &buf->records_lost_big);
+ return ctx->priv->records_lost_big;
commit_count = v_read(config, &cc_hot->cc);
lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
commit_count = v_read(config, &cc_hot->cc);
lib_ring_buffer_check_deliver(config, buf, chan, offset_end - 1,
- commit_count, endidx, handle, ctx_private->tsc);
+ commit_count, endidx, handle, ctx);
/*
* Update used size at each commit. It's needed only for extracting
* ring_buffer buffers from vmcore, after crash.
/*
* Update used size at each commit. It's needed only for extracting
* ring_buffer buffers from vmcore, after crash.
unsigned long commit_count,
unsigned long idx,
struct lttng_ust_shm_handle *handle,
unsigned long commit_count,
unsigned long idx,
struct lttng_ust_shm_handle *handle,
+ const struct lttng_ust_ring_buffer_ctx *ctx)
__attribute__((visibility("hidden")));
/* Buffer write helpers */
__attribute__((visibility("hidden")));
/* Buffer write helpers */
unsigned long commit_count,
unsigned long idx,
struct lttng_ust_shm_handle *handle,
unsigned long commit_count,
unsigned long idx,
struct lttng_ust_shm_handle *handle,
+ const struct lttng_ust_ring_buffer_ctx *ctx)
{
unsigned long old_commit_count = commit_count
- chan->backend.subbuf_size;
{
unsigned long old_commit_count = commit_count
- chan->backend.subbuf_size;
if (caa_unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
- (old_commit_count & chan->commit_count_mask) == 0))
lib_ring_buffer_check_deliver_slow(config, buf, chan, offset,
if (caa_unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order)
- (old_commit_count & chan->commit_count_mask) == 0))
lib_ring_buffer_check_deliver_slow(config, buf, chan, offset,
- commit_count, idx, handle, tsc);
+ commit_count, idx, handle, ctx);
*/
uint64_t tsc; /* time-stamp counter value */
unsigned int rflags; /* reservation flags */
*/
uint64_t tsc; /* time-stamp counter value */
unsigned int rflags; /* reservation flags */
struct lttng_ust_ring_buffer *buf; /*
* buffer corresponding to processor id
* for this channel
*/
struct lttng_ust_ring_buffer_backend_pages *backend_pages;
struct lttng_ust_ring_buffer *buf; /*
* buffer corresponding to processor id
* for this channel
*/
struct lttng_ust_ring_buffer_backend_pages *backend_pages;
+
+ /*
+ * Records lost counts are only loaded into these fields before
+ * reserving the last bytes from the ring buffer.
+ */
+ unsigned long records_lost_full;
+ unsigned long records_lost_wrap;
+ unsigned long records_lost_big;
void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
+ const struct lttng_ust_ring_buffer_ctx *ctx,
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
unsigned long commit_count;
struct commit_counters_hot *cc_hot;
unsigned long commit_count;
struct commit_counters_hot *cc_hot;
- config->cb.buffer_begin(buf, tsc, oldidx, handle);
+ config->cb.buffer_begin(buf, ctx->priv->tsc, oldidx, handle);
/*
* Order all writes to buffer before the commit count update that will
/*
* Order all writes to buffer before the commit count update that will
commit_count = v_read(config, &cc_hot->cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
commit_count = v_read(config, &cc_hot->cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
- commit_count, oldidx, handle, tsc);
+ commit_count, oldidx, handle, ctx);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->old + config->cb.subbuffer_header_size(),
commit_count, handle, cc_hot);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->old + config->cb.subbuffer_header_size(),
commit_count, handle, cc_hot);
void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
+ const struct lttng_ust_ring_buffer_ctx *ctx,
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
* postponed until the commit counter is incremented for the
* current space reservation.
*/
* postponed until the commit counter is incremented for the
* current space reservation.
*/
+ *ts_end = ctx->priv->tsc;
/*
* Order all writes to buffer and store to ts_end before the commit
/*
* Order all writes to buffer and store to ts_end before the commit
v_add(config, padding_size, &cc_hot->cc);
commit_count = v_read(config, &cc_hot->cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
v_add(config, padding_size, &cc_hot->cc);
commit_count = v_read(config, &cc_hot->cc);
lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
- commit_count, oldidx, handle, tsc);
+ commit_count, oldidx, handle, ctx);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->old + padding_size, commit_count, handle,
cc_hot);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->old + padding_size, commit_count, handle,
cc_hot);
void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
+ const struct lttng_ust_ring_buffer_ctx *ctx,
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
unsigned long commit_count;
struct commit_counters_hot *cc_hot;
unsigned long commit_count;
struct commit_counters_hot *cc_hot;
- config->cb.buffer_begin(buf, tsc, beginidx, handle);
+ config->cb.buffer_begin(buf, ctx->priv->tsc, beginidx, handle);
/*
* Order all writes to buffer before the commit count update that will
/*
* Order all writes to buffer before the commit count update that will
commit_count = v_read(config, &cc_hot->cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
commit_count = v_read(config, &cc_hot->cc);
/* Check if the written buffer has to be delivered */
lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
- commit_count, beginidx, handle, tsc);
+ commit_count, beginidx, handle, ctx);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->begin + config->cb.subbuffer_header_size(),
commit_count, handle, cc_hot);
lib_ring_buffer_write_commit_counter(config, buf, chan,
offsets->begin + config->cb.subbuffer_header_size(),
commit_count, handle, cc_hot);
void lib_ring_buffer_switch_new_end(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
void lib_ring_buffer_switch_new_end(struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
+ const struct lttng_ust_ring_buffer_ctx *ctx,
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
* postponed until the commit counter is incremented for the
* current space reservation.
*/
* postponed until the commit counter is incremented for the
* current space reservation.
*/
+ *ts_end = ctx->priv->tsc;
struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
struct lttng_ust_ring_buffer *buf,
struct lttng_ust_ring_buffer_channel *chan,
struct switch_offsets *offsets,
+ struct lttng_ust_ring_buffer_ctx *ctx,
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
struct lttng_ust_shm_handle *handle)
{
const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
offsets->switch_old_start = 0;
off = subbuf_offset(offsets->begin, chan);
offsets->switch_old_start = 0;
off = subbuf_offset(offsets->begin, chan);
- *tsc = config->cb.ring_buffer_clock_read(chan);
+ ctx->priv->tsc = config->cb.ring_buffer_clock_read(chan);
/*
* Ensure we flush the header of an empty subbuffer when doing the
/*
* Ensure we flush the header of an empty subbuffer when doing the
offsets->begin = subbuf_align(offsets->begin, chan);
/* Note: old points to the next subbuf at offset 0 */
offsets->end = offsets->begin;
offsets->begin = subbuf_align(offsets->begin, chan);
/* Note: old points to the next subbuf at offset 0 */
offsets->end = offsets->begin;
+ /*
+ * Populate the records lost counters prior to performing a
+ * sub-buffer switch.
+ */
+ ctx->priv->records_lost_full = v_read(config, &buf->records_lost_full);
+ ctx->priv->records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+ ctx->priv->records_lost_big = v_read(config, &buf->records_lost_big);
{
struct lttng_ust_ring_buffer_channel *chan;
const struct lttng_ust_ring_buffer_config *config;
{
struct lttng_ust_ring_buffer_channel *chan;
const struct lttng_ust_ring_buffer_config *config;
+ struct lttng_ust_ring_buffer_ctx_private ctx_priv;
+ struct lttng_ust_ring_buffer_ctx ctx;
struct switch_offsets offsets;
unsigned long oldidx;
struct switch_offsets offsets;
unsigned long oldidx;
chan = shmp(handle, buf->backend.chan);
if (!chan)
return;
chan = shmp(handle, buf->backend.chan);
if (!chan)
return;
*/
do {
if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
*/
do {
if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
return; /* Switch not needed */
} while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
!= offsets.old);
return; /* Switch not needed */
} while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
!= offsets.old);
* records, never the opposite (missing a full TSC record when it would
* be needed).
*/
* records, never the opposite (missing a full TSC record when it would
* be needed).
*/
- save_last_tsc(config, buf, tsc);
+ save_last_tsc(config, buf, ctx.priv->tsc);
/*
* Push the reader if necessary
/*
* Push the reader if necessary
* May need to populate header start on SWITCH_FLUSH.
*/
if (offsets.switch_old_start) {
* May need to populate header start on SWITCH_FLUSH.
*/
if (offsets.switch_old_start) {
- lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
+ lib_ring_buffer_switch_old_start(buf, chan, &offsets, &ctx, handle);
offsets.old += config->cb.subbuffer_header_size();
}
/*
* Switch old subbuffer.
*/
offsets.old += config->cb.subbuffer_header_size();
}
/*
* Switch old subbuffer.
*/
- lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
+ lib_ring_buffer_switch_old_end(buf, chan, &offsets, &ctx, handle);
*/
offsets->switch_new_end = 1; /* For offsets->begin */
}
*/
offsets->switch_new_end = 1; /* For offsets->begin */
}
+ /*
+ * Populate the records lost counters when the space reservation
+ * may cause a sub-buffer switch.
+ */
+ if (offsets->switch_new_end || offsets->switch_old_end) {
+ ctx_private->records_lost_full = v_read(config, &buf->records_lost_full);
+ ctx_private->records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+ ctx_private->records_lost_big = v_read(config, &buf->records_lost_big);
+ }
lib_ring_buffer_clear_noref(config, &buf->backend,
subbuf_index(offsets.old - 1, chan),
handle);
lib_ring_buffer_clear_noref(config, &buf->backend,
subbuf_index(offsets.old - 1, chan),
handle);
- lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx_private->tsc, handle);
+ lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx, handle);
}
/*
* Populate new subbuffer.
*/
if (caa_unlikely(offsets.switch_new_start))
}
/*
* Populate new subbuffer.
*/
if (caa_unlikely(offsets.switch_new_start))
- lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx_private->tsc, handle);
+ lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx, handle);
if (caa_unlikely(offsets.switch_new_end))
if (caa_unlikely(offsets.switch_new_end))
- lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx_private->tsc, handle);
+ lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx, handle);
ctx_private->slot_size = offsets.size;
ctx_private->pre_offset = offsets.begin;
ctx_private->slot_size = offsets.size;
ctx_private->pre_offset = offsets.begin;
unsigned long commit_count,
unsigned long idx,
struct lttng_ust_shm_handle *handle,
unsigned long commit_count,
unsigned long idx,
struct lttng_ust_shm_handle *handle,
- uint64_t tsc __attribute__((unused)))
+ const struct lttng_ust_ring_buffer_ctx *ctx)
{
unsigned long old_commit_count = commit_count
- chan->backend.subbuf_size;
{
unsigned long old_commit_count = commit_count
- chan->backend.subbuf_size;
/*
* Increment the packet counter while we have exclusive
/*
* Increment the packet counter while we have exclusive
struct lttng_ust_shm_handle *handle);
void (*buffer_end) (struct lttng_ust_ring_buffer *buf, uint64_t tsc,
unsigned int subbuf_idx, unsigned long data_size,
struct lttng_ust_shm_handle *handle);
void (*buffer_end) (struct lttng_ust_ring_buffer *buf, uint64_t tsc,
unsigned int subbuf_idx, unsigned long data_size,
- struct lttng_ust_shm_handle *handle);
+ struct lttng_ust_shm_handle *handle,
+ const struct lttng_ust_ring_buffer_ctx *ctx);
/* Optional callbacks (can be set to NULL) */
/* Optional callbacks (can be set to NULL) */