X-Git-Url: http://git.liburcu.org/?p=lttng-modules.git;a=blobdiff_plain;f=lib%2Fringbuffer%2Fring_buffer_frontend.c;h=19b0036546c87723ebdc7027f0696f06d6fcbaa2;hp=4a8919b354ea1599ce9299d6b8f184d10d9f8dd9;hb=a1817dcb28265d0f8db10c6bbe98fe8db2c53a8a;hpb=2230bf699820d6cc70492353600fcca409ea366c diff --git a/lib/ringbuffer/ring_buffer_frontend.c b/lib/ringbuffer/ring_buffer_frontend.c index 4a8919b3..19b00365 100644 --- a/lib/ringbuffer/ring_buffer_frontend.c +++ b/lib/ringbuffer/ring_buffer_frontend.c @@ -150,6 +150,7 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); lttng_kvfree(buf->commit_hot); lttng_kvfree(buf->commit_cold); + lttng_kvfree(buf->ts_end); lib_ring_buffer_backend_free(&buf->backend); } @@ -179,6 +180,7 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) v_set(config, &buf->commit_hot[i].cc, 0); v_set(config, &buf->commit_hot[i].seq, 0); v_set(config, &buf->commit_cold[i].cc_sb, 0); + buf->ts_end[i] = 0; } atomic_long_set(&buf->consumed, 0); atomic_set(&buf->record_disabled, 0); @@ -267,6 +269,17 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, goto free_commit; } + buf->ts_end = + lttng_kvzalloc_node(ALIGN(sizeof(*buf->ts_end) + * chan->backend.num_subbuf, + 1 << INTERNODE_CACHE_SHIFT), + GFP_KERNEL | __GFP_NOWARN, + cpu_to_node(max(cpu, 0))); + if (!buf->ts_end) { + ret = -ENOMEM; + goto free_commit_cold; + } + init_waitqueue_head(&buf->read_wait); init_waitqueue_head(&buf->write_wait); raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); @@ -306,6 +319,8 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, /* Error handling */ free_init: + lttng_kvfree(buf->ts_end); +free_commit_cold: lttng_kvfree(buf->commit_cold); free_commit: lttng_kvfree(buf->commit_hot); @@ -1596,14 +1611,26 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; struct commit_counters_hot *cc_hot; + u64 *ts_end; data_size = subbuf_offset(offsets->old - 1, chan) + 1; padding_size = chan->backend.subbuf_size - data_size; subbuffer_set_data_size(config, &buf->backend, oldidx, data_size); + ts_end = &buf->ts_end[oldidx]; /* - * Order all writes to buffer before the commit count update that will - * determine that the subbuffer is full. + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; + + /* + * Order all writes to buffer and store to ts_end before the commit + * count update that will determine that the subbuffer is full. */ if (config->ipi == RING_BUFFER_IPI_BARRIER) { /* @@ -1684,10 +1711,21 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, { const struct lib_ring_buffer_config *config = &chan->backend.config; unsigned long endidx, data_size; + u64 *ts_end; endidx = subbuf_index(offsets->end - 1, chan); data_size = subbuf_offset(offsets->end - 1, chan) + 1; subbuffer_set_data_size(config, &buf->backend, endidx, data_size); + ts_end = &buf->ts_end[endidx]; + /* + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; } /* @@ -2249,14 +2287,24 @@ void lib_ring_buffer_check_deliver_slow(const struct lib_ring_buffer_config *con if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { + u64 *ts_end; + /* * Start of exclusive subbuffer access. We are * guaranteed to be the last writer in this subbuffer * and any other writer trying to access this subbuffer * in this state is required to drop records. + * + * We can read the ts_end for the current sub-buffer + * which has been saved by the very last space + * reservation for the current sub-buffer. + * + * Order increment of commit counter before reading ts_end. */ + smp_mb(); + ts_end = &buf->ts_end[idx]; deliver_count_events(config, buf, idx); - config->cb.buffer_end(buf, tsc, idx, + config->cb.buffer_end(buf, *ts_end, idx, lib_ring_buffer_get_data_size(config, buf, idx));