Implement ring buffer periodic buffer switch timer
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 6 Mar 2013 01:03:37 +0000 (20:03 -0500)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Wed, 6 Mar 2013 01:03:37 +0000 (20:03 -0500)
The tricky part was the teardown. It must not race against neither
pending signals nor in-flight signal handler execution, otherwise the
timer handler could access freed channel data. Use a dedicated thread to
handle those signals, with a synchronization point between each handler
execution.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
liblttng-ust-ctl/ustctl.c
liblttng-ust/lttng-ust-abi.c
libringbuffer/frontend.h
libringbuffer/frontend_types.h
libringbuffer/ring_buffer_frontend.c

index 54456c8e9712dc21ffd64360b602d3fe8c3f49ba..fb4330e4da8b139645348cbaf1d5ae6a445ab910 100644 (file)
@@ -1569,6 +1569,7 @@ void ustctl_init(void)
        lttng_ring_buffer_metadata_client_init();
        lttng_ring_buffer_client_overwrite_init();
        lttng_ring_buffer_client_discard_init();
+       lib_ringbuffer_signal_init();
 }
 
 static __attribute__((destructor))
index 9085b04049fc48f6467785348120485bf1551479..94c059684422e5039e3a4e88fc461541ff8d2d54 100644 (file)
@@ -412,6 +412,7 @@ int lttng_abi_map_channel(int session_objd,
 
        chan = shmp(channel_handle, channel_handle->chan);
        assert(chan);
+       chan->handle = channel_handle;
        config = &chan->backend.config;
        lttng_chan = channel_get_private(chan);
        if (!lttng_chan) {
index b59948df7b42e8b7d62c61c1c97429fb92504998..2eda6e945b218c8b557886084202feb4b5151716 100644 (file)
@@ -109,6 +109,12 @@ extern int lib_ring_buffer_open_read(struct lttng_ust_lib_ring_buffer *buf,
 extern void lib_ring_buffer_release_read(struct lttng_ust_lib_ring_buffer *buf,
                                         struct lttng_ust_shm_handle *handle);
 
+/*
+ * Initialize signals for ring buffer. Should be called early e.g. by
+ * main() in the program to affect all threads.
+ */
+void lib_ringbuffer_signal_init(void);
+
 /*
  * Read sequence: snapshot, many get_subbuf/put_subbuf, move_consumer.
  */
index 9d3932d8978d51b17f64c87af472c675e6472824..c23fdd2841e85f808e6505455205c67231be1aac 100644 (file)
@@ -29,6 +29,7 @@
  */
 
 #include <string.h>
+#include <time.h>      /* for timer_t */
 
 #include <urcu/list.h>
 #include <urcu/uatomic.h>
@@ -37,6 +38,7 @@
 #include <usterr-signal-safe.h>
 #include "backend_types.h"
 #include "shm_internal.h"
+#include "shm_types.h"
 #include "vatomic.h"
 
 /*
@@ -56,12 +58,17 @@ struct channel {
                                                 * subbuffer index.
                                                 */
 
-       unsigned long switch_timer_interval;    /* Buffer flush (jiffies) */
-       unsigned long read_timer_interval;      /* Reader wakeup (jiffies) */
+       unsigned long switch_timer_interval;    /* Buffer flush (us) */
+       timer_t switch_timer;
+       int switch_timer_enabled;
+
+       unsigned long read_timer_interval;      /* Reader wakeup (us) */
+       //timer_t read_timer;
        //wait_queue_head_t read_wait;          /* reader wait queue */
        int finalized;                          /* Has channel been finalized */
        size_t priv_data_offset;
        unsigned int nr_streams;                /* Number of streams */
+       struct lttng_ust_shm_handle *handle;
        char padding[RB_CHANNEL_PADDING];
        /*
         * Associated backend contains a variable-length array. Needs to
@@ -118,8 +125,6 @@ struct lttng_ust_lib_ring_buffer {
        union v_atomic records_overrun; /* Number of overwritten records */
        //wait_queue_head_t read_wait;  /* reader buffer-level wait queue */
        int finalized;                  /* buffer has been finalized */
-       //struct timer_list switch_timer;       /* timer for periodical switch */
-       //struct timer_list read_timer; /* timer for read poll */
        unsigned long get_subbuf_consumed;      /* Read-side consumed */
        unsigned long prod_snapshot;    /* Producer count snapshot */
        unsigned long cons_snapshot;    /* Consumer count snapshot */
index 393aa9f0d30d50331a453d1435f3b0e2eacaf24c..76f369e9bc7bbee03faf55627c254855ddf54f40 100644 (file)
 #include <sys/types.h>
 #include <sys/mman.h>
 #include <sys/stat.h>
+#include <unistd.h>
 #include <fcntl.h>
+#include <signal.h>
+#include <time.h>
 #include <urcu/compiler.h>
 #include <urcu/ref.h>
 #include <urcu/tls-compat.h>
 /* Print DBG() messages about events lost only every 1048576 hits */
 #define DBG_PRINT_NR_LOST      (1UL << 20)
 
+#define LTTNG_UST_RB_SIG               SIGRTMIN
+#define LTTNG_UST_RB_SIG_TEARDOWN      SIGRTMIN + 1
+#define CLOCKID                CLOCK_MONOTONIC
+
 /*
  * Use POSIX SHM: shm_open(3) and shm_unlink(3).
  * close(2) to close the fd returned by shm_open.
@@ -110,6 +117,20 @@ void lib_ring_buffer_print_errors(struct channel *chan,
                                struct lttng_ust_lib_ring_buffer *buf, int cpu,
                                struct lttng_ust_shm_handle *handle);
 
+/*
+ * Handle timer teardown race wrt memory free of private data by
+ * ring buffer signals are handled by a single thread, which permits
+ * a synchronization point between handling of each signal.
+ * Protected by the ust mutex.
+ */
+struct timer_signal_data {
+       pthread_t tid;  /* thread id managing signals */
+       int setup_done;
+       int qs_done;
+};
+
+static struct timer_signal_data timer_signal;
+
 /**
  * lib_ring_buffer_reset - Reset ring buffer to initial values.
  * @buf: Ring buffer.
@@ -264,37 +285,208 @@ static void switch_buffer_timer(unsigned long data)
 }
 #endif //0
 
-static void lib_ring_buffer_start_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
+static
+void lib_ring_buffer_channel_switch_timer(int sig, siginfo_t *si, void *uc)
 {
-       struct channel *chan = shmp(handle, buf->backend.chan);
-       //const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
+       const struct lttng_ust_lib_ring_buffer_config *config;
+       struct lttng_ust_shm_handle *handle;
+       struct channel *chan;
+       int cpu;
+
+       assert(CMM_LOAD_SHARED(timer_signal.tid) == pthread_self());
+
+       chan = si->si_value.sival_ptr;
+       handle = chan->handle;
+       config = &chan->backend.config;
 
-       if (!chan->switch_timer_interval || buf->switch_timer_enabled)
+       DBG("Timer for channel %p\n", chan);
+
+       if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
+               for_each_possible_cpu(cpu) {
+                       struct lttng_ust_lib_ring_buffer *buf =
+                               shmp(handle, chan->backend.buf[cpu].shmp);
+
+                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                               chan->handle);
+               }
+       } else {
+               struct lttng_ust_lib_ring_buffer *buf =
+                       shmp(handle, chan->backend.buf[0].shmp);
+
+                       lib_ring_buffer_switch_slow(buf, SWITCH_ACTIVE,
+                               chan->handle);
+       }
+       return;
+}
+
+static
+void rb_setmask(sigset_t *mask)
+{
+       int ret;
+
+       ret = sigemptyset(mask);
+       if (ret) {
+               PERROR("sigemptyset");
+       }
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG);
+       if (ret) {
+               PERROR("sigaddset");
+       }
+       ret = sigaddset(mask, LTTNG_UST_RB_SIG_TEARDOWN);
+       if (ret) {
+               PERROR("sigaddset");
+       }
+}
+
+static
+void *sig_thread(void *arg)
+{
+       sigset_t mask;
+       siginfo_t info;
+       int signr;
+
+       /* Only self thread will receive signal mask. */
+       rb_setmask(&mask);
+       CMM_STORE_SHARED(timer_signal.tid, pthread_self());
+
+       for (;;) {
+               signr = sigwaitinfo(&mask, &info);
+               if (signr == -1) {
+                       PERROR("sigwaitinfo");
+                       continue;
+               }
+               if (signr == LTTNG_UST_RB_SIG) {
+                       lib_ring_buffer_channel_switch_timer(info.si_signo,
+                                       &info, NULL);
+               } else if (signr == LTTNG_UST_RB_SIG_TEARDOWN) {
+                       cmm_smp_mb();
+                       CMM_STORE_SHARED(timer_signal.qs_done, 1);
+                       cmm_smp_mb();
+               } else {
+                       ERR("Unexptected signal %d\n", info.si_signo);
+               }
+       }
+       return NULL;
+}
+
+/*
+ * Called with ust_lock() held.
+ * Ensure only a single thread listens on the timer signal.
+ */
+static
+void lib_ring_buffer_setup_timer_thread(void)
+{
+       pthread_t thread;
+       int ret;
+
+       if (timer_signal.setup_done)
                return;
-       //TODO
-       //init_timer(&buf->switch_timer);
-       //buf->switch_timer.function = switch_buffer_timer;
-       //buf->switch_timer.expires = jiffies + chan->switch_timer_interval;
-       //buf->switch_timer.data = (unsigned long)buf;
-       //if (config->alloc == RING_BUFFER_ALLOC_PER_CPU)
-       //      add_timer_on(&buf->switch_timer, buf->backend.cpu);
-       //else
-       //      add_timer(&buf->switch_timer);
-       buf->switch_timer_enabled = 1;
+
+       ret = pthread_create(&thread, NULL, &sig_thread, NULL);
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_create");
+       }
+       ret = pthread_detach(thread);
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_detach");
+       }
+       timer_signal.setup_done = 1;
 }
 
