workqueue: preserve FIFO work order
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 21 Oct 2014 12:32:43 +0000 (08:32 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 21 Oct 2014 12:32:43 +0000 (08:32 -0400)
Never merge work from a sibling into a non-empty worker queue, because
doing so does not keep FIFO ordering of the work.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
urcu/workqueue-fifo.h

index 1d7528db7e1327b6507141f8f6fbfd992fca7571..bc19967a6a062eebc5d9c2d33598caab3f2a8f64 100644 (file)
@@ -29,6 +29,7 @@
 #include <urcu/wfcqueue.h>
 #include <urcu/rculist.h>
 #include <pthread.h>
+#include <assert.h>
 
 /*
  * We use RCU to steal work from siblings. Therefore, one of RCU flavors
@@ -170,31 +171,37 @@ void urcu_worker_unregister(struct urcu_workqueue *queue,
  * Try stealing work from siblings when we have nothing to do.
  */
 static inline
-void ___urcu_steal_work(struct urcu_worker *worker,
+bool ___urcu_steal_work(struct urcu_worker *worker,
                struct urcu_worker *sibling)
 {
+       enum cds_wfcq_ret splice_ret;
+
        /*
         * Don't bother grabbing the sibling queue lock if it is empty.
         */
        if (cds_wfcq_empty(&sibling->head, &sibling->tail))
-               return;
+               return false;
        cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
-       (void) __cds_wfcq_splice_blocking(&worker->head,
+       splice_ret = __cds_wfcq_splice_blocking(&worker->head,
                        &worker->tail,
                        &sibling->head,
                        &sibling->tail);
        cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
+       /* Ensure that we preserve FIFO work order. */
+       assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
+       return splice_ret != CDS_WFCQ_RET_SRC_EMPTY;
 }
 
 static inline
-int __urcu_steal_work(struct urcu_workqueue *queue,
+bool __urcu_steal_work(struct urcu_workqueue *queue,
                struct urcu_worker *worker)
 {
        struct urcu_worker *sibling_prev, *sibling_next;
        struct cds_list_head *sibling_node;
+       bool steal_performed = 0;
 
        if (!(worker->flags & URCU_WORKER_STEAL))
-               return 0;
+               return false;
 
        rcu_read_lock();
 
@@ -204,7 +211,9 @@ int __urcu_steal_work(struct urcu_workqueue *queue,
        sibling_next = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_next != worker)
-               ___urcu_steal_work(worker, sibling_next);
+               steal_performed = ___urcu_steal_work(worker, sibling_next);
+       if (steal_performed)
+               goto end;
 
        sibling_node = rcu_dereference(worker->sibling_node.prev);
        if (sibling_node == &queue->sibling_head)
@@ -212,11 +221,11 @@ int __urcu_steal_work(struct urcu_workqueue *queue,
        sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_prev != worker && sibling_prev != sibling_next)
-               ___urcu_steal_work(worker, sibling_prev);
-
+               steal_performed =  ___urcu_steal_work(worker, sibling_prev);
+end:
        rcu_read_unlock();
 
-       return !cds_wfcq_empty(&worker->head, &worker->tail);
+       return steal_performed;
 }
 
 static inline
This page took 0.029361 seconds and 4 git commands to generate.