X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libringbuffer%2Ffrontend_internal.h;h=4ada183791c2a4f4253bb10baf1cff36b4d97214;hb=1793b0b5fd18ce7ccb5bdab19745cffdf55756e3;hp=7c28788a5758468e86b72db958dfaed3f09e2bee;hpb=8d8a24c8565ce6dce7e7bd1045a95d1d2af5e536;p=lttng-ust.git diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index 7c28788a..4ada1837 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -1,13 +1,28 @@ -#ifndef _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H -#define _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H +#ifndef _LTTNG_RING_BUFFER_FRONTEND_INTERNAL_H +#define _LTTNG_RING_BUFFER_FRONTEND_INTERNAL_H /* - * linux/ringbuffer/frontend_internal.h - * - * (C) Copyright 2005-2010 - Mathieu Desnoyers + * libringbuffer/frontend_internal.h * * Ring Buffer Library Synchronization Header (internal helpers). * + * Copyright (C) 2005-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * + * * Author: * Mathieu Desnoyers * @@ -17,8 +32,11 @@ */ #include +#include +#include +#include -#include +#include #include "backend_types.h" #include "frontend_types.h" #include "shm.h" @@ -85,8 +103,8 @@ unsigned long subbuf_index(unsigned long offset, struct channel *chan) #if (CAA_BITS_PER_LONG == 32) static inline -void save_last_tsc(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, u64 tsc) +void save_last_tsc(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc) { if (config->tsc_bits == 0 || config->tsc_bits == 64) return; @@ -98,8 +116,8 @@ void save_last_tsc(const struct lib_ring_buffer_config *config, } static inline -int last_tsc_overflow(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, u64 tsc) +int last_tsc_overflow(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc) { unsigned long tsc_shifted; @@ -107,7 +125,7 @@ int last_tsc_overflow(const struct lib_ring_buffer_config *config, return 0; tsc_shifted = (unsigned long)(tsc >> config->tsc_bits); - if (unlikely(tsc_shifted + if (caa_unlikely(tsc_shifted - (unsigned long)v_read(config, &buf->last_tsc))) return 1; else @@ -115,8 +133,8 @@ int last_tsc_overflow(const struct lib_ring_buffer_config *config, } #else static inline -void save_last_tsc(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, u64 tsc) +void save_last_tsc(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc) { if (config->tsc_bits == 0 || config->tsc_bits == 64) return; @@ -125,13 +143,13 @@ void save_last_tsc(const struct lib_ring_buffer_config *config, } static inline -int last_tsc_overflow(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, u64 tsc) +int last_tsc_overflow(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, uint64_t tsc) { if (config->tsc_bits == 0 || config->tsc_bits == 64) return 0; - if (unlikely((tsc - v_read(config, &buf->last_tsc)) + if (caa_unlikely((tsc - v_read(config, &buf->last_tsc)) >> config->tsc_bits)) return 1; else @@ -140,16 +158,17 @@ int last_tsc_overflow(const struct lib_ring_buffer_config *config, #endif extern -int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx); +int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx); extern -void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, - enum switch_mode mode); +void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, + enum switch_mode mode, + struct lttng_ust_shm_handle *handle); /* Buffer write helpers */ static inline -void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf, +void lib_ring_buffer_reserve_push_reader(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, unsigned long offset) { @@ -166,36 +185,38 @@ void lib_ring_buffer_reserve_push_reader(struct lib_ring_buffer *buf, * write position sub-buffer index in the buffer being the one * which will win this loop. */ - if (unlikely(subbuf_trunc(offset, chan) + if (caa_unlikely(subbuf_trunc(offset, chan) - subbuf_trunc(consumed_old, chan) >= chan->backend.buf_size)) consumed_new = subbuf_align(consumed_old, chan); else return; - } while (unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old, + } while (caa_unlikely(uatomic_cmpxchg(&buf->consumed, consumed_old, consumed_new) != consumed_old)); } static inline -void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +void lib_ring_buffer_vmcore_check_deliver(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct lttng_ust_shm_handle *handle) { if (config->oops == RING_BUFFER_OOPS_CONSISTENCY) - v_set(config, &shmp(buf->commit_hot)[idx].seq, commit_count); + v_set(config, &shmp_index(handle, buf->commit_hot, idx)->seq, commit_count); } static inline -int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, - struct channel *chan) +int lib_ring_buffer_poll_deliver(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, + struct channel *chan, + struct lttng_ust_shm_handle *handle) { unsigned long consumed_old, consumed_idx, commit_count, write_offset; consumed_old = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed_old, chan); - commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp_index(handle, buf->commit_cold, consumed_idx)->cc_sb); /* * No memory barrier here, since we are only interested * in a statistically correct polling result. The next poll will @@ -229,19 +250,20 @@ int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, } static inline -int lib_ring_buffer_pending_data(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +int lib_ring_buffer_pending_data(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, struct channel *chan) { return !!subbuf_offset(v_read(config, &buf->offset), chan); } static inline -unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, - unsigned long idx) +unsigned long lib_ring_buffer_get_data_size(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, + unsigned long idx, + struct lttng_ust_shm_handle *handle) { - return subbuffer_get_data_size(config, &buf->backend, idx); + return subbuffer_get_data_size(config, &buf->backend, idx, handle); } /* @@ -250,9 +272,10 @@ unsigned long lib_ring_buffer_get_data_size(const struct lib_ring_buffer_config * This is a very specific ftrace use-case, so we keep this as "internal" API. */ static inline -int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, - struct channel *chan) +int lib_ring_buffer_reserve_committed(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, + struct channel *chan, + struct lttng_ust_shm_handle *handle) { unsigned long offset, idx, commit_count; @@ -270,7 +293,7 @@ int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *confi do { offset = v_read(config, &buf->offset); idx = subbuf_index(offset, chan); - commit_count = v_read(config, &shmp(buf->commit_hot)[idx].cc); + commit_count = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->cc); } while (offset != v_read(config, &buf->offset)); return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) @@ -278,19 +301,85 @@ int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *confi } static inline -void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +void lib_ring_buffer_wakeup(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) +{ + int wakeup_fd = shm_get_wakeup_fd(handle, &buf->self._ref); + sigset_t sigpipe_set, pending_set, old_set; + int ret, sigpipe_was_pending = 0; + + if (wakeup_fd < 0) + return; + + /* + * Wake-up the other end by writing a null byte in the pipe + * (non-blocking). Important note: Because writing into the + * pipe is non-blocking (and therefore we allow dropping wakeup + * data, as long as there is wakeup data present in the pipe + * buffer to wake up the consumer), the consumer should perform + * the following sequence for waiting: + * 1) empty the pipe (reads). + * 2) check if there is data in the buffer. + * 3) wait on the pipe (poll). + * + * Discard the SIGPIPE from write(), not disturbing any SIGPIPE + * that might be already pending. If a bogus SIGPIPE is sent to + * the entire process concurrently by a malicious user, it may + * be simply discarded. + */ + ret = sigemptyset(&pending_set); + assert(!ret); + /* + * sigpending returns the mask of signals that are _both_ + * blocked for the thread _and_ pending for either the thread or + * the entire process. + */ + ret = sigpending(&pending_set); + assert(!ret); + sigpipe_was_pending = sigismember(&pending_set, SIGPIPE); + /* + * If sigpipe was pending, it means it was already blocked, so + * no need to block it. + */ + if (!sigpipe_was_pending) { + ret = sigemptyset(&sigpipe_set); + assert(!ret); + ret = sigaddset(&sigpipe_set, SIGPIPE); + assert(!ret); + ret = pthread_sigmask(SIG_BLOCK, &sigpipe_set, &old_set); + assert(!ret); + } + do { + ret = write(wakeup_fd, "", 1); + } while (ret == -1L && errno == EINTR); + if (ret == -1L && errno == EPIPE && !sigpipe_was_pending) { + struct timespec timeout = { 0, 0 }; + do { + ret = sigtimedwait(&sigpipe_set, NULL, + &timeout); + } while (ret == -1L && errno == EINTR); + } + if (!sigpipe_was_pending) { + ret = pthread_sigmask(SIG_SETMASK, &old_set, NULL); + assert(!ret); + } +} + +static inline +void lib_ring_buffer_check_deliver(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, unsigned long offset, unsigned long commit_count, - unsigned long idx) + unsigned long idx, + struct lttng_ust_shm_handle *handle) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; - u64 tsc; + uint64_t tsc; /* Check if all commits have been done */ - if (unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) + if (caa_unlikely((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) - (old_commit_count & chan->commit_count_mask) == 0)) { /* * If we succeeded at updating cc_sb below, we are the subbuffer @@ -318,7 +407,13 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * The subbuffer size is least 2 bytes (minimum size: 1 page). * This guarantees that old_commit_count + 1 != commit_count. */ - if (likely(v_cmpxchg(config, &shmp(buf->commit_cold)[idx].cc_sb, + + /* + * Order prior updates to reserve count prior to the + * commit_cold cc_sb update. + */ + cmm_smp_wmb(); + if (caa_likely(v_cmpxchg(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { /* @@ -330,17 +425,20 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, tsc = config->cb.ring_buffer_clock_read(chan); v_add(config, subbuffer_get_records_count(config, - &buf->backend, idx), + &buf->backend, + idx, handle), &buf->records_count); v_add(config, subbuffer_count_records_overrun(config, &buf->backend, - idx), + idx, handle), &buf->records_overrun); config->cb.buffer_end(buf, tsc, idx, lib_ring_buffer_get_data_size(config, buf, - idx)); + idx, + handle), + handle); /* * Set noref flag and offset for this subbuffer id. @@ -348,7 +446,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * are ordered before set noref and offset. */ lib_ring_buffer_set_noref_offset(config, &buf->backend, idx, - buf_trunc_val(offset, chan)); + buf_trunc_val(offset, chan), handle); /* * Order set_noref and record counter updates before the @@ -358,21 +456,24 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, */ cmm_smp_mb(); /* End of exclusive subbuffer access */ - v_set(config, &shmp(buf->commit_cold)[idx].cc_sb, + v_set(config, &shmp_index(handle, buf->commit_cold, idx)->cc_sb, commit_count); + /* + * Order later updates to reserve count after + * the commit cold cc_sb update. + */ + cmm_smp_wmb(); lib_ring_buffer_vmcore_check_deliver(config, buf, - commit_count, idx); + commit_count, idx, handle); /* * RING_BUFFER_WAKEUP_BY_WRITER wakeup is not lock-free. */ if (config->wakeup == RING_BUFFER_WAKEUP_BY_WRITER && uatomic_read(&buf->active_readers) - && lib_ring_buffer_poll_deliver(config, buf, chan)) { - //wake_up_interruptible(&buf->read_wait); - //wake_up_interruptible(&chan->read_wait); + && lib_ring_buffer_poll_deliver(config, buf, chan, handle)) { + lib_ring_buffer_wakeup(buf, handle); } - } } } @@ -386,13 +487,14 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * useful for crash dump. */ static inline -void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *config, - struct lib_ring_buffer *buf, +void lib_ring_buffer_write_commit_counter(const struct lttng_ust_lib_ring_buffer_config *config, + struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, unsigned long idx, unsigned long buf_offset, unsigned long commit_count, - size_t slot_size) + size_t slot_size, + struct lttng_ust_shm_handle *handle) { unsigned long offset, commit_seq_old; @@ -407,21 +509,23 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c * buffer full/empty mismatch because offset is never zero here * (subbuffer header and record headers have non-zero length). */ - if (unlikely(subbuf_offset(offset - commit_count, chan))) + if (caa_unlikely(subbuf_offset(offset - commit_count, chan))) return; - commit_seq_old = v_read(config, &shmp(buf->commit_hot)[idx].seq); + commit_seq_old = v_read(config, &shmp_index(handle, buf->commit_hot, idx)->seq); while ((long) (commit_seq_old - commit_count) < 0) - commit_seq_old = v_cmpxchg(config, &shmp(buf->commit_hot)[idx].seq, + commit_seq_old = v_cmpxchg(config, &shmp_index(handle, buf->commit_hot, idx)->seq, commit_seq_old, commit_count); } -extern int lib_ring_buffer_create(struct lib_ring_buffer *buf, +extern int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, struct channel_backend *chanb, int cpu, - struct shm_header *shm_header); -extern void lib_ring_buffer_free(struct lib_ring_buffer *buf); + struct lttng_ust_shm_handle *handle, + struct shm_object *shmobj); +extern void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle); /* Keep track of trap nesting inside ring buffer code */ -extern __thread unsigned int lib_ring_buffer_nesting; +extern DECLARE_URCU_TLS(unsigned int, lib_ring_buffer_nesting); -#endif /* _LINUX_RING_BUFFER_FRONTEND_INTERNAL_H */ +#endif /* _LTTNG_RING_BUFFER_FRONTEND_INTERNAL_H */