X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=6f94040e9b186ca9a2788c3875d9d453bddde8c6;hb=74d81a6cca2cd4a7718bba9368f382f9f2fbba84;hp=fdb4bdfcc9b4f0a289a2dac09facd5aefdb501a9;hpb=a3f61e7f689a5fc60b833a773f462989dc6cc78f;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index fdb4bdfc..6f94040e 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -1,7 +1,22 @@ /* * ring_buffer_frontend.c * - * (C) Copyright 2005-2010 - Mathieu Desnoyers + * Copyright (C) 2005-2012 Mathieu Desnoyers + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Lesser General Public + * License as published by the Free Software Foundation; only + * version 2.1 of the License. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Lesser General Public License for more details. + * + * You should have received a copy of the GNU Lesser General Public + * License along with this library; if not, write to the Free Software + * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + * * * Ring buffer wait-free buffer synchronization. Producer-consumer and flight * recorder (overwrite) modes. See thesis: @@ -34,27 +49,33 @@ * - splice one subbuffer worth of data to a pipe * - splice the data from pipe to disk/network * - put_subbuf - * - * Dual LGPL v2.1/GPL v2 license. */ +#define _GNU_SOURCE #include #include #include #include #include #include +#include #include "smp.h" #include +#include "vatomic.h" #include "backend.h" #include "frontend.h" #include "shm.h" +#include "tlsfixup.h" +#include "../liblttng-ust/compat.h" /* For ENODATA */ #ifndef max #define max(a, b) ((a) > (b) ? (a) : (b)) #endif +/* Print DBG() messages about events lost only every 1048576 hits */ +#define DBG_PRINT_NR_LOST (1UL << 20) + /* * Use POSIX SHM: shm_open(3) and shm_unlink(3). * close(2) to close the fd returned by shm_open. @@ -83,25 +104,15 @@ struct switch_offsets { __thread unsigned int lib_ring_buffer_nesting; +/* + * TODO: this is unused. Errors are saved within the ring buffer. + * Eventually, allow consumerd to print these errors. + */ static void lib_ring_buffer_print_errors(struct channel *chan, struct lttng_ust_lib_ring_buffer *buf, int cpu, - struct lttng_ust_shm_handle *handle); - -/* - * Must be called under cpu hotplug protection. - */ -void lib_ring_buffer_free(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) -{ - struct channel *chan = shmp(handle, buf->backend.chan); - - lib_ring_buffer_print_errors(chan, buf, buf->backend.cpu, handle); - /* buf->commit_hot will be freed by shm teardown */ - /* buf->commit_cold will be freed by shm teardown */ - - lib_ring_buffer_backend_free(&buf->backend); -} + struct lttng_ust_shm_handle *handle) + __attribute__((unused)); /** * lib_ring_buffer_reset - Reset ring buffer to initial values. @@ -175,9 +186,8 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config; struct channel *chan = caa_container_of(chanb, struct channel, backend); void *priv = channel_get_private(chan); - unsigned int num_subbuf; size_t subbuf_header_size; - u64 tsc; + uint64_t tsc; int ret; /* Test for cpu hotplug */ @@ -207,9 +217,6 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, goto free_commit; } - num_subbuf = chan->backend.num_subbuf; - //init_waitqueue_head(&buf->read_wait); - /* * Write the subbuffer header for first subbuffer so we know the total * duration of data gathering. @@ -235,7 +242,6 @@ free_init: free_commit: /* commit_hot will be freed by shm teardown */ free_chanbuf: - lib_ring_buffer_backend_free(&buf->backend); return ret; } @@ -249,7 +255,7 @@ static void switch_buffer_timer(unsigned long data) /* * Only flush buffers periodically if readers are active. */ - if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers)) + if (uatomic_read(&buf->active_readers)) lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle); //TODO timers @@ -307,7 +313,7 @@ static void read_buffer_timer(unsigned long data) CHAN_WARN_ON(chan, !buf->backend.allocated); - if (uatomic_read(&buf->active_readers) || uatomic_read(&buf->active_shadow_readers)) + if (uatomic_read(&buf->active_readers)) && lib_ring_buffer_poll_deliver(config, buf, chan)) { //TODO //wake_up_interruptible(&buf->read_wait); @@ -395,11 +401,9 @@ static void channel_unregister_notifiers(struct channel *chan, //channel_backend_unregister_notifiers(&chan->backend); } -static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle, - int shadow) +static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *handle) { - if (!shadow) - channel_backend_free(&chan->backend, handle); + channel_backend_free(&chan->backend, handle); /* chan is freed by shm teardown */ shm_object_table_destroy(handle->table); free(handle); @@ -411,6 +415,7 @@ static void channel_free(struct channel *chan, struct lttng_ust_shm_handle *hand * @name: name of the channel * @priv_data: ring buffer client private data area pointer (output) * @priv_data_size: length, in bytes, of the private data area. + * @priv_data_init: initialization data for private data. * @buf_addr: pointer the the beginning of the preallocated buffer contiguous * address mapping. It is used only by RING_BUFFER_STATIC * configuration. It can be set to NULL for other backends. @@ -429,17 +434,22 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff void **priv_data, size_t priv_data_align, size_t priv_data_size, + void *priv_data_init, void *buf_addr, size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, - unsigned int read_timer_interval, - int *shm_fd, int *wait_fd, uint64_t *memory_map_size) + unsigned int read_timer_interval) { int ret, cpu; size_t shmsize, chansize; struct channel *chan; struct lttng_ust_shm_handle *handle; struct shm_object *shmobj; - struct shm_ref *ref; + unsigned int nr_streams; + + if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) + nr_streams = num_possible_cpus(); + else + nr_streams = 1; if (lib_ring_buffer_check_config(config, switch_timer_interval, read_timer_interval)) @@ -457,15 +467,14 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff /* Calculate the shm allocation layout */ shmsize = sizeof(struct channel); shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp)); - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * num_possible_cpus(); - else - shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp); + shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * nr_streams; chansize = shmsize; - shmsize += offset_align(shmsize, priv_data_align); + if (priv_data_align) + shmsize += offset_align(shmsize, priv_data_align); shmsize += priv_data_size; - shmobj = shm_object_table_append(handle->table, shmsize); + /* Allocate normal memory for channel (not shared) */ + shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM); if (!shmobj) goto error_append; /* struct channel is at object 0, offset 0 (hardcoded) */ @@ -475,6 +484,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff chan = shmp(handle, handle->chan); if (!chan) goto error_append; + chan->nr_streams = nr_streams; /* space for private data */ if (priv_data_size) { @@ -486,9 +496,11 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff if (!shmp(handle, priv_data_alloc)) goto error_append; *priv_data = channel_get_private(chan); + memcpy(*priv_data, priv_data_init, priv_data_size); } else { chan->priv_data_offset = -1; - *priv_data = NULL; + if (priv_data) + *priv_data = NULL; } ret = channel_backend_init(&chan->backend, name, config, @@ -521,8 +533,6 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff lib_ring_buffer_start_switch_timer(buf, handle); lib_ring_buffer_start_read_timer(buf, handle); } - ref = &handle->chan._ref; - shm_get_object_data(handle, ref, shm_fd, wait_fd, memory_map_size); return handle; error_backend_init: @@ -533,7 +543,7 @@ error_table_alloc: return NULL; } -struct lttng_ust_shm_handle *channel_handle_create(int shm_fd, int wait_fd, +struct lttng_ust_shm_handle *channel_handle_create(void *data, uint64_t memory_map_size) { struct lttng_ust_shm_handle *handle; @@ -548,8 +558,8 @@ struct lttng_ust_shm_handle *channel_handle_create(int shm_fd, int wait_fd, if (!handle->table) goto error_table_alloc; /* Add channel object */ - object = shm_object_table_append_shadow(handle->table, - shm_fd, wait_fd, memory_map_size); + object = shm_object_table_append_mem(handle->table, data, + memory_map_size); if (!object) goto error_table_object; /* struct channel is at object 0, offset 0 (hardcoded) */ @@ -565,23 +575,30 @@ error_table_alloc: } int channel_handle_add_stream(struct lttng_ust_shm_handle *handle, - int shm_fd, int wait_fd, uint64_t memory_map_size) + int shm_fd, int wakeup_fd, uint32_t stream_nr, + uint64_t memory_map_size) { struct shm_object *object; /* Add stream object */ - object = shm_object_table_append_shadow(handle->table, - shm_fd, wait_fd, memory_map_size); + object = shm_object_table_append_shm(handle->table, + shm_fd, wakeup_fd, stream_nr, + memory_map_size); if (!object) - return -1; + return -EINVAL; return 0; } +unsigned int channel_handle_get_nr_streams(struct lttng_ust_shm_handle *handle) +{ + assert(handle->table); + return handle->table->allocated_len - 1; +} + static -void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, - int shadow) +void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle) { - channel_free(chan, handle, shadow); + channel_free(chan, handle); } /** @@ -595,59 +612,21 @@ void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, * They should release their handle at that point. */ void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, - int shadow) + int consumer) { - const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - int cpu; - - if (shadow) { - channel_release(chan, handle, shadow); - return; - } - - channel_unregister_notifiers(chan, handle); - - if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) { - for_each_channel_cpu(cpu, chan) { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp); - - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, - channel_get_private(chan), - cpu, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); - /* - * Perform flush before writing to finalized. - */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); - } - } else { - struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp); - - if (config->cb.buffer_finalize) - config->cb.buffer_finalize(buf, channel_get_private(chan), -1, handle); - if (buf->backend.allocated) - lib_ring_buffer_switch_slow(buf, SWITCH_FLUSH, - handle); + if (consumer) { /* - * Perform flush before writing to finalized. + * Note: the consumer takes care of finalizing and + * switching the buffers. */ - cmm_smp_wmb(); - CMM_ACCESS_ONCE(buf->finalized) = 1; - //wake_up_interruptible(&buf->read_wait); + channel_unregister_notifiers(chan, handle); } - CMM_ACCESS_ONCE(chan->finalized) = 1; - //wake_up_interruptible(&chan->hp_wait); - //wake_up_interruptible(&chan->read_wait); + /* * sessiond/consumer are keeping a reference on the shm file * descriptor directly. No need to refcount. */ - channel_release(chan, handle, shadow); + channel_release(chan, handle); return; } @@ -656,35 +635,63 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( struct channel *chan, int cpu, struct lttng_ust_shm_handle *handle, int *shm_fd, int *wait_fd, + int *wakeup_fd, uint64_t *memory_map_size) { struct shm_ref *ref; if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) { - ref = &chan->backend.buf[0].shmp._ref; - shm_get_object_data(handle, ref, shm_fd, wait_fd, - memory_map_size); - return shmp(handle, chan->backend.buf[0].shmp); + cpu = 0; } else { if (cpu >= num_possible_cpus()) return NULL; - ref = &chan->backend.buf[cpu].shmp._ref; - shm_get_object_data(handle, ref, shm_fd, wait_fd, - memory_map_size); - return shmp(handle, chan->backend.buf[cpu].shmp); } + ref = &chan->backend.buf[cpu].shmp._ref; + *shm_fd = shm_get_shm_fd(handle, ref); + *wait_fd = shm_get_wait_fd(handle, ref); + *wakeup_fd = shm_get_wakeup_fd(handle, ref); + if (shm_get_shm_size(handle, ref, memory_map_size)) + return NULL; + return shmp(handle, chan->backend.buf[cpu].shmp); } -int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle, - int shadow) +int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, + struct channel *chan, + struct lttng_ust_shm_handle *handle, + int cpu) { - if (shadow) { - if (uatomic_cmpxchg(&buf->active_shadow_readers, 0, 1) != 0) - return -EBUSY; - cmm_smp_mb(); - return 0; + struct shm_ref *ref; + + if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) { + cpu = 0; + } else { + if (cpu >= num_possible_cpus()) + return -EINVAL; } + ref = &chan->backend.buf[cpu].shmp._ref; + return shm_close_wait_fd(handle, ref); +} + +int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, + struct channel *chan, + struct lttng_ust_shm_handle *handle, + int cpu) +{ + struct shm_ref *ref; + + if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) { + cpu = 0; + } else { + if (cpu >= num_possible_cpus()) + return -EINVAL; + } + ref = &chan->backend.buf[cpu].shmp._ref; + return shm_close_wakeup_fd(handle, ref); +} + +int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf, + struct lttng_ust_shm_handle *handle) +{ if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0) return -EBUSY; cmm_smp_mb(); @@ -692,17 +699,10 @@ int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf, } void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle, - int shadow) + struct lttng_ust_shm_handle *handle) { struct channel *chan = shmp(handle, buf->backend.chan); - if (shadow) { - CHAN_WARN_ON(chan, uatomic_read(&buf->active_shadow_readers) != 1); - cmm_smp_mb(); - uatomic_dec(&buf->active_shadow_readers); - return; - } CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); cmm_smp_mb(); uatomic_dec(&buf->active_readers); @@ -780,8 +780,7 @@ void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan = shmp(handle, bufb->chan); unsigned long consumed; - CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1 - && uatomic_read(&buf->active_shadow_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); /* * Only push the consumed value forward. @@ -904,8 +903,7 @@ void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf, const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long read_sb_bindex, consumed_idx, consumed; - CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1 - && uatomic_read(&buf->active_shadow_readers) != 1); + CHAN_WARN_ON(chan, uatomic_read(&buf->active_readers) != 1); if (!buf->get_subbuf) { /* @@ -970,7 +968,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu commit_count_sb = v_read(config, &shmp_index(handle, buf->commit_cold, cons_idx)->cc_sb); if (subbuf_offset(commit_count, chan) != 0) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "commit count in subbuffer %lu,\n" "expecting multiples of %lu bytes\n" " [ %lu bytes committed, %lu bytes reader-visible ]\n", @@ -978,7 +976,7 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu chan->backend.subbuf_size, commit_count, commit_count_sb); - ERRMSG("ring buffer: %s, cpu %d: %lu bytes committed\n", + DBG("ring buffer: %s, cpu %d: %lu bytes committed\n", chan->backend.name, cpu, commit_count); } @@ -991,12 +989,6 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; - /* - * Can be called in the error path of allocation when - * trans_channel_data is not yet set. - */ - if (!chan) - return; /* * No need to order commit_count, write_offset and cons_offset reads * because we execute at teardown when no more writer nor reader @@ -1005,7 +997,7 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, write_offset = v_read(config, &buf->offset); cons_offset = uatomic_read(&buf->consumed); if (write_offset != cons_offset) - ERRMSG("ring buffer %s, cpu %d: " + DBG("ring buffer %s, cpu %d: " "non-consumed data\n" " [ %lu bytes written, %lu bytes read ]\n", chan->backend.name, cpu, write_offset, cons_offset); @@ -1027,23 +1019,30 @@ void lib_ring_buffer_print_errors(struct channel *chan, const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; void *priv = channel_get_private(chan); - ERRMSG("ring buffer %s, cpu %d: %lu records written, " - "%lu records overrun\n", - chan->backend.name, cpu, - v_read(config, &buf->records_count), - v_read(config, &buf->records_overrun)); - - if (v_read(config, &buf->records_lost_full) - || v_read(config, &buf->records_lost_wrap) - || v_read(config, &buf->records_lost_big)) - ERRMSG("ring buffer %s, cpu %d: records were lost. Caused by:\n" - " [ %lu buffer full, %lu nest buffer wrap-around, " - "%lu event too big ]\n", - chan->backend.name, cpu, - v_read(config, &buf->records_lost_full), - v_read(config, &buf->records_lost_wrap), - v_read(config, &buf->records_lost_big)); - + if (!strcmp(chan->backend.name, "relay-metadata-mmap")) { + DBG("ring buffer %s: %lu records written, " + "%lu records overrun\n", + chan->backend.name, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + } else { + DBG("ring buffer %s, cpu %d: %lu records written, " + "%lu records overrun\n", + chan->backend.name, cpu, + v_read(config, &buf->records_count), + v_read(config, &buf->records_overrun)); + + if (v_read(config, &buf->records_lost_full) + || v_read(config, &buf->records_lost_wrap) + || v_read(config, &buf->records_lost_big)) + DBG("ring buffer %s, cpu %d: records were lost. Caused by:\n" + " [ %lu buffer full, %lu nest buffer wrap-around, " + "%lu event too big ]\n", + chan->backend.name, cpu, + v_read(config, &buf->records_lost_full), + v_read(config, &buf->records_lost_wrap), + v_read(config, &buf->records_lost_big)); + } lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle); } @@ -1056,7 +1055,7 @@ static void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1094,7 +1093,7 @@ static void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1131,7 +1130,7 @@ static void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1167,7 +1166,7 @@ static void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 tsc, + uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; @@ -1203,7 +1202,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_ust_lib_ring_buffer *buf, struct channel *chan, struct switch_offsets *offsets, - u64 *tsc) + uint64_t *tsc) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long off; @@ -1268,7 +1267,7 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum swi const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct switch_offsets offsets; unsigned long oldidx; - u64 tsc; + uint64_t tsc; offsets.size = 0; @@ -1385,11 +1384,19 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, - subbuf_trunc((unsigned long) uatomic_read(&buf->consumed), chan) >= chan->backend.buf_size)) { + unsigned long nr_lost; + /* * We do not overwrite non consumed buffers * and we are full : record is lost. */ + nr_lost = v_read(config, &buf->records_lost_full); v_inc(config, &buf->records_lost_full); + if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) { + DBG("%lu or more records lost in (%s:%d) (buffer full)\n", + nr_lost + 1, chan->backend.name, + buf->backend.cpu); + } return -ENOBUFS; } else { /* @@ -1400,13 +1407,21 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, */ } } else { + unsigned long nr_lost; + /* * Next subbuffer reserve offset does not match the * commit offset. Drop record in producer-consumer and * overwrite mode. Caused by either a writer OOPS or too * many nested writes over a reserve/commit pair. */ + nr_lost = v_read(config, &buf->records_lost_wrap); v_inc(config, &buf->records_lost_wrap); + if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) { + DBG("%lu or more records lost in (%s:%d) (wrap-around)\n", + nr_lost + 1, chan->backend.name, + buf->backend.cpu); + } return -EIO; } offsets->size = @@ -1420,11 +1435,20 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, + ctx->data_size; if (caa_unlikely(subbuf_offset(offsets->begin, chan) + offsets->size > chan->backend.subbuf_size)) { + unsigned long nr_lost; + /* * Record too big for subbuffers, report error, don't * complete the sub-buffer switch. */ + nr_lost = v_read(config, &buf->records_lost_big); v_inc(config, &buf->records_lost_big); + if ((nr_lost & (DBG_PRINT_NR_LOST - 1)) == 0) { + DBG("%lu or more records lost in (%s:%d) record size " + " of %zu bytes is too large for buffer\n", + nr_lost + 1, chan->backend.name, + buf->backend.cpu, offsets->size); + } return -ENOSPC; } else { /* @@ -1528,3 +1552,11 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx) ctx->buf_offset = offsets.begin + offsets.pre_header_padding; return 0; } + +/* + * Force a read (imply TLS fixup for dlopen) of TLS variables. + */ +void lttng_fixup_ringbuffer_tls(void) +{ + asm volatile ("" : : "m" (lib_ring_buffer_nesting)); +}