From a6352fd40a2090fd883a6c369144bf405c9e9ec4 Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Mon, 4 Jul 2011 13:03:45 -0400 Subject: [PATCH] Ring buffer: use shmp (shared-memory pointers) for per-channel shm structures Signed-off-by: Mathieu Desnoyers --- .gitignore | 3 +- include/ust/bug.h | 3 + include/ust/core.h | 1 + include/usterr_signal_safe.h | 2 +- libringbuffer/Makefile.am | 7 +- libringbuffer/backend.h | 36 +- libringbuffer/backend_internal.h | 46 +- libringbuffer/backend_types.h | 18 +- libringbuffer/config.h | 298 ++++++++++ libringbuffer/frontend.h | 10 +- libringbuffer/frontend_internal.h | 18 +- libringbuffer/frontend_types.h | 16 +- libringbuffer/iterator.h | 70 --- libringbuffer/ring_buffer_abi.c | 5 - libringbuffer/ring_buffer_backend.c | 445 ++++----------- libringbuffer/ring_buffer_frontend.c | 783 ++++++++------------------- libringbuffer/shm.h | 71 +++ libringbuffer/smp.c | 33 ++ libringbuffer/smp.h | 73 +++ libust/buffers.c | 25 - libust/tracer.h | 31 -- 21 files changed, 905 insertions(+), 1089 deletions(-) create mode 100644 libringbuffer/config.h delete mode 100644 libringbuffer/iterator.h create mode 100644 libringbuffer/shm.h create mode 100644 libringbuffer/smp.c create mode 100644 libringbuffer/smp.h diff --git a/.gitignore b/.gitignore index e9cac90..bea8aa3 100644 --- a/.gitignore +++ b/.gitignore @@ -15,7 +15,8 @@ Makefile.in configure aclocal.m4 autom4te.cache/ -config.h +/config.h +/include/ust/config.h /config.h.in config/ config.log diff --git a/include/ust/bug.h b/include/ust/bug.h index 8243cc9..96007c3 100644 --- a/include/ust/bug.h +++ b/include/ust/bug.h @@ -9,6 +9,9 @@ * Dual LGPL v2.1/GPL v2 license. */ +#define BUILD_BUG_ON(condition) \ + ((void) sizeof(char[-!!(condition)])) + /** * BUILD_RUNTIME_BUG_ON - check condition at build (if constant) or runtime * @condition: the condition which should be false. diff --git a/include/ust/core.h b/include/ust/core.h index 8c1c490..f54ea3f 100644 --- a/include/ust/core.h +++ b/include/ust/core.h @@ -20,6 +20,7 @@ #include #include +#include #define likely(x) __builtin_expect(!!(x), 1) #define unlikely(x) __builtin_expect(!!(x), 0) diff --git a/include/usterr_signal_safe.h b/include/usterr_signal_safe.h index f12c317..10355dc 100644 --- a/include/usterr_signal_safe.h +++ b/include/usterr_signal_safe.h @@ -62,7 +62,7 @@ static inline void __attribute__ ((format (printf, 1, 2))) ust_safe_snprintf(____buf, sizeof(____buf), fmt, ## args); \ \ /* Add end of string in case of buffer overflow. */ \ - ____buf[sizeof(____buf)-1] = 0; \ + ____buf[sizeof(____buf) - 1] = 0; \ \ patient_write(STDERR_FILENO, ____buf, strlen(____buf)); \ /* Can't print errors because we are in the error printing code path. */ \ diff --git a/libringbuffer/Makefile.am b/libringbuffer/Makefile.am index 2266053..ffa3701 100644 --- a/libringbuffer/Makefile.am +++ b/libringbuffer/Makefile.am @@ -3,10 +3,13 @@ AM_CFLAGS = -fno-strict-aliasing lib_LTLIBRARIES = libringbuffer.la +noinst_HEADERS = \ + smp.h + libringbuffer_la_SOURCES = \ + smp.c \ ring_buffer_backend.c \ - ring_buffer_frontend.c \ - ring_buffer_abi.c + ring_buffer_frontend.c libringbuffer_la_LDFLAGS = -no-undefined -version-info 0:0:0 diff --git a/libringbuffer/backend.h b/libringbuffer/backend.h index 61d2f32..1bd6110 100644 --- a/libringbuffer/backend.h +++ b/libringbuffer/backend.h @@ -32,10 +32,6 @@ extern size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, extern int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset, void *dest, size_t len); -extern struct page ** -lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb, size_t offset, - void ***virt); - /* * Return the address where a given offset is located. * Should be used to get the current subbuffer header pointer. Given we know @@ -68,29 +64,27 @@ void lib_ring_buffer_write(const struct lib_ring_buffer_config *config, { struct lib_ring_buffer_backend *bufb = &ctx->buf->backend; struct channel_backend *chanb = &ctx->chan->backend; - size_t sbidx, index; + size_t sbidx; size_t offset = ctx->buf_offset; - ssize_t pagecpy; struct lib_ring_buffer_backend_pages *rpages; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; sbidx = offset >> chanb->subbuf_size_order; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); - pagecpy = min_t(size_t, len, (-offset) & ~PAGE_MASK); - id = bufb->buf_wsb[sbidx].id; + id = shmp(bufb->buf_wsb)[sbidx].id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; + rpages = shmp(bufb->array)[sb_bindex]; CHAN_WARN_ON(ctx->chan, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - if (likely(pagecpy == len)) - lib_ring_buffer_do_copy(config, - rpages->p[index].virt - + (offset & ~PAGE_MASK), - src, len); - else - _lib_ring_buffer_write(bufb, offset, src, len, 0); + /* + * Underlying layer should never ask for writes across + * subbuffers. + */ + CHAN_WARN_ON(chanb, offset >= chanb->buf_size); + lib_ring_buffer_do_copy(config, + shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), + src, len); ctx->buf_offset += len; } @@ -109,16 +103,16 @@ unsigned long lib_ring_buffer_get_records_unread( unsigned long records_unread = 0, sb_bindex, id; unsigned int i; - for (i = 0; i < bufb->chan->backend.num_subbuf; i++) { - id = bufb->buf_wsb[i].id; + for (i = 0; i < shmp(bufb->chan)->backend.num_subbuf; i++) { + id = shmp(bufb->buf_wsb)[i].id; sb_bindex = subbuffer_id_get_index(config, id); - pages = bufb->array[sb_bindex]; + pages = shmp(bufb->array)[sb_bindex]; records_unread += v_read(config, &pages->records_unread); } if (config->mode == RING_BUFFER_OVERWRITE) { id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); - pages = bufb->array[sb_bindex]; + pages = shmp(bufb->array)[sb_bindex]; records_unread += v_read(config, &pages->records_unread); } return records_unread; diff --git a/libringbuffer/backend_internal.h b/libringbuffer/backend_internal.h index c5f3362..182ac97 100644 --- a/libringbuffer/backend_internal.h +++ b/libringbuffer/backend_internal.h @@ -17,20 +17,22 @@ #include "config.h" #include "backend_types.h" #include "frontend_types.h" +#include "shm.h" /* Ring buffer backend API presented to the frontend */ /* Ring buffer and channel backend create/free */ int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb, - struct channel_backend *chan, int cpu); + struct channel_backend *chan, int cpu, + struct shm_header *shm_header); void channel_backend_unregister_notifiers(struct channel_backend *chanb); void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb); int channel_backend_init(struct channel_backend *chanb, const char *name, const struct lib_ring_buffer_config *config, void *priv, size_t subbuf_size, - size_t num_subbuf); + size_t num_subbuf, struct shm_header *shm_header); void channel_backend_free(struct channel_backend *chanb); void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb); @@ -185,8 +187,8 @@ void subbuffer_count_record(const struct lib_ring_buffer_config *config, { unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); - v_inc(config, &bufb->array[sb_bindex]->records_commit); + sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); + v_inc(config, &shmp(bufb->array)[sb_bindex]->records_commit); } /* @@ -201,9 +203,9 @@ void subbuffer_consume_record(const struct lib_ring_buffer_config *config, sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); CHAN_WARN_ON(bufb->chan, - !v_read(config, &bufb->array[sb_bindex]->records_unread)); + !v_read(config, &shmp(bufb->array)[sb_bindex]->records_unread)); /* Non-atomic decrement protected by exclusive subbuffer access */ - _v_dec(config, &bufb->array[sb_bindex]->records_unread); + _v_dec(config, &shmp(bufb->array)[sb_bindex]->records_unread); v_inc(config, &bufb->records_read); } @@ -215,8 +217,8 @@ unsigned long subbuffer_get_records_count( { unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); - return v_read(config, &bufb->array[sb_bindex]->records_commit); + sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); + return v_read(config, &shmp(bufb->array)[sb_bindex]->records_commit); } /* @@ -234,8 +236,8 @@ unsigned long subbuffer_count_records_overrun( struct lib_ring_buffer_backend_pages *pages; unsigned long overruns, sb_bindex; - sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); - pages = bufb->array[sb_bindex]; + sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); + pages = shmp(bufb->array)[sb_bindex]; overruns = v_read(config, &pages->records_unread); v_set(config, &pages->records_unread, v_read(config, &pages->records_commit)); @@ -253,8 +255,8 @@ void subbuffer_set_data_size(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend_pages *pages; unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); - pages = bufb->array[sb_bindex]; + sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); + pages = shmp(bufb->array)[sb_bindex]; pages->data_size = data_size; } @@ -267,7 +269,7 @@ unsigned long subbuffer_get_read_data_size( unsigned long sb_bindex; sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); - pages = bufb->array[sb_bindex]; + pages = shmp(bufb->array)[sb_bindex]; return pages->data_size; } @@ -280,8 +282,8 @@ unsigned long subbuffer_get_data_size( struct lib_ring_buffer_backend_pages *pages; unsigned long sb_bindex; - sb_bindex = subbuffer_id_get_index(config, bufb->buf_wsb[idx].id); - pages = bufb->array[sb_bindex]; + sb_bindex = subbuffer_id_get_index(config, shmp(bufb->buf_wsb)[idx].id); + pages = shmp(bufb->array)[sb_bindex]; return pages->data_size; } @@ -303,7 +305,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, * Performing a volatile access to read the sb_pages, because we want to * read a coherent version of the pointer and the associated noref flag. */ - id = CMM_ACCESS_ONCE(bufb->buf_wsb[idx].id); + id = CMM_ACCESS_ONCE(shmp(bufb->buf_wsb)[idx].id); for (;;) { /* This check is called on the fast path for each record. */ if (likely(!subbuffer_id_is_noref(config, id))) { @@ -317,7 +319,7 @@ void lib_ring_buffer_clear_noref(const struct lib_ring_buffer_config *config, } new_id = id; subbuffer_id_clear_noref(config, &new_id); - new_id = uatomic_cmpxchg(&bufb->buf_wsb[idx].id, id, new_id); + new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[idx].id, id, new_id); if (likely(new_id == id)) break; id = new_id; @@ -348,13 +350,13 @@ void lib_ring_buffer_set_noref_offset(const struct lib_ring_buffer_config *confi * readers of the noref flag. */ CHAN_WARN_ON(bufb->chan, - subbuffer_id_is_noref(config, bufb->buf_wsb[idx].id)); + subbuffer_id_is_noref(config, shmp(bufb->buf_wsb)[idx].id)); /* * Memory barrier that ensures counter stores are ordered before set * noref and offset. */ cmm_smp_mb(); - subbuffer_id_set_noref_offset(config, &bufb->buf_wsb[idx].id, offset); + subbuffer_id_set_noref_offset(config, &shmp(bufb->buf_wsb)[idx].id, offset); } /** @@ -376,7 +378,7 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config, * old_wpage, because the value read will be confirmed by the * following cmpxchg(). */ - old_id = bufb->buf_wsb[consumed_idx].id; + old_id = shmp(bufb->buf_wsb)[consumed_idx].id; if (unlikely(!subbuffer_id_is_noref(config, old_id))) return -EAGAIN; /* @@ -390,14 +392,14 @@ int update_read_sb_index(const struct lib_ring_buffer_config *config, !subbuffer_id_is_noref(config, bufb->buf_rsb.id)); subbuffer_id_set_noref_offset(config, &bufb->buf_rsb.id, consumed_count); - new_id = uatomic_cmpxchg(&bufb->buf_wsb[consumed_idx].id, old_id, + new_id = uatomic_cmpxchg(&shmp(bufb->buf_wsb)[consumed_idx].id, old_id, bufb->buf_rsb.id); if (unlikely(old_id != new_id)) return -EAGAIN; bufb->buf_rsb.id = new_id; } else { /* No page exchange, use the writer page directly */ - bufb->buf_rsb.id = bufb->buf_wsb[consumed_idx].id; + bufb->buf_rsb.id = shmp(bufb->buf_wsb)[consumed_idx].id; } return 0; } diff --git a/libringbuffer/backend_types.h b/libringbuffer/backend_types.h index cfbe59c..3bc36ba 100644 --- a/libringbuffer/backend_types.h +++ b/libringbuffer/backend_types.h @@ -11,17 +11,14 @@ * Dual LGPL v2.1/GPL v2 license. */ -struct lib_ring_buffer_backend_page { - void *virt; /* page virtual address (cached) */ - struct page *page; /* pointer to page structure */ -}; +#include "shm.h" struct lib_ring_buffer_backend_pages { unsigned long mmap_offset; /* offset of the subbuffer in mmap */ union v_atomic records_commit; /* current records committed count */ union v_atomic records_unread; /* records to read */ unsigned long data_size; /* Amount of data to read from subbuf */ - struct lib_ring_buffer_backend_page p[]; + DECLARE_SHMP(char, p); /* Backing memory map */ }; struct lib_ring_buffer_backend_subbuffer { @@ -37,17 +34,17 @@ struct lib_ring_buffer; struct lib_ring_buffer_backend { /* Array of ring_buffer_backend_subbuffer for writer */ - struct lib_ring_buffer_backend_subbuffer *buf_wsb; + DECLARE_SHMP(struct lib_ring_buffer_backend_subbuffer, buf_wsb); /* ring_buffer_backend_subbuffer for reader */ struct lib_ring_buffer_backend_subbuffer buf_rsb; /* * Pointer array of backend pages, for whole buffer. * Indexed by ring_buffer_backend_subbuffer identifier (id) index. */ - struct lib_ring_buffer_backend_pages **array; - unsigned int num_pages_per_subbuf; + DECLARE_SHMP(struct lib_ring_buffer_backend_pages *, array); + DECLARE_SHMP(char, memory_map); /* memory mapping */ - struct channel *chan; /* Associated channel */ + DECLARE_SHMP(struct channel, chan); /* Associated channel */ int cpu; /* This buffer's cpu. -1 if global. */ union v_atomic records_read; /* Number of records read */ unsigned int allocated:1; /* Bool: is buffer allocated ? */ @@ -63,8 +60,7 @@ struct channel_backend { */ unsigned int buf_size_order; /* Order of buffer size */ int extra_reader_sb:1; /* Bool: has extra reader subbuffer */ - struct lib_ring_buffer *buf; /* Channel per-cpu buffers */ - + DECLARE_SHMP(struct lib_ring_buffer, buf); /* Channel per-cpu buffers */ unsigned long num_subbuf; /* Number of sub-buffers for writer */ u64 start_tsc; /* Channel creation TSC value */ void *priv; /* Client-specific information */ diff --git a/libringbuffer/config.h b/libringbuffer/config.h new file mode 100644 index 0000000..900208f --- /dev/null +++ b/libringbuffer/config.h @@ -0,0 +1,298 @@ +#ifndef _LINUX_RING_BUFFER_CONFIG_H +#define _LINUX_RING_BUFFER_CONFIG_H + +/* + * linux/ringbuffer/config.h + * + * Copyright (C) 2010 - Mathieu Desnoyers + * + * Ring buffer configuration header. Note: after declaring the standard inline + * functions, clients should also include linux/ringbuffer/api.h. + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include +#include "ust/kcompat/kcompat.h" +#include "ust/align.h" + +struct lib_ring_buffer; +struct channel; +struct lib_ring_buffer_config; +struct lib_ring_buffer_ctx; + +/* + * Ring buffer client callbacks. Only used by slow path, never on fast path. + * For the fast path, record_header_size(), ring_buffer_clock_read() should be + * provided as inline functions too. These may simply return 0 if not used by + * the client. + */ +struct lib_ring_buffer_client_cb { + /* Mandatory callbacks */ + + /* A static inline version is also required for fast path */ + u64 (*ring_buffer_clock_read) (struct channel *chan); + size_t (*record_header_size) (const struct lib_ring_buffer_config *config, + struct channel *chan, size_t offset, + size_t *pre_header_padding, + struct lib_ring_buffer_ctx *ctx); + + /* Slow path only, at subbuffer switch */ + size_t (*subbuffer_header_size) (void); + void (*buffer_begin) (struct lib_ring_buffer *buf, u64 tsc, + unsigned int subbuf_idx); + void (*buffer_end) (struct lib_ring_buffer *buf, u64 tsc, + unsigned int subbuf_idx, unsigned long data_size); + + /* Optional callbacks (can be set to NULL) */ + + /* Called at buffer creation/finalize */ + int (*buffer_create) (struct lib_ring_buffer *buf, void *priv, + int cpu, const char *name); + /* + * Clients should guarantee that no new reader handle can be opened + * after finalize. + */ + void (*buffer_finalize) (struct lib_ring_buffer *buf, void *priv, int cpu); + + /* + * Extract header length, payload length and timestamp from event + * record. Used by buffer iterators. Timestamp is only used by channel + * iterator. + */ + void (*record_get) (const struct lib_ring_buffer_config *config, + struct channel *chan, struct lib_ring_buffer *buf, + size_t offset, size_t *header_len, + size_t *payload_len, u64 *timestamp); +}; + +/* + * Ring buffer instance configuration. + * + * Declare as "static const" within the client object to ensure the inline fast + * paths can be optimized. + * + * alloc/sync pairs: + * + * RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_PER_CPU : + * Per-cpu buffers with per-cpu synchronization. Tracing must be performed + * with preemption disabled (lib_ring_buffer_get_cpu() and + * lib_ring_buffer_put_cpu()). + * + * RING_BUFFER_ALLOC_PER_CPU and RING_BUFFER_SYNC_GLOBAL : + * Per-cpu buffer with global synchronization. Tracing can be performed with + * preemption enabled, statistically stays on the local buffers. + * + * RING_BUFFER_ALLOC_GLOBAL and RING_BUFFER_SYNC_PER_CPU : + * Should only be used for buffers belonging to a single thread or protected + * by mutual exclusion by the client. Note that periodical sub-buffer switch + * should be disabled in this kind of configuration. + * + * RING_BUFFER_ALLOC_GLOBAL and RING_BUFFER_SYNC_GLOBAL : + * Global shared buffer with global synchronization. + * + * wakeup: + * + * RING_BUFFER_WAKEUP_BY_TIMER uses per-cpu deferrable timers to poll the + * buffers and wake up readers if data is ready. Mainly useful for tracers which + * don't want to call into the wakeup code on the tracing path. Use in + * combination with "read_timer_interval" channel_create() argument. + * + * RING_BUFFER_WAKEUP_BY_WRITER directly wakes up readers when a subbuffer is + * ready to read. Lower latencies before the reader is woken up. Mainly suitable + * for drivers. + * + * RING_BUFFER_WAKEUP_NONE does not perform any wakeup whatsoever. The client + * has the responsibility to perform wakeups. + */ +struct lib_ring_buffer_config { + enum { + RING_BUFFER_ALLOC_PER_CPU, + RING_BUFFER_ALLOC_GLOBAL, + } alloc; + enum { + RING_BUFFER_SYNC_PER_CPU, /* Wait-free */ + RING_BUFFER_SYNC_GLOBAL, /* Lock-free */ + } sync; + enum { + RING_BUFFER_OVERWRITE, /* Overwrite when buffer full */ + RING_BUFFER_DISCARD, /* Discard when buffer full */ + } mode; + enum { + RING_BUFFER_SPLICE, + RING_BUFFER_MMAP, + RING_BUFFER_READ, /* TODO */ + RING_BUFFER_ITERATOR, + RING_BUFFER_NONE, + } output; + enum { + RING_BUFFER_PAGE, + RING_BUFFER_VMAP, /* TODO */ + RING_BUFFER_STATIC, /* TODO */ + } backend; + enum { + RING_BUFFER_NO_OOPS_CONSISTENCY, + RING_BUFFER_OOPS_CONSISTENCY, + } oops; + enum { + RING_BUFFER_IPI_BARRIER, + RING_BUFFER_NO_IPI_BARRIER, + } ipi; + enum { + RING_BUFFER_WAKEUP_BY_TIMER, /* wake up performed by timer */ + RING_BUFFER_WAKEUP_BY_WRITER, /* + * writer wakes up reader, + * not lock-free + * (takes spinlock). + */ + } wakeup; + /* + * tsc_bits: timestamp bits saved at each record. + * 0 and 64 disable the timestamp compression scheme. + */ + unsigned int tsc_bits; + struct lib_ring_buffer_client_cb cb; +}; + +/* + * ring buffer context + * + * Context passed to lib_ring_buffer_reserve(), lib_ring_buffer_commit(), + * lib_ring_buffer_try_discard_reserve(), lib_ring_buffer_align_ctx() and + * lib_ring_buffer_write(). + */ +struct lib_ring_buffer_ctx { + /* input received by lib_ring_buffer_reserve(), saved here. */ + struct channel *chan; /* channel */ + void *priv; /* client private data */ + size_t data_size; /* size of payload */ + int largest_align; /* + * alignment of the largest element + * in the payload + */ + int cpu; /* processor id */ + + /* output from lib_ring_buffer_reserve() */ + struct lib_ring_buffer *buf; /* + * buffer corresponding to processor id + * for this channel + */ + size_t slot_size; /* size of the reserved slot */ + unsigned long buf_offset; /* offset following the record header */ + unsigned long pre_offset; /* + * Initial offset position _before_ + * the record is written. Positioned + * prior to record header alignment + * padding. + */ + u64 tsc; /* time-stamp counter value */ + unsigned int rflags; /* reservation flags */ +}; + +/** + * lib_ring_buffer_ctx_init - initialize ring buffer context + * @ctx: ring buffer context to initialize + * @chan: channel + * @priv: client private data + * @data_size: size of record data payload + * @largest_align: largest alignment within data payload types + * @cpu: processor id + */ +static inline +void lib_ring_buffer_ctx_init(struct lib_ring_buffer_ctx *ctx, + struct channel *chan, void *priv, + size_t data_size, int largest_align, + int cpu) +{ + ctx->chan = chan; + ctx->priv = priv; + ctx->data_size = data_size; + ctx->largest_align = largest_align; + ctx->cpu = cpu; + ctx->rflags = 0; +} + +/* + * Reservation flags. + * + * RING_BUFFER_RFLAG_FULL_TSC + * + * This flag is passed to record_header_size() and to the primitive used to + * write the record header. It indicates that the full 64-bit time value is + * needed in the record header. If this flag is not set, the record header needs + * only to contain "tsc_bits" bit of time value. + * + * Reservation flags can be added by the client, starting from + * "(RING_BUFFER_FLAGS_END << 0)". It can be used to pass information from + * record_header_size() to lib_ring_buffer_write_record_header(). + */ +#define RING_BUFFER_RFLAG_FULL_TSC (1U << 0) +#define RING_BUFFER_RFLAG_END (1U << 1) + +/* + * We need to define RING_BUFFER_ALIGN_ATTR so it is known early at + * compile-time. We have to duplicate the "config->align" information and the + * definition here because config->align is used both in the slow and fast + * paths, but RING_BUFFER_ALIGN_ATTR is only available for the client code. + */ +#ifdef RING_BUFFER_ALIGN + +# define RING_BUFFER_ALIGN_ATTR /* Default arch alignment */ + +/* + * Calculate the offset needed to align the type. + * size_of_type must be non-zero. + */ +static inline +unsigned int lib_ring_buffer_align(size_t align_drift, size_t size_of_type) +{ + return offset_align(align_drift, size_of_type); +} + +#else + +# define RING_BUFFER_ALIGN_ATTR __attribute__((packed)) + +/* + * Calculate the offset needed to align the type. + * size_of_type must be non-zero. + */ +static inline +unsigned int lib_ring_buffer_align(size_t align_drift, size_t size_of_type) +{ + return 0; +} + +#endif + +/** + * lib_ring_buffer_align_ctx - Align context offset on "alignment" + * @ctx: ring buffer context. + */ +static inline +void lib_ring_buffer_align_ctx(struct lib_ring_buffer_ctx *ctx, + size_t alignment) +{ + ctx->buf_offset += lib_ring_buffer_align(ctx->buf_offset, + alignment); +} + +/* + * lib_ring_buffer_check_config() returns 0 on success. + * Used internally to check for valid configurations at channel creation. + */ +static inline +int lib_ring_buffer_check_config(const struct lib_ring_buffer_config *config, + unsigned int switch_timer_interval, + unsigned int read_timer_interval) +{ + if (config->alloc == RING_BUFFER_ALLOC_GLOBAL + && config->sync == RING_BUFFER_SYNC_PER_CPU + && switch_timer_interval) + return -EINVAL; + return 0; +} + +#include "vatomic.h" + +#endif /* _LINUX_RING_BUFFER_CONFIG_H */ diff --git a/libringbuffer/frontend.h b/libringbuffer/frontend.h index 9d73da1..fe301c1 100644 --- a/libringbuffer/frontend.h +++ b/libringbuffer/frontend.h @@ -19,6 +19,7 @@ #include #include +#include "smp.h" /* Internal helpers */ #include "frontend_internal.h" @@ -41,7 +42,8 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, - unsigned int read_timer_interval); + unsigned int read_timer_interval, + int *shmid); /* * channel_destroy returns the private data pointer. It finalizes all channel's @@ -61,9 +63,7 @@ void *channel_destroy(struct channel *chan); * only performed at channel destruction. */ #define for_each_channel_cpu(cpu, chan) \ - for ((cpu) = -1; \ - ({ (cpu) = cpumask_next(cpu, (chan)->backend.cpumask); \ - cmm_smp_read_barrier_depends(); (cpu) < nr_cpu_ids; });) + for_each_possible_cpu(cpu) extern struct lib_ring_buffer *channel_get_ring_buffer( const struct lib_ring_buffer_config *config, @@ -104,7 +104,7 @@ static inline void lib_ring_buffer_put_next_subbuf(struct lib_ring_buffer *buf) { lib_ring_buffer_put_subbuf(buf); lib_ring_buffer_move_consumer(buf, subbuf_align(buf->cons_snapshot, - buf->backend.chan)); + shmp(buf->backend.chan))); } extern void channel_reset(struct channel *chan); diff --git a/libringbuffer/frontend_internal.h b/libringbuffer/frontend_internal.h index f758a68..6a1d3a6 100644 --- a/libringbuffer/frontend_internal.h +++ b/libringbuffer/frontend_internal.h @@ -21,6 +21,7 @@ #include "config.h" #include "backend_types.h" #include "frontend_types.h" +#include "shm.h" /* Buffer offset macros */ @@ -182,7 +183,7 @@ void lib_ring_buffer_vmcore_check_deliver(const struct lib_ring_buffer_config *c unsigned long idx) { if (config->oops == RING_BUFFER_OOPS_CONSISTENCY) - v_set(config, &buf->commit_hot[idx].seq, commit_count); + v_set(config, &shmp(buf->commit_hot)[idx].seq, commit_count); } static inline @@ -194,7 +195,7 @@ int lib_ring_buffer_poll_deliver(const struct lib_ring_buffer_config *config, consumed_old = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed_old, chan); - commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb); /* * No memory barrier here, since we are only interested * in a statistically correct polling result. The next poll will @@ -269,7 +270,7 @@ int lib_ring_buffer_reserve_committed(const struct lib_ring_buffer_config *confi do { offset = v_read(config, &buf->offset); idx = subbuf_index(offset, chan); - commit_count = v_read(config, &buf->commit_hot[idx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[idx].cc); } while (offset != v_read(config, &buf->offset)); return ((buf_trunc(offset, chan) >> chan->backend.num_subbuf_order) @@ -317,7 +318,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, * The subbuffer size is least 2 bytes (minimum size: 1 page). * This guarantees that old_commit_count + 1 != commit_count. */ - if (likely(v_cmpxchg(config, &buf->commit_cold[idx].cc_sb, + if (likely(v_cmpxchg(config, &shmp(buf->commit_cold)[idx].cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { /* @@ -357,7 +358,7 @@ void lib_ring_buffer_check_deliver(const struct lib_ring_buffer_config *config, */ cmm_smp_mb(); /* End of exclusive subbuffer access */ - v_set(config, &buf->commit_cold[idx].cc_sb, + v_set(config, &shmp(buf->commit_cold)[idx].cc_sb, commit_count); lib_ring_buffer_vmcore_check_deliver(config, buf, commit_count, idx); @@ -409,14 +410,15 @@ void lib_ring_buffer_write_commit_counter(const struct lib_ring_buffer_config *c if (unlikely(subbuf_offset(offset - commit_count, chan))) return; - commit_seq_old = v_read(config, &buf->commit_hot[idx].seq); + commit_seq_old = v_read(config, &shmp(buf->commit_hot)[idx].seq); while ((long) (commit_seq_old - commit_count) < 0) - commit_seq_old = v_cmpxchg(config, &buf->commit_hot[idx].seq, + commit_seq_old = v_cmpxchg(config, &shmp(buf->commit_hot)[idx].seq, commit_seq_old, commit_count); } extern int lib_ring_buffer_create(struct lib_ring_buffer *buf, - struct channel_backend *chanb, int cpu); + struct channel_backend *chanb, int cpu, + struct shm_header *shm_header); extern void lib_ring_buffer_free(struct lib_ring_buffer *buf); /* Keep track of trap nesting inside ring buffer code */ diff --git a/libringbuffer/frontend_types.h b/libringbuffer/frontend_types.h index c9f98cb..d1a3d20 100644 --- a/libringbuffer/frontend_types.h +++ b/libringbuffer/frontend_types.h @@ -16,6 +16,8 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include + #include #include #include @@ -25,6 +27,7 @@ #include "usterr_signal_safe.h" #include "config.h" #include "backend_types.h" +#include "shm.h" /* * A switch is done during tracing or as a final flush after tracing (so it @@ -49,24 +52,25 @@ struct channel { //wait_queue_head_t read_wait; /* reader wait queue */ int finalized; /* Has channel been finalized */ struct urcu_ref ref; /* Reference count */ -}; + DECLARE_SHMP(struct shm_header, shm_header); +} ____cacheline_aligned; /* Per-subbuffer commit counters used on the hot path */ struct commit_counters_hot { union v_atomic cc; /* Commit counter */ union v_atomic seq; /* Consecutive commits */ -}; +} ____cacheline_aligned; /* Per-subbuffer commit counters used only on cold paths */ struct commit_counters_cold { union v_atomic cc_sb; /* Incremented _once_ at sb switch */ -}; +} ____cacheline_aligned; /* ring buffer state */ struct lib_ring_buffer { /* First 32 bytes cache-hot cacheline */ union v_atomic offset; /* Current offset in the buffer */ - struct commit_counters_hot *commit_hot; + DECLARE_SHMP(struct commit_counters_hot, commit_hot); /* Commit count per sub-buffer */ long consumed; /* * Current offset in the buffer @@ -80,7 +84,7 @@ struct lib_ring_buffer { struct lib_ring_buffer_backend backend; /* Associated backend */ - struct commit_counters_cold *commit_cold; + DECLARE_SHMP(struct commit_counters_cold, commit_cold); /* Commit count per sub-buffer */ long active_readers; /* * Active readers count @@ -102,7 +106,7 @@ struct lib_ring_buffer { int get_subbuf:1; /* Sub-buffer being held by reader */ int switch_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */ int read_timer_enabled:1; /* Protected by ring_buffer_nohz_lock */ -}; +} ____cacheline_aligned; static inline void *channel_get_private(struct channel *chan) diff --git a/libringbuffer/iterator.h b/libringbuffer/iterator.h deleted file mode 100644 index 4914929..0000000 --- a/libringbuffer/iterator.h +++ /dev/null @@ -1,70 +0,0 @@ -#ifndef _LINUX_RING_BUFFER_ITERATOR_H -#define _LINUX_RING_BUFFER_ITERATOR_H - -/* - * linux/ringbuffer/iterator.h - * - * (C) Copyright 2010 - Mathieu Desnoyers - * - * Ring buffer and channel iterators. - * - * Author: - * Mathieu Desnoyers - * - * Dual LGPL v2.1/GPL v2 license. - */ - -#include "backend.h" -#include "frontend.h" - -/* - * lib_ring_buffer_get_next_record advances the buffer read position to the next - * record. It returns either the size of the next record, -EAGAIN if there is - * currently no data available, or -ENODATA if no data is available and buffer - * is finalized. - */ -extern ssize_t lib_ring_buffer_get_next_record(struct channel *chan, - struct lib_ring_buffer *buf); - -/* - * channel_get_next_record advances the buffer read position to the next record. - * It returns either the size of the next record, -EAGAIN if there is currently - * no data available, or -ENODATA if no data is available and buffer is - * finalized. - * Returns the current buffer in ret_buf. - */ -extern ssize_t channel_get_next_record(struct channel *chan, - struct lib_ring_buffer **ret_buf); - -/** - * read_current_record - copy the buffer current record into dest. - * @buf: ring buffer - * @dest: destination where the record should be copied - * - * dest should be large enough to contain the record. Returns the number of - * bytes copied. - */ -static inline size_t read_current_record(struct lib_ring_buffer *buf, void *dest) -{ - return lib_ring_buffer_read(&buf->backend, buf->iter.read_offset, - dest, buf->iter.payload_len); -} - -extern int lib_ring_buffer_iterator_open(struct lib_ring_buffer *buf); -extern void lib_ring_buffer_iterator_release(struct lib_ring_buffer *buf); -extern int channel_iterator_open(struct channel *chan); -extern void channel_iterator_release(struct channel *chan); - -extern const struct file_operations channel_payload_file_operations; -extern const struct file_operations lib_ring_buffer_payload_file_operations; - -/* - * Used internally. - */ -int channel_iterator_init(struct channel *chan); -void channel_iterator_unregister_notifiers(struct channel *chan); -void channel_iterator_free(struct channel *chan); -void channel_iterator_reset(struct channel *chan); -void lib_ring_buffer_iterator_reset(struct lib_ring_buffer *buf); - -#endif /* _LINUX_RING_BUFFER_ITERATOR_H */ diff --git a/libringbuffer/ring_buffer_abi.c b/libringbuffer/ring_buffer_abi.c index c105fe0..5642327 100644 --- a/libringbuffer/ring_buffer_abi.c +++ b/libringbuffer/ring_buffer_abi.c @@ -374,8 +374,3 @@ const struct file_operations lib_ring_buffer_file_operations = { .compat_ioctl = lib_ring_buffer_compat_ioctl, #endif }; -EXPORT_SYMBOL_GPL(lib_ring_buffer_file_operations); - -MODULE_LICENSE("GPL and additional rights"); -MODULE_AUTHOR("Mathieu Desnoyers"); -MODULE_DESCRIPTION("Ring Buffer Library VFS"); diff --git a/libringbuffer/ring_buffer_backend.c b/libringbuffer/ring_buffer_backend.c index 861acf7..8aa8c86 100644 --- a/libringbuffer/ring_buffer_backend.c +++ b/libringbuffer/ring_buffer_backend.c @@ -13,6 +13,7 @@ #include "config.h" #include "backend.h" #include "frontend.h" +#include "smp.h" /** * lib_ring_buffer_backend_allocate - allocate a channel buffer @@ -26,67 +27,49 @@ static int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config, struct lib_ring_buffer_backend *bufb, size_t size, size_t num_subbuf, - int extra_reader_sb) + int extra_reader_sb, + struct shm_header *shm_header) { - struct channel_backend *chanb = &bufb->chan->backend; - unsigned long j, num_pages, num_pages_per_subbuf, page_idx = 0; + struct channel_backend *chanb = &shmp(bufb->chan)->backend; unsigned long subbuf_size, mmap_offset = 0; unsigned long num_subbuf_alloc; - struct page **pages; - void **virt; unsigned long i; - num_pages = size >> get_count_order(PAGE_SIZE); - num_pages_per_subbuf = num_pages >> get_count_order(num_subbuf); subbuf_size = chanb->subbuf_size; num_subbuf_alloc = num_subbuf; - if (extra_reader_sb) { - num_pages += num_pages_per_subbuf; /* Add pages for reader */ + if (extra_reader_sb) num_subbuf_alloc++; - } - - pages = malloc_align(sizeof(*pages) * num_pages); - if (unlikely(!pages)) - goto pages_error; - virt = malloc_align(sizeof(*virt) * num_pages); - if (unlikely(!virt)) - goto virt_error; - - bufb->array = malloc_align(sizeof(*bufb->array) * num_subbuf_alloc); - if (unlikely(!bufb->array)) + set_shmp(bufb->array, zalloc_shm(shm_header, + sizeof(*bufb->array) * num_subbuf_alloc)); + if (unlikely(!shmp(bufb->array))) goto array_error; - for (i = 0; i < num_pages; i++) { - pages[i] = alloc_pages_node(cpu_to_node(max(bufb->cpu, 0)), - GFP_KERNEL | __GFP_ZERO, 0); - if (unlikely(!pages[i])) - goto depopulate; - virt[i] = page_address(pages[i]); - } - bufb->num_pages_per_subbuf = num_pages_per_subbuf; + set_shmp(bufb->memory_map, zalloc_shm(shm_header, + subbuf_size * num_subbuf_alloc)); + if (unlikely(!shmp(bufb->memory_map))) + goto memory_map_error; /* Allocate backend pages array elements */ for (i = 0; i < num_subbuf_alloc; i++) { - bufb->array[i] = - zmalloc_align( + set_shmp(bufb->array[i], + zalloc_shm(shm_header, sizeof(struct lib_ring_buffer_backend_pages) + - sizeof(struct lib_ring_buffer_backend_page) - * num_pages_per_subbuf); - if (!bufb->array[i]) + subbuf_size)); + if (!shmp(bufb->array[i])) goto free_array; } /* Allocate write-side subbuffer table */ - bufb->buf_wsb = zmalloc_align( + bufb->buf_wsb = zalloc_shm(shm_header, sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf); - if (unlikely(!bufb->buf_wsb)) + if (unlikely(!shmp(bufb->buf_wsb))) goto free_array; for (i = 0; i < num_subbuf; i++) - bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i); + shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i); /* Assign read-side subbuffer table */ if (extra_reader_sb) @@ -97,73 +80,50 @@ int lib_ring_buffer_backend_allocate(const struct lib_ring_buffer_config *config /* Assign pages to page index */ for (i = 0; i < num_subbuf_alloc; i++) { - for (j = 0; j < num_pages_per_subbuf; j++) { - CHAN_WARN_ON(chanb, page_idx > num_pages); - bufb->array[i]->p[j].virt = virt[page_idx]; - bufb->array[i]->p[j].page = pages[page_idx]; - page_idx++; - } + set_shmp(shmp(bufb->array)[i]->p, + &shmp(bufb->memory_map)[i * subbuf_size]); if (config->output == RING_BUFFER_MMAP) { - bufb->array[i]->mmap_offset = mmap_offset; + shmp(bufb->array)[i]->mmap_offset = mmap_offset; mmap_offset += subbuf_size; } } - kfree(virt); - kfree(pages); return 0; free_array: - for (i = 0; (i < num_subbuf_alloc && bufb->array[i]); i++) - kfree(bufb->array[i]); -depopulate: - /* Free all allocated pages */ - for (i = 0; (i < num_pages && pages[i]); i++) - __free_page(pages[i]); - kfree(bufb->array); + /* bufb->array[i] will be freed by shm teardown */ +memory_map_error: + /* bufb->array will be freed by shm teardown */ array_error: - kfree(virt); -virt_error: - kfree(pages); -pages_error: return -ENOMEM; } int lib_ring_buffer_backend_create(struct lib_ring_buffer_backend *bufb, - struct channel_backend *chanb, int cpu) + struct channel_backend *chanb, int cpu, + struct shm_header *shm_header) { const struct lib_ring_buffer_config *config = chanb->config; - bufb->chan = caa_container_of(chanb, struct channel, backend); + set_shmp(&bufb->chan, caa_container_of(chanb, struct channel, backend)); bufb->cpu = cpu; return lib_ring_buffer_backend_allocate(config, bufb, chanb->buf_size, chanb->num_subbuf, - chanb->extra_reader_sb); + chanb->extra_reader_sb, + shm_header); } void lib_ring_buffer_backend_free(struct lib_ring_buffer_backend *bufb) { - struct channel_backend *chanb = &bufb->chan->backend; - unsigned long i, j, num_subbuf_alloc; - - num_subbuf_alloc = chanb->num_subbuf; - if (chanb->extra_reader_sb) - num_subbuf_alloc++; - - kfree(bufb->buf_wsb); - for (i = 0; i < num_subbuf_alloc; i++) { - for (j = 0; j < bufb->num_pages_per_subbuf; j++) - __free_page(bufb->array[i]->p[j].page); - kfree(bufb->array[i]); - } - kfree(bufb->array); + /* bufb->buf_wsb will be freed by shm teardown */ + /* bufb->array[i] will be freed by shm teardown */ + /* bufb->array will be freed by shm teardown */ bufb->allocated = 0; } void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb) { - struct channel_backend *chanb = &bufb->chan->backend; + struct channel_backend *chanb = &shmp(bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; unsigned long num_subbuf_alloc; unsigned int i; @@ -173,7 +133,7 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb) num_subbuf_alloc++; for (i = 0; i < chanb->num_subbuf; i++) - bufb->buf_wsb[i].id = subbuffer_id(config, 0, 1, i); + shmp(bufb->buf_wsb)[i].id = subbuffer_id(config, 0, 1, i); if (chanb->extra_reader_sb) bufb->buf_rsb.id = subbuffer_id(config, 0, 1, num_subbuf_alloc - 1); @@ -182,9 +142,9 @@ void lib_ring_buffer_backend_reset(struct lib_ring_buffer_backend *bufb) for (i = 0; i < num_subbuf_alloc; i++) { /* Don't reset mmap_offset */ - v_set(config, &bufb->array[i]->records_commit, 0); - v_set(config, &bufb->array[i]->records_unread, 0); - bufb->array[i]->data_size = 0; + v_set(config, &shmp(bufb->array)[i]->records_commit, 0); + v_set(config, &shmp(bufb->array)[i]->records_unread, 0); + shmp(bufb->array)[i]->data_size = 0; /* Don't reset backend page and virt addresses */ } /* Don't reset num_pages_per_subbuf, cpu, allocated */ @@ -208,52 +168,6 @@ void channel_backend_reset(struct channel_backend *chanb) chanb->start_tsc = config->cb.ring_buffer_clock_read(chan); } -#ifdef CONFIG_HOTPLUG_CPU -/** - * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback - * @nb: notifier block - * @action: hotplug action to take - * @hcpu: CPU number - * - * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) - */ -static -int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, - unsigned long action, - void *hcpu) -{ - unsigned int cpu = (unsigned long)hcpu; - struct channel_backend *chanb = caa_container_of(nb, struct channel_backend, - cpu_hp_notifier); - const struct lib_ring_buffer_config *config = chanb->config; - struct lib_ring_buffer *buf; - int ret; - - CHAN_WARN_ON(chanb, config->alloc == RING_BUFFER_ALLOC_GLOBAL); - - switch (action) { - case CPU_UP_PREPARE: - case CPU_UP_PREPARE_FROZEN: - buf = per_cpu_ptr(chanb->buf, cpu); - ret = lib_ring_buffer_create(buf, chanb, cpu); - if (ret) { - printk(KERN_ERR - "ring_buffer_cpu_hp_callback: cpu %d " - "buffer creation failed\n", cpu); - return NOTIFY_BAD; - } - break; - case CPU_DEAD: - case CPU_DEAD_FROZEN: - /* No need to do a buffer switch here, because it will happen - * when tracing is stopped, or will be done by switch timer CPU - * DEAD callback. */ - break; - } - return NOTIFY_OK; -} -#endif - /** * channel_backend_init - initialize a channel backend * @chanb: channel backend @@ -263,6 +177,7 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, * @parent: dentry of parent directory, %NULL for root directory * @subbuf_size: size of sub-buffers (> PAGE_SIZE, power of 2) * @num_subbuf: number of sub-buffers (power of 2) + * @shm_header: shared memory header * * Returns channel pointer if successful, %NULL otherwise. * @@ -275,7 +190,8 @@ int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, int channel_backend_init(struct channel_backend *chanb, const char *name, const struct lib_ring_buffer_config *config, - void *priv, size_t subbuf_size, size_t num_subbuf) + void *priv, size_t subbuf_size, size_t num_subbuf, + struct shm_header *shm_header) { struct channel *chan = caa_container_of(chanb, struct channel, backend); unsigned int i; @@ -310,58 +226,42 @@ int channel_backend_init(struct channel_backend *chanb, chanb->extra_reader_sb = (config->mode == RING_BUFFER_OVERWRITE) ? 1 : 0; chanb->num_subbuf = num_subbuf; - strlcpy(chanb->name, name, NAME_MAX); + strncpy(chanb->name, name, NAME_MAX); + chanb->name[NAME_MAX - 1] = '\0'; chanb->config = config; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - if (!zalloc_cpumask_var(&chanb->cpumask, GFP_KERNEL)) - return -ENOMEM; - } + struct lib_ring_buffer *buf; + size_t alloc_size; - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { /* Allocating the buffer per-cpu structures */ - chanb->buf = alloc_percpu(struct lib_ring_buffer); - if (!chanb->buf) - goto free_cpumask; + alloc_size = sizeof(struct lib_ring_buffer); + buf = zalloc_shm(shm_header, alloc_size * num_possible_cpus()); + if (!buf) + goto end; + set_shmp(chanb->buf, buf); /* - * In case of non-hotplug cpu, if the ring-buffer is allocated - * in early initcall, it will not be notified of secondary cpus. - * In that off case, we need to allocate for all possible cpus. - */ -#ifdef CONFIG_HOTPLUG_CPU - /* - * buf->backend.allocated test takes care of concurrent CPU - * hotplug. - * Priority higher than frontend, so we create the ring buffer - * before we start the timer. + * We need to allocate for all possible cpus. */ - chanb->cpu_hp_notifier.notifier_call = - lib_ring_buffer_cpu_hp_callback; - chanb->cpu_hp_notifier.priority = 5; - register_hotcpu_notifier(&chanb->cpu_hp_notifier); - - get_online_cpus(); - for_each_online_cpu(i) { - ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i), - chanb, i); - if (ret) - goto free_bufs; /* cpu hotplug locked */ - } - put_online_cpus(); -#else for_each_possible_cpu(i) { - ret = lib_ring_buffer_create(per_cpu_ptr(chanb->buf, i), - chanb, i); + ret = lib_ring_buffer_create(&shmp(chanb->buf)[i], + chanb, i, shm_header); if (ret) goto free_bufs; /* cpu hotplug locked */ } -#endif } else { - chanb->buf = kzalloc(sizeof(struct lib_ring_buffer), GFP_KERNEL); - if (!chanb->buf) - goto free_cpumask; - ret = lib_ring_buffer_create(chanb->buf, chanb, -1); + struct lib_ring_buffer *buf; + size_t alloc_size; + + alloc_size = sizeof(struct lib_ring_buffer); + chanb->buf = zmalloc(sizeof(struct lib_ring_buffer)); + buf = zalloc_shm(shm_header, alloc_size); + if (!buf) + goto end; + set_shmp(chanb->buf, buf); + ret = lib_ring_buffer_create(shmp(chanb->buf), chanb, -1, + shm_header); if (ret) goto free_bufs; } @@ -372,38 +272,18 @@ int channel_backend_init(struct channel_backend *chanb, free_bufs: if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_possible_cpu(i) { - struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i); + struct lib_ring_buffer *buf = &shmp(chanb->buf)[i]; if (!buf->backend.allocated) continue; lib_ring_buffer_free(buf); } -#ifdef CONFIG_HOTPLUG_CPU - put_online_cpus(); -#endif - free_percpu(chanb->buf); - } else - kfree(chanb->buf); -free_cpumask: - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - free_cpumask_var(chanb->cpumask); + } + /* We only free the buffer data upon shm teardown */ +end: return -ENOMEM; } -/** - * channel_backend_unregister_notifiers - unregister notifiers - * @chan: the channel - * - * Holds CPU hotplug. - */ -void channel_backend_unregister_notifiers(struct channel_backend *chanb) -{ - const struct lib_ring_buffer_config *config = chanb->config; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - unregister_hotcpu_notifier(&chanb->cpu_hp_notifier); -} - /** * channel_backend_free - destroy the channel * @chan: the channel @@ -417,67 +297,21 @@ void channel_backend_free(struct channel_backend *chanb) if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { for_each_possible_cpu(i) { - struct lib_ring_buffer *buf = per_cpu_ptr(chanb->buf, i); + struct lib_ring_buffer *buf = &shmp(chanb->buf)[i]; if (!buf->backend.allocated) continue; lib_ring_buffer_free(buf); } - free_cpumask_var(chanb->cpumask); - free_percpu(chanb->buf); } else { - struct lib_ring_buffer *buf = chanb->buf; + struct lib_ring_buffer *buf = shmp(chanb->buf); CHAN_WARN_ON(chanb, !buf->backend.allocated); lib_ring_buffer_free(buf); - kfree(buf); } + /* We only free the buffer data upon shm teardown */ } -/** - * lib_ring_buffer_write - write data to a ring_buffer buffer. - * @bufb : buffer backend - * @offset : offset within the buffer - * @src : source address - * @len : length to write - * @pagecpy : page size copied so far - */ -void _lib_ring_buffer_write(struct lib_ring_buffer_backend *bufb, size_t offset, - const void *src, size_t len, ssize_t pagecpy) -{ - struct channel_backend *chanb = &bufb->chan->backend; - const struct lib_ring_buffer_config *config = chanb->config; - size_t sbidx, index; - struct lib_ring_buffer_backend_pages *rpages; - unsigned long sb_bindex, id; - - do { - len -= pagecpy; - src += pagecpy; - offset += pagecpy; - sbidx = offset >> chanb->subbuf_size_order; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); - - /* - * Underlying layer should never ask for writes across - * subbuffers. - */ - CHAN_WARN_ON(chanb, offset >= chanb->buf_size); - - pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK)); - id = bufb->buf_wsb[sbidx].id; - sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; - CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE - && subbuffer_id_is_noref(config, id)); - lib_ring_buffer_do_copy(config, - rpages->p[index].virt - + (offset & ~PAGE_MASK), - src, pagecpy); - } while (unlikely(len != pagecpy)); -} -EXPORT_SYMBOL_GPL(_lib_ring_buffer_write); - /** * lib_ring_buffer_read - read data from ring_buffer_buffer. * @bufb : buffer backend @@ -491,42 +325,30 @@ EXPORT_SYMBOL_GPL(_lib_ring_buffer_write); size_t lib_ring_buffer_read(struct lib_ring_buffer_backend *bufb, size_t offset, void *dest, size_t len) { - struct channel_backend *chanb = &bufb->chan->backend; + struct channel_backend *chanb = &shmp(bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; - size_t index; - ssize_t pagecpy, orig_len; + ssize_t orig_len; struct lib_ring_buffer_backend_pages *rpages; unsigned long sb_bindex, id; orig_len = len; offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); + if (unlikely(!len)) return 0; - for (;;) { - pagecpy = min_t(size_t, len, PAGE_SIZE - (offset & ~PAGE_MASK)); - id = bufb->buf_rsb.id; - sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; - CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE - && subbuffer_id_is_noref(config, id)); - memcpy(dest, rpages->p[index].virt + (offset & ~PAGE_MASK), - pagecpy); - len -= pagecpy; - if (likely(!len)) - break; - dest += pagecpy; - offset += pagecpy; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); - /* - * Underlying layer should never ask for reads across - * subbuffers. - */ - CHAN_WARN_ON(chanb, offset >= chanb->buf_size); - } + id = bufb->buf_rsb.id; + sb_bindex = subbuffer_id_get_index(config, id); + rpages = shmp(bufb->array)[sb_bindex]; + /* + * Underlying layer should never ask for reads across + * subbuffers. + */ + CHAN_WARN_ON(chanb, offset >= chanb->buf_size); + CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE + && subbuffer_id_is_noref(config, id)); + memcpy(dest, shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)), len); return orig_len; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_read); /** * lib_ring_buffer_read_cstr - read a C-style string from ring_buffer. @@ -541,79 +363,33 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_read); int lib_ring_buffer_read_cstr(struct lib_ring_buffer_backend *bufb, size_t offset, void *dest, size_t len) { - struct channel_backend *chanb = &bufb->chan->backend; + struct channel_backend *chanb = &shmp(bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; - size_t index; - ssize_t pagecpy, pagelen, strpagelen, orig_offset; + ssize_t string_len, orig_offset; char *str; struct lib_ring_buffer_backend_pages *rpages; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); orig_offset = offset; - for (;;) { - id = bufb->buf_rsb.id; - sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; - CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE - && subbuffer_id_is_noref(config, id)); - str = (char *)rpages->p[index].virt + (offset & ~PAGE_MASK); - pagelen = PAGE_SIZE - (offset & ~PAGE_MASK); - strpagelen = strnlen(str, pagelen); - if (len) { - pagecpy = min_t(size_t, len, strpagelen); - if (dest) { - memcpy(dest, str, pagecpy); - dest += pagecpy; - } - len -= pagecpy; - } - offset += strpagelen; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); - if (strpagelen < pagelen) - break; - /* - * Underlying layer should never ask for reads across - * subbuffers. - */ - CHAN_WARN_ON(chanb, offset >= chanb->buf_size); - } - if (dest && len) - ((char *)dest)[0] = 0; - return offset - orig_offset; -} -EXPORT_SYMBOL_GPL(lib_ring_buffer_read_cstr); - -/** - * lib_ring_buffer_read_get_page - Get a whole page to read from - * @bufb : buffer backend - * @offset : offset within the buffer - * @virt : pointer to page address (output) - * - * Should be protected by get_subbuf/put_subbuf. - * Returns the pointer to the page struct pointer. - */ -struct page **lib_ring_buffer_read_get_page(struct lib_ring_buffer_backend *bufb, - size_t offset, void ***virt) -{ - size_t index; - struct lib_ring_buffer_backend_pages *rpages; - struct channel_backend *chanb = &bufb->chan->backend; - const struct lib_ring_buffer_config *config = chanb->config; - unsigned long sb_bindex, id; - - offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; + rpages = shmp(bufb->array)[sb_bindex]; + /* + * Underlying layer should never ask for reads across + * subbuffers. + */ + CHAN_WARN_ON(chanb, offset >= chanb->buf_size); CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - *virt = &rpages->p[index].virt; - return &rpages->p[index].page; + str = (char *)shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)); + string_len = strnlen(str, len); + if (dest && len) { + memcpy(dest, str, string_len); + ((char *)dest)[0] = 0; + } + return offset - orig_offset; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page); /** * lib_ring_buffer_read_offset_address - get address of a buffer location @@ -628,22 +404,19 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_read_get_page); void *lib_ring_buffer_read_offset_address(struct lib_ring_buffer_backend *bufb, size_t offset) { - size_t index; struct lib_ring_buffer_backend_pages *rpages; - struct channel_backend *chanb = &bufb->chan->backend; + struct channel_backend *chanb = &shmp(bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); id = bufb->buf_rsb.id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; + rpages = shmp(bufb->array)[sb_bindex]; CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - return rpages->p[index].virt + (offset & ~PAGE_MASK); + return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)); } -EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address); /** * lib_ring_buffer_offset_address - get address of a location within the buffer @@ -658,20 +431,18 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_read_offset_address); void *lib_ring_buffer_offset_address(struct lib_ring_buffer_backend *bufb, size_t offset) { - size_t sbidx, index; + size_t sbidx; struct lib_ring_buffer_backend_pages *rpages; - struct channel_backend *chanb = &bufb->chan->backend; + struct channel_backend *chanb = &shmp(bufb->chan)->backend; const struct lib_ring_buffer_config *config = chanb->config; unsigned long sb_bindex, id; offset &= chanb->buf_size - 1; sbidx = offset >> chanb->subbuf_size_order; - index = (offset & (chanb->subbuf_size - 1)) >> get_count_order(PAGE_SIZE); - id = bufb->buf_wsb[sbidx].id; + id = shmp(bufb->buf_wsb)[sbidx].id; sb_bindex = subbuffer_id_get_index(config, id); - rpages = bufb->array[sb_bindex]; + rpages = shmp(bufb->array)[sb_bindex]; CHAN_WARN_ON(chanb, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, id)); - return rpages->p[index].virt + (offset & ~PAGE_MASK); + return shmp(rpages->p) + (offset & ~(chanb->subbuf_size - 1)); } -EXPORT_SYMBOL_GPL(lib_ring_buffer_offset_address); diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 5ceda87..2b4ecc2 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -38,13 +38,16 @@ * Dual LGPL v2.1/GPL v2 license. */ +#include +#include #include +#include +#include "smp.h" #include "config.h" #include "backend.h" #include "frontend.h" -#include "iterator.h" -#include "nohz.h" +#include "shm.h" /* * Internal structure representing offsets to use at a sub-buffer switch. @@ -56,20 +59,7 @@ struct switch_offsets { switch_old_end:1; }; -#ifdef CONFIG_NO_HZ -enum tick_nohz_val { - TICK_NOHZ_STOP, - TICK_NOHZ_FLUSH, - TICK_NOHZ_RESTART, -}; - -static ATOMIC_NOTIFIER_HEAD(tick_nohz_notifier); -#endif /* CONFIG_NO_HZ */ - -static DEFINE_PER_CPU(spinlock_t, ring_buffer_nohz_lock); - -DEFINE_PER_CPU(unsigned int, lib_ring_buffer_nesting); -EXPORT_PER_CPU_SYMBOL(lib_ring_buffer_nesting); +__thread unsigned int lib_ring_buffer_nesting; static void lib_ring_buffer_print_errors(struct channel *chan, @@ -80,11 +70,11 @@ void lib_ring_buffer_print_errors(struct channel *chan, */ void lib_ring_buffer_free(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu); - kfree(buf->commit_hot); - kfree(buf->commit_cold); + free(shmp(buf->commit_hot)); + free(shmp(buf->commit_cold)); lib_ring_buffer_backend_free(&buf->backend); } @@ -100,7 +90,7 @@ void lib_ring_buffer_free(struct lib_ring_buffer *buf) */ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned int i; @@ -108,15 +98,14 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) * Reset iterator first. It will put the subbuffer if it currently holds * it. */ - lib_ring_buffer_iterator_reset(buf); v_set(config, &buf->offset, 0); for (i = 0; i < chan->backend.num_subbuf; i++) { - v_set(config, &buf->commit_hot[i].cc, 0); - v_set(config, &buf->commit_hot[i].seq, 0); - v_set(config, &buf->commit_cold[i].cc_sb, 0); + v_set(config, &shmp(buf->commit_hot)[i].cc, 0); + v_set(config, &shmp(buf->commit_hot)[i].seq, 0); + v_set(config, &shmp(buf->commit_cold)[i].cc_sb, 0); } - atomic_long_set(&buf->consumed, 0); - atomic_set(&buf->record_disabled, 0); + uatomic_set(&buf->consumed, 0); + uatomic_set(&buf->record_disabled, 0); v_set(config, &buf->last_tsc, 0); lib_ring_buffer_backend_reset(&buf->backend); /* Don't reset number of active readers */ @@ -127,7 +116,6 @@ void lib_ring_buffer_reset(struct lib_ring_buffer *buf) v_set(config, &buf->records_overrun, 0); buf->finalized = 0; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_reset); /** * channel_reset - Reset channel to initial values. @@ -143,21 +131,20 @@ void channel_reset(struct channel *chan) /* * Reset iterators first. Will put the subbuffer if held for reading. */ - channel_iterator_reset(chan); - atomic_set(&chan->record_disabled, 0); + uatomic_set(&chan->record_disabled, 0); /* Don't reset commit_count_mask, still valid */ channel_backend_reset(&chan->backend); /* Don't reset switch/read timer interval */ /* Don't reset notifiers and notifier enable bits */ /* Don't reset reader reference count */ } -EXPORT_SYMBOL_GPL(channel_reset); /* * Must be called under cpu hotplug protection. */ int lib_ring_buffer_create(struct lib_ring_buffer *buf, - struct channel_backend *chanb, int cpu) + struct channel_backend *chanb, int cpu, + struct shm_header *shm_header) { const struct lib_ring_buffer_config *config = chanb->config; struct channel *chan = caa_container_of(chanb, struct channel, backend); @@ -171,39 +158,29 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, if (buf->backend.allocated) return 0; - /* - * Paranoia: per cpu dynamic allocation is not officially documented as - * zeroing the memory, so let's do it here too, just in case. - */ - memset(buf, 0, sizeof(*buf)); - - ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu); + ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, + cpu, shm_header); if (ret) return ret; - buf->commit_hot = - kzalloc_node(ALIGN(sizeof(*buf->commit_hot) - * chan->backend.num_subbuf, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); - if (!buf->commit_hot) { + set_shmp(&buf->commit_hot, + zalloc_shm(shm_header, + sizeof(*buf->commit_hot) * chan->backend.num_subbuf)); + if (!shmp(buf->commit_hot)) { ret = -ENOMEM; goto free_chanbuf; } - buf->commit_cold = - kzalloc_node(ALIGN(sizeof(*buf->commit_cold) - * chan->backend.num_subbuf, - 1 << INTERNODE_CACHE_SHIFT), - GFP_KERNEL, cpu_to_node(max(cpu, 0))); - if (!buf->commit_cold) { + set_shmp(&buf->commit_cold, + zalloc_shm(shm_header, + sizeof(*buf->commit_cold) * chan->backend.num_subbuf)); + if (!shmp(buf->commit_cold)) { ret = -ENOMEM; goto free_commit; } num_subbuf = chan->backend.num_subbuf; - init_waitqueue_head(&buf->read_wait); - raw_spin_lock_init(&buf->raw_tick_nohz_spinlock); + //init_waitqueue_head(&buf->read_wait); /* * Write the subbuffer header for first subbuffer so we know the total @@ -211,38 +188,24 @@ int lib_ring_buffer_create(struct lib_ring_buffer *buf, */ subbuf_header_size = config->cb.subbuffer_header_size(); v_set(config, &buf->offset, subbuf_header_size); - subbuffer_id_clear_noref(config, &buf->backend.buf_wsb[0].id); - tsc = config->cb.ring_buffer_clock_read(buf->backend.chan); + subbuffer_id_clear_noref(config, &shmp(buf->backend.buf_wsb)[0].id); + tsc = config->cb.ring_buffer_clock_read(shmp(buf->backend.chan)); config->cb.buffer_begin(buf, tsc, 0); - v_add(config, subbuf_header_size, &buf->commit_hot[0].cc); + v_add(config, subbuf_header_size, &shmp(buf->commit_hot)[0].cc); if (config->cb.buffer_create) { ret = config->cb.buffer_create(buf, priv, cpu, chanb->name); if (ret) goto free_init; } - - /* - * Ensure the buffer is ready before setting it to allocated and setting - * the cpumask. - * Used for cpu hotplug vs cpumask iteration. - */ - smp_wmb(); buf->backend.allocated = 1; - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - CHAN_WARN_ON(chan, cpumask_test_cpu(cpu, - chan->backend.cpumask)); - cpumask_set_cpu(cpu, chan->backend.cpumask); - } - return 0; /* Error handling */ free_init: - kfree(buf->commit_cold); + /* commit_cold will be freed by shm teardown */ free_commit: - kfree(buf->commit_hot); + /* commit_hot will be freed by shm teardown */ free_chanbuf: lib_ring_buffer_backend_free(&buf->backend); return ret; @@ -251,55 +214,52 @@ free_chanbuf: static void switch_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; /* * Only flush buffers periodically if readers are active. */ - if (atomic_long_read(&buf->active_readers)) + if (uatomic_read(&buf->active_readers)) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->switch_timer, - jiffies + chan->switch_timer_interval); - else - mod_timer(&buf->switch_timer, - jiffies + chan->switch_timer_interval); + //TODO timers + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // mod_timer_pinned(&buf->switch_timer, + // jiffies + chan->switch_timer_interval); + //else + // mod_timer(&buf->switch_timer, + // jiffies + chan->switch_timer_interval); } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_start_switch_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (!chan->switch_timer_interval || buf->switch_timer_enabled) return; - init_timer(&buf->switch_timer); - buf->switch_timer.function = switch_buffer_timer; - buf->switch_timer.expires = jiffies + chan->switch_timer_interval; - buf->switch_timer.data = (unsigned long)buf; - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - add_timer_on(&buf->switch_timer, buf->backend.cpu); - else - add_timer(&buf->switch_timer); + //TODO + //init_timer(&buf->switch_timer); + //buf->switch_timer.function = switch_buffer_timer; + //buf->switch_timer.expires = jiffies + chan->switch_timer_interval; + //buf->switch_timer.data = (unsigned long)buf; + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // add_timer_on(&buf->switch_timer, buf->backend.cpu); + //else + // add_timer(&buf->switch_timer); buf->switch_timer_enabled = 1; } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); if (!chan->switch_timer_interval || !buf->switch_timer_enabled) return; - del_timer_sync(&buf->switch_timer); + //TODO + //del_timer_sync(&buf->switch_timer); buf->switch_timer_enabled = 0; } @@ -309,31 +269,30 @@ static void lib_ring_buffer_stop_switch_timer(struct lib_ring_buffer *buf) static void read_buffer_timer(unsigned long data) { struct lib_ring_buffer *buf = (struct lib_ring_buffer *)data; - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; CHAN_WARN_ON(chan, !buf->backend.allocated); - if (atomic_long_read(&buf->active_readers) + if (uatomic_read(&buf->active_readers) && lib_ring_buffer_poll_deliver(config, buf, chan)) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); + //TODO + //wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&chan->read_wait); } - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - mod_timer_pinned(&buf->read_timer, - jiffies + chan->read_timer_interval); - else - mod_timer(&buf->read_timer, - jiffies + chan->read_timer_interval); + //TODO + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // mod_timer_pinned(&buf->read_timer, + // jiffies + chan->read_timer_interval); + //else + // mod_timer(&buf->read_timer, + // jiffies + chan->read_timer_interval); } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -341,24 +300,22 @@ static void lib_ring_buffer_start_read_timer(struct lib_ring_buffer *buf) || buf->read_timer_enabled) return; - init_timer(&buf->read_timer); - buf->read_timer.function = read_buffer_timer; - buf->read_timer.expires = jiffies + chan->read_timer_interval; - buf->read_timer.data = (unsigned long)buf; + //TODO + //init_timer(&buf->read_timer); + //buf->read_timer.function = read_buffer_timer; + //buf->read_timer.expires = jiffies + chan->read_timer_interval; + //buf->read_timer.data = (unsigned long)buf; - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - add_timer_on(&buf->read_timer, buf->backend.cpu); - else - add_timer(&buf->read_timer); + //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + // add_timer_on(&buf->read_timer, buf->backend.cpu); + //else + // add_timer(&buf->read_timer); buf->read_timer_enabled = 1; } -/* - * Called with ring_buffer_nohz_lock held for per-cpu buffers. - */ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER @@ -366,202 +323,34 @@ static void lib_ring_buffer_stop_read_timer(struct lib_ring_buffer *buf) || !buf->read_timer_enabled) return; - del_timer_sync(&buf->read_timer); + //TODO + //del_timer_sync(&buf->read_timer); /* * do one more check to catch data that has been written in the last * timer period. */ if (lib_ring_buffer_poll_deliver(config, buf, chan)) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); + //TODO + //wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&chan->read_wait); } buf->read_timer_enabled = 0; } -#ifdef CONFIG_HOTPLUG_CPU -/** - * lib_ring_buffer_cpu_hp_callback - CPU hotplug callback - * @nb: notifier block - * @action: hotplug action to take - * @hcpu: CPU number - * - * Returns the success/failure of the operation. (%NOTIFY_OK, %NOTIFY_BAD) - */ -static -int __cpuinit lib_ring_buffer_cpu_hp_callback(struct notifier_block *nb, - unsigned long action, - void *hcpu) -{ - unsigned int cpu = (unsigned long)hcpu; - struct channel *chan = caa_container_of(nb, struct channel, - cpu_hp_notifier); - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, cpu); - const struct lib_ring_buffer_config *config = chan->backend.config; - - if (!chan->cpu_hp_enable) - return NOTIFY_DONE; - - CHAN_WARN_ON(chan, config->alloc == RING_BUFFER_ALLOC_GLOBAL); - - switch (action) { - case CPU_DOWN_FAILED: - case CPU_DOWN_FAILED_FROZEN: - case CPU_ONLINE: - case CPU_ONLINE_FROZEN: - wake_up_interruptible(&chan->hp_wait); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - return NOTIFY_OK; - - case CPU_DOWN_PREPARE: - case CPU_DOWN_PREPARE_FROZEN: - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - return NOTIFY_OK; - - case CPU_DEAD: - case CPU_DEAD_FROZEN: - /* - * Performing a buffer switch on a remote CPU. Performed by - * the CPU responsible for doing the hotunplug after the target - * CPU stopped running completely. Ensures that all data - * from that remote CPU is flushed. - */ - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - return NOTIFY_OK; - - default: - return NOTIFY_DONE; - } -} -#endif - -#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) -/* - * For per-cpu buffers, call the reader wakeups before switching the buffer, so - * that wake-up-tracing generated events are flushed before going idle (in - * tick_nohz). We test if the spinlock is locked to deal with the race where - * readers try to sample the ring buffer before we perform the switch. We let - * the readers retry in that case. If there is data in the buffer, the wake up - * is going to forbid the CPU running the reader thread from going idle. - */ -static int notrace ring_buffer_tick_nohz_callback(struct notifier_block *nb, - unsigned long val, - void *data) -{ - struct channel *chan = caa_container_of(nb, struct channel, - tick_nohz_notifier); - const struct lib_ring_buffer_config *config = chan->backend.config; - struct lib_ring_buffer *buf; - int cpu = smp_processor_id(); - - if (config->alloc != RING_BUFFER_ALLOC_PER_CPU) { - /* - * We don't support keeping the system idle with global buffers - * and streaming active. In order to do so, we would need to - * sample a non-nohz-cpumask racelessly with the nohz updates - * without adding synchronization overhead to nohz. Leave this - * use-case out for now. - */ - return 0; - } - - buf = channel_get_ring_buffer(config, chan, cpu); - switch (val) { - case TICK_NOHZ_FLUSH: - raw_spin_lock(&buf->raw_tick_nohz_spinlock); - if (config->wakeup == RING_BUFFER_WAKEUP_BY_TIMER - && chan->read_timer_interval - && atomic_long_read(&buf->active_readers) - && (lib_ring_buffer_poll_deliver(config, buf, chan) - || lib_ring_buffer_pending_data(config, buf, chan))) { - wake_up_interruptible(&buf->read_wait); - wake_up_interruptible(&chan->read_wait); - } - if (chan->switch_timer_interval) - lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE); - raw_spin_unlock(&buf->raw_tick_nohz_spinlock); - break; - case TICK_NOHZ_STOP: - spin_lock(&__get_cpu_var(ring_buffer_nohz_lock)); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock)); - break; - case TICK_NOHZ_RESTART: - spin_lock(&__get_cpu_var(ring_buffer_nohz_lock)); - lib_ring_buffer_start_read_timer(buf); - lib_ring_buffer_start_switch_timer(buf); - spin_unlock(&__get_cpu_var(ring_buffer_nohz_lock)); - break; - } - - return 0; -} - -void notrace lib_ring_buffer_tick_nohz_flush(void) -{ - atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_FLUSH, - NULL); -} - -void notrace lib_ring_buffer_tick_nohz_stop(void) -{ - atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_STOP, - NULL); -} - -void notrace lib_ring_buffer_tick_nohz_restart(void) -{ - atomic_notifier_call_chain(&tick_nohz_notifier, TICK_NOHZ_RESTART, - NULL); -} -#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ - -/* - * Holds CPU hotplug. - */ static void channel_unregister_notifiers(struct channel *chan) { const struct lib_ring_buffer_config *config = chan->backend.config; int cpu; - channel_iterator_unregister_notifiers(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { -#ifdef CONFIG_NO_HZ - /* - * Remove the nohz notifier first, so we are certain we stop - * the timers. - */ - atomic_notifier_chain_unregister(&tick_nohz_notifier, - &chan->tick_nohz_notifier); - /* - * ring_buffer_nohz_lock will not be needed below, because - * we just removed the notifiers, which were the only source of - * concurrency. - */ -#endif /* CONFIG_NO_HZ */ -#ifdef CONFIG_HOTPLUG_CPU - get_online_cpus(); - chan->cpu_hp_enable = 0; - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - lib_ring_buffer_stop_switch_timer(buf); - lib_ring_buffer_stop_read_timer(buf); - } - put_online_cpus(); - unregister_cpu_notifier(&chan->cpu_hp_notifier); -#else for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); + struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; + lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); } -#endif } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lib_ring_buffer *buf = shmp(chan->backend.buf); lib_ring_buffer_stop_switch_timer(buf); lib_ring_buffer_stop_read_timer(buf); @@ -571,9 +360,8 @@ static void channel_unregister_notifiers(struct channel *chan) static void channel_free(struct channel *chan) { - channel_iterator_free(chan); channel_backend_free(&chan->backend); - kfree(chan); + free(chan); } /** @@ -590,6 +378,7 @@ static void channel_free(struct channel *chan) * padding to let readers get those sub-buffers. * Used for live streaming. * @read_timer_interval: Time interval (in us) to wake up pending readers. + * @shmid: shared memory ID (output) * * Holds cpu hotplug. * Returns NULL on failure. @@ -598,79 +387,104 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, const char *name, void *priv, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, - unsigned int read_timer_interval) + unsigned int read_timer_interval, + int *shmid) { int ret, cpu; struct channel *chan; + size_t shmsize, bufshmsize; + struct shm_header *shm_header; + unsigned long num_subbuf_alloc; if (lib_ring_buffer_check_config(config, switch_timer_interval, read_timer_interval)) return NULL; - chan = kzalloc(sizeof(struct channel), GFP_KERNEL); - if (!chan) + /* Calculate the shm allocation layout */ + shmsize = sizeof(struct shm_header); + shmsize += sizeof(struct channel); + + /* Per-cpu buffer size: control (prior to backend) */ + bufshmsize = sizeof(struct lib_ring_buffer); + shmsize += bufshmsize * num_possible_cpus(); + + /* Per-cpu buffer size: backend */ + /* num_subbuf + 1 is the worse case */ + num_subbuf_alloc = num_subbuf + 1; + bufshmsize = sizeof(struct lib_ring_buffer_backend_pages *) * num_subbuf_alloc; + bufshmsize += subbuf_size * (num_subbuf_alloc); + bufshmsize += (sizeof(struct lib_ring_buffer_backend_pages) + subbuf_size) * num_subbuf_alloc; + bufshmsize += sizeof(struct lib_ring_buffer_backend_subbuffer) * num_subbuf; + shmsize += bufshmsize * num_possible_cpus(); + + /* Per-cpu buffer size: control (after backend) */ + bufshmsize += sizeof(struct commit_counters_hot) * num_subbuf; + bufshmsize += sizeof(struct commit_counters_cold) * num_subbuf; + + /* Allocate shm */ + *shmid = shmget(getpid(), shmsize, IPC_CREAT | IPC_EXCL | 0700); + if (*shmid < 0) { + if (errno == EINVAL) + ERR("shmget() returned EINVAL; maybe /proc/sys/kernel/shmmax should be increased."); + else + PERROR("shmget"); return NULL; + } - ret = channel_backend_init(&chan->backend, name, config, priv, - subbuf_size, num_subbuf); - if (ret) - goto error; + shm_header = shmat(*shmid, NULL, 0); + if (shm_header == (void *) -1) { + perror("shmat"); + goto destroy_shmem; + } - ret = channel_iterator_init(chan); + /* Already mark the shared memory for destruction. This will occur only + * when all users have detached. + */ + ret = shmctl(*shmid, IPC_RMID, NULL); + if (ret == -1) { + perror("shmctl"); + goto destroy_shmem; + } + + shm_header->magic = SHM_MAGIC; + shm_header->major = SHM_MAJOR; + shm_header->major = SHM_MINOR; + shm_header->bits_per_long = CAA_BITS_PER_LONG; + shm_header->shm_size = shmsize; + shm_header->shm_allocated = sizeof(struct shm_header); + + chan = zalloc_shm(shm_header, sizeof(struct channel)); + if (!chan) + goto destroy_shmem; + set_shmp(shm_header->chan, chan); + + ret = channel_backend_init(&chan->backend, name, config, priv, + subbuf_size, num_subbuf, shm_header); if (ret) - goto error_free_backend; + goto destroy_shmem; chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order); - chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval); - chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); - kref_init(&chan->ref); - init_waitqueue_head(&chan->read_wait); - init_waitqueue_head(&chan->hp_wait); + //TODO + //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval); + //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval); + urcu_ref_init(&chan->ref); + //TODO + //init_waitqueue_head(&chan->read_wait); + //init_waitqueue_head(&chan->hp_wait); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { -#if defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) - /* Only benefit from NO_HZ idle with per-cpu buffers for now. */ - chan->tick_nohz_notifier.notifier_call = - ring_buffer_tick_nohz_callback; - chan->tick_nohz_notifier.priority = ~0U; - atomic_notifier_chain_register(&tick_nohz_notifier, - &chan->tick_nohz_notifier); -#endif /* defined(CONFIG_NO_HZ) && defined(CONFIG_LIB_RING_BUFFER) */ - /* * In case of non-hotplug cpu, if the ring-buffer is allocated * in early initcall, it will not be notified of secondary cpus. * In that off case, we need to allocate for all possible cpus. */ -#ifdef CONFIG_HOTPLUG_CPU - chan->cpu_hp_notifier.notifier_call = - lib_ring_buffer_cpu_hp_callback; - chan->cpu_hp_notifier.priority = 6; - register_cpu_notifier(&chan->cpu_hp_notifier); - - get_online_cpus(); - for_each_online_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); - lib_ring_buffer_start_switch_timer(buf); - lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); - } - chan->cpu_hp_enable = 1; - put_online_cpus(); -#else for_each_possible_cpu(cpu) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); - spin_lock(&per_cpu(ring_buffer_nohz_lock, cpu)); + struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); - spin_unlock(&per_cpu(ring_buffer_nohz_lock, cpu)); } -#endif } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lib_ring_buffer *buf = shmp(chan->backend.buf); lib_ring_buffer_start_switch_timer(buf); lib_ring_buffer_start_read_timer(buf); @@ -678,18 +492,18 @@ struct channel *channel_create(const struct lib_ring_buffer_config *config, return chan; -error_free_backend: - channel_backend_free(&chan->backend); -error: - kfree(chan); +destroy_shmem: + ret = shmctl(*shmid, IPC_RMID, NULL); + if (ret == -1) { + perror("shmctl"); + } return NULL; } -EXPORT_SYMBOL_GPL(channel_create); static -void channel_release(struct kref *kref) +void channel_release(struct urcu_ref *ref) { - struct channel *chan = caa_container_of(kref, struct channel, ref); + struct channel *chan = caa_container_of(ref, struct channel, ref); channel_free(chan); } @@ -713,13 +527,8 @@ void *channel_destroy(struct channel *chan) channel_unregister_notifiers(chan); if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - /* - * No need to hold cpu hotplug, because all notifiers have been - * unregistered. - */ for_each_channel_cpu(cpu, chan) { - struct lib_ring_buffer *buf = per_cpu_ptr(chan->backend.buf, - cpu); + struct lib_ring_buffer *buf = &shmp(chan->backend.buf)[cpu]; if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, @@ -730,12 +539,12 @@ void *channel_destroy(struct channel *chan) /* * Perform flush before writing to finalized. */ - smp_wmb(); + cmm_smp_wmb(); CMM_ACCESS_ONCE(buf->finalized) = 1; - wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&buf->read_wait); } } else { - struct lib_ring_buffer *buf = chan->backend.buf; + struct lib_ring_buffer *buf = shmp(chan->backend.buf); if (config->cb.buffer_finalize) config->cb.buffer_finalize(buf, chan->backend.priv, -1); @@ -744,62 +553,47 @@ void *channel_destroy(struct channel *chan) /* * Perform flush before writing to finalized. */ - smp_wmb(); + cmm_smp_wmb(); CMM_ACCESS_ONCE(buf->finalized) = 1; - wake_up_interruptible(&buf->read_wait); + //wake_up_interruptible(&buf->read_wait); } CMM_ACCESS_ONCE(chan->finalized) = 1; - wake_up_interruptible(&chan->hp_wait); - wake_up_interruptible(&chan->read_wait); - kref_put(&chan->ref, channel_release); + //wake_up_interruptible(&chan->hp_wait); + //wake_up_interruptible(&chan->read_wait); + urcu_ref_put(&chan->ref, channel_release); priv = chan->backend.priv; return priv; } -EXPORT_SYMBOL_GPL(channel_destroy); struct lib_ring_buffer *channel_get_ring_buffer( const struct lib_ring_buffer_config *config, struct channel *chan, int cpu) { if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) - return chan->backend.buf; + return shmp(chan->backend.buf); else - return per_cpu_ptr(chan->backend.buf, cpu); + return &shmp(chan->backend.buf)[cpu]; } -EXPORT_SYMBOL_GPL(channel_get_ring_buffer); int lib_ring_buffer_open_read(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); - if (!atomic_long_add_unless(&buf->active_readers, 1, 1)) + if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0) return -EBUSY; - kref_get(&chan->ref); - smp_mb__after_atomic_inc(); + urcu_ref_get(&chan->ref); + cmm_smp_mb(); return 0; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_open_read); void lib_ring_buffer_release_read(struct lib_ring_buffer *buf) { - struct channel *chan = buf->backend.chan; - - CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); - smp_mb__before_atomic_dec(); - atomic_long_dec(&buf->active_readers); - kref_put(&chan->ref, channel_release); -} -EXPORT_SYMBOL_GPL(lib_ring_buffer_release_read); + struct channel *chan = shmp(buf->backend.chan); -/* - * Promote compiler barrier to a smp_mb(). - * For the specific ring buffer case, this IPI call should be removed if the - * architecture does not reorder writes. This should eventually be provided by - * a separate architecture-specific infrastructure. - */ -static void remote_mb(void *info) -{ - smp_mb(); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); + cmm_smp_mb(); + uatomic_dec(&buf->active_readers); + urcu_ref_put(&chan->ref, channel_release); } /** @@ -810,24 +604,22 @@ static void remote_mb(void *info) * * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no * data to read at consumed position, or 0 if the get operation succeeds. - * Busy-loop trying to get data if the tick_nohz sequence lock is held. */ int lib_ring_buffer_snapshot(struct lib_ring_buffer *buf, unsigned long *consumed, unsigned long *produced) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long consumed_cur, write_offset; int finalized; -retry: finalized = CMM_ACCESS_ONCE(buf->finalized); /* * Read finalized before counters. */ - smp_rmb(); - consumed_cur = atomic_long_read(&buf->consumed); + cmm_smp_rmb(); + consumed_cur = uatomic_read(&buf->consumed); /* * No need to issue a memory barrier between consumed count read and * write offset read, because consumed count can only change @@ -858,12 +650,9 @@ nodata: */ if (finalized) return -ENODATA; - else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock)) - goto retry; else return -EAGAIN; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_snapshot); /** * lib_ring_buffer_put_snapshot - move consumed counter forward @@ -874,22 +663,21 @@ void lib_ring_buffer_move_consumer(struct lib_ring_buffer *buf, unsigned long consumed_new) { struct lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan = bufb->chan; + struct channel *chan = shmp(bufb->chan); unsigned long consumed; - CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); /* * Only push the consumed value forward. * If the consumed cmpxchg fails, this is because we have been pushed by * the writer in flight recorder mode. */ - consumed = atomic_long_read(&buf->consumed); + consumed = uatomic_read(&buf->consumed); while ((long) consumed - (long) consumed_new < 0) - consumed = atomic_long_cmpxchg(&buf->consumed, consumed, - consumed_new); + consumed = uatomic_cmpxchg(&buf->consumed, consumed, + consumed_new); } -EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); /** * lib_ring_buffer_get_subbuf - get exclusive access to subbuffer for reading @@ -898,12 +686,11 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_move_consumer); * * Returns -ENODATA if buffer is finalized, -EAGAIN if there is currently no * data to read at consumed position, or 0 if the get operation succeeds. - * Busy-loop trying to get data if the tick_nohz sequence lock is held. */ int lib_ring_buffer_get_subbuf(struct lib_ring_buffer *buf, unsigned long consumed) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret; @@ -914,72 +701,21 @@ retry: /* * Read finalized before counters. */ - smp_rmb(); - consumed_cur = atomic_long_read(&buf->consumed); + cmm_smp_rmb(); + consumed_cur = uatomic_read(&buf->consumed); consumed_idx = subbuf_index(consumed, chan); - commit_count = v_read(config, &buf->commit_cold[consumed_idx].cc_sb); + commit_count = v_read(config, &shmp(buf->commit_cold)[consumed_idx].cc_sb); /* * Make sure we read the commit count before reading the buffer * data and the write offset. Correct consumed offset ordering * wrt commit count is insured by the use of cmpxchg to update * the consumed offset. - * smp_call_function_single can fail if the remote CPU is offline, - * this is OK because then there is no wmb to execute there. - * If our thread is executing on the same CPU as the on the buffers - * belongs to, we don't have to synchronize it at all. If we are - * migrated, the scheduler will take care of the memory barriers. - * Normally, smp_call_function_single() should ensure program order when - * executing the remote function, which implies that it surrounds the - * function execution with : - * smp_mb() - * send IPI - * csd_lock_wait - * recv IPI - * smp_mb() - * exec. function - * smp_mb() - * csd unlock - * smp_mb() - * - * However, smp_call_function_single() does not seem to clearly execute - * such barriers. It depends on spinlock semantic to provide the barrier - * before executing the IPI and, when busy-looping, csd_lock_wait only - * executes smp_mb() when it has to wait for the other CPU. - * - * I don't trust this code. Therefore, let's add the smp_mb() sequence - * required ourself, even if duplicated. It has no performance impact - * anyway. - * - * smp_mb() is needed because smp_rmb() and smp_wmb() only order read vs - * read and write vs write. They do not ensure core synchronization. We - * really have to ensure total order between the 3 barriers running on - * the 2 CPUs. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - if (config->sync == RING_BUFFER_SYNC_PER_CPU - && config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - if (raw_smp_processor_id() != buf->backend.cpu) { - /* Total order with IPI handler smp_mb() */ - smp_mb(); - smp_call_function_single(buf->backend.cpu, - remote_mb, NULL, 1); - /* Total order with IPI handler smp_mb() */ - smp_mb(); - } - } else { - /* Total order with IPI handler smp_mb() */ - smp_mb(); - smp_call_function(remote_mb, NULL, 1); - /* Total order with IPI handler smp_mb() */ - smp_mb(); - } - } else { - /* - * Local rmb to match the remote wmb to read the commit count - * before the buffer data and the write offset. - */ - smp_rmb(); - } + /* + * Local rmb to match the remote wmb to read the commit count + * before the buffer data and the write offset. + */ + cmm_smp_rmb(); write_offset = v_read(config, &buf->offset); @@ -1035,12 +771,9 @@ nodata: */ if (finalized) return -ENODATA; - else if (raw_spin_is_locked(&buf->raw_tick_nohz_spinlock)) - goto retry; else return -EAGAIN; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf); /** * lib_ring_buffer_put_subbuf - release exclusive subbuffer access @@ -1049,11 +782,11 @@ EXPORT_SYMBOL_GPL(lib_ring_buffer_get_subbuf); void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) { struct lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan = bufb->chan; + struct channel *chan = shmp(bufb->chan); const struct lib_ring_buffer_config *config = chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; - CHAN_WARN_ON(chan, atomic_long_read(&buf->active_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); if (!buf->get_subbuf) { /* @@ -1074,9 +807,9 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) */ read_sb_bindex = subbuffer_id_get_index(config, bufb->buf_rsb.id); v_add(config, v_read(config, - &bufb->array[read_sb_bindex]->records_unread), + &shmp(bufb->array)[read_sb_bindex]->records_unread), &bufb->records_read); - v_set(config, &bufb->array[read_sb_bindex]->records_unread, 0); + v_set(config, &shmp(bufb->array)[read_sb_bindex]->records_unread, 0); CHAN_WARN_ON(chan, config->mode == RING_BUFFER_OVERWRITE && subbuffer_id_is_noref(config, bufb->buf_rsb.id)); subbuffer_id_set_noref(config, &bufb->buf_rsb.id); @@ -1097,7 +830,6 @@ void lib_ring_buffer_put_subbuf(struct lib_ring_buffer *buf) * if the writer concurrently updated it. */ } -EXPORT_SYMBOL_GPL(lib_ring_buffer_put_subbuf); /* * cons_offset is an iterator on all subbuffer offsets between the reader @@ -1113,12 +845,11 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, unsigned long cons_idx, commit_count, commit_count_sb; cons_idx = subbuf_index(cons_offset, chan); - commit_count = v_read(config, &buf->commit_hot[cons_idx].cc); - commit_count_sb = v_read(config, &buf->commit_cold[cons_idx].cc_sb); + commit_count = v_read(config, &shmp(buf->commit_hot)[cons_idx].cc); + commit_count_sb = v_read(config, &shmp(buf->commit_cold)[cons_idx].cc_sb); if (subbuf_offset(commit_count, chan) != 0) - printk(KERN_WARNING - "ring buffer %s, cpu %d: " + ERRMSG("ring buffer %s, cpu %d: " "commit count in subbuffer %lu,\n" "expecting multiples of %lu bytes\n" " [ %lu bytes committed, %lu bytes reader-visible ]\n", @@ -1126,7 +857,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lib_ring_buffer *buf, chan->backend.subbuf_size, commit_count, commit_count_sb); - printk(KERN_DEBUG "ring buffer: %s, cpu %d: %lu bytes committed\n", + ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n", chan->backend.name, cpu, commit_count); } @@ -1150,15 +881,14 @@ void lib_ring_buffer_print_buffer_errors(struct lib_ring_buffer *buf, * references are left. */ write_offset = v_read(config, &buf->offset); - cons_offset = atomic_long_read(&buf->consumed); + cons_offset = uatomic_read(&buf->consumed); if (write_offset != cons_offset) - printk(KERN_WARNING - "ring buffer %s, cpu %d: " + ERRMSG("ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", chan->backend.name, cpu, write_offset, cons_offset); - for (cons_offset = atomic_long_read(&buf->consumed); + for (cons_offset = uatomic_read(&buf->consumed); (long) (subbuf_trunc((unsigned long) v_read(config, &buf->offset), chan) - cons_offset) > 0; @@ -1174,7 +904,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, const struct lib_ring_buffer_config *config = chan->backend.config; void *priv = chan->backend.priv; - printk(KERN_DEBUG "ring buffer %s, cpu %d: %lu records written, " + ERRMSG("ring buffer %s, cpu %d: %lu records written, " "%lu records overrun\n", chan->backend.name, cpu, v_read(config, &buf->records_count), @@ -1183,8 +913,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, if (v_read(config, &buf->records_lost_full) || v_read(config, &buf->records_lost_wrap) || v_read(config, &buf->records_lost_big)) - printk(KERN_WARNING - "ring buffer %s, cpu %d: records were lost. Caused by:\n" + ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n" " [ %lu buffer full, %lu nest buffer wrap-around, " "%lu event too big ]\n", chan->backend.name, cpu, @@ -1216,18 +945,10 @@ void lib_ring_buffer_switch_old_start(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); + cmm_smp_wmb(); v_add(config, config->cb.subbuffer_header_size(), - &buf->commit_hot[oldidx].cc); - commit_count = v_read(config, &buf->commit_hot[oldidx].cc); + &shmp(buf->commit_hot)[oldidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->old, commit_count, oldidx); @@ -1262,17 +983,9 @@ void lib_ring_buffer_switch_old_end(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); - v_add(config, padding_size, &buf->commit_hot[oldidx].cc); - commit_count = v_read(config, &buf->commit_hot[oldidx].cc); + cmm_smp_wmb(); + v_add(config, padding_size, &shmp(buf->commit_hot)[oldidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[oldidx].cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1, commit_count, oldidx); lib_ring_buffer_write_commit_counter(config, buf, chan, oldidx, @@ -1303,18 +1016,10 @@ void lib_ring_buffer_switch_new_start(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); + cmm_smp_wmb(); v_add(config, config->cb.subbuffer_header_size(), - &buf->commit_hot[beginidx].cc); - commit_count = v_read(config, &buf->commit_hot[beginidx].cc); + &shmp(buf->commit_hot)[beginidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[beginidx].cc); /* Check if the written buffer has to be delivered */ lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin, commit_count, beginidx); @@ -1347,17 +1052,9 @@ void lib_ring_buffer_switch_new_end(struct lib_ring_buffer *buf, * Order all writes to buffer before the commit count update that will * determine that the subbuffer is full. */ - if (config->ipi == RING_BUFFER_IPI_BARRIER) { - /* - * Must write slot data before incrementing commit count. This - * compiler barrier is upgraded into a smp_mb() by the IPI sent - * by get_subbuf(). - */ - barrier(); - } else - smp_wmb(); - v_add(config, padding_size, &buf->commit_hot[endidx].cc); - commit_count = v_read(config, &buf->commit_hot[endidx].cc); + cmm_smp_wmb(); + v_add(config, padding_size, &shmp(buf->commit_hot)[endidx].cc); + commit_count = v_read(config, &shmp(buf->commit_hot)[endidx].cc); lib_ring_buffer_check_deliver(config, buf, chan, offsets->end - 1, commit_count, endidx); lib_ring_buffer_write_commit_counter(config, buf, chan, endidx, @@ -1398,10 +1095,10 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * The next record that reserves space will be responsible for * populating the following subbuffer header. We choose not to populate * the next subbuffer header here because we want to be able to use - * SWITCH_ACTIVE for periodical buffer flush and CPU tick_nohz stop - * buffer flush, which must guarantee that all the buffer content - * (records and header timestamps) are visible to the reader. This is - * required for quiescence guarantees for the fusion merge. + * SWITCH_ACTIVE for periodical buffer flush, which must + * guarantee that all the buffer content (records and header + * timestamps) are visible to the reader. This is required for + * quiescence guarantees for the fusion merge. */ if (mode == SWITCH_FLUSH || off > 0) { if (unlikely(off == 0)) { @@ -1435,7 +1132,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, */ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode mode) { - struct channel *chan = buf->backend.chan; + struct channel *chan = shmp(buf->backend.chan); const struct lib_ring_buffer_config *config = chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; @@ -1482,7 +1179,6 @@ void lib_ring_buffer_switch_slow(struct lib_ring_buffer *buf, enum switch_mode m */ lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc); } -EXPORT_SYMBOL_GPL(lib_ring_buffer_switch_slow); /* * Returns : @@ -1547,14 +1243,14 @@ int lib_ring_buffer_try_reserve_slow(struct lib_ring_buffer *buf, (buf_trunc(offsets->begin, chan) >> chan->backend.num_subbuf_order) - ((unsigned long) v_read(config, - &buf->commit_cold[sb_index].cc_sb) + &shmp(buf->commit_cold)[sb_index].cc_sb) & chan->commit_count_mask); if (likely(reserve_commit_diff == 0)) { /* Next subbuffer not being written to. */ if (unlikely(config->mode != RING_BUFFER_OVERWRITE && subbuf_trunc(offsets->begin, chan) - subbuf_trunc((unsigned long) - atomic_long_read(&buf->consumed), chan) + uatomic_read(&buf->consumed), chan) >= chan->backend.buf_size)) { /* * We do not overwrite non consumed buffers @@ -1638,9 +1334,9 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) int ret; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - buf = per_cpu_ptr(chan->backend.buf, ctx->cpu); + buf = &shmp(chan->backend.buf)[ctx->cpu]; else - buf = chan->backend.buf; + buf = shmp(chan->backend.buf); ctx->buf = buf; offsets.size = 0; @@ -1696,4 +1392,3 @@ int lib_ring_buffer_reserve_slow(struct lib_ring_buffer_ctx *ctx) ctx->buf_offset = offsets.begin + offsets.pre_header_padding; return 0; } -EXPORT_SYMBOL_GPL(lib_ring_buffer_reserve_slow); diff --git a/libringbuffer/shm.h b/libringbuffer/shm.h new file mode 100644 index 0000000..390a5b2 --- /dev/null +++ b/libringbuffer/shm.h @@ -0,0 +1,71 @@ +#ifndef _LIBRINGBUFFER_SHM_H +#define _LIBRINGBUFFER_SHM_H + +/* + * libringbuffer/shm.h + * + * Copyright 2011 (c) - Mathieu Desnoyers + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include +#include "ust/core.h" + +#define SHM_MAGIC 0x54335433 +#define SHM_MAJOR 0 +#define SHM_MINOR 1 + +/* + * Defining a max shm offset, for debugging purposes. + */ +#if (CAA_BITS_PER_LONG == 32) +/* Define the maximum shared memory size to 128MB on 32-bit machines */ +#define MAX_SHM_SIZE 134217728 +#else +/* Define the maximum shared memory size to 8GB on 64-bit machines */ +#define MAX_SHM_SIZE 8589934592 +#endif + +#define DECLARE_SHMP(type, name) type *****name + +struct shm_header { + uint32_t magic; + uint8_t major; + uint8_t minor; + uint8_t bits_per_long; + size_t shm_size, shm_allocated; + + DECLARE_SHMP(struct channel, chan); +}; + +#define shmp(shm_offset) \ + ((__typeof__(****(shm_offset))) (((char *) &(shm_offset)) + (ptrdiff_t) (shm_offset))) + +#define _shmp_abs(a) ((a < 0) ? -(a) : (a)) + +static inline +void _set_shmp(ptrdiff_t *shm_offset, void *ptr) +{ + *shm_offset = (((char *) ptr) - ((char *) shm_offset)); + assert(_shmp_abs(*shm_offset) < MAX_SHM_SIZE); +} + +#define set_shmp(shm_offset, ptr) \ + _set_shmp((ptrdiff_t *) ****(shm_offset), ptr) + +/* Shared memory is already zeroed by shmget */ +/* *NOT* multithread-safe (should be protected by mutex) */ +static inline +void *zalloc_shm(struct shm_header *shm_header, size_t len) +{ + void *ret; + + if (shm_header->shm_size - shm_header->shm_allocated < len) + return NULL; + ret = (char *) shm_header + shm_header->shm_allocated; + shm_header->shm_allocated += len; + return ret; +} + +#endif /* _LIBRINGBUFFER_SHM_H */ diff --git a/libringbuffer/smp.c b/libringbuffer/smp.c new file mode 100644 index 0000000..3f86ac8 --- /dev/null +++ b/libringbuffer/smp.c @@ -0,0 +1,33 @@ +/* + * libust/smp.c + * + * Copyright 2011 (c) - Mathieu Desnoyers + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include +#include "ust/core.h" +#include "usterr.h" +#include +#include "smp.h" + +int __num_possible_cpus; + +void _get_num_possible_cpus(void) +{ + int result; + + /* On Linux, when some processors are offline + * _SC_NPROCESSORS_CONF counts the offline + * processors, whereas _SC_NPROCESSORS_ONLN + * does not. If we used _SC_NPROCESSORS_ONLN, + * getcpu() could return a value greater than + * this sysconf, in which case the arrays + * indexed by processor would overflow. + */ + result = sysconf(_SC_NPROCESSORS_CONF); + if (result == -1) + return; + __num_possible_cpus = result; +} diff --git a/libringbuffer/smp.h b/libringbuffer/smp.h new file mode 100644 index 0000000..3d138a9 --- /dev/null +++ b/libringbuffer/smp.h @@ -0,0 +1,73 @@ +#ifndef _LIBRINGBUFFER_SMP_H +#define _LIBRINGBUFFER_SMP_H + +/* + * libringbuffer/smp.h + * + * Copyright 2011 (c) - Mathieu Desnoyers + * + * Dual LGPL v2.1/GPL v2 license. + */ + +#include + +/* + * 4kB of per-cpu data available. Enough to hold the control structures, + * but not ring buffers. + */ +#define PER_CPU_MEM_SIZE 4096 + +extern int __num_possible_cpus; +extern void _get_num_possible_cpus(void); + +static inline +int num_possible_cpus(void) +{ + if (!__num_possible_cpus) + _get_num_possible_cpus(); + return __num_possible_cpus; +} + +/* + * get_cpu() returns the current CPU number. It may change due to + * migration, so it is only statistically accurate. + */ +#ifndef UST_VALGRIND +static inline +int get_cpu(void) +{ + int cpu; + + cpu = sched_getcpu(); + if (likely(cpu >= 0)) + return cpu; + /* + * If getcpu(2) is not implemented in the Kernel use CPU 0 as fallback. + */ + return 0; +} + +#else /* #else #ifndef UST_VALGRIND */ +static inline +int get_cpu(void) +{ + /* + * Valgrind does not support the sched_getcpu() vsyscall. + * It causes it to detect a segfault in the program and stop it. + * So if we want to check libust with valgrind, we have to refrain + * from using this call. TODO: it would probably be better to return + * other values too, to better test it. + */ + return 0; +} +#endif /* #else #ifndef UST_VALGRIND */ + +static inline +void put_cpu(void) +{ +} + +#define for_each_possible_cpu(cpu) \ + for ((cpu) = 0; (cpu) < num_possible_cpus(); (cpu)++) + +#endif /* _LIBRINGBUFFER_SMP_H */ diff --git a/libust/buffers.c b/libust/buffers.c index 8ecebb9..0e198da 100644 --- a/libust/buffers.c +++ b/libust/buffers.c @@ -55,31 +55,6 @@ static CDS_LIST_HEAD(ust_buffers_channels); static void ltt_force_switch(struct ust_buffer *buf, enum force_switch_mode mode); -static int get_n_cpus(void) -{ - int result; - static int n_cpus = 0; - - if(!n_cpus) { - /* On Linux, when some processors are offline - * _SC_NPROCESSORS_CONF counts the offline - * processors, whereas _SC_NPROCESSORS_ONLN - * does not. If we used _SC_NPROCESSORS_ONLN, - * getcpu() could return a value greater than - * this sysconf, in which case the arrays - * indexed by processor would overflow. - */ - result = sysconf(_SC_NPROCESSORS_CONF); - if(result == -1) { - return -1; - } - - n_cpus = result; - } - - return n_cpus; -} - /** * _ust_buffers_strncpy_fixup - Fix an incomplete string in a ltt_relay buffer. * @buf : buffer diff --git a/libust/tracer.h b/libust/tracer.h index 4f72d7a..9fd626e 100644 --- a/libust/tracer.h +++ b/libust/tracer.h @@ -345,37 +345,6 @@ static __inline__ void ltt_write_trace_header(struct ust_trace *trace, header->freq_scale = trace->freq_scale; } -#ifndef UST_VALGRIND - -static __inline__ int ust_get_cpu(void) -{ - int cpu; - - cpu = sched_getcpu(); - if (likely(cpu >= 0)) - return cpu; - /* - * If getcpu(2) is not implemented in the Kernel use CPU 0 as fallback. - */ - return 0; -} - -#else /* #else #ifndef UST_VALGRIND */ - -static __inline__ int ust_get_cpu(void) -{ - /* - * Valgrind does not support the sched_getcpu() vsyscall. - * It causes it to detect a segfault in the program and stop it. - * So if we want to check libust with valgrind, we have to refrain - * from using this call. TODO: it would probably be better to return - * other values too, to better test it. - */ - return 0; -} - -#endif /* #else #ifndef UST_VALGRIND */ - /* * Size reserved for high priority events (interrupts, NMI, BH) at the end of a * nearly full buffer. User space won't use this last amount of space when in -- 2.34.1