Fix: AM_CONDITIONAL should be outside AS_IF block
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index 6dd81e14b0bbb6e767b44687d725c210b43661fc..6ca57b501aa6d407d399d25f1aea02bab1261335 100644 (file)
@@ -151,12 +151,19 @@ static struct timer_signal_data timer_signal = {
        .lock = PTHREAD_MUTEX_INITIALIZER,
 };
 
-int lttng_ust_blocking_retry_timeout =
-               CONFIG_LTTNG_UST_DEFAULT_BLOCKING_RETRY_TIMEOUT_MS;
+static bool lttng_ust_allow_blocking;
 
-void lttng_ust_ringbuffer_set_retry_timeout(int timeout)
+void lttng_ust_ringbuffer_set_allow_blocking(void)
 {
-       lttng_ust_blocking_retry_timeout = timeout;
+       lttng_ust_allow_blocking = true;
+}
+
+/* Get blocking timeout, in ms */
+static int lttng_ust_ringbuffer_get_timeout(struct channel *chan)
+{
+       if (!lttng_ust_allow_blocking)
+               return 0;
+       return chan->u.s.blocking_timeout_ms;
 }
 
 /**
@@ -951,7 +958,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
                   void *buf_addr, size_t subbuf_size,
                   size_t num_subbuf, unsigned int switch_timer_interval,
                   unsigned int read_timer_interval,
-                  const int *stream_fds, int nr_stream_fds)
+                  const int *stream_fds, int nr_stream_fds,
+                  int64_t blocking_timeout)
 {
        int ret;
        size_t shmsize, chansize;
@@ -959,6 +967,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
        struct lttng_ust_shm_handle *handle;
        struct shm_object *shmobj;
        unsigned int nr_streams;
+       int64_t blocking_timeout_ms;
 
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
                nr_streams = num_possible_cpus();
@@ -968,6 +977,19 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
        if (nr_stream_fds != nr_streams)
                return NULL;
 
+       if (blocking_timeout < -1) {
+               return NULL;
+       }
+       /* usec to msec */
+       if (blocking_timeout == -1) {
+               blocking_timeout_ms = -1;
+       } else {
+               blocking_timeout_ms = blocking_timeout / 1000;
+               if (blocking_timeout_ms != (int32_t) blocking_timeout_ms) {
+                       return NULL;
+               }
+       }
+
        if (lib_ring_buffer_check_config(config, switch_timer_interval,
                                         read_timer_interval))
                return NULL;
@@ -1021,6 +1043,8 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
                        *priv_data = NULL;
        }
 
+       chan->u.s.blocking_timeout_ms = (int32_t) blocking_timeout_ms;
+
        ret = channel_backend_init(&chan->backend, name, config,
                                   subbuf_size, num_subbuf, handle,
                                   stream_fds);
@@ -1305,6 +1329,43 @@ nodata:
                return -EAGAIN;
 }
 
+/**
+ * Performs the same function as lib_ring_buffer_snapshot(), but the positions
+ * are saved regardless of whether the consumed and produced positions are
+ * in the same subbuffer.
+ * @buf: ring buffer
+ * @consumed: consumed byte count indicating the last position read
+ * @produced: produced byte count indicating the last position written
+ *
+ * This function is meant to provide information on the exact producer and
+ * consumer positions without regard for the "snapshot" feature.
+ */
+int lib_ring_buffer_snapshot_sample_positions(
+                            struct lttng_ust_lib_ring_buffer *buf,
+                            unsigned long *consumed, unsigned long *produced,
+                            struct lttng_ust_shm_handle *handle)
+{
+       struct channel *chan;
+       const struct lttng_ust_lib_ring_buffer_config *config;
+
+       chan = shmp(handle, buf->backend.chan);
+       if (!chan)
+               return -EPERM;
+       config = &chan->backend.config;
+       cmm_smp_rmb();
+       *consumed = uatomic_read(&buf->consumed);
+       /*
+        * No need to issue a memory barrier between consumed count read and
+        * write offset read, because consumed count can only change
+        * concurrently in overwrite mode, and we keep a sequence counter
+        * identifier derived from the write offset to check we are getting
+        * the same sub-buffer we are expecting (the sub-buffers are atomically
+        * "tagged" upon writes, tags are checked upon read).
+        */
+       *produced = v_read(config, &buf->offset);
+       return 0;
+}
+
 /**
  * lib_ring_buffer_move_consumer - move consumed counter forward
  * @buf: ring buffer
@@ -2028,7 +2089,7 @@ int lib_ring_buffer_try_reserve_slow(struct lttng_ust_lib_ring_buffer *buf,
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        struct lttng_ust_shm_handle *handle = ctx->handle;
        unsigned long reserve_commit_diff, offset_cmp;
-       int timeout_left_ms = lttng_ust_blocking_retry_timeout;
+       int timeout_left_ms = lttng_ust_ringbuffer_get_timeout(chan);
 
 retry:
        offsets->begin = offset_cmp = v_read(config, &buf->offset);
This page took 0.023488 seconds and 4 git commands to generate.