Add mising include in ust.h
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index d0e9466810cc0cb45b7ef51e40a67c08e367cf33..c177f337da01aa4f457fbea66add969a837d691f 100644 (file)
@@ -80,8 +80,9 @@
 /* Print DBG() messages about events lost only every 1048576 hits */
 #define DBG_PRINT_NR_LOST      (1UL << 20)
 
-#define LTTNG_UST_RB_SIG               SIGRTMIN
-#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_FLUSH         SIGRTMIN
+#define LTTNG_UST_RB_SIG_READ          SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 2
 #define CLOCKID                CLOCK_MONOTONIC
 
 /*
@@ -127,15 +128,21 @@ void lib_ring_buffer_print_errors(struct channel *chan,
  * Handle timer teardown race wrt memory free of private data by
  * ring buffer signals are handled by a single thread, which permits
  * a synchronization point between handling of each signal.
- * Protected by the ust mutex.
+ * Protected by the lock within the structure.
  */
 struct timer_signal_data {
        pthread_t tid;  /* thread id managing signals */
        int setup_done;
        int qs_done;
+       pthread_mutex_t lock;
 };
 
-static struct timer_signal_data timer_signal;
+static struct timer_signal_data timer_signal = {
+       .tid = 0,
+       .setup_done = 0,
+       .qs_done = 0,
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
 
 /**
  * lib_ring_buffer_reset - Reset ring buffer to initial values.
@@ -282,7 +289,7 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
        handle = chan->handle;
        config = &chan->backend.config;
 
-       DBG("Timer for channel %p\n", chan);
+       DBG("Switch timer for channel %p\n", chan);
 
        /*
         * Only flush buffers periodically if readers are active.
@@ -300,11 +307,61 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
                struct lttng_ust_lib_ring_buffer *buf =
                        shmp(handle, chan->backend.buf[0].shmp);
 
-                       if (uatomic_read(&buf->active_readers))
-                               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
-                                       chan->handle);
+               if (uatomic_read(&buf->active_readers))
+                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                               chan->handle);
+       }
+       pthread_mutex_unlock(&wakeup_fd_mutex);
+       return;
+}
+
+static
+void lib_ring_buffer_channel_do_read(struct channel *chan)
+{
+       const struct lttng_ust_lib_ring_buffer_config *config;
+       struct lttng_ust_shm_handle *handle;
+       int cpu;
+
+       handle = chan->handle;
+       config = &chan->backend.config;
+
+       /*
+        * Only flush buffers periodically if readers are active.
+        */
+       pthread_mutex_lock(&wakeup_fd_mutex);
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               for_each_possible_cpu(cpu) {
+                       struct lttng_ust_lib_ring_buffer *buf =
+                               shmp(handle, chan->backend.buf[cpu].shmp);
+
+                       if (uatomic_read(&buf->active_readers)
+                           && lib_ring_buffer_poll_deliver(config, buf,
+                                       chan, handle)) {
+                               lib_ring_buffer_wakeup(buf, handle);
+                       }
+               }
+       } else {
+               struct lttng_ust_lib_ring_buffer *buf =
+                       shmp(handle, chan->backend.buf[0].shmp);
+
+               if (uatomic_read(&buf->active_readers)
+                   && lib_ring_buffer_poll_deliver(config, buf,
+                               chan, handle)) {
+                       lib_ring_buffer_wakeup(buf, handle);
+               }
        }
        pthread_mutex_unlock(&wakeup_fd_mutex);
+}
+
+static
+void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc)
+{
+       struct channel *chan;
+
+       assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
+       chan = si->si_value.sival_ptr;
+       DBG("Read timer for channel %p\n", chan);
+       lib_ring_buffer_channel_do_read(chan);
        return;
 }
 
@@ -317,7 +374,11 @@ void rb_setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigemptyset");
        }
-       ret = sigaddset(mask, LTTNG_UST_RB_SIG);
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_FLUSH);
+       if (ret) {
+               PERROR("sigaddset");
+       }
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_READ);
        if (ret) {
                PERROR("sigaddset");
        }