-static void lib_ring_buffer_stop_switch_timer(struct lttng_ust_lib_ring_buffer *buf,
-                          struct lttng_ust_shm_handle *handle)
+/*
+ * Called with ust_lock() held.
+ */
+static
+void lib_ring_buffer_channel_switch_timer_start(struct channel *chan)
 {
-       struct channel *chan = shmp(handle, buf->backend.chan);
+       struct sigevent sev;
+       struct itimerspec its;
+       int ret;
 
-       if (!chan->switch_timer_interval || !buf->switch_timer_enabled)
+       if (!chan->switch_timer_interval || chan->switch_timer_enabled)
                return;
 
-       //TODO
-       //del_timer_sync(&buf->switch_timer);
-       buf->switch_timer_enabled = 0;
+       chan->switch_timer_enabled = 1;
+
+       lib_ring_buffer_setup_timer_thread();
+
+       sev.sigev_notify = SIGEV_SIGNAL;
+       sev.sigev_signo = LTTNG_UST_RB_SIG;
+       sev.sigev_value.sival_ptr = chan;
+       ret = timer_create(CLOCKID, &sev, &chan->switch_timer);
+       if (ret == -1) {
+               PERROR("timer_create");
+       }
+
+       its.it_value.tv_sec = chan->switch_timer_interval / 1000000;
+       its.it_value.tv_nsec = chan->switch_timer_interval % 1000000;
+       its.it_interval.tv_sec = its.it_value.tv_sec;
+       its.it_interval.tv_nsec = its.it_value.tv_nsec;
+
+       ret = timer_settime(chan->switch_timer, 0, &its, NULL);
+       if (ret == -1) {
+               PERROR("timer_settime");
+       }
+}
+
+/*
+ * Called with ust_lock() held.
+ */
+static
+void lib_ring_buffer_channel_switch_timer_stop(struct channel *chan)
+{
+       sigset_t pending_set;
+       int sig_is_pending, ret;
+
+       if (!chan->switch_timer_interval || !chan->switch_timer_enabled)
+               return;
+
+       ret = timer_delete(chan->switch_timer);
+       if (ret == -1) {
+               PERROR("timer_delete");
+       }
+
+       /*
+        * Ensure we don't have any signal queued for this channel.
+        */
+       for (;;) {
+               ret = sigemptyset(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigemptyset");
+               }
+               ret = sigpending(&pending_set);
+               if (ret == -1) {
+                       PERROR("sigpending");
+               }
+               sig_is_pending = sigismember(&pending_set, LTTNG_UST_RB_SIG);
+               if (!sig_is_pending)
+                       break;
+               caa_cpu_relax();
+       }
+
+       /*
+        * From this point, no new signal handler will be fired that
+        * would try to access "chan". However, we still need to wait
+        * for any currently executing handler to complete.
+        */
+       cmm_smp_mb();
+       CMM_STORE_SHARED(timer_signal.qs_done, 0);
+       cmm_smp_mb();
+
+       /*
+        * Kill with LTTNG_UST_RB_SIG_TEARDOWN, so signal management
+        * thread wakes up.
+        */
+       kill(getpid(), LTTNG_UST_RB_SIG_TEARDOWN);
+
+       while (!CMM_LOAD_SHARED(timer_signal.qs_done))
+               caa_cpu_relax();
+       cmm_smp_mb();
+
+       chan->switch_timer = 0;
+       chan->switch_timer_enabled = 0;
 }
 
 #if 0
