X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=899071a076e714a220ac29feaa808cb73f9e3934;hb=54f59b3573baa1386d774462d946a6d861b14feb;hp=ea73bcc676351ac76f25eeae85282c1aa17bc25a;hpb=d1a996f5524540ebc8caa7c3c1bd3049018d212c;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index ea73bcc6..899071a0 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -128,15 +128,21 @@ void lib_ring_buffer_print_errors(struct channel *chan, * Handle timer teardown race wrt memory free of private data by * ring buffer signals are handled by a single thread, which permits * a synchronization point between handling of each signal. - * Protected by the ust mutex. + * Protected by the lock within the structure. */ struct timer_signal_data { pthread_t tid; /* thread id managing signals */ int setup_done; int qs_done; + pthread_mutex_t lock; }; -static struct timer_signal_data timer_signal; +static struct timer_signal_data timer_signal = { + .tid = 0, + .setup_done = 0, + .qs_done = 0, + .lock = PTHREAD_MUTEX_INITIALIZER, +}; /** * lib_ring_buffer_reset - Reset ring buffer to initial values. @@ -418,7 +424,6 @@ void *sig_thread(void *arg) } /* - * Called with ust_lock() held. * Ensure only a single thread listens on the timer signal. */ static @@ -427,8 +432,9 @@ void lib_ring_buffer_setup_timer_thread(void) pthread_t thread; int ret; + pthread_mutex_lock(&timer_signal.lock); if (timer_signal.setup_done) - return; + goto end; ret = pthread_create(&thread, NULL, &sig_thread, NULL); if (ret) { @@ -441,11 +447,64 @@ void lib_ring_buffer_setup_timer_thread(void) PERROR("pthread_detach"); } timer_signal.setup_done = 1; +end: + pthread_mutex_unlock(&timer_signal.lock); } /* - * Called with ust_lock() held. + * Wait for signal-handling thread quiescent state. */ +static +void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr) +{ + sigset_t pending_set; + int ret; + + /* + * We need to be the only thread interacting with the thread + * that manages signals for teardown synchronization. + */ + pthread_mutex_lock(&timer_signal.lock); + + /* + * Ensure we don't have any signal queued for this channel. + */ + for (;;) { + ret = sigemptyset(&pending_set); + if (ret == -1) { + PERROR("sigemptyset"); + } + ret = sigpending(&pending_set); + if (ret == -1) { + PERROR("sigpending"); + } + if (!sigismember(&pending_set, signr)) + break; + caa_cpu_relax(); + } + + /* + * From this point, no new signal handler will be fired that + * would try to access "chan". However, we still need to wait + * for any currently executing handler to complete. + */ + cmm_smp_mb(); + CMM_STORE_SHARED(timer_signal.qs_done, 0); + cmm_smp_mb(); + + /* + * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management + * thread wakes up. + */ + kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); + + while (!CMM_LOAD_SHARED(timer_signal.qs_done)) + caa_cpu_relax(); + cmm_smp_mb(); + + pthread_mutex_unlock(&timer_signal.lock); +} + static void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) { @@ -479,13 +538,9 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) } } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) { - sigset_t pending_set; int ret; if (!chan->switch_timer_interval || !chan->switch_timer_enabled) @@ -496,49 +551,12 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) PERROR("timer_delete"); } - /* - * Ensure we don't have any signal queued for this channel. - */ - for (;;) { - ret = sigemptyset(&pending_set); - if (ret == -1) { - PERROR("sigemptyset"); - } - ret = sigpending(&pending_set); - if (ret == -1) { - PERROR("sigpending"); - } - if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_FLUSH)) - break; - caa_cpu_relax(); - } - - /* - * From this point, no new signal handler will be fired that - * would try to access "chan". However, we still need to wait - * for any currently executing handler to complete. - */ - cmm_smp_mb(); - CMM_STORE_SHARED(timer_signal.qs_done, 0); - cmm_smp_mb(); - - /* - * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management - * thread wakes up. - */ - kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); - - while (!CMM_LOAD_SHARED(timer_signal.qs_done)) - caa_cpu_relax(); - cmm_smp_mb(); + lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_FLUSH); chan->switch_timer = 0; chan->switch_timer_enabled = 0; } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_read_timer_start(struct channel *chan) { @@ -574,14 +592,10 @@ void lib_ring_buffer_channel_read_timer_start(struct channel *chan) } } -/* - * Called with ust_lock() held. - */ static void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - sigset_t pending_set; int ret; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -599,42 +613,7 @@ void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) */ lib_ring_buffer_channel_do_read(chan); - - /* - * Ensure we don't have any signal queued for this channel. - */ - for (;;) { - ret = sigemptyset(&pending_set); - if (ret == -1) { - PERROR("sigemptyset"); - } - ret = sigpending(&pending_set); - if (ret == -1) { - PERROR("sigpending"); - } - if (!sigismember(&pending_set, LTTNG_UST_RB_SIG_READ)) - break; - caa_cpu_relax(); - } - - /* - * From this point, no new signal handler will be fired that - * would try to access "chan". However, we still need to wait - * for any currently executing handler to complete. - */ - cmm_smp_mb(); - CMM_STORE_SHARED(timer_signal.qs_done, 0); - cmm_smp_mb(); - - /* - * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management - * thread wakes up. - */ - kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN); - - while (!CMM_LOAD_SHARED(timer_signal.qs_done)) - caa_cpu_relax(); - cmm_smp_mb(); + lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_READ); chan->read_timer = 0; chan->read_timer_enabled = 0; @@ -1128,7 +1107,7 @@ retry: */ if (((commit_count - chan->backend.subbuf_size) & chan->commit_count_mask) - - (buf_trunc(consumed_cur, chan) + - (buf_trunc(consumed, chan) >> chan->backend.num_subbuf_order) != 0) goto nodata; @@ -1137,7 +1116,7 @@ retry: * Check that we are not about to read the same subbuffer in * which the writer head is. */ - if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed_cur, chan) + if (subbuf_trunc(write_offset, chan) - subbuf_trunc(consumed, chan) == 0) goto nodata; @@ -1354,7 +1333,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, - commit_count, oldidx, handle); + commit_count, oldidx, handle, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, offsets->old, commit_count, config->cb.subbuffer_header_size(), @@ -1393,7 +1372,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, oldidx)->cc); commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, oldidx)->cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, - commit_count, oldidx, handle); + commit_count, oldidx, handle, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, offsets->old, commit_count, padding_size, handle); @@ -1429,7 +1408,7 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, beginidx)->cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, - commit_count, beginidx, handle); + commit_count, beginidx, handle, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, beginidx, offsets->begin, commit_count, config->cb.subbuffer_header_size(), @@ -1466,7 +1445,7 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, v_add(config, padding_size, &shmp_index(handle, buf->commit_hot, endidx)->cc); commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, endidx)->cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1, - commit_count, endidx, handle); + commit_count, endidx, handle, tsc); lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, offsets->end, commit_count, padding_size, handle); @@ -1482,10 +1461,11 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - uint64_t *tsc) + uint64_t *tsc, + struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - unsigned long off; + unsigned long off, reserve_commit_diff; offsets->begin = v_read(config, &buf->offset); offsets->old = offsets->begin; @@ -1510,36 +1490,69 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * timestamps) are visible to the reader. This is required for * quiescence guarantees for the fusion merge. */ - if (mode == SWITCH_FLUSH || off > 0) { - if (caa_unlikely(off == 0)) { - /* - * A final flush that encounters an empty - * sub-buffer cannot switch buffer if a - * reader is located within this sub-buffer. - * Anyway, the purpose of final flushing of a - * sub-buffer at offset 0 is to handle the case - * of entirely empty stream. - */ - if (caa_unlikely(subbuf_trunc(offsets->begin, chan) - - subbuf_trunc((unsigned long) - uatomic_read(&buf->consumed), chan) - >= chan->backend.buf_size)) - return -1; - /* - * The client does not save any header information. - * Don't switch empty subbuffer on finalize, because it - * is invalid to deliver a completely empty subbuffer. - */ - if (!config->cb.subbuffer_header_size()) + if (mode != SWITCH_FLUSH && !off) + return -1; /* we do not have to switch : buffer is empty */ + + if (caa_unlikely(off == 0)) { + unsigned long sb_index, commit_count; + + /* + * We are performing a SWITCH_FLUSH. At this stage, there are no + * concurrent writes into the buffer. + * + * The client does not save any header information. Don't + * switch empty subbuffer on finalize, because it is invalid to + * deliver a completely empty subbuffer. + */ + if (!config->cb.subbuffer_header_size()) + return -1; + + /* Test new buffer integrity */ + sb_index = subbuf_index(offsets->begin, chan); + commit_count = v_read(config, + &shmp_index(handle, buf->commit_cold, + sb_index)->cc_sb); + reserve_commit_diff = + (buf_trunc(offsets->begin, chan) + >> chan->backend.num_subbuf_order) + - (commit_count & chan->commit_count_mask); + if (caa_likely(reserve_commit_diff == 0)) { + /* Next subbuffer not being written to. */ + if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE && + subbuf_trunc(offsets->begin, chan) + - subbuf_trunc((unsigned long) + uatomic_read(&buf->consumed), chan) + >= chan->backend.buf_size)) { + /* + * We do not overwrite non consumed buffers + * and we are full : don't switch. + */ return -1; + } else { + /* + * Next subbuffer not being written to, and we + * are either in overwrite mode or the buffer is + * not full. It's safe to write in this new + * subbuffer. + */ + } + } else { /* - * Need to write the subbuffer start header on finalize. + * Next subbuffer reserve offset does not match the + * commit offset. Don't perform switch in + * producer-consumer and overwrite mode. Caused by + * either a writer OOPS or too many nested writes over a + * reserve/commit pair. */ - offsets->switch_old_start = 1; + return -1; } - offsets->begin = subbuf_align(offsets->begin, chan); - } else - return -1; /* we do not have to switch : buffer is empty */ + + /* + * Need to write the subbuffer start header on finalize. + */ + offsets->switch_old_start = 1; + } + offsets->begin = subbuf_align(offsets->begin, chan); /* Note: old points to the next subbuf at offset 0 */ offsets->end = offsets->begin; return 0; @@ -1569,7 +1582,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi */ do { if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets, - &tsc)) + &tsc, handle)) return; /* Switch not needed */ } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end) != offsets.old); @@ -1619,9 +1632,10 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct lttng_ust_shm_handle *handle = ctx->handle; - unsigned long reserve_commit_diff; + unsigned long reserve_commit_diff, offset_cmp; - offsets->begin = v_read(config, &buf->offset); +retry: + offsets->begin = offset_cmp = v_read(config, &buf->offset); offsets->old = offsets->begin; offsets->switch_new_start = 0; offsets->switch_new_end = 0; @@ -1653,7 +1667,7 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, } } if (caa_unlikely(offsets->switch_new_start)) { - unsigned long sb_index; + unsigned long sb_index, commit_count; /* * We are typically not filling the previous buffer completely. @@ -1664,12 +1678,32 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, + config->cb.subbuffer_header_size(); /* Test new buffer integrity */ sb_index = subbuf_index(offsets->begin, chan); + /* + * Read buf->offset before buf->commit_cold[sb_index].cc_sb. + * lib_ring_buffer_check_deliver() has the matching + * memory barriers required around commit_cold cc_sb + * updates to ensure reserve and commit counter updates + * are not seen reordered when updated by another CPU. + */ + cmm_smp_rmb(); + commit_count = v_read(config, + &shmp_index(handle, buf->commit_cold, + sb_index)->cc_sb); + /* Read buf->commit_cold[sb_index].cc_sb before buf->offset. */ + cmm_smp_rmb(); + if (caa_unlikely(offset_cmp != v_read(config, &buf->offset))) { + /* + * The reserve counter have been concurrently updated + * while we read the commit counter. This means the + * commit counter we read might not match buf->offset + * due to concurrent update. We therefore need to retry. + */ + goto retry; + } reserve_commit_diff = (buf_trunc(offsets->begin, chan) >> chan->backend.num_subbuf_order) - - ((unsigned long) v_read(config, - &shmp_index(handle, buf->commit_cold, sb_index)->cc_sb) - & chan->commit_count_mask); + - (commit_count & chan->commit_count_mask); if (caa_likely(reserve_commit_diff == 0)) { /* Next subbuffer not being written to. */ if (caa_unlikely(config->mode != RING_BUFFER_OVERWRITE && @@ -1704,7 +1738,8 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, /* * Next subbuffer reserve offset does not match the - * commit offset. Drop record in producer-consumer and + * commit offset, and this did not involve update to the + * reserve counter. Drop record in producer-consumer and * overwrite mode. Caused by either a writer OOPS or too * many nested writes over a reserve/commit pair. */