@@ -341,12 +402,16 @@ void *sig_thread(void *arg)
        for (;;) {
                signr = sigwaitinfo(&mask, &info);
                if (signr == -1) {
-                       PERROR("sigwaitinfo");
+                       if (errno != EINTR)
+                               PERROR("sigwaitinfo");
                        continue;
                }
-               if (signr == LTTNG_UST_RB_SIG) {
+               if (signr == LTTNG_UST_RB_SIG_FLUSH) {
                        lib_ring_buffer_channel_switch_timer(info.si_signo,
                                        &info, NULL);
+               } else if (signr == LTTNG_UST_RB_SIG_READ) {
+                       lib_ring_buffer_channel_read_timer(info.si_signo,
+                                       &info, NULL);
                } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
@@ -359,7 +424,6 @@ void *sig_thread(void *arg)
 }
 
 /*
- * Called with ust_lock() held.
  * Ensure only a single thread listens on the timer signal.
  */
 static
@@ -368,8 +432,9 @@ void lib_ring_buffer_setup_timer_thread(void)
        pthread_t thread;
        int ret;
 
+       pthread_mutex_lock(&timer_signal.lock);
        if (timer_signal.setup_done)
-               return;
+               goto end;
 
        ret = pthread_create(&thread, NULL, &sig_thread, NULL);
        if (ret) {
@@ -382,11 +447,64 @@ void lib_ring_buffer_setup_timer_thread(void)
                PERROR("pthread_detach");
        }
        timer_signal.setup_done = 1;
+end:
+       pthread_mutex_unlock(&timer_signal.lock);
 }
 
 /*
- * Called with ust_lock() held.
+ * Wait for signal-handling thread quiescent state.
  */
+static
+void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr)
+{
+       sigset_t pending_set;
+       int ret;
+
+       /*
+        * We need to be the only thread interacting with the thread
+        * that manages signals for teardown synchronization.
+        */
+       pthread_mutex_lock(&timer_signal.lock);
+
+       /*
+        * Ensure we don't have any signal queued for this channel.
+        */
+       for (;;) {
+               ret = sigemptyset(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigemptyset");
+               }
+               ret = sigpending(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigpending");
+               }
+               if (!sigismember(&pending_set, signr))
+                       break;
+               caa_cpu_relax();
+       }
+
+       /*
+        * From this point, no new signal handler will be fired that
+        * would try to access "chan". However, we still need to wait
+        * for any currently executing handler to complete.
+        */
+       cmm_smp_mb();
+       CMM_STORE_SHARED(timer_signal.qs_done, 0);
+       cmm_smp_mb();
+
+       /*
+        * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
+        * thread wakes up.
+        */
+       kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
+
+       while (!CMM_LOAD_SHARED(timer_signal.qs_done))
+               caa_cpu_relax();
+       cmm_smp_mb();
+
+       pthread_mutex_unlock(&timer_signal.lock);
+}
+
 static
 void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
 {
@@ -402,7 +520,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
        lib_ring_buffer_setup_timer_thread();
 
        sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_UST_RB_SIG;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
        sev.sigev_value.sival_ptr = chan;
        ret = timer_create(CLOCKID, &sev, &chan->switch_timer);
        if (ret == -1) {
@@ -420,14 +538,10 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
        }
 }
 
-/*
- * Called with ust_lock() held.
- */
 static
 void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
 {
-       sigset_t pending_set;
-       int sig_is_pending, ret;
+       int ret;
 
        if (!chan->switch_timer_interval || !chan->switch_timer_enabled)
                return;
@@ -437,144 +551,79 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
                PERROR("timer_delete");
        }
 
-       /*
-        * Ensure we don't have any signal queued for this channel.
-        */
-       for (;;) {
-               ret = sigemptyset(&pending_set);
-               if (ret == -1) {
-                       PERROR("sigemptyset");
-               }
-               ret = sigpending(&pending_set);
-               if (ret == -1) {
-                       PERROR("sigpending");
-               }
-               sig_is_pending = sigismember(&pending_set, LTTNG_UST_RB_SIG);
-               if (!sig_is_pending)
-                       break;
-               caa_cpu_relax();
-       }
-
-       /*
-        * From this point, no new signal handler will be fired that
-        * would try to access "chan". However, we still need to wait
-        * for any currently executing handler to complete.
-        */
-       cmm_smp_mb();
-       CMM_STORE_SHARED(timer_signal.qs_done, 0);
-       cmm_smp_mb();
-
-       /*
-        * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
-        * thread wakes up.
-        */
-       kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
-
-       while (!CMM_LOAD_SHARED(timer_signal.qs_done))
-               caa_cpu_relax();
-       cmm_smp_mb();
+       lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_FLUSH);
 
        chan->switch_timer = 0;
        chan->switch_timer_enabled = 0;
 }
 
-#if 0
-/*
- * Polling timer to check the channels for data.
- */
-static void read_buffer_timer(unsigned long data)
+static
+void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
 {
-       struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       struct sigevent sev;
+       struct itimerspec its;
+       int ret;
 
-       CHAN_WARN_ON(chan, !buf->backend.allocated);
+       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
+                       || !chan->read_timer_interval || chan->read_timer_enabled)
+               return;
 
