Fix: pass private data to context callbacks
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index 331639f1e86826b2c67d186dfa8d0907df70f0d8..1b9ec40d3cc9b001a5bd951457438cf2445c53e3 100644 (file)
@@ -1,23 +1,8 @@
 /*
- * ring_buffer_frontend.c
+ * SPDX-License-Identifier: LGPL-2.1-only
  *
  * Copyright (C) 2005-2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; only
- * version 2.1 of the License.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- *
- *
  * Ring buffer wait-free buffer synchronization. Producer-consumer and flight
  * recorder (overwrite) modes. See thesis:
  *
@@ -51,7 +36,6 @@
  *   - put_subbuf
  */
 
-#define _GNU_SOURCE
 #define _LGPL_SOURCE
 #include <sys/types.h>
 #include <sys/mman.h>
 #include <signal.h>
 #include <time.h>
 #include <stdbool.h>
+#include <stdint.h>
 #include <urcu/compiler.h>
 #include <urcu/ref.h>
 #include <urcu/tls-compat.h>
 #include <poll.h>
-#include <helper.h>
+#include <ust-helper.h>
+
+#include <lttng/ust-utils.h>
+#include <lttng/ringbuffer-context.h>
 
 #include "smp.h"
-#include <lttng/ringbuffer-config.h>
+#include "ringbuffer-config.h"
 #include "vatomic.h"
 #include "backend.h"
 #include "frontend.h"
@@ -127,7 +115,7 @@ DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
 static pthread_mutex_t wakeup_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
 
 static
-void lib_ring_buffer_print_errors(struct channel *chan,
+void lib_ring_buffer_print_errors(struct lttng_ust_lib_ring_buffer_channel *chan,
                                struct lttng_ust_lib_ring_buffer *buf, int cpu,
                                struct lttng_ust_shm_handle *handle);
 
@@ -159,7 +147,7 @@ void lttng_ust_ringbuffer_set_allow_blocking(void)
 }
 
 /* Get blocking timeout, in ms */
