workqueue: ensure worker is removed from waitqueue upon unregister
[userspace-rcu.git] / urcu / workqueue-fifo.h
index bc19967a6a062eebc5d9c2d33598caab3f2a8f64..1292e04d6a173f6a276d6fab70dfaca004f7f26d 100644 (file)
  */
 
 #include <urcu/uatomic.h>
-#include <urcu/wfstack.h>
+#include <urcu/lfstack.h>
 #include <urcu/waitqueue-lifo.h>
 #include <urcu/wfcqueue.h>
 #include <urcu/rculist.h>
 #include <pthread.h>
 #include <assert.h>
 
+enum urcu_accept_ret {
+       URCU_ACCEPT_WORK        = 0,
+       URCU_ACCEPT_SHUTDOWN    = 1,
+};
+
 /*
  * We use RCU to steal work from siblings. Therefore, one of RCU flavors
  * need to be included before this header. All worker that participate
@@ -53,6 +58,8 @@ struct urcu_workqueue {
        /* RCU linked list head of siblings for work stealing. */
        struct cds_list_head sibling_head;
        pthread_mutex_t sibling_lock;   /* Protect sibling list updates */
+
+       bool shutdown;                  /* Shutdown performed */
 };
 
 struct urcu_worker {
@@ -75,6 +82,7 @@ void urcu_workqueue_init(struct urcu_workqueue *queue)
        __cds_wfcq_init(&queue->head, &queue->tail);
        urcu_wait_queue_init(&queue->waitqueue);
        CDS_INIT_LIST_HEAD(&queue->sibling_head);
+       queue->shutdown = false;
 }
 
 static inline
@@ -100,16 +108,22 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
         * worker threads when threads are busy enough to still be
         * running when work is enqueued.
         */
-       if (was_empty)
+       if (was_empty) {
+               rcu_read_lock();        /* Protect stack dequeue */
                (void) urcu_dequeue_wake_single(&queue->waitqueue);
+               rcu_read_unlock();      /* Protect stack dequeue */
+       }
 }
 
 static inline
-void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
+void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
 {
        struct urcu_waiters waiters;
 
+       rcu_read_lock();        /* Protect stack dequeue */
        urcu_move_waiters(&waiters, &queue->waitqueue);
+       rcu_read_unlock();      /* Protect stack dequeue */
+
        (void) urcu_wake_all_waiters(&waiters);
 }
 
@@ -142,14 +156,14 @@ void urcu_worker_unregister(struct urcu_workqueue *queue,
                pthread_mutex_lock(&queue->sibling_lock);
                cds_list_del_rcu(&worker->sibling_node);
                pthread_mutex_unlock(&queue->sibling_lock);
-
-               /*
-                * Wait for grace period before freeing or reusing
-                * "worker" because used by RCU linked list.
-                */
-               synchronize_rcu();
        }
 
+       /*
+        * Make sure we are removed from waitqueue.
+        */
+       if (CMM_LOAD_SHARED(worker->wait_node.node.next))
+               __urcu_workqueue_wakeup_all(queue);
+
        /*
         * Put any local work we still have back into the workqueue.
         */
@@ -163,8 +177,19 @@ void urcu_worker_unregister(struct urcu_workqueue *queue,
                 * Wakeup worker thread if we have put work back into
                 * workqueue that was previously empty.
                 */
+               rcu_read_lock();        /* Protect stack dequeue */
                (void) urcu_dequeue_wake_single(&queue->waitqueue);
+               rcu_read_unlock();      /* Protect stack dequeue */
        }
+
+       /*
+        * Wait for grace period before freeing or reusing
+        * "worker" because used by RCU linked list.
+        * Also prevents ABA for waitqueue stack dequeue: matches RCU
+        * read-side critical sections around dequeue and move all
+        * operations on waitqueue).
+        */
+       synchronize_rcu();
 }
 
 /*
@@ -181,12 +206,10 @@ bool ___urcu_steal_work(struct urcu_worker *worker,
         */
        if (cds_wfcq_empty(&sibling->head, &sibling->tail))
                return false;
-       cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
-       splice_ret = __cds_wfcq_splice_blocking(&worker->head,
+       splice_ret = cds_wfcq_splice_blocking(&worker->head,
                        &worker->tail,
                        &sibling->head,
                        &sibling->tail);
-       cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
        /* Ensure that we preserve FIFO work order. */
        assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
        return splice_ret != CDS_WFCQ_RET_SRC_EMPTY;
@@ -275,9 +298,8 @@ end:
 }
 
 static inline
-void urcu_accept_work(struct urcu_workqueue *queue,
-               struct urcu_worker *worker,
-               int blocking)
+enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
 {
        enum cds_wfcq_ret wfcq_ret;
 
@@ -293,8 +315,9 @@ void urcu_accept_work(struct urcu_workqueue *queue,
        /* Try to steal work from sibling instead of blocking */
        if (__urcu_steal_work(queue, worker))
                goto do_work;
-       if (!blocking)
-               return;
+       /* No more work to do, check shutdown state */
+       if (CMM_LOAD_SHARED(queue->shutdown))
+               return URCU_ACCEPT_SHUTDOWN;
        urcu_wait_set_state(&worker->wait_node,
                        URCU_WAIT_WAITING);
        if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
@@ -304,7 +327,9 @@ void urcu_accept_work(struct urcu_workqueue *queue,
                 * NULL next pointer. We are therefore not in
                 * the queue.
                 */
-               cds_wfs_node_init(&worker->wait_node.node);
+               cds_lfs_node_init(&worker->wait_node.node);
+               /* Protect stack dequeue against ABA */
+               synchronize_rcu();
                was_empty = !urcu_wait_add(&queue->waitqueue,
                                &worker->wait_node);
                /*
@@ -317,8 +342,11 @@ void urcu_accept_work(struct urcu_workqueue *queue,
                 * a wake up.
                 */
                if (was_empty && !cds_wfcq_empty(&queue->head,
-                                               &queue->tail))
+                                               &queue->tail)) {
+                       rcu_read_lock();        /* Protect stack dequeue */
                        (void) urcu_dequeue_wake_single(&queue->waitqueue);
+                       rcu_read_unlock();      /* Protect stack dequeue */
+               }
        } else {
                /*
                 * Non-NULL next pointer. We are therefore in
@@ -339,6 +367,7 @@ do_work:
         * they can steal from us.
         */
        (void) __urcu_wakeup_siblings(queue, worker);
+       return URCU_ACCEPT_WORK;
 }
 
 static inline
@@ -368,4 +397,13 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
        return caa_container_of(node, struct urcu_work, node);
 }
 
+static inline
+void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
+{
+       /* Set shutdown */
+       CMM_STORE_SHARED(queue->shutdown, true);
+       /* Wakeup all workers */
+       __urcu_workqueue_wakeup_all(queue);
+}
+
 #endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.024576 seconds and 4 git commands to generate.