X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libringbuffer%2Ffrontend_internal.h;h=00b9508d4df54768767fbe0aaff401cb0f965b0d;hb=8f62fbe3204b1f7c6c91401c38f2d30cfc644906;hp=de705d5b1226d4caaddaeed57b4a2f6f2103e064;hpb=74d81a6cca2cd4a7718bba9368f382f9f2fbba84;p=lttng-ust.git diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index de705d5b..00b9508d 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -32,6 +32,7 @@ */ #include +#include #include #include @@ -299,6 +300,78 @@ int lib_ring_buffer_reserve_committed(const struct lttng_ust_lib_ring_buffer_con - (commit_count & chan->commit_count_mask) == 0); } +static inline +void lib_ring_buffer_wakeup(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) +{ + int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); + sigset_t sigpipe_set, pending_set, old_set; + int ret, sigpipe_was_pending = 0; + + if (wakeup_fd < 0) + return; + + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the consumer), the consumer should perform + * the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) check if there is data in the buffer. + * 3) wait on the pipe (poll). + * + * Discard the SIGPIPE from write(), not disturbing any SIGPIPE + * that might be already pending. If a bogus SIGPIPE is sent to + * the entire process concurrently by a malicious user, it may + * be simply discarded. + */ + ret = sigemptyset(&pending_set); + assert(!ret); + /* + * sigpending returns the mask of signals that are _both_ + * blocked for the thread _and_ pending for either the thread or + * the entire process. + */ + ret = sigpending(&pending_set); + assert(!ret); + sigpipe_was_pending = sigismember(&pending_set, SIGPIPE); + /* + * If sigpipe was pending, it means it was already blocked, so + * no need to block it. + */ + if (!sigpipe_was_pending) { + ret = sigemptyset(&sigpipe_set); + assert(!ret); + ret = sigaddset(&sigpipe_set, SIGPIPE); + assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &sigpipe_set, &old_set); + assert(!ret); + } + do { + ret = write(wakeup_fd, "", 1); + } while (ret == -1L && errno == EINTR); + if (ret == -1L && errno == EPIPE && !sigpipe_was_pending) { + struct timespec timeout = { 0, 0 }; + do { + ret = sigtimedwait(&sigpipe_set, NULL, + &timeout); + } while (ret == -1L && errno == EINTR); + } + if (!sigpipe_was_pending) { + ret = pthread_sigmask(SIG_SETMASK, &old_set, NULL); + assert(!ret); + } +} + +/* + * Receive end of subbuffer TSC as parameter. It has been read in the + * space reservation loop of either reserve or switch, which ensures it + * progresses monotonically with event records in the buffer. Therefore, + * it ensures that the end timestamp of a subbuffer is <= begin + * timestamp of the following subbuffers. + */ static inline void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer *buf, @@ -306,11 +379,11 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config unsigned long offset, unsigned long commit_count, unsigned long idx, - struct lttng_ust_shm_handle *handle) + struct lttng_ust_shm_handle *handle, + uint64_t tsc) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; - uint64_t tsc; /* Check if all commits have been done */ if (caa_unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) @@ -341,6 +414,12 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config * The subbuffer size is least 2 bytes (minimum size: 1 page). * This guarantees that old_commit_count + 1 != commit_count. */ + + /* + * Order prior updates to reserve count prior to the + * commit_cold cc_sb update. + */ + cmm_smp_wmb(); if (caa_likely(v_cmpxchg(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { @@ -350,7 +429,6 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config * and any other writer trying to access this subbuffer * in this state is required to drop records. */ - tsc = config->cb.ring_buffer_clock_read(chan); v_add(config, subbuffer_get_records_count(config, &buf->backend, @@ -368,6 +446,12 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config handle), handle); + /* + * Increment the packet counter while we have exclusive + * access. + */ + subbuffer_inc_packet_count(config, &buf->backend, idx, handle); + /* * Set noref flag and offset for this subbuffer id. * Contains a memory barrier that ensures counter stores @@ -386,6 +470,11 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config /* End of exclusive subbuffer access */ v_set(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, commit_count); + /* + * Order later updates to reserve count after + * the commit cold cc_sb update. + */ + cmm_smp_wmb(); lib_ring_buffer_vmcore_check_deliver(config, buf, commit_count, idx, handle); @@ -395,78 +484,7 @@ void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER && uatomic_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { - int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); - - if (wakeup_fd >= 0) { - sigset_t sigpipe_set, pending_set, old_set; - int ret, sigpipe_was_pending = 0; - - /* - * Wake-up the other end by - * writing a null byte in the - * pipe (non-blocking). - * Important note: Because - * writing into the pipe is - * non-blocking (and therefore - * we allow dropping wakeup - * data, as long as there is - * wakeup data present in the - * pipe buffer to wake up the - * consumer), the consumer - * should perform the following - * sequence for waiting: - * 1) empty the pipe (reads). - * 2) check if there is data in - * the buffer. - * 3) wait on the pipe (poll). - * - * Discard the SIGPIPE from write(), not - * disturbing any SIGPIPE that might be - * already pending. If a bogus SIGPIPE - * is sent to the entire process - * concurrently by a malicious user, it - * may be simply discarded. - */ - ret = sigemptyset(&pending_set); - assert(!ret); - /* - * sigpending returns the mask - * of signals that are _both_ - * blocked for the thread _and_ - * pending for either the thread - * or the entire process. - */ - ret = sigpending(&pending_set); - assert(!ret); - sigpipe_was_pending = sigismember(&pending_set, SIGPIPE); - /* - * If sigpipe was pending, it - * means it was already blocked, - * so no need to block it. - */ - if (!sigpipe_was_pending) { - ret = sigemptyset(&sigpipe_set); - assert(!ret); - ret = sigaddset(&sigpipe_set, SIGPIPE); - assert(!ret); - ret = pthread_sigmask(SIG_BLOCK, &sigpipe_set, &old_set); - assert(!ret); - } - do { - ret = write(wakeup_fd, "", 1); - } while (ret == -1L && errno == EINTR); - if (ret == -1L && errno == EPIPE && !sigpipe_was_pending) { - struct timespec timeout = { 0, 0 }; - do { - ret = sigtimedwait(&sigpipe_set, NULL, - &timeout); - } while (ret == -1L && errno == EINTR); - } - if (!sigpipe_was_pending) { - ret = pthread_sigmask(SIG_SETMASK, &old_set, NULL); - assert(!ret); - } - } + lib_ring_buffer_wakeup(buf, handle); } } } @@ -487,23 +505,20 @@ void lib_ring_buffer_write_commit_counter(const struct lttng_ust_lib_ring_buffer unsigned long idx, unsigned long buf_offset, unsigned long commit_count, - size_t slot_size, struct lttng_ust_shm_handle *handle) { - unsigned long offset, commit_seq_old; + unsigned long commit_seq_old; if (config->oops != RING_BUFFER_OOPS_CONSISTENCY) return; - offset = buf_offset + slot_size; - /* * subbuf_offset includes commit_count_mask. We can simply * compare the offsets within the subbuffer without caring about * buffer full/empty mismatch because offset is never zero here * (subbuffer header and record headers have non-zero length). */ - if (caa_unlikely(subbuf_offset(offset - commit_count, chan))) + if (caa_unlikely(subbuf_offset(buf_offset - commit_count, chan))) return; commit_seq_old = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->seq); @@ -520,6 +535,6 @@ extern void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle); /* Keep track of trap nesting inside ring buffer code */ -extern __thread unsigned int lib_ring_buffer_nesting; +extern DECLARE_URCU_TLS(unsigned int, lib_ring_buffer_nesting); #endif /* _LTTNG_RING_BUFFER_FRONTEND_INTERNAL_H */