Fix: add internal mutex for timer
[lttng-ust.git] / libringbuffer / ring_buffer_frontend.c
index a01ebbbf8ae8931a937dac9c4a7ec19ba29ea4c8..c177f337da01aa4f457fbea66add969a837d691f 100644 (file)
@@ -80,8 +80,9 @@
 /* Print DBG() messages about events lost only every 1048576 hits */
 #define DBG_PRINT_NR_LOST      (1UL << 20)
 
-#define LTTNG_UST_RB_SIG               SIGRTMIN
-#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_FLUSH         SIGRTMIN
+#define LTTNG_UST_RB_SIG_READ          SIGRTMIN + 1
+#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 2
 #define CLOCKID                CLOCK_MONOTONIC
 
 /*
@@ -112,6 +113,12 @@ struct switch_offsets {
 
 DEFINE_URCU_TLS(unsigned int, lib_ring_buffer_nesting);
 
+/*
+ * wakeup_fd_mutex protects wakeup fd use by timer from concurrent
+ * close.
+ */
+static pthread_mutex_t wakeup_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
+
 static
 void lib_ring_buffer_print_errors(struct channel *chan,
                                struct lttng_ust_lib_ring_buffer *buf, int cpu,
@@ -121,15 +128,21 @@ void lib_ring_buffer_print_errors(struct channel *chan,
  * Handle timer teardown race wrt memory free of private data by
  * ring buffer signals are handled by a single thread, which permits
  * a synchronization point between handling of each signal.
- * Protected by the ust mutex.
+ * Protected by the lock within the structure.
  */
 struct timer_signal_data {
        pthread_t tid;  /* thread id managing signals */
        int setup_done;
        int qs_done;
+       pthread_mutex_t lock;
 };
 
-static struct timer_signal_data timer_signal;
+static struct timer_signal_data timer_signal = {
+       .tid = 0,
+       .setup_done = 0,
+       .qs_done = 0,
+       .lock = PTHREAD_MUTEX_INITIALIZER,
+};
 
 /**
  * lib_ring_buffer_reset - Reset ring buffer to initial values.
@@ -262,29 +275,6 @@ free_chanbuf:
        return ret;
 }
 
-#if 0
-static void switch_buffer_timer(unsigned long data)
-{
-       struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
-       struct channel *chan = shmp(handle, buf->backend.chan);
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
-
-       /*
-        * Only flush buffers periodically if readers are active.
-        */
-       if (uatomic_read(&buf->active_readers))
-               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE, handle);
-
-       //TODO timers
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      mod_timer_pinned(&buf->switch_timer,
-       //                       jiffies + chan->switch_timer_interval);
-       //else
-       //      mod_timer(&buf->switch_timer,
-       //                jiffies + chan->switch_timer_interval);
-}
-#endif //0
-
 static
 void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
 {
@@ -299,23 +289,79 @@ void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
        handle = chan->handle;
        config = &chan->backend.config;
 
-       DBG("Timer for channel %p\n", chan);
+       DBG("Switch timer for channel %p\n", chan);
 
+       /*
+        * Only flush buffers periodically if readers are active.
+        */
+       pthread_mutex_lock(&wakeup_fd_mutex);
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                for_each_possible_cpu(cpu) {
                        struct lttng_ust_lib_ring_buffer *buf =
                                shmp(handle, chan->backend.buf[cpu].shmp);
+                       if (uatomic_read(&buf->active_readers))
+                               lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                                       chan->handle);
+               }
+       } else {
+               struct lttng_ust_lib_ring_buffer *buf =
+                       shmp(handle, chan->backend.buf[0].shmp);
 
+               if (uatomic_read(&buf->active_readers))
                        lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
                                chan->handle);
