Disable signals in URCU background threads
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Fri, 23 Sep 2022 17:55:03 +0000 (13:55 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 27 Sep 2022 14:29:45 +0000 (10:29 -0400)
Applications using signalfd depend on signals being blocked in all
threads of the process, otherwise threads with unblocked signals
can receive them and starve the signalfd.

While some threads in URCU do block signals (e.g. workqueue
worker for rculfhash), the call_rcu, defer_rcu, and rculfhash
partition_resize_helper threads do not.

Always block all signals before creating threads, and only unblock
SIGRCU when registering a urcu-signal thread. Restore the SIGRCU
signal to its pre-registration blocked state on unregistration.

For rculfhash, cds_lfht_worker_init can be removed, because its only
effect is to block all signals except SIGRCU. Blocking all signals is
already done by the workqueue code, and unbloking SIGRCU is now done by
the urcu signal flavor thread regisration.

Co-developed-by: Eric Wong <normalperson@yhbt.net>
Signed-off-by: Eric Wong <normalperson@yhbt.net>
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: If78346b15bdc287417b992a8963098c6ea0dc7d2

src/rculfhash.c
src/urcu-call-rcu-impl.h
src/urcu-defer-impl.h
src/urcu.c
src/workqueue.c

index 7c0b9fb8081fbfd4db089def5665d01d7b6cda90..a41cac83a2f4fdc202b1b1821b98312f58d83018 100644 (file)
@@ -1251,6 +1251,7 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
        struct partition_resize_work *work;
        int ret;
        unsigned long thread, nr_threads;
+       sigset_t newmask, oldmask;
 
        urcu_posix_assert(nr_cpus_mask != -1);
        if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD)
@@ -1273,6 +1274,12 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
                dbg_printf("error allocating for resize, single-threading\n");
                goto fallback;
        }
+
+       ret = sigfillset(&newmask);
+       urcu_posix_assert(!ret);
+       ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+       urcu_posix_assert(!ret);
+
        for (thread = 0; thread < nr_threads; thread++) {
                work[thread].ht = ht;
                work[thread].i = i;
@@ -1294,6 +1301,10 @@ void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
                }
                urcu_posix_assert(!ret);
        }
+
+       ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+       urcu_posix_assert(!ret);
+
        for (thread = 0; thread < nr_threads; thread++) {
                ret = pthread_join(work[thread].thread_id, NULL);
                urcu_posix_assert(!ret);
@@ -2163,29 +2174,6 @@ static struct urcu_atfork cds_lfht_atfork = {
        .after_fork_child = cds_lfht_after_fork_child,
 };
 
-/*
- * Block all signals for the workqueue worker thread to ensure we don't
- * disturb the application. The SIGRCU signal needs to be unblocked for
- * the urcu-signal flavor.
- */
-static void cds_lfht_worker_init(
-               struct urcu_workqueue *workqueue __attribute__((unused)),
-               void *priv __attribute__((unused)))
-{
-       int ret;
-       sigset_t mask;
-
-       ret = sigfillset(&mask);
-       if (ret)
-               urcu_die(errno);
-       ret = sigdelset(&mask, SIGRCU);
-       if (ret)
-               urcu_die(errno);
-       ret = pthread_sigmask(SIG_SETMASK, &mask, NULL);
-       if (ret)
-               urcu_die(ret);
-}
-
 static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
 {
        flavor->register_rculfhash_atfork(&cds_lfht_atfork);
@@ -2194,7 +2182,7 @@ static void cds_lfht_init_worker(const struct rcu_flavor_struct *flavor)
        if (cds_lfht_workqueue_user_count++)
                goto end;
        cds_lfht_workqueue = urcu_workqueue_create(0, -1, NULL,
-               NULL, cds_lfht_worker_init, NULL, NULL, NULL, NULL, NULL);
+               NULL, NULL, NULL, NULL, NULL, NULL, NULL);
 end:
        mutex_unlock(&cds_lfht_fork_mutex);
 }