-       if (uatomic_read(&buf->active_readers))
-           && lib_ring_buffer_poll_deliver(config, buf, chan)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
-       }
+       chan->read_timer_enabled = 1;
 
-       //TODO
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      mod_timer_pinned(&buf->read_timer,
-       //                       jiffies + chan->read_timer_interval);
-       //else
-       //      mod_timer(&buf->read_timer,
-       //                jiffies + chan->read_timer_interval);
-}
-#endif //0
+       lib_ring_buffer_setup_timer_thread();
 
-static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
-{
-       struct channel *chan = shmp(handle, buf->backend.chan);
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_READ;
+       sev.sigev_value.sival_ptr = chan;
+       ret = timer_create(CLOCKID, &sev, &chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_create");
+       }
 
-       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || buf->read_timer_enabled)
-               return;
+       its.it_value.tv_sec = chan->read_timer_interval / 1000000;
+       its.it_value.tv_nsec = chan->read_timer_interval % 1000000;
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       //TODO
-       //init_timer(&buf->read_timer);
-       //buf->read_timer.function = read_buffer_timer;
-       //buf->read_timer.expires = jiffies + chan->read_timer_interval;
-       //buf->read_timer.data = (unsigned long)buf;
-
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      add_timer_on(&buf->read_timer, buf->backend.cpu);
-       //else
-       //      add_timer(&buf->read_timer);
-       buf->read_timer_enabled = 1;
+       ret = timer_settime(chan->read_timer, 0, &its, NULL);
+       if (ret == -1) {
+               PERROR("timer_settime");
+       }
 }
 
-static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
+static
+void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
 {
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       int ret;
 
        if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || !buf->read_timer_enabled)
+                       || !chan->read_timer_interval || !chan->read_timer_enabled)
                return;
 
-       //TODO
-       //del_timer_sync(&buf->read_timer);
+       ret = timer_delete(chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_delete");
+       }
+
        /*
         * do one more check to catch data that has been written in the last
         * timer period.
         */
-       if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
-       }
-       buf->read_timer_enabled = 0;
+       lib_ring_buffer_channel_do_read(chan);
+
+       lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_READ);
+
+       chan->read_timer = 0;
+       chan->read_timer_enabled = 0;
 }
 
 static void channel_unregister_notifiers(struct channel *chan,
                           struct lttng_ust_shm_handle *handle)
 {
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
-       int cpu;
-
        lib_ring_buffer_channel_switch_timer_stop(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-
-                       lib_ring_buffer_stop_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
-
-               lib_ring_buffer_stop_read_timer(buf, handle);
-       }
-       //channel_backend_unregister_notifiers(&chan->backend);
+       lib_ring_buffer_channel_read_timer_stop(chan);
 }
 
 static void channel_print_errors(struct channel *chan,
@@ -637,7 +686,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
                   size_t num_subbuf, unsigned int switch_timer_interval,
                   unsigned int read_timer_interval)
 {
-       int ret, cpu;
+       int ret;
        size_t shmsize, chansize;
        struct channel *chan;
        struct lttng_ust_shm_handle *handle;
@@ -708,29 +757,12 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
 
        chan->handle = handle;
        chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
-       chan->switch_timer_interval = switch_timer_interval;
-
-       //TODO
-       //chan->read_timer_interval = read_timer_interval;
-       //init_waitqueue_head(&chan->read_wait);
-       //init_waitqueue_head(&chan->hp_wait);
 
+       chan->switch_timer_interval = switch_timer_interval;
+       chan->read_timer_interval = read_timer_interval;
        lib_ring_buffer_channel_switch_timer_start(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               /*
-                * In case of non-hotplug cpu, if the ring-buffer is allocated
-                * in early initcall, it will not be notified of secondary cpus.
-                * In that off case, we need to allocate for all possible cpus.
-                */
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-                       lib_ring_buffer_start_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
+       lib_ring_buffer_channel_read_timer_start(chan);
 
-               lib_ring_buffer_start_read_timer(buf, handle);
-       }
        return handle;
 
 error_backend_init:
@@ -995,7 +1027,7 @@ nodata:
 }
 
 /**
- * lib_ring_buffer_put_snapshot - move consumed counter forward
+ * lib_ring_buffer_move_consumer - move consumed counter forward
  * @buf: ring buffer
  * @consumed_new: new consumed count value
  */
This page took 0.028518 seconds and 4 git commands to generate.