+       }
+       pthread_mutex_unlock(&wakeup_fd_mutex);
+       return;
+}
+
+static
+void lib_ring_buffer_channel_do_read(struct channel *chan)
+{
+       const struct lttng_ust_lib_ring_buffer_config *config;
+       struct lttng_ust_shm_handle *handle;
+       int cpu;
+
+       handle = chan->handle;
+       config = &chan->backend.config;
+
+       /*
+        * Only flush buffers periodically if readers are active.
+        */
+       pthread_mutex_lock(&wakeup_fd_mutex);
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               for_each_possible_cpu(cpu) {
+                       struct lttng_ust_lib_ring_buffer *buf =
+                               shmp(handle, chan->backend.buf[cpu].shmp);
+
+                       if (uatomic_read(&buf->active_readers)
+                           && lib_ring_buffer_poll_deliver(config, buf,
+                                       chan, handle)) {
+                               lib_ring_buffer_wakeup(buf, handle);
+                       }
                }
        } else {
                struct lttng_ust_lib_ring_buffer *buf =
                        shmp(handle, chan->backend.buf[0].shmp);
 
-                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
-                               chan->handle);
+               if (uatomic_read(&buf->active_readers)
+                   && lib_ring_buffer_poll_deliver(config, buf,
+                               chan, handle)) {
+                       lib_ring_buffer_wakeup(buf, handle);
+               }
        }
+       pthread_mutex_unlock(&wakeup_fd_mutex);
+}
+
+static
+void lib_ring_buffer_channel_read_timer(int sig, siginfo_t *si, void *uc)
+{
+       struct channel *chan;
+
+       assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
+       chan = si->si_value.sival_ptr;
+       DBG("Read timer for channel %p\n", chan);
+       lib_ring_buffer_channel_do_read(chan);
        return;
 }
 
@@ -328,7 +374,11 @@ void rb_setmask(sigset_t *mask)
        if (ret) {
                PERROR("sigemptyset");
        }
-       ret = sigaddset(mask, LTTNG_UST_RB_SIG);
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_FLUSH);
+       if (ret) {
+               PERROR("sigaddset");
+       }
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_READ);
        if (ret) {
                PERROR("sigaddset");
        }
@@ -352,12 +402,16 @@ void *sig_thread(void *arg)
        for (;;) {
                signr = sigwaitinfo(&mask, &info);
                if (signr == -1) {
-                       PERROR("sigwaitinfo");
+                       if (errno != EINTR)
+                               PERROR("sigwaitinfo");
                        continue;
                }
-               if (signr == LTTNG_UST_RB_SIG) {
+               if (signr == LTTNG_UST_RB_SIG_FLUSH) {
                        lib_ring_buffer_channel_switch_timer(info.si_signo,
                                        &info, NULL);
+               } else if (signr == LTTNG_UST_RB_SIG_READ) {
+                       lib_ring_buffer_channel_read_timer(info.si_signo,
+                                       &info, NULL);
                } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) {
                        cmm_smp_mb();
                        CMM_STORE_SHARED(timer_signal.qs_done, 1);
@@ -370,7 +424,6 @@ void *sig_thread(void *arg)
 }
 
 /*
- * Called with ust_lock() held.
  * Ensure only a single thread listens on the timer signal.
  */
 static
@@ -379,8 +432,9 @@ void lib_ring_buffer_setup_timer_thread(void)
        pthread_t thread;
        int ret;
 
+       pthread_mutex_lock(&timer_signal.lock);
        if (timer_signal.setup_done)
-               return;
+               goto end;
 
        ret = pthread_create(&thread, NULL, &sig_thread, NULL);
        if (ret) {
@@ -393,11 +447,64 @@ void lib_ring_buffer_setup_timer_thread(void)
                PERROR("pthread_detach");
        }
        timer_signal.setup_done = 1;
+end:
+       pthread_mutex_unlock(&timer_signal.lock);
 }
 
 /*
- * Called with ust_lock() held.
+ * Wait for signal-handling thread quiescent state.
  */
