ust-fd: Add close_range declaration
[lttng-ust.git] / src / common / ringbuffer / ring_buffer_frontend.c
index d6f5365c8eb1e3629d4a4c829414d13bb00ea8da..ab1fc0ff878b749779d9ce3e09d1d46ed70fc9ea 100644 (file)
@@ -63,6 +63,7 @@
 #include "shm.h"
 #include "rb-init.h"
 #include "common/compat/errno.h"       /* For ENODATA */
+#include "common/populate.h"
 
 /* Print DBG() messages about events lost only every 1048576 hits */
 #define DBG_PRINT_NR_LOST      (1UL << 20)
@@ -78,6 +79,8 @@
 /*
  * Non-static to ensure the compiler does not optimize away the xor.
  */
+uint8_t lttng_crash_magic_xor[]
+       __attribute__((visibility("hidden")));
 uint8_t lttng_crash_magic_xor[] = RB_CRASH_DUMP_ABI_MAGIC_XOR;
 
 /*
@@ -200,7 +203,7 @@ void lib_ring_buffer_reset(struct lttng_ust_ring_buffer *buf,
        }
        uatomic_set(&buf->consumed, 0);
        uatomic_set(&buf->record_disabled, 0);
-       v_set(config, &buf->last_tsc, 0);
+       v_set(config, &buf->last_timestamp, 0);
        lib_ring_buffer_backend_reset(&buf->backend, handle);
        /* Don't reset number of active readers */
        v_set(config, &buf->records_lost_full, 0);
@@ -338,7 +341,7 @@ int lib_ring_buffer_create(struct lttng_ust_ring_buffer *buf,
        struct commit_counters_hot *cc_hot;
        void *priv = channel_get_private_config(chan);
        size_t subbuf_header_size;
-       uint64_t tsc;
+       uint64_t timestamp;
        int ret;
 
        /* Test for cpu hotplug */
@@ -395,8 +398,8 @@ int lib_ring_buffer_create(struct lttng_ust_ring_buffer *buf,
                ret = -EPERM;
                goto free_chanbuf;
        }
-       tsc = config->cb.ring_buffer_clock_read(shmp_chan);
-       config->cb.buffer_begin(buf, tsc, 0, handle);
+       timestamp = config->cb.ring_buffer_clock_read(shmp_chan);
+       config->cb.buffer_begin(buf, timestamp, 0, handle);
        cc_hot = shmp_index(handle, buf->commit_hot, 0);
        if (!cc_hot) {
                ret = -EPERM;
@@ -978,9 +981,10 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_ring_buffer_c
        struct shm_object *shmobj;
        unsigned int nr_streams;
        int64_t blocking_timeout_ms;
+       bool populate = lttng_ust_map_populate_is_enabled();
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-               nr_streams = num_possible_cpus();
+               nr_streams = get_possible_cpus_array_len();
        else
                nr_streams = 1;
 
@@ -1004,12 +1008,12 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_ring_buffer_c
                                         read_timer_interval))
                return NULL;
 
-       handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
+       handle = zmalloc_populate(sizeof(struct lttng_ust_shm_handle), populate);
        if (!handle)
                return NULL;
 
        /* Allocate table for channel + per-cpu buffers */
-       handle->table = shm_object_table_create(1 + num_possible_cpus());
+       handle->table = shm_object_table_create(1 + get_possible_cpus_array_len(), populate);
        if (!handle->table)
                goto error_table_alloc;
 
@@ -1024,7 +1028,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_ring_buffer_c
 
        /* Allocate normal memory for channel (not shared) */
        shmobj = shm_object_table_alloc(handle->table, shmsize, SHM_OBJECT_MEM,
-                       -1, -1);
+                       -1, -1, populate);
        if (!shmobj)
                goto error_append;
        /* struct lttng_ust_ring_buffer_channel is at object 0, offset 0 (hardcoded) */