-static int lttng_ust_ringbuffer_get_timeout(struct channel *chan)
+static int lttng_ust_ringbuffer_get_timeout(struct lttng_ust_lib_ring_buffer_channel *chan)
 {
        if (!lttng_ust_allow_blocking)
                return 0;
@@ -178,7 +166,7 @@ static int lttng_ust_ringbuffer_get_timeout(struct channel *chan)
 void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
                           struct lttng_ust_shm_handle *handle)
 {
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        const struct lttng_ust_lib_ring_buffer_config *config;
        unsigned int i;
 
@@ -194,6 +182,7 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
        for (i = 0; i < chan->backend.num_subbuf; i++) {
                struct commit_counters_hot *cc_hot;
                struct commit_counters_cold *cc_cold;
+               uint64_t *ts_end;
 
                cc_hot = shmp_index(handle, buf->commit_hot, i);
                if (!cc_hot)
@@ -201,9 +190,13 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
                cc_cold = shmp_index(handle, buf->commit_cold, i);
                if (!cc_cold)
                        return;
+               ts_end = shmp_index(handle, buf->ts_end, i);
+               if (!ts_end)
+                       return;
                v_set(config, &cc_hot->cc, 0);
                v_set(config, &cc_hot->seq, 0);
                v_set(config, &cc_cold->cc_sb, 0);
+               *ts_end = 0;
        }
        uatomic_set(&buf->consumed, 0);
        uatomic_set(&buf->record_disabled, 0);
@@ -227,7 +220,7 @@ void lib_ring_buffer_reset(struct lttng_ust_lib_ring_buffer *buf,
  * be using the iterator concurrently with reset. The previous current iterator
  * record is reset.
  */
-void channel_reset(struct channel *chan)
+void channel_reset(struct lttng_ust_lib_ring_buffer_channel *chan)
 {
        /*
         * Reset iterators first. Will put the subbuffer if held for reading.
@@ -338,11 +331,12 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
                           struct shm_object *shmobj)
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chanb->config;
-       struct channel *chan = caa_container_of(chanb, struct channel, backend);
+       struct lttng_ust_lib_ring_buffer_channel *chan = caa_container_of(chanb,
+                       struct lttng_ust_lib_ring_buffer_channel, backend);
        struct lttng_ust_lib_ring_buffer_backend_subbuffer *wsb;
-       struct channel *shmp_chan;
+       struct lttng_ust_lib_ring_buffer_channel *shmp_chan;
        struct commit_counters_hot *cc_hot;
-       void *priv = channel_get_private(chan);
+       void *priv = channel_get_private_config(chan);
        size_t subbuf_header_size;
        uint64_t tsc;
        int ret;
@@ -368,6 +362,16 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
                goto free_commit;
        }
 
+       align_shm(shmobj, __alignof__(uint64_t));
+       set_shmp(buf->ts_end,
+                zalloc_shm(shmobj,
+                       sizeof(uint64_t) * chan->backend.num_subbuf));
+       if (!shmp(handle, buf->ts_end)) {
+               ret = -ENOMEM;
+               goto free_commit_cold;
+       }
+
+
        ret = lib_ring_buffer_backend_create(&buf->backend, &chan->backend,
                        cpu, handle, shmobj);
        if (ret) {
@@ -414,6 +418,8 @@ int lib_ring_buffer_create(struct lttng_ust_lib_ring_buffer *buf,
 
        /* Error handling */
 free_init:
+       /* ts_end will be freed by shm teardown */
+free_commit_cold:
        /* commit_cold will be freed by shm teardown */
 free_commit:
        /* commit_hot will be freed by shm teardown */
@@ -422,11 +428,12 @@ free_chanbuf:
 }
 
 static
-void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
+void lib_ring_buffer_channel_switch_timer(int sig __attribute__((unused)),
+               siginfo_t *si, void *uc __attribute__((unused)))
 {
        const struct lttng_ust_lib_ring_buffer_config *config;
        struct lttng_ust_shm_handle *handle;
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        int cpu;
 
        assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
@@ -470,7 +477,7 @@ end:
 static
 int lib_ring_buffer_poll_deliver(const struct lttng_ust_lib_ring_buffer_config *config,
                                 struct lttng_ust_lib_ring_buffer *buf,
-                                struct channel *chan,
+                                struct lttng_ust_lib_ring_buffer_channel *chan,
                                 struct lttng_ust_shm_handle *handle)
 {
        unsigned long consumed_old, consumed_idx, commit_count, write_offset;
@@ -579,7 +586,7 @@ void lib_ring_buffer_wakeup(struct lttng_ust_lib_ring_buffer *buf,
 }
 
 static
-void lib_ring_buffer_channel_do_read(struct channel *chan)
+void lib_ring_buffer_channel_do_read(struct lttng_ust_lib_ring_buffer_channel *chan)
 {
        const struct lttng_ust_lib_ring_buffer_config *config;
        struct lttng_ust_shm_handle *handle;
@@ -622,9 +629,10 @@ end:
 }
 
 static
-void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc)
+void lib_ring_buffer_channel_read_timer(int sig __attribute__((unused)),
+               siginfo_t *si, void *uc __attribute__((unused)))
 {
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
 
        assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
        chan = si->si_value.sival_ptr;
@@ -657,7 +665,7 @@ void rb_setmask(sigset_t *mask)
 }
 
 static
-void *sig_thread(void *arg)
+void *sig_thread(void *arg __attribute__((unused)))
 {
        sigset_t mask;
        siginfo_t info;
@@ -774,7 +782,7 @@ void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr)
 }
 
 static
-void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
+void lib_ring_buffer_channel_switch_timer_start(struct lttng_ust_lib_ring_buffer_channel *chan)
 {
        struct sigevent sev;
        struct itimerspec its;
@@ -787,6 +795,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
 
        lib_ring_buffer_setup_timer_thread();
 
+       memset(&sev, 0, sizeof(sev));
        sev.sigev_notify = SIGEV_SIGNAL;
        sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
        sev.sigev_value.sival_ptr = chan;
@@ -807,7 +816,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
 }
 
 static
-void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
+void lib_ring_buffer_channel_switch_timer_stop(struct lttng_ust_lib_ring_buffer_channel *chan)
 {
        int ret;
 
@@ -826,7 +835,7 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
 }
 
 static
-void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
+void lib_ring_buffer_channel_read_timer_start(struct lttng_ust_lib_ring_buffer_channel *chan)
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        struct sigevent sev;
@@ -861,7 +870,7 @@ void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
 }
 
 static
-void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
+void lib_ring_buffer_channel_read_timer_stop(struct lttng_ust_lib_ring_buffer_channel *chan)
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        int ret;
@@ -887,14 +896,14 @@ void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
        chan->read_timer_enabled = 0;
 }
 
