X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=libringbuffer%2Fring_buffer_frontend.c;h=1b9ec40d3cc9b001a5bd951457438cf2445c53e3;hb=6ba6fd60507f8e045bdc4f1be14e9d99c6a15f7f;hp=331639f1e86826b2c67d186dfa8d0907df70f0d8;hpb=b2c5f61a9cb94a79c35167450666e540b3e0fffe;p=lttng-ust.git diff --git a/libringbuffer/ring_buffer_frontend.c b/libringbuffer/ring_buffer_frontend.c index 331639f1..1b9ec40d 100644 --- a/libringbuffer/ring_buffer_frontend.c +++ b/libringbuffer/ring_buffer_frontend.c @@ -1,23 +1,8 @@ /* - * ring_buffer_frontend.c + * SPDX-License-Identifier: LGPL-2.1-only * * Copyright (C) 2005-2012 Mathieu Desnoyers * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Lesser General Public - * License as published by the Free Software Foundation; only - * version 2.1 of the License. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Lesser General Public License for more details. - * - * You should have received a copy of the GNU Lesser General Public - * License along with this library; if not, write to the Free Software - * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA - * - * * Ring buffer wait-free buffer synchronization. Producer-consumer and flight * recorder (overwrite) modes. See thesis: * @@ -51,7 +36,6 @@ * - put_subbuf */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -61,14 +45,18 @@ #include #include #include +#include #include #include #include #include -#include +#include + +#include +#include #include "smp.h" -#include +#include "ringbuffer-config.h" #include "vatomic.h" #include "backend.h" #include "frontend.h" @@ -127,7 +115,7 @@ DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting); static pthread_mutex_t wakeup_fd_mutex = PTHREAD_MUTEX_INITIALIZER; static -void lib_ring_buffer_print_errors(struct channel *chan, +void lib_ring_buffer_print_errors(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_lib_ring_buffer *buf, int cpu, struct lttng_ust_shm_handle *handle); @@ -159,7 +147,7 @@ void lttng_ust_ringbuffer_set_allow_blocking(void) } /* Get blocking timeout, in ms */ -static int lttng_ust_ringbuffer_get_timeout(struct channel *chan) +static int lttng_ust_ringbuffer_get_timeout(struct lttng_ust_lib_ring_buffer_channel *chan) { if (!lttng_ust_allow_blocking) return 0; @@ -178,7 +166,7 @@ static int lttng_ust_ringbuffer_get_timeout(struct channel *chan) void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle) { - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; const struct lttng_ust_lib_ring_buffer_config *config; unsigned int i; @@ -194,6 +182,7 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, for (i = 0; i < chan->backend.num_subbuf; i++) { struct commit_counters_hot *cc_hot; struct commit_counters_cold *cc_cold; + uint64_t *ts_end; cc_hot = shmp_index(handle, buf->commit_hot, i); if (!cc_hot) @@ -201,9 +190,13 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, cc_cold = shmp_index(handle, buf->commit_cold, i); if (!cc_cold) return; + ts_end = shmp_index(handle, buf->ts_end, i); + if (!ts_end) + return; v_set(config, &cc_hot->cc, 0); v_set(config, &cc_hot->seq, 0); v_set(config, &cc_cold->cc_sb, 0); + *ts_end = 0; } uatomic_set(&buf->consumed, 0); uatomic_set(&buf->record_disabled, 0); @@ -227,7 +220,7 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf, * be using the iterator concurrently with reset. The previous current iterator * record is reset. */ -void channel_reset(struct channel *chan) +void channel_reset(struct lttng_ust_lib_ring_buffer_channel *chan) { /* * Reset iterators first. Will put the subbuffer if held for reading. @@ -338,11 +331,12 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, struct shm_object *shmobj) { const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config; - struct channel *chan = caa_container_of(chanb, struct channel, backend); + struct lttng_ust_lib_ring_buffer_channel *chan = caa_container_of(chanb, + struct lttng_ust_lib_ring_buffer_channel, backend); struct lttng_ust_lib_ring_buffer_backend_subbuffer *wsb; - struct channel *shmp_chan; + struct lttng_ust_lib_ring_buffer_channel *shmp_chan; struct commit_counters_hot *cc_hot; - void *priv = channel_get_private(chan); + void *priv = channel_get_private_config(chan); size_t subbuf_header_size; uint64_t tsc; int ret; @@ -368,6 +362,16 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, goto free_commit; } + align_shm(shmobj, __alignof__(uint64_t)); + set_shmp(buf->ts_end, + zalloc_shm(shmobj, + sizeof(uint64_t) * chan->backend.num_subbuf)); + if (!shmp(handle, buf->ts_end)) { + ret = -ENOMEM; + goto free_commit_cold; + } + + ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend, cpu, handle, shmobj); if (ret) { @@ -414,6 +418,8 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf, /* Error handling */ free_init: + /* ts_end will be freed by shm teardown */ +free_commit_cold: /* commit_cold will be freed by shm teardown */ free_commit: /* commit_hot will be freed by shm teardown */ @@ -422,11 +428,12 @@ free_chanbuf: } static -void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc) +void lib_ring_buffer_channel_switch_timer(int sig __attribute__((unused)), + siginfo_t *si, void *uc __attribute__((unused))) { const struct lttng_ust_lib_ring_buffer_config *config; struct lttng_ust_shm_handle *handle; - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; int cpu; assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self()); @@ -470,7 +477,7 @@ end: static int lib_ring_buffer_poll_deliver(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle) { unsigned long consumed_old, consumed_idx, commit_count, write_offset; @@ -579,7 +586,7 @@ void lib_ring_buffer_wakeup(struct lttng_ust_lib_ring_buffer *buf, } static -void lib_ring_buffer_channel_do_read(struct channel *chan) +void lib_ring_buffer_channel_do_read(struct lttng_ust_lib_ring_buffer_channel *chan) { const struct lttng_ust_lib_ring_buffer_config *config; struct lttng_ust_shm_handle *handle; @@ -622,9 +629,10 @@ end: } static -void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc) +void lib_ring_buffer_channel_read_timer(int sig __attribute__((unused)), + siginfo_t *si, void *uc __attribute__((unused))) { - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self()); chan = si->si_value.sival_ptr; @@ -657,7 +665,7 @@ void rb_setmask(sigset_t *mask) } static -void *sig_thread(void *arg) +void *sig_thread(void *arg __attribute__((unused))) { sigset_t mask; siginfo_t info; @@ -774,7 +782,7 @@ void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr) } static -void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) +void lib_ring_buffer_channel_switch_timer_start(struct lttng_ust_lib_ring_buffer_channel *chan) { struct sigevent sev; struct itimerspec its; @@ -787,6 +795,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) lib_ring_buffer_setup_timer_thread(); + memset(&sev, 0, sizeof(sev)); sev.sigev_notify = SIGEV_SIGNAL; sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH; sev.sigev_value.sival_ptr = chan; @@ -807,7 +816,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan) } static -void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) +void lib_ring_buffer_channel_switch_timer_stop(struct lttng_ust_lib_ring_buffer_channel *chan) { int ret; @@ -826,7 +835,7 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan) } static -void lib_ring_buffer_channel_read_timer_start(struct channel *chan) +void lib_ring_buffer_channel_read_timer_start(struct lttng_ust_lib_ring_buffer_channel *chan) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct sigevent sev; @@ -861,7 +870,7 @@ void lib_ring_buffer_channel_read_timer_start(struct channel *chan) } static -void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) +void lib_ring_buffer_channel_read_timer_stop(struct lttng_ust_lib_ring_buffer_channel *chan) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; int ret; @@ -887,14 +896,14 @@ void lib_ring_buffer_channel_read_timer_stop(struct channel *chan) chan->read_timer_enabled = 0; } -static void channel_unregister_notifiers(struct channel *chan, - struct lttng_ust_shm_handle *handle) +static void channel_unregister_notifiers(struct lttng_ust_lib_ring_buffer_channel *chan, + struct lttng_ust_shm_handle *handle __attribute__((unused))) { lib_ring_buffer_channel_switch_timer_stop(chan); lib_ring_buffer_channel_read_timer_stop(chan); } -static void channel_print_errors(struct channel *chan, +static void channel_print_errors(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = @@ -917,7 +926,7 @@ static void channel_print_errors(struct channel *chan, } } -static void channel_free(struct channel *chan, +static void channel_free(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle, int consumer) { @@ -931,9 +940,10 @@ static void channel_free(struct channel *chan, * channel_create - Create channel. * @config: ring buffer instance configuration * @name: name of the channel - * @priv_data: ring buffer client private data area pointer (output) - * @priv_data_size: length, in bytes, of the private data area. - * @priv_data_init: initialization data for private data. + * @priv_data_align: alignment, in bytes, of the private data area. (config) + * @priv_data_size: length, in bytes, of the private data area. (config) + * @priv_data_init: initialization data for private data. (config) + * @priv: local private data (memory owner by caller) * @buf_addr: pointer the the beginning of the preallocated buffer contiguous * address mapping. It is used only by RING_BUFFER_STATIC * configuration. It can be set to NULL for other backends. @@ -951,11 +961,11 @@ static void channel_free(struct channel *chan, */ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config, const char *name, - void **priv_data, size_t priv_data_align, size_t priv_data_size, void *priv_data_init, - void *buf_addr, size_t subbuf_size, + void *priv, + void *buf_addr __attribute__((unused)), size_t subbuf_size, size_t num_subbuf, unsigned int switch_timer_interval, unsigned int read_timer_interval, const int *stream_fds, int nr_stream_fds, @@ -963,7 +973,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff { int ret; size_t shmsize, chansize; - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; struct lttng_ust_shm_handle *handle; struct shm_object *shmobj; unsigned int nr_streams; @@ -1004,20 +1014,20 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff goto error_table_alloc; /* Calculate the shm allocation layout */ - shmsize = sizeof(struct channel); - shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp)); + shmsize = sizeof(struct lttng_ust_lib_ring_buffer_channel); + shmsize += lttng_ust_offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp)); shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * nr_streams; chansize = shmsize; if (priv_data_align) - shmsize += offset_align(shmsize, priv_data_align); + shmsize += lttng_ust_offset_align(shmsize, priv_data_align); shmsize += priv_data_size; /* Allocate normal memory for channel (not shared) */ shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM, - -1); + -1, -1); if (!shmobj) goto error_append; - /* struct channel is at object 0, offset 0 (hardcoded) */ + /* struct lttng_ust_lib_ring_buffer_channel is at object 0, offset 0 (hardcoded) */ set_shmp(handle->chan, zalloc_shm(shmobj, chansize)); assert(handle->chan._ref.index == 0); assert(handle->chan._ref.offset == 0); @@ -1028,6 +1038,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff /* space for private data */ if (priv_data_size) { + void *priv_config; + DECLARE_SHMP(void, priv_data_alloc); align_shm(shmobj, priv_data_align); @@ -1035,16 +1047,16 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size)); if (!shmp(handle, priv_data_alloc)) goto error_append; - *priv_data = channel_get_private(chan); - memcpy(*priv_data, priv_data_init, priv_data_size); + priv_config = channel_get_private_config(chan); + memcpy(priv_config, priv_data_init, priv_data_size); } else { chan->priv_data_offset = -1; - if (priv_data) - *priv_data = NULL; } chan->u.s.blocking_timeout_ms = (int32_t) blocking_timeout_ms; + channel_set_private(chan, priv); + ret = channel_backend_init(&chan->backend, name, config, subbuf_size, num_subbuf, handle, stream_fds); @@ -1089,7 +1101,7 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data, memory_map_size, wakeup_fd); if (!object) goto error_table_object; - /* struct channel is at object 0, offset 0 (hardcoded) */ + /* struct lttng_ust_lib_ring_buffer_channel is at object 0, offset 0 (hardcoded) */ handle->chan._ref.index = 0; handle->chan._ref.offset = 0; return handle; @@ -1123,7 +1135,7 @@ unsigned int channel_handle_get_nr_streams(struct lttng_ust_shm_handle *handle) } static -void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, +void channel_release(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle, int consumer) { channel_free(chan, handle, consumer); @@ -1137,9 +1149,9 @@ void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle, * Call "destroy" callback, finalize channels, decrement the channel * reference count. Note that when readers have completed data * consumption of finalized channels, get_subbuf() will return -ENODATA. - * They should release their handle at that point. + * They should release their handle at that point. */ -void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, +void channel_destroy(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle, int consumer) { if (consumer) { @@ -1164,7 +1176,7 @@ void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle, struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( const struct lttng_ust_lib_ring_buffer_config *config, - struct channel *chan, int cpu, + struct lttng_ust_lib_ring_buffer_channel *chan, int cpu, struct lttng_ust_shm_handle *handle, int *shm_fd, int *wait_fd, int *wakeup_fd, @@ -1187,9 +1199,10 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer( return shmp(handle, chan->backend.buf[cpu].shmp); } -int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, - struct channel *chan, - struct lttng_ust_shm_handle *handle) +int ring_buffer_channel_close_wait_fd( + const struct lttng_ust_lib_ring_buffer_config *config __attribute__((unused)), + struct lttng_ust_lib_ring_buffer_channel *chan __attribute__((unused)), + struct lttng_ust_shm_handle *handle) { struct shm_ref *ref; @@ -1197,9 +1210,10 @@ int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_con return shm_close_wait_fd(handle, ref); } -int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, - struct channel *chan, - struct lttng_ust_shm_handle *handle) +int ring_buffer_channel_close_wakeup_fd( + const struct lttng_ust_lib_ring_buffer_config *config __attribute__((unused)), + struct lttng_ust_lib_ring_buffer_channel *chan __attribute__((unused)), + struct lttng_ust_shm_handle *handle) { struct shm_ref *ref; @@ -1208,7 +1222,7 @@ int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_c } int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle, int cpu) { @@ -1225,7 +1239,7 @@ int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_conf } int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle, int cpu) { @@ -1246,7 +1260,7 @@ int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_co } int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf, - struct lttng_ust_shm_handle *handle) + struct lttng_ust_shm_handle *handle __attribute__((unused))) { if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0) return -EBUSY; @@ -1257,7 +1271,7 @@ int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf, void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle) { - struct channel *chan = shmp(handle, buf->backend.chan); + struct lttng_ust_lib_ring_buffer_channel *chan = shmp(handle, buf->backend.chan); if (!chan) return; @@ -1280,7 +1294,7 @@ int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf, unsigned long *consumed, unsigned long *produced, struct lttng_ust_shm_handle *handle) { - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; const struct lttng_ust_lib_ring_buffer_config *config; unsigned long consumed_cur, write_offset; int finalized; @@ -1345,7 +1359,7 @@ int lib_ring_buffer_snapshot_sample_positions( unsigned long *consumed, unsigned long *produced, struct lttng_ust_shm_handle *handle) { - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; const struct lttng_ust_lib_ring_buffer_config *config; chan = shmp(handle, buf->backend.chan); @@ -1376,7 +1390,7 @@ void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle) { struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; unsigned long consumed; chan = shmp(handle, bufb->chan); @@ -1407,7 +1421,7 @@ int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf, unsigned long consumed, struct lttng_ust_shm_handle *handle) { - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; const struct lttng_ust_lib_ring_buffer_config *config; unsigned long consumed_cur, consumed_idx, commit_count, write_offset; int ret, finalized, nr_retry = LTTNG_UST_RING_BUFFER_GET_RETRY; @@ -1571,7 +1585,7 @@ void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf, struct lttng_ust_shm_handle *handle) { struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend; - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; const struct lttng_ust_lib_ring_buffer_config *config; unsigned long sb_bindex, consumed_idx, consumed; struct lttng_ust_lib_ring_buffer_backend_pages_shmp *rpages; @@ -1638,7 +1652,7 @@ void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf, */ static void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, unsigned long cons_offset, int cpu, struct lttng_ust_shm_handle *handle) @@ -1673,9 +1687,8 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu static void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, - void *priv, int cpu, - struct lttng_ust_shm_handle *handle) + struct lttng_ust_lib_ring_buffer_channel *chan, + int cpu, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long write_offset, cons_offset; @@ -1703,12 +1716,11 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf, } static -void lib_ring_buffer_print_errors(struct channel *chan, +void lib_ring_buffer_print_errors(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_lib_ring_buffer *buf, int cpu, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - void *priv = channel_get_private(chan); if (!strcmp(chan->backend.name, "relay-metadata-mmap")) { DBG("ring buffer %s: %lu records written, " @@ -1734,7 +1746,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, v_read(config, &buf->records_lost_wrap), v_read(config, &buf->records_lost_big)); } - lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle); + lib_ring_buffer_print_buffer_errors(buf, chan, cpu, handle); } /* @@ -1745,7 +1757,7 @@ void lib_ring_buffer_print_errors(struct channel *chan, */ static void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct switch_offsets *offsets, uint64_t tsc, struct lttng_ust_shm_handle *handle) @@ -1786,7 +1798,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf, */ static void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct switch_offsets *offsets, uint64_t tsc, struct lttng_ust_shm_handle *handle) @@ -1795,15 +1807,29 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, unsigned long oldidx = subbuf_index(offsets->old - 1, chan); unsigned long commit_count, padding_size, data_size; struct commit_counters_hot *cc_hot; + uint64_t *ts_end; data_size = subbuf_offset(offsets->old - 1, chan) + 1; padding_size = chan->backend.subbuf_size - data_size; subbuffer_set_data_size(config, &buf->backend, oldidx, data_size, handle); + ts_end = shmp_index(handle, buf->ts_end, oldidx); + if (!ts_end) + return; /* - * Order all writes to buffer before the commit count update that will - * determine that the subbuffer is full. + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; + + /* + * Order all writes to buffer and store to ts_end before the commit + * count update that will determine that the subbuffer is full. */ cmm_smp_wmb(); cc_hot = shmp_index(handle, buf->commit_hot, oldidx); @@ -1827,7 +1853,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf, */ static void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct switch_offsets *offsets, uint64_t tsc, struct lttng_ust_shm_handle *handle) @@ -1867,18 +1893,31 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf, */ static void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct switch_offsets *offsets, uint64_t tsc, struct lttng_ust_shm_handle *handle) { const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; unsigned long endidx, data_size; + uint64_t *ts_end; endidx = subbuf_index(offsets->end - 1, chan); data_size = subbuf_offset(offsets->end - 1, chan) + 1; subbuffer_set_data_size(config, &buf->backend, endidx, data_size, handle); + ts_end = shmp_index(handle, buf->ts_end, endidx); + if (!ts_end) + return; + /* + * This is the last space reservation in that sub-buffer before + * it gets delivered. This provides exclusive access to write to + * this sub-buffer's ts_end. There are also no concurrent + * readers of that ts_end because delivery of that sub-buffer is + * postponed until the commit counter is incremented for the + * current space reservation. + */ + *ts_end = tsc; } /* @@ -1889,7 +1928,7 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf, static int lib_ring_buffer_try_switch_slow(enum switch_mode mode, struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct switch_offsets *offsets, uint64_t *tsc, struct lttng_ust_shm_handle *handle) @@ -1996,14 +2035,16 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode, * Force a sub-buffer switch. This operation is completely reentrant : can be * called while tracing is active with absolutely no lock held. * - * Note, however, that as a v_cmpxchg is used for some atomic - * operations, this function must be called from the CPU which owns the buffer - * for a ACTIVE flush. + * For RING_BUFFER_SYNC_PER_CPU ring buffers, as a v_cmpxchg is used for + * some atomic operations, this function must be called from the CPU + * which owns the buffer for a ACTIVE flush. However, for + * RING_BUFFER_SYNC_GLOBAL ring buffers, this function can be called + * from any CPU. */ void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode, struct lttng_ust_shm_handle *handle) { - struct channel *chan; + struct lttng_ust_lib_ring_buffer_channel *chan; const struct lttng_ust_lib_ring_buffer_config *config; struct switch_offsets offsets; unsigned long oldidx; @@ -2082,13 +2123,14 @@ bool handle_blocking_retry(int *timeout_left_ms) */ static int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, struct switch_offsets *offsets, struct lttng_ust_lib_ring_buffer_ctx *ctx, void *client_ctx) { + struct lttng_ust_lib_ring_buffer_ctx_private *ctx_private = ctx->priv; const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; - struct lttng_ust_shm_handle *handle = ctx->handle; + struct lttng_ust_shm_handle *handle = chan->handle; unsigned long reserve_commit_diff, offset_cmp; int timeout_left_ms = lttng_ust_ringbuffer_get_timeout(chan); @@ -2100,14 +2142,14 @@ retry: offsets->switch_old_end = 0; offsets->pre_header_padding = 0; - ctx->tsc = config->cb.ring_buffer_clock_read(chan); - if ((int64_t) ctx->tsc == -EIO) + ctx_private->tsc = config->cb.ring_buffer_clock_read(chan); + if ((int64_t) ctx_private->tsc == -EIO) return -EIO; - if (last_tsc_overflow(config, buf, ctx->tsc)) - ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC; + if (last_tsc_overflow(config, buf, ctx_private->tsc)) + ctx_private->rflags |= RING_BUFFER_RFLAG_FULL_TSC; - if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) { + if (caa_unlikely(subbuf_offset(offsets->begin, chan) == 0)) { offsets->switch_new_start = 1; /* For offsets->begin */ } else { offsets->size = config->cb.record_header_size(config, chan, @@ -2115,8 +2157,8 @@ retry: &offsets->pre_header_padding, ctx, client_ctx); offsets->size += - lib_ring_buffer_align(offsets->begin + offsets->size, - ctx->largest_align) + lttng_ust_lib_ring_buffer_align(offsets->begin + offsets->size, + ctx->largest_align) + ctx->data_size; if (caa_unlikely(subbuf_offset(offsets->begin, chan) + offsets->size > chan->backend.subbuf_size)) { @@ -2221,7 +2263,7 @@ retry: &offsets->pre_header_padding, ctx, client_ctx); offsets->size += - lib_ring_buffer_align(offsets->begin + offsets->size, + lttng_ust_lib_ring_buffer_align(offsets->begin + offsets->size, ctx->largest_align) + ctx->data_size; if (caa_unlikely(subbuf_offset(offsets->begin, chan) @@ -2276,20 +2318,21 @@ retry: int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx, void *client_ctx) { - struct channel *chan = ctx->chan; - struct lttng_ust_shm_handle *handle = ctx->handle; + struct lttng_ust_lib_ring_buffer_ctx_private *ctx_private = ctx->priv; + struct lttng_ust_lib_ring_buffer_channel *chan = ctx_private->chan; + struct lttng_ust_shm_handle *handle = chan->handle; const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config; struct lttng_ust_lib_ring_buffer *buf; struct switch_offsets offsets; int ret; if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) - buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp); + buf = shmp(handle, chan->backend.buf[ctx_private->reserve_cpu].shmp); else buf = shmp(handle, chan->backend.buf[0].shmp); if (!buf) return -EIO; - ctx->buf = buf; + ctx_private->buf = buf; offsets.size = 0; @@ -2308,7 +2351,7 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx, * records, never the opposite (missing a full TSC record when it would * be needed). */ - save_last_tsc(config, buf, ctx->tsc); + save_last_tsc(config, buf, ctx_private->tsc); /* * Push the reader if necessary @@ -2329,21 +2372,21 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx, lib_ring_buffer_clear_noref(config, &buf->backend, subbuf_index(offsets.old - 1, chan), handle); - lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle); + lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx_private->tsc, handle); } /* * Populate new subbuffer. */ if (caa_unlikely(offsets.switch_new_start)) - lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle); + lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx_private->tsc, handle); if (caa_unlikely(offsets.switch_new_end)) - lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle); + lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx_private->tsc, handle); - ctx->slot_size = offsets.size; - ctx->pre_offset = offsets.begin; - ctx->buf_offset = offsets.begin + offsets.pre_header_padding; + ctx_private->slot_size = offsets.size; + ctx_private->pre_offset = offsets.begin; + ctx_private->buf_offset = offsets.begin + offsets.pre_header_padding; return 0; } @@ -2384,22 +2427,23 @@ void deliver_count_events(const struct lttng_ust_lib_ring_buffer_config *config, } #else /* LTTNG_RING_BUFFER_COUNT_EVENTS */ static -void deliver_count_events(const struct lttng_ust_lib_ring_buffer_config *config, - struct lttng_ust_lib_ring_buffer *buf, - unsigned long idx, - struct lttng_ust_shm_handle *handle) +void deliver_count_events( + const struct lttng_ust_lib_ring_buffer_config *config __attribute__((unused)), + struct lttng_ust_lib_ring_buffer *buf __attribute__((unused)), + unsigned long idx __attribute__((unused)), + struct lttng_ust_shm_handle *handle __attribute__((unused))) { } #endif /* #else LTTNG_RING_BUFFER_COUNT_EVENTS */ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_config *config, struct lttng_ust_lib_ring_buffer *buf, - struct channel *chan, + struct lttng_ust_lib_ring_buffer_channel *chan, unsigned long offset, unsigned long commit_count, unsigned long idx, struct lttng_ust_shm_handle *handle, - uint64_t tsc) + uint64_t tsc __attribute__((unused))) { unsigned long old_commit_count = commit_count - chan->backend.subbuf_size; @@ -2443,14 +2487,26 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_c if (caa_likely(v_cmpxchg(config, &cc_cold->cc_sb, old_commit_count, old_commit_count + 1) == old_commit_count)) { + uint64_t *ts_end; + /* * Start of exclusive subbuffer access. We are * guaranteed to be the last writer in this subbuffer * and any other writer trying to access this subbuffer * in this state is required to drop records. + * + * We can read the ts_end for the current sub-buffer + * which has been saved by the very last space + * reservation for the current sub-buffer. + * + * Order increment of commit counter before reading ts_end. */ + cmm_smp_mb(); + ts_end = shmp_index(handle, buf->ts_end, idx); + if (!ts_end) + return; deliver_count_events(config, buf, idx, handle); - config->cb.buffer_end(buf, tsc, idx, + config->cb.buffer_end(buf, *ts_end, idx, lib_ring_buffer_get_data_size(config, buf, idx,