workqueue: preserve FIFO work order
[userspace-rcu.git] / urcu / workqueue-fifo.h
index 65d3a2e222cc0fbe991c986bce41f4e35722b309..bc19967a6a062eebc5d9c2d33598caab3f2a8f64 100644 (file)
@@ -29,6 +29,7 @@
 #include <urcu/wfcqueue.h>
 #include <urcu/rculist.h>
 #include <pthread.h>
+#include <assert.h>
 
 /*
  * We use RCU to steal work from siblings. Therefore, one of RCU flavors
@@ -170,26 +171,37 @@ void urcu_worker_unregister(struct urcu_workqueue *queue,
  * Try stealing work from siblings when we have nothing to do.
  */
 static inline
-void ___urcu_steal_work(struct urcu_worker *worker,
+bool ___urcu_steal_work(struct urcu_worker *worker,
                struct urcu_worker *sibling)
 {
+       enum cds_wfcq_ret splice_ret;
+
+       /*
+        * Don't bother grabbing the sibling queue lock if it is empty.
+        */
+       if (cds_wfcq_empty(&sibling->head, &sibling->tail))
+               return false;
        cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
-       (void) __cds_wfcq_splice_blocking(&worker->head,
+       splice_ret = __cds_wfcq_splice_blocking(&worker->head,
                        &worker->tail,
                        &sibling->head,
                        &sibling->tail);
        cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
+       /* Ensure that we preserve FIFO work order. */
+       assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
+       return splice_ret != CDS_WFCQ_RET_SRC_EMPTY;
 }
 
 static inline
-int __urcu_steal_work(struct urcu_workqueue *queue,
+bool __urcu_steal_work(struct urcu_workqueue *queue,
                struct urcu_worker *worker)
 {
        struct urcu_worker *sibling_prev, *sibling_next;
        struct cds_list_head *sibling_node;
+       bool steal_performed = 0;
 
        if (!(worker->flags & URCU_WORKER_STEAL))
-               return 0;
+               return false;
 
        rcu_read_lock();
 
@@ -199,7 +211,9 @@ int __urcu_steal_work(struct urcu_workqueue *queue,
        sibling_next = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_next != worker)
-               ___urcu_steal_work(worker, sibling_next);
+               steal_performed = ___urcu_steal_work(worker, sibling_next);
+       if (steal_performed)
+               goto end;
 
        sibling_node = rcu_dereference(worker->sibling_node.prev);
        if (sibling_node == &queue->sibling_head)
@@ -207,25 +221,26 @@ int __urcu_steal_work(struct urcu_workqueue *queue,
        sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_prev != worker && sibling_prev != sibling_next)
-               ___urcu_steal_work(worker, sibling_prev);
-
+               steal_performed =  ___urcu_steal_work(worker, sibling_prev);
+end:
        rcu_read_unlock();
 
-       return !cds_wfcq_empty(&worker->head, &worker->tail);
+       return steal_performed;
 }
 
 static inline
-void ___urcu_wakeup_sibling(struct urcu_worker *sibling)
+bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
 {
-       urcu_adaptative_wake_up(&sibling->wait_node);
+       return urcu_adaptative_wake_up(&sibling->wait_node);
 }
 
 static inline
-void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
+bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
                struct urcu_worker *worker)
 {
        struct urcu_worker *sibling_prev, *sibling_next;
        struct cds_list_head *sibling_node;
+       bool wakeup_performed = 0;
 
        if (!(worker->flags & URCU_WORKER_STEAL))
                return;
@@ -242,7 +257,9 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
        sibling_next = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_next != worker)
-               ___urcu_wakeup_sibling(sibling_next);
+               wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
+       if (wakeup_performed)
+               goto end;
 
        sibling_node = rcu_dereference(worker->sibling_node.prev);
        if (sibling_node == &queue->sibling_head)
@@ -250,9 +267,11 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
        sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_prev != worker && sibling_prev != sibling_next)
-               ___urcu_wakeup_sibling(sibling_prev);
-
+               wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
+end:
        rcu_read_unlock();
+
+       return wakeup_performed;
 }
 
 static inline
@@ -319,7 +338,7 @@ do_work:
         * We will be busy handling the work batch, awaken siblings so
         * they can steal from us.
         */
-       __urcu_wakeup_siblings(queue, worker);
+       (void) __urcu_wakeup_siblings(queue, worker);
 }
 
 static inline
@@ -331,12 +350,19 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
         * If we are registered for work stealing, we need to dequeue
         * safely against siblings.
         */
-       if (worker->flags & URCU_WORKER_STEAL)
+       if (worker->flags & URCU_WORKER_STEAL) {
+               /*
+                * Don't bother grabbing the worker queue lock if it is
+                * empty.
+                */
+               if (cds_wfcq_empty(&worker->head, &worker->tail))
+                       return NULL;
                node = cds_wfcq_dequeue_blocking(&worker->head,
                                &worker->tail);
-       else
+       } else {
                node = ___cds_wfcq_dequeue_with_state(&worker->head,
                                &worker->tail, NULL, 1, 0);
+       }
        if (!node)
                return NULL;
        return caa_container_of(node, struct urcu_work, node);
This page took 0.024664 seconds and 4 git commands to generate.