@@ -1087,13 +1091,14 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data,
 {
        struct lttng_ust_shm_handle *handle;
        struct shm_object *object;
+       bool populate = lttng_ust_map_populate_is_enabled();
 
-       handle = zmalloc(sizeof(struct lttng_ust_shm_handle));
+       handle = zmalloc_populate(sizeof(struct lttng_ust_shm_handle), populate);
        if (!handle)
                return NULL;
 
        /* Allocate table for channel + per-cpu buffers */
-       handle->table = shm_object_table_create(1 + num_possible_cpus());
+       handle->table = shm_object_table_create(1 + get_possible_cpus_array_len(), populate);
        if (!handle->table)
                goto error_table_alloc;
        /* Add channel object */
@@ -1122,7 +1127,7 @@ int channel_handle_add_stream(struct lttng_ust_shm_handle *handle,
        /* Add stream object */
        object = shm_object_table_append_shm(handle->table,
                        shm_fd, wakeup_fd, stream_nr,
-                       memory_map_size);
+                       memory_map_size, lttng_ust_map_populate_cpu_is_enabled(stream_nr));
        if (!object)
                return -EINVAL;
        return 0;
@@ -1180,14 +1185,15 @@ struct lttng_ust_ring_buffer *channel_get_ring_buffer(
                                        struct lttng_ust_shm_handle *handle,
                                        int *shm_fd, int *wait_fd,
                                        int *wakeup_fd,
-                                       uint64_t *memory_map_size)
+                                       uint64_t *memory_map_size,
+                                       void **memory_map_addr)
 {
        struct shm_ref *ref;
 
        if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
                cpu = 0;
        } else {
-               if (cpu >= num_possible_cpus())
+               if (cpu >= get_possible_cpus_array_len())
                        return NULL;
        }
        ref = &chan->backend.buf[cpu].shmp._ref;
@@ -1196,6 +1202,7 @@ struct lttng_ust_ring_buffer *channel_get_ring_buffer(
        *wakeup_fd = shm_get_wakeup_fd(handle, ref);
        if (shm_get_shm_size(handle, ref, memory_map_size))
                return NULL;
+       *memory_map_addr = handle->table->objects[ref->index].memory_map;
        return shmp(handle, chan->backend.buf[cpu].shmp);
 }
 
@@ -1231,7 +1238,7 @@ int ring_buffer_stream_close_wait_fd(const struct lttng_ust_ring_buffer_config *
        if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
                cpu = 0;
        } else {
-               if (cpu >= num_possible_cpus())
+               if (cpu >= get_possible_cpus_array_len())
                        return -EINVAL;
        }
        ref = &chan->backend.buf[cpu].shmp._ref;
@@ -1249,7 +1256,7 @@ int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_ring_buffer_config
        if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
                cpu = 0;
        } else {
-               if (cpu >= num_possible_cpus())
+               if (cpu >= get_possible_cpus_array_len())
                        return -EINVAL;
        }
        ref = &chan->backend.buf[cpu].shmp._ref;
@@ -1759,7 +1766,7 @@ static
 void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
                                      struct lttng_ust_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
-                                     uint64_t tsc,
+                                     const struct lttng_ust_ring_buffer_ctx *ctx,
                                      struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1767,7 +1774,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
        unsigned long commit_count;
        struct commit_counters_hot *cc_hot;
 
-       config->cb.buffer_begin(buf, tsc, oldidx, handle);
+       config->cb.buffer_begin(buf, ctx->priv->timestamp, oldidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1782,7 +1789,7 @@ void lib_ring_buffer_switch_old_start(struct lttng_ust_ring_buffer *buf,
        commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old,
-                                     commit_count, oldidx, handle, tsc);
+                                     commit_count, oldidx, handle, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->old + config->cb.subbuffer_header_size(),
                        commit_count, handle, cc_hot);
@@ -1800,7 +1807,7 @@ static
 void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
                                    struct lttng_ust_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   uint64_t tsc,
+                                   const struct lttng_ust_ring_buffer_ctx *ctx,
                                    struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1825,7 +1832,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
         * postponed until the commit counter is incremented for the
         * current space reservation.
         */
-       *ts_end = tsc;
+       *ts_end = ctx->priv->timestamp;
 
        /*
         * Order all writes to buffer and store to ts_end before the commit
@@ -1838,7 +1845,7 @@ void lib_ring_buffer_switch_old_end(struct lttng_ust_ring_buffer *buf,
        v_add(config, padding_size, &cc_hot->cc);
        commit_count = v_read(config, &cc_hot->cc);
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->old - 1,
-                                     commit_count, oldidx, handle, tsc);
+                                     commit_count, oldidx, handle, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->old + padding_size, commit_count, handle,
                        cc_hot);
@@ -1855,7 +1862,7 @@ static
 void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
                                      struct lttng_ust_ring_buffer_channel *chan,
                                      struct switch_offsets *offsets,
-                                     uint64_t tsc,
+                                     const struct lttng_ust_ring_buffer_ctx *ctx,
                                      struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1863,7 +1870,7 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
        unsigned long commit_count;
        struct commit_counters_hot *cc_hot;
 
-       config->cb.buffer_begin(buf, tsc, beginidx, handle);
+       config->cb.buffer_begin(buf, ctx->priv->timestamp, beginidx, handle);
 
        /*
         * Order all writes to buffer before the commit count update that will
@@ -1877,7 +1884,7 @@ void lib_ring_buffer_switch_new_start(struct lttng_ust_ring_buffer *buf,
        commit_count = v_read(config, &cc_hot->cc);
        /* Check if the written buffer has to be delivered */
        lib_ring_buffer_check_deliver(config, buf, chan, offsets->begin,
-                                     commit_count, beginidx, handle, tsc);
+                                     commit_count, beginidx, handle, ctx);
        lib_ring_buffer_write_commit_counter(config, buf, chan,
                        offsets->begin + config->cb.subbuffer_header_size(),
                        commit_count, handle, cc_hot);