+static
+void lib_ring_buffer_wait_signal_thread_qs(unsigned int signr)
+{
+       sigset_t pending_set;
+       int ret;
+
+       /*
+        * We need to be the only thread interacting with the thread
+        * that manages signals for teardown synchronization.
+        */
+       pthread_mutex_lock(&timer_signal.lock);
+
+       /*
+        * Ensure we don't have any signal queued for this channel.
+        */
+       for (;;) {
+               ret = sigemptyset(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigemptyset");
+               }
+               ret = sigpending(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigpending");
+               }
+               if (!sigismember(&pending_set, signr))
+                       break;
+               caa_cpu_relax();
+       }
+
+       /*
+        * From this point, no new signal handler will be fired that
+        * would try to access "chan". However, we still need to wait
+        * for any currently executing handler to complete.
+        */
+       cmm_smp_mb();
+       CMM_STORE_SHARED(timer_signal.qs_done, 0);
+       cmm_smp_mb();
+
+       /*
+        * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
+        * thread wakes up.
+        */
+       kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
+
+       while (!CMM_LOAD_SHARED(timer_signal.qs_done))
+               caa_cpu_relax();
+       cmm_smp_mb();
+
+       pthread_mutex_unlock(&timer_signal.lock);
+}
+
 static
 void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
 {
@@ -413,7 +520,7 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
        lib_ring_buffer_setup_timer_thread();
 
        sev.sigev_notify = SIGEV_SIGNAL;
-       sev.sigev_signo = LTTNG_UST_RB_SIG;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_FLUSH;
        sev.sigev_value.sival_ptr = chan;
        ret = timer_create(CLOCKID, &sev, &chan->switch_timer);
        if (ret == -1) {
@@ -431,14 +538,10 @@ void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
        }
 }
 
-/*
- * Called with ust_lock() held.
- */
 static
 void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
 {
-       sigset_t pending_set;
-       int sig_is_pending, ret;
+       int ret;
 
        if (!chan->switch_timer_interval || !chan->switch_timer_enabled)
                return;
@@ -448,144 +551,79 @@ void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
                PERROR("timer_delete");
        }
 
-       /*
-        * Ensure we don't have any signal queued for this channel.
-        */
-       for (;;) {
-               ret = sigemptyset(&pending_set);
-               if (ret == -1) {
-                       PERROR("sigemptyset");
-               }
-               ret = sigpending(&pending_set);
-               if (ret == -1) {
-                       PERROR("sigpending");
-               }
-               sig_is_pending = sigismember(&pending_set, LTTNG_UST_RB_SIG);
-               if (!sig_is_pending)
-                       break;
-               caa_cpu_relax();
-       }
-
-       /*
-        * From this point, no new signal handler will be fired that
-        * would try to access "chan". However, we still need to wait
-        * for any currently executing handler to complete.
-        */
-       cmm_smp_mb();
-       CMM_STORE_SHARED(timer_signal.qs_done, 0);
-       cmm_smp_mb();
-
-       /*
-        * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
-        * thread wakes up.
-        */
-       kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
-
-       while (!CMM_LOAD_SHARED(timer_signal.qs_done))
-               caa_cpu_relax();
-       cmm_smp_mb();
+       lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_FLUSH);
 
        chan->switch_timer = 0;
        chan->switch_timer_enabled = 0;
 }
 
-#if 0
-/*
- * Polling timer to check the channels for data.
- */
-static void read_buffer_timer(unsigned long data)
+static
+void lib_ring_buffer_channel_read_timer_start(struct channel *chan)
 {
-       struct lttng_ust_lib_ring_buffer *buf = (struct lttng_ust_lib_ring_buffer *)data;
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       struct sigevent sev;
+       struct itimerspec its;
+       int ret;
 
-       CHAN_WARN_ON(chan, !buf->backend.allocated);
+       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
+                       || !chan->read_timer_interval || chan->read_timer_enabled)
+               return;
 