-static void channel_unregister_notifiers(struct channel *chan,
-                          struct lttng_ust_shm_handle *handle)
+static void channel_unregister_notifiers(struct lttng_ust_lib_ring_buffer_channel *chan,
+                          struct lttng_ust_shm_handle *handle __attribute__((unused)))
 {
        lib_ring_buffer_channel_switch_timer_stop(chan);
        lib_ring_buffer_channel_read_timer_stop(chan);
 }
 
-static void channel_print_errors(struct channel *chan,
+static void channel_print_errors(struct lttng_ust_lib_ring_buffer_channel *chan,
                struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_lib_ring_buffer_config *config =
@@ -917,7 +926,7 @@ static void channel_print_errors(struct channel *chan,
        }
 }
 
-static void channel_free(struct channel *chan,
+static void channel_free(struct lttng_ust_lib_ring_buffer_channel *chan,
                struct lttng_ust_shm_handle *handle,
                int consumer)
 {
@@ -931,9 +940,10 @@ static void channel_free(struct channel *chan,
  * channel_create - Create channel.
  * @config: ring buffer instance configuration
  * @name: name of the channel
- * @priv_data: ring buffer client private data area pointer (output)
- * @priv_data_size: length, in bytes, of the private data area.
- * @priv_data_init: initialization data for private data.
+ * @priv_data_align: alignment, in bytes, of the private data area. (config)
+ * @priv_data_size: length, in bytes, of the private data area. (config)
+ * @priv_data_init: initialization data for private data. (config)
+ * @priv: local private data (memory owner by caller)
  * @buf_addr: pointer the the beginning of the preallocated buffer contiguous
  *            address mapping. It is used only by RING_BUFFER_STATIC
  *            configuration. It can be set to NULL for other backends.
@@ -951,11 +961,11 @@ static void channel_free(struct channel *chan,
  */
 struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buffer_config *config,
                   const char *name,
-                  void **priv_data,
                   size_t priv_data_align,
                   size_t priv_data_size,
                   void *priv_data_init,
-                  void *buf_addr, size_t subbuf_size,
+                  void *priv,
+                  void *buf_addr __attribute__((unused)), size_t subbuf_size,
                   size_t num_subbuf, unsigned int switch_timer_interval,
                   unsigned int read_timer_interval,
                   const int *stream_fds, int nr_stream_fds,
@@ -963,7 +973,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
 {
        int ret;
        size_t shmsize, chansize;
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        struct lttng_ust_shm_handle *handle;
        struct shm_object *shmobj;
        unsigned int nr_streams;
@@ -1004,20 +1014,20 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
                goto error_table_alloc;
 
        /* Calculate the shm allocation layout */
-       shmsize = sizeof(struct channel);
-       shmsize += offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
+       shmsize = sizeof(struct lttng_ust_lib_ring_buffer_channel);
+       shmsize += lttng_ust_offset_align(shmsize, __alignof__(struct lttng_ust_lib_ring_buffer_shmp));
        shmsize += sizeof(struct lttng_ust_lib_ring_buffer_shmp) * nr_streams;
        chansize = shmsize;
        if (priv_data_align)
-               shmsize += offset_align(shmsize, priv_data_align);
+               shmsize += lttng_ust_offset_align(shmsize, priv_data_align);
        shmsize += priv_data_size;
 
        /* Allocate normal memory for channel (not shared) */
        shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM,
-                       -1);
+                       -1, -1);
        if (!shmobj)
                goto error_append;
-       /* struct channel is at object 0, offset 0 (hardcoded) */
+       /* struct lttng_ust_lib_ring_buffer_channel is at object 0, offset 0 (hardcoded) */
        set_shmp(handle->chan, zalloc_shm(shmobj, chansize));
        assert(handle->chan._ref.index == 0);
        assert(handle->chan._ref.offset == 0);
@@ -1028,6 +1038,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
 
        /* space for private data */
        if (priv_data_size) {
+               void *priv_config;
+
                DECLARE_SHMP(void, priv_data_alloc);
 
                align_shm(shmobj, priv_data_align);
@@ -1035,16 +1047,16 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
                set_shmp(priv_data_alloc, zalloc_shm(shmobj, priv_data_size));
                if (!shmp(handle, priv_data_alloc))
                        goto error_append;
-               *priv_data = channel_get_private(chan);
-               memcpy(*priv_data, priv_data_init, priv_data_size);
+               priv_config = channel_get_private_config(chan);
+               memcpy(priv_config, priv_data_init, priv_data_size);
        } else {
                chan->priv_data_offset = -1;
-               if (priv_data)
-                       *priv_data = NULL;
        }
 
        chan->u.s.blocking_timeout_ms = (int32_t) blocking_timeout_ms;
 
+       channel_set_private(chan, priv);
+
        ret = channel_backend_init(&chan->backend, name, config,
                                   subbuf_size, num_subbuf, handle,
                                   stream_fds);
@@ -1089,7 +1101,7 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data,
                        memory_map_size, wakeup_fd);
        if (!object)
                goto error_table_object;
-       /* struct channel is at object 0, offset 0 (hardcoded) */
+       /* struct lttng_ust_lib_ring_buffer_channel is at object 0, offset 0 (hardcoded) */
        handle->chan._ref.index = 0;
        handle->chan._ref.offset = 0;
        return handle;
@@ -1123,7 +1135,7 @@ unsigned int channel_handle_get_nr_streams(struct lttng_ust_shm_handle *handle)
 }
 
 static
-void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle,
+void channel_release(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle,
                int consumer)
 {
        channel_free(chan, handle, consumer);
@@ -1137,9 +1149,9 @@ void channel_release(struct channel *chan, struct lttng_ust_shm_handle *handle,
  * Call "destroy" callback, finalize channels, decrement the channel
  * reference count. Note that when readers have completed data
  * consumption of finalized channels, get_subbuf() will return -ENODATA.
- * They should release their handle at that point. 
+ * They should release their handle at that point.
  */
-void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
+void channel_destroy(struct lttng_ust_lib_ring_buffer_channel *chan, struct lttng_ust_shm_handle *handle,
                int consumer)
 {
        if (consumer) {
@@ -1164,7 +1176,7 @@ void channel_destroy(struct channel *chan, struct lttng_ust_shm_handle *handle,
 
 struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
                                        const struct lttng_ust_lib_ring_buffer_config *config,
-                                       struct channel *chan, int cpu,
+                                       struct lttng_ust_lib_ring_buffer_channel *chan, int cpu,
                                        struct lttng_ust_shm_handle *handle,
                                        int *shm_fd, int *wait_fd,
                                        int *wakeup_fd,
@@ -1187,9 +1199,10 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
        return shmp(handle, chan->backend.buf[cpu].shmp);
 }
 
-int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
-                       struct channel *chan,
-                       struct lttng_ust_shm_handle *handle)
+int ring_buffer_channel_close_wait_fd(
+               const struct lttng_ust_lib_ring_buffer_config *config __attribute__((unused)),
+               struct lttng_ust_lib_ring_buffer_channel *chan __attribute__((unused)),
+               struct lttng_ust_shm_handle *handle)
 {
        struct shm_ref *ref;
 
@@ -1197,9 +1210,10 @@ int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_con
        return shm_close_wait_fd(handle, ref);
 }
 
-int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
-                       struct channel *chan,
-                       struct lttng_ust_shm_handle *handle)
+int ring_buffer_channel_close_wakeup_fd(
+               const struct lttng_ust_lib_ring_buffer_config *config __attribute__((unused)),
+               struct lttng_ust_lib_ring_buffer_channel *chan __attribute__((unused)),
+               struct lttng_ust_shm_handle *handle)
 {
        struct shm_ref *ref;
 
@@ -1208,7 +1222,7 @@ int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_c
 }
 
 int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
-                       struct channel *chan,
+                       struct lttng_ust_lib_ring_buffer_channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
 {
@@ -1225,7 +1239,7 @@ int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_conf
 }
 
 int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
-                       struct channel *chan,
+                       struct lttng_ust_lib_ring_buffer_channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
 {
@@ -1246,7 +1260,7 @@ int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_co
 }
 
 int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
-                             struct lttng_ust_shm_handle *handle)
+                             struct lttng_ust_shm_handle *handle __attribute__((unused)))
 {
        if (uatomic_cmpxchg(&buf->active_readers, 0, 1) != 0)
                return -EBUSY;
@@ -1257,7 +1271,7 @@ int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
 void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
                                  struct lttng_ust_shm_handle *handle)
 {
-       struct channel *chan = shmp(handle, buf->backend.chan);
+       struct lttng_ust_lib_ring_buffer_channel *chan = shmp(handle, buf->backend.chan);
 
        if (!chan)
                return;
@@ -1280,7 +1294,7 @@ int lib_ring_buffer_snapshot(struct lttng_ust_lib_ring_buffer *buf,
                             unsigned long *consumed, unsigned long *produced,
                             struct lttng_ust_shm_handle *handle)
 {
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        const struct lttng_ust_lib_ring_buffer_config *config;
        unsigned long consumed_cur, write_offset;
        int finalized;
@@ -1345,7 +1359,7 @@ int lib_ring_buffer_snapshot_sample_positions(
                             unsigned long *consumed, unsigned long *produced,
                             struct lttng_ust_shm_handle *handle)
 {
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        const struct lttng_ust_lib_ring_buffer_config *config;
 
        chan = shmp(handle, buf->backend.chan);
@@ -1376,7 +1390,7 @@ void lib_ring_buffer_move_consumer(struct lttng_ust_lib_ring_buffer *buf,
                                   struct lttng_ust_shm_handle *handle)
 {
        struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        unsigned long consumed;
 
        chan = shmp(handle, bufb->chan);
@@ -1407,7 +1421,7 @@ int lib_ring_buffer_get_subbuf(struct lttng_ust_lib_ring_buffer *buf,
                               unsigned long consumed,
                               struct lttng_ust_shm_handle *handle)
 {
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        const struct lttng_ust_lib_ring_buffer_config *config;
        unsigned long consumed_cur, consumed_idx, commit_count, write_offset;
        int ret, finalized, nr_retry = LTTNG_UST_RING_BUFFER_GET_RETRY;
@@ -1571,7 +1585,7 @@ void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
                                struct lttng_ust_shm_handle *handle)
 {
        struct lttng_ust_lib_ring_buffer_backend *bufb = &buf->backend;
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        const struct lttng_ust_lib_ring_buffer_config *config;
        unsigned long sb_bindex, consumed_idx, consumed;
        struct lttng_ust_lib_ring_buffer_backend_pages_shmp *rpages;
@@ -1638,7 +1652,7 @@ void lib_ring_buffer_put_subbuf(struct lttng_ust_lib_ring_buffer *buf,
  */
 static
 void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *buf,
-                                           struct channel *chan,
+                                           struct lttng_ust_lib_ring_buffer_channel *chan,
                                            unsigned long cons_offset,
                                            int cpu,
                                            struct lttng_ust_shm_handle *handle)
@@ -1673,9 +1687,8 @@ void lib_ring_buffer_print_subbuffer_errors(struct lttng_ust_lib_ring_buffer *bu
 
 static
 void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
-                                        struct channel *chan,
-                                        void *priv, int cpu,
-                                        struct lttng_ust_shm_handle *handle)
+                                        struct lttng_ust_lib_ring_buffer_channel *chan,
+                                        int cpu, struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long write_offset, cons_offset;
@@ -1703,12 +1716,11 @@ void lib_ring_buffer_print_buffer_errors(struct lttng_ust_lib_ring_buffer *buf,
 }
 
 static
-void lib_ring_buffer_print_errors(struct channel *chan,
+void lib_ring_buffer_print_errors(struct lttng_ust_lib_ring_buffer_channel *chan,
                                struct lttng_ust_lib_ring_buffer *buf, int cpu,
                                struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
-       void *priv = channel_get_private(chan);
 
        if (!strcmp(chan->backend.name, "relay-metadata-mmap")) {
                DBG("ring buffer %s: %lu records written, "
@@ -1734,7 +1746,7 @@ void lib_ring_buffer_print_errors(struct channel *chan,
                                v_read(config, &buf->records_lost_wrap),
                                v_read(config, &buf->records_lost_big));
        }
-       lib_ring_buffer_print_buffer_errors(buf, chan, priv, cpu, handle);
+       lib_ring_buffer_print_buffer_errors(buf, chan, cpu, handle);
 }
 
 /*
@@ -1745,7 +1757,7 @@ void lib_ring_buffer_print_errors(struct channel *chan,
  */
 static
 void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
-                                     struct channel *chan,
+                                     struct lttng_ust_lib_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
                                      uint64_t tsc,
                                      struct lttng_ust_shm_handle *handle)
@@ -1786,7 +1798,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_lib_ring_buffer *buf,
  */
 static
 void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
-                                   struct channel *chan,
+                                   struct lttng_ust_lib_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
                                    uint64_t tsc,
                                    struct lttng_ust_shm_handle *handle)
@@ -1795,15 +1807,29 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
        unsigned long oldidx = subbuf_index(offsets->old - 1, chan);
        unsigned long commit_count, padding_size, data_size;
        struct commit_counters_hot *cc_hot;
+       uint64_t *ts_end;
 
        data_size = subbuf_offset(offsets->old - 1, chan) + 1;
        padding_size = chan->backend.subbuf_size - data_size;
        subbuffer_set_data_size(config, &buf->backend, oldidx, data_size,
                                handle);
 
+       ts_end = shmp_index(handle, buf->ts_end, oldidx);
+       if (!ts_end)
+               return;
        /*
-        * Order all writes to buffer before the commit count update that will
-        * determine that the subbuffer is full.
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
+
+       /*
+        * Order all writes to buffer and store to ts_end before the commit
+        * count update that will determine that the subbuffer is full.
         */
        cmm_smp_wmb();
        cc_hot = shmp_index(handle, buf->commit_hot, oldidx);
@@ -1827,7 +1853,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_lib_ring_buffer *buf,
  */
 static
 void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
-                                     struct channel *chan,
+                                     struct lttng_ust_lib_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
                                      uint64_t tsc,
                                      struct lttng_ust_shm_handle *handle)
@@ -1867,18 +1893,31 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_lib_ring_buffer *buf,
  */
 static
 void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
-                                   struct channel *chan,
+                                   struct lttng_ust_lib_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
                                    uint64_t tsc,
                                    struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        unsigned long endidx, data_size;
+       uint64_t *ts_end;
 
        endidx = subbuf_index(offsets->end - 1, chan);
        data_size = subbuf_offset(offsets->end - 1, chan) + 1;
        subbuffer_set_data_size(config, &buf->backend, endidx, data_size,
                                handle);
+       ts_end = shmp_index(handle, buf->ts_end, endidx);
+       if (!ts_end)
+               return;
+       /*
+        * This is the last space reservation in that sub-buffer before
+        * it gets delivered. This provides exclusive access to write to
+        * this sub-buffer's ts_end. There are also no concurrent
+        * readers of that ts_end because delivery of that sub-buffer is
+        * postponed until the commit counter is incremented for the
+        * current space reservation.
+        */
+       *ts_end = tsc;
 }
 
 /*
@@ -1889,7 +1928,7 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_lib_ring_buffer *buf,
 static
 int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
                                    struct lttng_ust_lib_ring_buffer *buf,
-                                   struct channel *chan,
+                                   struct lttng_ust_lib_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
                                    uint64_t *tsc,
                                    struct lttng_ust_shm_handle *handle)
@@ -1996,14 +2035,16 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
  * Force a sub-buffer switch. This operation is completely reentrant : can be
  * called while tracing is active with absolutely no lock held.
  *
- * Note, however, that as a v_cmpxchg is used for some atomic
- * operations, this function must be called from the CPU which owns the buffer
- * for a ACTIVE flush.
+ * For RING_BUFFER_SYNC_PER_CPU ring buffers, as a v_cmpxchg is used for
+ * some atomic operations, this function must be called from the CPU
+ * which owns the buffer for a ACTIVE flush. However, for
+ * RING_BUFFER_SYNC_GLOBAL ring buffers, this function can be called
+ * from any CPU.
  */
 void lib_ring_buffer_switch_slow(struct lttng_ust_lib_ring_buffer *buf, enum switch_mode mode,
                                 struct lttng_ust_shm_handle *handle)
 {
-       struct channel *chan;
+       struct lttng_ust_lib_ring_buffer_channel *chan;
        const struct lttng_ust_lib_ring_buffer_config *config;
        struct switch_offsets offsets;
        unsigned long oldidx;
@@ -2082,13 +2123,14 @@ bool handle_blocking_retry(int *timeout_left_ms)
  */
 static
 int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
-                                    struct channel *chan,
+                                    struct lttng_ust_lib_ring_buffer_channel *chan,
                                     struct switch_offsets *offsets,
                                     struct lttng_ust_lib_ring_buffer_ctx *ctx,
                                     void *client_ctx)
 {
+       struct lttng_ust_lib_ring_buffer_ctx_private *ctx_private = ctx->priv;
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
-       struct lttng_ust_shm_handle *handle = ctx->handle;
+       struct lttng_ust_shm_handle *handle = chan->handle;
        unsigned long reserve_commit_diff, offset_cmp;
        int timeout_left_ms = lttng_ust_ringbuffer_get_timeout(chan);
 
@@ -2100,14 +2142,14 @@ retry:
        offsets->switch_old_end = 0;
        offsets->pre_header_padding = 0;
 
-       ctx->tsc = config->cb.ring_buffer_clock_read(chan);
-       if ((int64_t) ctx->tsc == -EIO)
+       ctx_private->tsc = config->cb.ring_buffer_clock_read(chan);
+       if ((int64_t) ctx_private->tsc == -EIO)
                return -EIO;
 
-       if (last_tsc_overflow(config, buf, ctx->tsc))
-               ctx->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
+       if (last_tsc_overflow(config, buf, ctx_private->tsc))
+               ctx_private->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
 
-       if (caa_unlikely(subbuf_offset(offsets->begin, ctx->chan) == 0)) {
+       if (caa_unlikely(subbuf_offset(offsets->begin, chan) == 0)) {
                offsets->switch_new_start = 1;          /* For offsets->begin */
        } else {
                offsets->size = config->cb.record_header_size(config, chan,
@@ -2115,8 +2157,8 @@ retry:
                                                &offsets->pre_header_padding,
                                                ctx, client_ctx);
                offsets->size +=
-                       lib_ring_buffer_align(offsets->begin + offsets->size,
-                                             ctx->largest_align)
+                       lttng_ust_lib_ring_buffer_align(offsets->begin + offsets->size,
+                                            ctx->largest_align)
                        + ctx->data_size;
                if (caa_unlikely(subbuf_offset(offsets->begin, chan) +
                             offsets->size > chan->backend.subbuf_size)) {
@@ -2221,7 +2263,7 @@ retry:
                                                &offsets->pre_header_padding,
                                                ctx, client_ctx);
                offsets->size +=
-                       lib_ring_buffer_align(offsets->begin + offsets->size,
+                       lttng_ust_lib_ring_buffer_align(offsets->begin + offsets->size,
                                              ctx->largest_align)
                        + ctx->data_size;
                if (caa_unlikely(subbuf_offset(offsets->begin, chan)
@@ -2276,20 +2318,21 @@ retry:
 int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
                void *client_ctx)
 {
-       struct channel *chan = ctx->chan;
-       struct lttng_ust_shm_handle *handle = ctx->handle;
+       struct lttng_ust_lib_ring_buffer_ctx_private *ctx_private = ctx->priv;
+       struct lttng_ust_lib_ring_buffer_channel *chan = ctx_private->chan;
+       struct lttng_ust_shm_handle *handle = chan->handle;
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        struct lttng_ust_lib_ring_buffer *buf;
        struct switch_offsets offsets;
        int ret;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               buf = shmp(handle, chan->backend.buf[ctx->cpu].shmp);
+               buf = shmp(handle, chan->backend.buf[ctx_private->reserve_cpu].shmp);
        else
                buf = shmp(handle, chan->backend.buf[0].shmp);
        if (!buf)
                return -EIO;
-       ctx->buf = buf;
+       ctx_private->buf = buf;
 
        offsets.size = 0;
 
@@ -2308,7 +2351,7 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
         * records, never the opposite (missing a full TSC record when it would
         * be needed).
         */
-       save_last_tsc(config, buf, ctx->tsc);
+       save_last_tsc(config, buf, ctx_private->tsc);
 
        /*
         * Push the reader if necessary
@@ -2329,21 +2372,21 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_lib_ring_buffer_ctx *ctx,
                lib_ring_buffer_clear_noref(config, &buf->backend,
                                            subbuf_index(offsets.old - 1, chan),
                                            handle);
-               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx->tsc, handle);
+               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx_private->tsc, handle);
        }
 
        /*
         * Populate new subbuffer.
         */
        if (caa_unlikely(offsets.switch_new_start))
-               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx->tsc, handle);
+               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx_private->tsc, handle);
 
        if (caa_unlikely(offsets.switch_new_end))
-               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx->tsc, handle);
+               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx_private->tsc, handle);
 
-       ctx->slot_size = offsets.size;
-       ctx->pre_offset = offsets.begin;
-       ctx->buf_offset = offsets.begin + offsets.pre_header_padding;
+       ctx_private->slot_size = offsets.size;
+       ctx_private->pre_offset = offsets.begin;
+       ctx_private->buf_offset = offsets.begin + offsets.pre_header_padding;
        return 0;
 }
 
@@ -2384,22 +2427,23 @@ void deliver_count_events(const struct lttng_ust_lib_ring_buffer_config *config,
 }
 #else /* LTTNG_RING_BUFFER_COUNT_EVENTS */
 static
-void deliver_count_events(const struct lttng_ust_lib_ring_buffer_config *config,
-               struct lttng_ust_lib_ring_buffer *buf,
-               unsigned long idx,
-               struct lttng_ust_shm_handle *handle)
+void deliver_count_events(
+               const struct lttng_ust_lib_ring_buffer_config *config __attribute__((unused)),
+               struct lttng_ust_lib_ring_buffer *buf __attribute__((unused)),
+               unsigned long idx __attribute__((unused)),
+               struct lttng_ust_shm_handle *handle __attribute__((unused)))
 {
 }
 #endif /* #else LTTNG_RING_BUFFER_COUNT_EVENTS */
 
 void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_config *config,
                                   struct lttng_ust_lib_ring_buffer *buf,
-                                  struct channel *chan,
+                                  struct lttng_ust_lib_ring_buffer_channel *chan,
                                   unsigned long offset,
                                   unsigned long commit_count,
                                   unsigned long idx,
                                   struct lttng_ust_shm_handle *handle,
-                                  uint64_t tsc)
+                                  uint64_t tsc __attribute__((unused)))
 {
        unsigned long old_commit_count = commit_count
                                         - chan->backend.subbuf_size;
@@ -2443,14 +2487,26 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_lib_ring_buffer_c
        if (caa_likely(v_cmpxchg(config, &cc_cold->cc_sb,
                                 old_commit_count, old_commit_count + 1)
                   == old_commit_count)) {
+               uint64_t *ts_end;
+
                /*
                 * Start of exclusive subbuffer access. We are
                 * guaranteed to be the last writer in this subbuffer
                 * and any other writer trying to access this subbuffer
                 * in this state is required to drop records.
+                *
+                * We can read the ts_end for the current sub-buffer
+                * which has been saved by the very last space
+                * reservation for the current sub-buffer.
+                *
+                * Order increment of commit counter before reading ts_end.
                 */
+               cmm_smp_mb();
+               ts_end = shmp_index(handle, buf->ts_end, idx);
+               if (!ts_end)
+                       return;
                deliver_count_events(config, buf, idx, handle);
-               config->cb.buffer_end(buf, tsc, idx,
+               config->cb.buffer_end(buf, *ts_end, idx,
                                      lib_ring_buffer_get_data_size(config,
                                                                buf,
                                                                idx,
This page took 0.03588 seconds and 4 git commands to generate.