X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=include%2Fringbuffer%2Fbackend_internal.h;h=895dbd9bb6d5b2e43c47f06326b61e2ead752a5a;hb=refs%2Ftags%2Ffor-upstreaming-review-1;hp=3bb1586ea388e486ef0cf318635ebb9f34d6894e;hpb=369708f464bedc0682151df9308cebfa14dbdb2b;p=lttng-modules.git diff --git a/include/ringbuffer/backend_internal.h b/include/ringbuffer/backend_internal.h index 3bb1586e..895dbd9b 100644 --- a/include/ringbuffer/backend_internal.h +++ b/include/ringbuffer/backend_internal.h @@ -1,2 +1,531 @@ -/* SPDX-License-Identifier: (GPL-2.0-only or LGPL-2.1-only) */ -#include +/* SPDX-License-Identifier: (GPL-2.0-only OR LGPL-2.1-only) + * + * lib/ringbuffer/backend_internal.h + * + * Ring buffer backend (internal helpers). + * + * Copyright (C) 2008-2012 Mathieu Desnoyers + */ + +#ifndef _LIB_RING_BUFFER_BACKEND_INTERNAL_H +#define _LIB_RING_BUFFER_BACKEND_INTERNAL_H + +#include +#include +#include +#include +#include + +/* Ring buffer backend API presented to the frontend */ + +/* Ring buffer and channel backend create/free */ + +int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb, + struct channel_backend *chan, int cpu); +void channel_backend_unregister_notifiers(struct channel_backend *chanb); +void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb); +int channel_backend_init(struct channel_backend *chanb, + const char *name, + const struct lib_ring_buffer_config *config, + void *priv, size_t subbuf_size, + size_t num_subbuf); +void channel_backend_free(struct channel_backend *chanb); + +void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb); +void channel_backend_reset(struct channel_backend *chanb); + +int lib_ring_buffer_backend_init(void); +void lib_ring_buffer_backend_exit(void); + +extern void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, + size_t offset, const void *src, size_t len, + size_t pagecpy); +extern void _lib_ring_buffer_memset(struct lib_ring_buffer_backend *bufb, + size_t offset, int c, size_t len, + size_t pagecpy); +extern void _lib_ring_buffer_strcpy(struct lib_ring_buffer_backend *bufb, + size_t offset, const char *src, size_t len, + size_t pagecpy, int pad); +extern void _lib_ring_buffer_copy_from_user_inatomic(struct lib_ring_buffer_backend *bufb, + size_t offset, const void *src, + size_t len, size_t pagecpy); +extern void _lib_ring_buffer_strcpy_from_user_inatomic(struct lib_ring_buffer_backend *bufb, + size_t offset, const char __user *src, size_t len, + size_t pagecpy, int pad); + +/* + * Subbuffer ID bits for overwrite mode. Need to fit within a single word to be + * exchanged atomically. + * + * Top half word, except lowest bit, belongs to "offset", which is used to keep + * to count the produced buffers. For overwrite mode, this provides the + * consumer with the capacity to read subbuffers in order, handling the + * situation where producers would write up to 2^15 buffers (or 2^31 for 64-bit + * systems) concurrently with a single execution of get_subbuf (between offset + * sampling and subbuffer ID exchange). + */ + +#define HALF_ULONG_BITS (BITS_PER_LONG >> 1) + +#define SB_ID_OFFSET_SHIFT (HALF_ULONG_BITS + 1) +#define SB_ID_OFFSET_COUNT (1UL << SB_ID_OFFSET_SHIFT) +#define SB_ID_OFFSET_MASK (~(SB_ID_OFFSET_COUNT - 1)) +/* + * Lowest bit of top word half belongs to noref. Used only for overwrite mode. + */ +#define SB_ID_NOREF_SHIFT (SB_ID_OFFSET_SHIFT - 1) +#define SB_ID_NOREF_COUNT (1UL << SB_ID_NOREF_SHIFT) +#define SB_ID_NOREF_MASK SB_ID_NOREF_COUNT +/* + * In overwrite mode: lowest half of word is used for index. + * Limit of 2^16 subbuffers per buffer on 32-bit, 2^32 on 64-bit. + * In producer-consumer mode: whole word used for index. + */ +#define SB_ID_INDEX_SHIFT 0 +#define SB_ID_INDEX_COUNT (1UL << SB_ID_INDEX_SHIFT) +#define SB_ID_INDEX_MASK (SB_ID_NOREF_COUNT - 1) + +/* + * Construct the subbuffer id from offset, index and noref. Use only the index + * for producer-consumer mode (offset and noref are only used in overwrite + * mode). + */ +static inline +unsigned long subbuffer_id(const struct lib_ring_buffer_config *config, + unsigned long offset, unsigned long noref, + unsigned long index) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return (offset << SB_ID_OFFSET_SHIFT) + | (noref << SB_ID_NOREF_SHIFT) + | index; + else + return index; +} + +/* + * Compare offset with the offset contained within id. Return 1 if the offset + * bits are identical, else 0. + */ +static inline +int subbuffer_id_compare_offset(const struct lib_ring_buffer_config *config, + unsigned long id, unsigned long offset) +{ + return (id & SB_ID_OFFSET_MASK) == (offset << SB_ID_OFFSET_SHIFT); +} + +static inline +unsigned long subbuffer_id_get_index(const struct lib_ring_buffer_config *config, + unsigned long id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return id & SB_ID_INDEX_MASK; + else + return id; +} + +static inline +unsigned long subbuffer_id_is_noref(const struct lib_ring_buffer_config *config, + unsigned long id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return !!(id & SB_ID_NOREF_MASK); + else + return 1; +} + +/* + * Only used by reader on subbuffer ID it has exclusive access to. No volatile + * needed. + */ +static inline +void subbuffer_id_set_noref(const struct lib_ring_buffer_config *config, + unsigned long *id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + *id |= SB_ID_NOREF_MASK; +} + +static inline +void subbuffer_id_set_noref_offset(const struct lib_ring_buffer_config *config, + unsigned long *id, unsigned long offset) +{ + unsigned long tmp; + + if (config->mode == RING_BUFFER_OVERWRITE) { + tmp = *id; + tmp &= ~SB_ID_OFFSET_MASK; + tmp |= offset << SB_ID_OFFSET_SHIFT; + tmp |= SB_ID_NOREF_MASK; + /* Volatile store, read concurrently by readers. */ + WRITE_ONCE(*id, tmp); + } +} + +/* No volatile access, since already used locally */ +static inline +void subbuffer_id_clear_noref(const struct lib_ring_buffer_config *config, + unsigned long *id) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + *id &= ~SB_ID_NOREF_MASK; +} + +/* + * For overwrite mode, cap the number of subbuffers per buffer to: + * 2^16 on 32-bit architectures + * 2^32 on 64-bit architectures + * This is required to fit in the index part of the ID. Return 0 on success, + * -EPERM on failure. + */ +static inline +int subbuffer_id_check_index(const struct lib_ring_buffer_config *config, + unsigned long num_subbuf) +{ + if (config->mode == RING_BUFFER_OVERWRITE) + return (num_subbuf > (1UL << HALF_ULONG_BITS)) ? -EPERM : 0; + else + return 0; +} + +static inline +void lib_ring_buffer_backend_get_pages(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_ctx *ctx, + struct lib_ring_buffer_backend_pages **backend_pages) +{ + struct lib_ring_buffer_backend *bufb = &ctx->buf->backend; + struct channel_backend *chanb = &ctx->chan->backend; + size_t sbidx, offset = ctx->buf_offset; + unsigned long sb_bindex, id; + struct lib_ring_buffer_backend_pages *rpages; + + offset &= chanb->buf_size - 1; + sbidx = offset >> chanb->subbuf_size_order; + id = bufb->buf_wsb[sbidx].id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = bufb->array[sb_bindex]; + CHAN_WARN_ON(ctx->chan, + config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + *backend_pages = rpages; +} + +/* Get backend pages from cache. */ +static inline +struct lib_ring_buffer_backend_pages * + lib_ring_buffer_get_backend_pages_from_ctx(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_ctx *ctx) +{ + return ctx->backend_pages; +} + +/* + * The ring buffer can count events recorded and overwritten per buffer, + * but it is disabled by default due to its performance overhead. + */ +#ifdef LTTNG_RING_BUFFER_COUNT_EVENTS +static inline +void subbuffer_count_record(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx) +{ + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + v_inc(config, &bufb->array[sb_bindex]->records_commit); +} +#else /* LTTNG_RING_BUFFER_COUNT_EVENTS */ +static inline +void subbuffer_count_record(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx) +{ +} +#endif /* #else LTTNG_RING_BUFFER_COUNT_EVENTS */ + +/* + * Reader has exclusive subbuffer access for record consumption. No need to + * perform the decrement atomically. + */ +static inline +void subbuffer_consume_record(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb) +{ + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); + CHAN_WARN_ON(bufb->chan, + !v_read(config, &bufb->array[sb_bindex]->records_unread)); + /* Non-atomic decrement protected by exclusive subbuffer access */ + _v_dec(config, &bufb->array[sb_bindex]->records_unread); + v_inc(config, &bufb->records_read); +} + +static inline +unsigned long subbuffer_get_records_count( + const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx) +{ + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + return v_read(config, &bufb->array[sb_bindex]->records_commit); +} + +/* + * Must be executed at subbuffer delivery when the writer has _exclusive_ + * subbuffer access. See lib_ring_buffer_check_deliver() for details. + * lib_ring_buffer_get_records_count() must be called to get the records + * count before this function, because it resets the records_commit + * count. + */ +static inline +unsigned long subbuffer_count_records_overrun( + const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx) +{ + struct lib_ring_buffer_backend_pages *pages; + unsigned long overruns, sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + pages = bufb->array[sb_bindex]; + overruns = v_read(config, &pages->records_unread); + v_set(config, &pages->records_unread, + v_read(config, &pages->records_commit)); + v_set(config, &pages->records_commit, 0); + + return overruns; +} + +static inline +void subbuffer_set_data_size(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx, + unsigned long data_size) +{ + struct lib_ring_buffer_backend_pages *pages; + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + pages = bufb->array[sb_bindex]; + pages->data_size = data_size; +} + +static inline +unsigned long subbuffer_get_read_data_size( + const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb) +{ + struct lib_ring_buffer_backend_pages *pages; + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); + pages = bufb->array[sb_bindex]; + return pages->data_size; +} + +static inline +unsigned long subbuffer_get_data_size( + const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx) +{ + struct lib_ring_buffer_backend_pages *pages; + unsigned long sb_bindex; + + sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); + pages = bufb->array[sb_bindex]; + return pages->data_size; +} + +static inline +void subbuffer_inc_packet_count(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx) +{ + bufb->buf_cnt[idx].seq_cnt++; +} + +/** + * lib_ring_buffer_clear_noref - Clear the noref subbuffer flag, called by + * writer. + */ +static inline +void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx) +{ + unsigned long id, new_id; + + if (config->mode != RING_BUFFER_OVERWRITE) + return; + + /* + * Performing a volatile access to read the sb_pages, because we want to + * read a coherent version of the pointer and the associated noref flag. + */ + id = READ_ONCE(bufb->buf_wsb[idx].id); + for (;;) { + /* This check is called on the fast path for each record. */ + if (likely(!subbuffer_id_is_noref(config, id))) { + /* + * Store after load dependency ordering the writes to + * the subbuffer after load and test of the noref flag + * matches the memory barrier implied by the cmpxchg() + * in update_read_sb_index(). + */ + return; /* Already writing to this buffer */ + } + new_id = id; + subbuffer_id_clear_noref(config, &new_id); + new_id = cmpxchg(&bufb->buf_wsb[idx].id, id, new_id); + if (likely(new_id == id)) + break; + id = new_id; + } +} + +/** + * lib_ring_buffer_set_noref_offset - Set the noref subbuffer flag and offset, + * called by writer. + */ +static inline +void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + unsigned long idx, unsigned long offset) +{ + if (config->mode != RING_BUFFER_OVERWRITE) + return; + + /* + * Because ring_buffer_set_noref() is only called by a single thread + * (the one which updated the cc_sb value), there are no concurrent + * updates to take care of: other writers have not updated cc_sb, so + * they cannot set the noref flag, and concurrent readers cannot modify + * the pointer because the noref flag is not set yet. + * The smp_wmb() in ring_buffer_commit() takes care of ordering writes + * to the subbuffer before this set noref operation. + * subbuffer_set_noref() uses a volatile store to deal with concurrent + * readers of the noref flag. + */ + CHAN_WARN_ON(bufb->chan, + subbuffer_id_is_noref(config, bufb->buf_wsb[idx].id)); + /* + * Memory barrier that ensures counter stores are ordered before set + * noref and offset. + */ + smp_mb(); + subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset); +} + +/** + * update_read_sb_index - Read-side subbuffer index update. + */ +static inline +int update_read_sb_index(const struct lib_ring_buffer_config *config, + struct lib_ring_buffer_backend *bufb, + struct channel_backend *chanb, + unsigned long consumed_idx, + unsigned long consumed_count) +{ + unsigned long old_id, new_id; + + if (config->mode == RING_BUFFER_OVERWRITE) { + /* + * Exchange the target writer subbuffer with our own unused + * subbuffer. No need to use READ_ONCE() here to read the + * old_wpage, because the value read will be confirmed by the + * following cmpxchg(). + */ + old_id = bufb->buf_wsb[consumed_idx].id; + if (unlikely(!subbuffer_id_is_noref(config, old_id))) + return -EAGAIN; + /* + * Make sure the offset count we are expecting matches the one + * indicated by the writer. + */ + if (unlikely(!subbuffer_id_compare_offset(config, old_id, + consumed_count))) + return -EAGAIN; + CHAN_WARN_ON(bufb->chan, + !subbuffer_id_is_noref(config, bufb->buf_rsb.id)); + subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id, + consumed_count); + new_id = cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id, + bufb->buf_rsb.id); + if (unlikely(old_id != new_id)) + return -EAGAIN; + bufb->buf_rsb.id = new_id; + } else { + /* No page exchange, use the writer page directly */ + bufb->buf_rsb.id = bufb->buf_wsb[consumed_idx].id; + } + return 0; +} + +static inline __attribute__((always_inline)) +void lttng_inline_memcpy(void *dest, const void *src, + unsigned long len) +{ + switch (len) { + case 1: + *(uint8_t *) dest = *(const uint8_t *) src; + break; + case 2: + *(uint16_t *) dest = *(const uint16_t *) src; + break; + case 4: + *(uint32_t *) dest = *(const uint32_t *) src; + break; + case 8: + *(uint64_t *) dest = *(const uint64_t *) src; + break; + default: + memcpy(dest, src, len); + } +} + +/* + * Use the architecture-specific memcpy implementation for constant-sized + * inputs, but rely on an inline memcpy for length statically unknown. + * The function call to memcpy is just way too expensive for a fast path. + */ +#define lib_ring_buffer_do_copy(config, dest, src, len) \ +do { \ + size_t __len = (len); \ + if (__builtin_constant_p(len)) \ + memcpy(dest, src, __len); \ + else \ + lttng_inline_memcpy(dest, src, __len); \ +} while (0) + +/* + * We use __copy_from_user_inatomic to copy userspace data since we already + * did the access_ok for the whole range. + * + * Return 0 if OK, nonzero on error. + */ +static inline +unsigned long lib_ring_buffer_do_copy_from_user_inatomic(void *dest, + const void __user *src, + unsigned long len) +{ + return __copy_from_user_inatomic(dest, src, len); +} + +/* + * write len bytes to dest with c + */ +static inline +void lib_ring_buffer_do_memset(char *dest, int c, + unsigned long len) +{ + unsigned long i; + + for (i = 0; i < len; i++) + dest[i] = c; +} + +#endif /* _LIB_RING_BUFFER_BACKEND_INTERNAL_H */