@@ -381,17 +573,16 @@ static void channel_unregister_notifiers(struct channel *chan,
        const struct lttng_ust_lib_ring_buffer_config *config = &chan->backend.config;
        int cpu;
 
+       lib_ring_buffer_channel_switch_timer_stop(chan);
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                for_each_possible_cpu(cpu) {
                        struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
 
-                       lib_ring_buffer_stop_switch_timer(buf, handle);
                        lib_ring_buffer_stop_read_timer(buf, handle);
                }
        } else {
                struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
 
-               lib_ring_buffer_stop_switch_timer(buf, handle);
                lib_ring_buffer_stop_read_timer(buf, handle);
        }
        //channel_backend_unregister_notifiers(&chan->backend);
@@ -527,14 +718,16 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
        if (ret)
                goto error_backend_init;
 
+       chan->handle = handle;
        chan->commit_count_mask = (~0UL >> chan->backend.num_subbuf_order);
+       chan->switch_timer_interval = switch_timer_interval;
+
        //TODO
-       //chan->switch_timer_interval = usecs_to_jiffies(switch_timer_interval);
-       //chan->read_timer_interval = usecs_to_jiffies(read_timer_interval);
-       //TODO
+       //chan->read_timer_interval = read_timer_interval;
        //init_waitqueue_head(&chan->read_wait);
        //init_waitqueue_head(&chan->hp_wait);
 