index e9366b42355b8b0db313cca083b121017192852b..9f85d55b403c5fa1f882947af021c6cae460d712 100644 (file)
@@ -434,6 +434,7 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp,
 {
        struct call_rcu_data *crdp;
        int ret;
+       sigset_t newmask, oldmask;
 
        crdp = malloc(sizeof(*crdp));
        if (crdp == NULL)
@@ -448,9 +449,18 @@ static void call_rcu_data_init(struct call_rcu_data **crdpp,
        crdp->gp_count = 0;
        cmm_smp_mb();  /* Structure initialized before pointer is planted. */
        *crdpp = crdp;
+
+       ret = sigfillset(&newmask);
+       urcu_posix_assert(!ret);
+       ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+       urcu_posix_assert(!ret);
+
        ret = pthread_create(&crdp->tid, NULL, call_rcu_thread, crdp);
        if (ret)
                urcu_die(ret);
+
+       ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+       urcu_posix_assert(!ret);
 }
 
 /*
index b5d79262c03bb88a28cd376d78c4189ca3c578cd..1c9628792ac494db61c870c204811b3f1df61d72 100644 (file)
@@ -409,9 +409,18 @@ void defer_rcu(void (*fct)(void *p), void *p)
 static void start_defer_thread(void)
 {
        int ret;
+       sigset_t newmask, oldmask;
+
+       ret = sigfillset(&newmask);
+       urcu_posix_assert(!ret);
+       ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+       urcu_posix_assert(!ret);
 
        ret = pthread_create(&tid_defer, NULL, thr_defer, NULL);
        urcu_posix_assert(!ret);
+
+       ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+       urcu_posix_assert(!ret);
 }
 
 static void stop_defer_thread(void)
index 59f2e8f18fdddc5a781e25d2478ca86e18a9fee5..cf4d6d03f711750c05e91991f2a64687640b4438 100644 (file)
@@ -110,6 +110,8 @@ static int init_done;
 
 void __attribute__((constructor)) rcu_init(void);
 void __attribute__((destructor)) rcu_exit(void);
+
+static DEFINE_URCU_TLS(int, rcu_signal_was_blocked);
 #endif
 
 /*
@@ -537,8 +539,52 @@ int rcu_read_ongoing(void)
        return _rcu_read_ongoing();
 }
 
+#ifdef RCU_SIGNAL
+/*
+ * Make sure the signal used by the urcu-signal flavor is unblocked
+ * while the thread is registered.
+ */
+static
+void urcu_signal_unblock(void)
+{
+       sigset_t mask, oldmask;
+       int ret;
+
+       ret = sigemptyset(&mask);
+       urcu_posix_assert(!ret);
+       ret = sigaddset(&mask, SIGRCU);
+       urcu_posix_assert(!ret);
+       ret = pthread_sigmask(SIG_UNBLOCK, &mask, &oldmask);
+       urcu_posix_assert(!ret);
+       URCU_TLS(rcu_signal_was_blocked) = sigismember(&oldmask, SIGRCU);
+}
+
+static
+void urcu_signal_restore(void)
+{
+       sigset_t mask;
+       int ret;
+
+       if (!URCU_TLS(rcu_signal_was_blocked))
+               return;
+       ret = sigemptyset(&mask);
+       urcu_posix_assert(!ret);
+       ret = sigaddset(&mask, SIGRCU);
+       urcu_posix_assert(!ret);
+       ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
+       urcu_posix_assert(!ret);
+}
+#else
+static
+void urcu_signal_unblock(void) { }
+static
+void urcu_signal_restore(void) { }
+#endif
+
 void rcu_register_thread(void)
 {
+       urcu_signal_unblock();
+
        URCU_TLS(rcu_reader).tid = pthread_self();
        urcu_posix_assert(URCU_TLS(rcu_reader).need_mb == 0);
        urcu_posix_assert(!(URCU_TLS(rcu_reader).ctr & URCU_GP_CTR_NEST_MASK));
@@ -558,6 +604,8 @@ void rcu_unregister_thread(void)
        URCU_TLS(rcu_reader).registered = 0;
        cds_list_del(&URCU_TLS(rcu_reader).node);
        mutex_unlock(&rcu_registry_lock);
+
+       urcu_signal_restore();
 }
 
 #ifdef RCU_MEMBARRIER
index b6361adafb5b97f2116ac9b864111c8cc2a55d92..1039d7297b9ce13446c4b81aee86fa217a005eab 100644 (file)
@@ -284,6 +284,7 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags,
 {
        struct urcu_workqueue *workqueue;
        int ret;
+       sigset_t newmask, oldmask;
 
        workqueue = malloc(sizeof(*workqueue));
        if (workqueue == NULL)
@@ -304,10 +305,20 @@ struct urcu_workqueue *urcu_workqueue_create(unsigned long flags,
        workqueue->cpu_affinity = cpu_affinity;
        workqueue->loop_count = 0;
        cmm_smp_mb();  /* Structure initialized before pointer is planted. */
+
+       ret = sigfillset(&newmask);
+       urcu_posix_assert(!ret);
+       ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+       urcu_posix_assert(!ret);
+
        ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue);
        if (ret) {
                urcu_die(ret);
        }
+
+       ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+       urcu_posix_assert(!ret);
+
        return workqueue;
 }
 
@@ -464,13 +475,23 @@ void urcu_workqueue_resume_worker(struct urcu_workqueue *workqueue)
 void urcu_workqueue_create_worker(struct urcu_workqueue *workqueue)
 {
        int ret;
+       sigset_t newmask, oldmask;
 
        /* Clear workqueue state from parent. */
        workqueue->flags &= ~URCU_WORKQUEUE_PAUSED;
        workqueue->flags &= ~URCU_WORKQUEUE_PAUSE;
        workqueue->tid = 0;
+
+       ret = sigfillset(&newmask);
+       urcu_posix_assert(!ret);
+       ret = pthread_sigmask(SIG_BLOCK, &newmask, &oldmask);
+       urcu_posix_assert(!ret);
+
        ret = pthread_create(&workqueue->tid, NULL, workqueue_thread, workqueue);
        if (ret) {
                urcu_die(ret);
        }
+
+       ret = pthread_sigmask(SIG_SETMASK, &oldmask, NULL);
+       urcu_posix_assert(!ret);
 }
This page took 0.033123 seconds and 4 git commands to generate.