@@ -1895,7 +1902,7 @@ static
 void lib_ring_buffer_switch_new_end(struct lttng_ust_ring_buffer *buf,
                                    struct lttng_ust_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   uint64_t tsc,
+                                   const struct lttng_ust_ring_buffer_ctx *ctx,
                                    struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1917,7 +1924,7 @@ void lib_ring_buffer_switch_new_end(struct lttng_ust_ring_buffer *buf,
         * postponed until the commit counter is incremented for the
         * current space reservation.
         */
-       *ts_end = tsc;
+       *ts_end = ctx->priv->timestamp;
 }
 
 /*
@@ -1930,7 +1937,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
                                    struct lttng_ust_ring_buffer *buf,
                                    struct lttng_ust_ring_buffer_channel *chan,
                                    struct switch_offsets *offsets,
-                                   uint64_t *tsc,
+                                   struct lttng_ust_ring_buffer_ctx *ctx,
                                    struct lttng_ust_shm_handle *handle)
 {
        const struct lttng_ust_ring_buffer_config *config = &chan->backend.config;
@@ -1941,7 +1948,7 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
        offsets->switch_old_start = 0;
        off = subbuf_offset(offsets->begin, chan);
 
-       *tsc = config->cb.ring_buffer_clock_read(chan);
+       ctx->priv->timestamp = config->cb.ring_buffer_clock_read(chan);
 
        /*
         * Ensure we flush the header of an empty subbuffer when doing the
@@ -2028,6 +2035,13 @@ int lib_ring_buffer_try_switch_slow(enum switch_mode mode,
        offsets->begin = subbuf_align(offsets->begin, chan);
        /* Note: old points to the next subbuf at offset 0 */
        offsets->end = offsets->begin;
+       /*
+        * Populate the records lost counters prior to performing a
+        * sub-buffer switch.
+        */
+       ctx->priv->records_lost_full = v_read(config, &buf->records_lost_full);
+       ctx->priv->records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+       ctx->priv->records_lost_big = v_read(config, &buf->records_lost_big);
        return 0;
 }
 
@@ -2046,10 +2060,12 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_ring_buffer *buf, enum switch_
 {
        struct lttng_ust_ring_buffer_channel *chan;
        const struct lttng_ust_ring_buffer_config *config;
+       struct lttng_ust_ring_buffer_ctx_private ctx_priv;
+       struct lttng_ust_ring_buffer_ctx ctx;
        struct switch_offsets offsets;
        unsigned long oldidx;
-       uint64_t tsc;
 
+       ctx.priv = &ctx_priv;
        chan = shmp(handle, buf->backend.chan);
        if (!chan)
                return;
@@ -2062,18 +2078,18 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_ring_buffer *buf, enum switch_
         */
        do {
                if (lib_ring_buffer_try_switch_slow(mode, buf, chan, &offsets,
-                                                   &tsc, handle))
+                                                   &ctx, handle))
                        return; /* Switch not needed */
        } while (v_cmpxchg(config, &buf->offset, offsets.old, offsets.end)
                 != offsets.old);
 
        /*
-        * Atomically update last_tsc. This update races against concurrent
-        * atomic updates, but the race will always cause supplementary full TSC
-        * records, never the opposite (missing a full TSC record when it would
-        * be needed).
+        * Atomically update last_timestamp. This update races against concurrent
+        * atomic updates, but the race will always cause supplementary full
+        * timestamp records, never the opposite (missing a full timestamp
+        * record when it would be needed).
         */
-       save_last_tsc(config, buf, tsc);
+       save_last_timestamp(config, buf, ctx.priv->timestamp);
 
        /*
         * Push the reader if necessary
@@ -2087,14 +2103,14 @@ void lib_ring_buffer_switch_slow(struct lttng_ust_ring_buffer *buf, enum switch_
         * May need to populate header start on SWITCH_FLUSH.
         */
        if (offsets.switch_old_start) {
-               lib_ring_buffer_switch_old_start(buf, chan, &offsets, tsc, handle);
+               lib_ring_buffer_switch_old_start(buf, chan, &offsets, &ctx, handle);
                offsets.old += config->cb.subbuffer_header_size();
        }
 
        /*
         * Switch old subbuffer.
         */
-       lib_ring_buffer_switch_old_end(buf, chan, &offsets, tsc, handle);
+       lib_ring_buffer_switch_old_end(buf, chan, &offsets, &ctx, handle);
 }
 
 static