+       lib_ring_buffer_channel_switch_timer_start(chan);
        if (config->alloc == RING_BUFFER_ALLOC_PER_CPU) {
                /*
                 * In case of non-hotplug cpu, if the ring-buffer is allocated
@@ -543,13 +736,11 @@ struct lttng_ust_shm_handle *channel_create(const struct lttng_ust_lib_ring_buff
                 */
                for_each_possible_cpu(cpu) {
                        struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[cpu].shmp);
-                       lib_ring_buffer_start_switch_timer(buf, handle);
                        lib_ring_buffer_start_read_timer(buf, handle);
                }
        } else {
                struct lttng_ust_lib_ring_buffer *buf = shmp(handle, chan->backend.buf[0].shmp);
 
-               lib_ring_buffer_start_switch_timer(buf, handle);
                lib_ring_buffer_start_read_timer(buf, handle);
        }
        return handle;
@@ -1592,3 +1783,20 @@ void lttng_fixup_ringbuffer_tls(void)
 {
        asm volatile ("" : : "m" (URCU_TLS(lib_ring_buffer_nesting)));
 }
+
+void lib_ringbuffer_signal_init(void)
+{
+       sigset_t mask;
+       int ret;
+
+       /*
+        * Block signal for entire process, so only our thread processes
+        * it.
+        */
+       rb_setmask(&mask);
+       ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+       if (ret) {
+               errno = ret;
+               PERROR("pthread_sigmask");
+       }
+}
This page took 0.032068 seconds and 4 git commands to generate.