-       if (uatomic_read(&buf->active_readers))
-           && lib_ring_buffer_poll_deliver(config, buf, chan)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
-       }
+       chan->read_timer_enabled = 1;
 
-       //TODO
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      mod_timer_pinned(&buf->read_timer,
-       //                       jiffies + chan->read_timer_interval);
-       //else
-       //      mod_timer(&buf->read_timer,
-       //                jiffies + chan->read_timer_interval);
-}
-#endif //0
+       lib_ring_buffer_setup_timer_thread();
 
-static void lib_ring_buffer_start_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
-{
-       struct channel *chan = shmp(handle, buf->backend.chan);
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = LTTNG_UST_RB_SIG_READ;
+       sev.sigev_value.sival_ptr = chan;
+       ret = timer_create(CLOCKID, &sev, &chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_create");
+       }
 
-       if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || buf->read_timer_enabled)
-               return;
+       its.it_value.tv_sec = chan->read_timer_interval / 1000000;
+       its.it_value.tv_nsec = chan->read_timer_interval % 1000000;
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
 
-       //TODO
-       //init_timer(&buf->read_timer);
-       //buf->read_timer.function = read_buffer_timer;
-       //buf->read_timer.expires = jiffies + chan->read_timer_interval;
-       //buf->read_timer.data = (unsigned long)buf;
-
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      add_timer_on(&buf->read_timer, buf->backend.cpu);
-       //else
-       //      add_timer(&buf->read_timer);
-       buf->read_timer_enabled = 1;
+       ret = timer_settime(chan->read_timer, 0, &its, NULL);
+       if (ret == -1) {
+               PERROR("timer_settime");
+       }
 }
 
-static void lib_ring_buffer_stop_read_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
+static
+void lib_ring_buffer_channel_read_timer_stop(struct channel *chan)
 {
-       struct channel *chan = shmp(handle, buf->backend.chan);
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       int ret;
 
        if (config->wakeup != RING_BUFFER_WAKEUP_BY_TIMER
-           || !chan->read_timer_interval
-           || !buf->read_timer_enabled)
+                       || !chan->read_timer_interval || !chan->read_timer_enabled)
                return;
 
-       //TODO
-       //del_timer_sync(&buf->read_timer);
+       ret = timer_delete(chan->read_timer);
+       if (ret == -1) {
+               PERROR("timer_delete");
+       }
+
        /*
         * do one more check to catch data that has been written in the last
         * timer period.
         */
-       if (lib_ring_buffer_poll_deliver(config, buf, chan, handle)) {
-               //TODO
-               //wake_up_interruptible(&buf->read_wait);
-               //wake_up_interruptible(&chan->read_wait);
-       }
-       buf->read_timer_enabled = 0;
+       lib_ring_buffer_channel_do_read(chan);
+
+       lib_ring_buffer_wait_signal_thread_qs(LTTNG_UST_RB_SIG_READ);
+
+       chan->read_timer = 0;
+       chan->read_timer_enabled = 0;
 }
 
 static void channel_unregister_notifiers(struct channel *chan,
                           struct lttng_ust_shm_handle *handle)
 {
-       const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
-       int cpu;
-
        lib_ring_buffer_channel_switch_timer_stop(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-
-                       lib_ring_buffer_stop_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
-
-               lib_ring_buffer_stop_read_timer(buf, handle);
-       }
-       //channel_backend_unregister_notifiers(&chan->backend);
+       lib_ring_buffer_channel_read_timer_stop(chan);
 }
 
 static void channel_print_errors(struct channel *chan,
@@ -648,7 +686,7 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
                   size_t num_subbuf, unsigned int switch_timer_interval,
                   unsigned int read_timer_interval)
 {
-       int ret, cpu;
+       int ret;
        size_t shmsize, chansize;
        struct channel *chan;
        struct lttng_ust_shm_handle *handle;
@@ -719,29 +757,12 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
 
        chan->handle = handle;
        chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
-       chan->switch_timer_interval = switch_timer_interval;
-
-       //TODO
-       //chan->read_timer_interval = read_timer_interval;
-       //init_waitqueue_head(&chan->read_wait);
-       //init_waitqueue_head(&chan->hp_wait);
 
+       chan->switch_timer_interval = switch_timer_interval;
+       chan->read_timer_interval = read_timer_interval;
        lib_ring_buffer_channel_switch_timer_start(chan);
-       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
-               /*
-                * In case of non-hotplug cpu, if the ring-buffer is allocated
-                * in early initcall, it will not be notified of secondary cpus.
-                * In that off case, we need to allocate for all possible cpus.
-                */
-               for_each_possible_cpu(cpu) {
-                       struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-                       lib_ring_buffer_start_read_timer(buf, handle);
-               }
-       } else {
-               struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
+       lib_ring_buffer_channel_read_timer_start(chan);
 
-               lib_ring_buffer_start_read_timer(buf, handle);
-       }
        return handle;
 
 error_backend_init:
@@ -753,7 +774,8 @@ error_table_alloc:
 }
 
 struct lttng_ust_shm_handle *channel_handle_create(void *data,
-                                       uint64_t memory_map_size)
+                                       uint64_t memory_map_size,
+                                       int wakeup_fd)
 {
        struct lttng_ust_shm_handle *handle;
        struct shm_object *object;
@@ -768,7 +790,7 @@ struct lttng_ust_shm_handle *channel_handle_create(void *data,
                goto error_table_alloc;
        /* Add channel object */
        object = shm_object_table_append_mem(handle->table, data,
-                       memory_map_size);
+                       memory_map_size, wakeup_fd);
        if (!object)
                goto error_table_object;
        /* struct channel is at object 0, offset 0 (hardcoded) */
@@ -868,7 +890,27 @@ struct lttng_ust_lib_ring_buffer *channel_get_ring_buffer(
        return shmp(handle, chan->backend.buf[cpu].shmp);
 }
 
-int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_channel_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle)
+{
+       struct shm_ref *ref;
+
+       ref = &handle->chan._ref;
+       return shm_close_wait_fd(handle, ref);
+}
+
+int ring_buffer_channel_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+                       struct channel *chan,
+                       struct lttng_ust_shm_handle *handle)
+{
+       struct shm_ref *ref;
+
+       ref = &handle->chan._ref;
+       return shm_close_wakeup_fd(handle, ref);
+}
+
+int ring_buffer_stream_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                        struct channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
@@ -885,12 +927,13 @@ int ring_buffer_close_wait_fd(const struct lttng_ust_lib_ring_buffer_config *con
        return shm_close_wait_fd(handle, ref);
 }
 
-int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
+int ring_buffer_stream_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *config,
                        struct channel *chan,
                        struct lttng_ust_shm_handle *handle,
                        int cpu)
 {
        struct shm_ref *ref;
+       int ret;
 
        if (config->alloc == RING_BUFFER_ALLOC_GLOBAL) {
                cpu = 0;
@@ -899,7 +942,10 @@ int ring_buffer_close_wakeup_fd(const struct lttng_ust_lib_ring_buffer_config *c
                        return -EINVAL;
        }
        ref = &chan->backend.buf[cpu].shmp._ref;
-       return shm_close_wakeup_fd(handle, ref);
+       pthread_mutex_lock(&wakeup_fd_mutex);
+       ret = shm_close_wakeup_fd(handle, ref);
+       pthread_mutex_unlock(&wakeup_fd_mutex);
+       return ret;
 }
 
 int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
@@ -981,7 +1027,7 @@ nodata:
 }
 
 /**
- * lib_ring_buffer_put_snapshot - move consumed counter forward
+ * lib_ring_buffer_move_consumer - move consumed counter forward
  * @buf: ring buffer
  * @consumed_new: new consumed count value
  */
This page took 0.029535 seconds and 4 git commands to generate.