@@ -2142,12 +2158,12 @@ retry:
        offsets->switch_old_end = 0;
        offsets->pre_header_padding = 0;
 
-       ctx_private->tsc = config->cb.ring_buffer_clock_read(chan);
-       if ((int64_t) ctx_private->tsc == -EIO)
+       ctx_private->timestamp = config->cb.ring_buffer_clock_read(chan);
+       if ((int64_t) ctx_private->timestamp == -EIO)
                return -EIO;
 
-       if (last_tsc_overflow(config, buf, ctx_private->tsc))
-               ctx_private->rflags |= RING_BUFFER_RFLAG_FULL_TSC;
+       if (last_timestamp_overflow(config, buf, ctx_private->timestamp))
+               ctx_private->rflags |= RING_BUFFER_RFLAG_FULL_TIMESTAMP;
 
        if (caa_unlikely(subbuf_offset(offsets->begin, chan) == 0)) {
                offsets->switch_new_start = 1;          /* For offsets->begin */
@@ -2304,6 +2320,15 @@ retry:
                 */
                offsets->switch_new_end = 1;    /* For offsets->begin */
        }
+       /*
+        * Populate the records lost counters when the space reservation
+        * may cause a sub-buffer switch.
+        */
+       if (offsets->switch_new_end || offsets->switch_old_end) {
+               ctx_private->records_lost_full = v_read(config, &buf->records_lost_full);
+               ctx_private->records_lost_wrap = v_read(config, &buf->records_lost_wrap);
+               ctx_private->records_lost_big = v_read(config, &buf->records_lost_big);
+       }
        return 0;
 }
 
@@ -2346,12 +2371,12 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_ring_buffer_ctx *ctx,
                          != offsets.old));
 
        /*
-        * Atomically update last_tsc. This update races against concurrent
-        * atomic updates, but the race will always cause supplementary full TSC
-        * records, never the opposite (missing a full TSC record when it would
-        * be needed).
+        * Atomically update last_timestamp. This update races against concurrent
+        * atomic updates, but the race will always cause supplementary full
+        * timestamp records, never the opposite (missing a full timestamp
+        * record when it would be needed).
         */
-       save_last_tsc(config, buf, ctx_private->tsc);
+       save_last_timestamp(config, buf, ctx_private->timestamp);
 
        /*
         * Push the reader if necessary
@@ -2372,17 +2397,17 @@ int lib_ring_buffer_reserve_slow(struct lttng_ust_ring_buffer_ctx *ctx,
                lib_ring_buffer_clear_noref(config, &buf->backend,
                                            subbuf_index(offsets.old - 1, chan),
                                            handle);
-               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx_private->tsc, handle);
+               lib_ring_buffer_switch_old_end(buf, chan, &offsets, ctx, handle);
        }
 
        /*
         * Populate new subbuffer.
         */
        if (caa_unlikely(offsets.switch_new_start))
-               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx_private->tsc, handle);
+               lib_ring_buffer_switch_new_start(buf, chan, &offsets, ctx, handle);
 
        if (caa_unlikely(offsets.switch_new_end))
-               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx_private->tsc, handle);
+               lib_ring_buffer_switch_new_end(buf, chan, &offsets, ctx, handle);
 
        ctx_private->slot_size = offsets.size;
        ctx_private->pre_offset = offsets.begin;
@@ -2443,7 +2468,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_ring_buffer_confi
                                   unsigned long commit_count,
                                   unsigned long idx,
                                   struct lttng_ust_shm_handle *handle,
-                                  uint64_t tsc __attribute__((unused)))
+                                  const struct lttng_ust_ring_buffer_ctx *ctx)
 {
        unsigned long old_commit_count = commit_count
                                         - chan->backend.subbuf_size;
@@ -2511,7 +2536,7 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_ring_buffer_confi
                                                                buf,
                                                                idx,
                                                                handle),
-                                     handle);
+                                     handle, ctx);
 
                /*
                 * Increment the packet counter while we have exclusive
@@ -2556,11 +2581,11 @@ void lib_ring_buffer_check_deliver_slow(const struct lttng_ust_ring_buffer_confi
 }
 
 /*
- * Force a read (imply TLS fixup for dlopen) of TLS variables.
+ * Force a read (imply TLS allocation for dlopen) of TLS variables.
  */
-void lttng_fixup_ringbuffer_tls(void)
+void lttng_ringbuffer_alloc_tls(void)
 {
-       asm volatile ("" : : "m" (URCU_TLS(lib_ring_buffer_nesting)));
+       __asm__ __volatile__ ("" : : "m" (URCU_TLS(lib_ring_buffer_nesting)));
 }
 
 void lib_ringbuffer_signal_init(void)
This page took 0.030641 seconds and 4 git commands to generate.