X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=urcu%2Fworkqueue-fifo.h;h=bc19967a6a062eebc5d9c2d33598caab3f2a8f64;hb=e10c65b3279d1868a7ef83588bb91dd46cfe428a;hp=ffdb974bb5a720f317ff9e539e15fcfd31eef468;hpb=30926570a4ccf3050a4493316a7f23620495bcc8;p=userspace-rcu.git diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index ffdb974..bc19967 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -29,6 +29,7 @@ #include #include #include +#include /* * We use RCU to steal work from siblings. Therefore, one of RCU flavors @@ -170,31 +171,37 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, * Try stealing work from siblings when we have nothing to do. */ static inline -void ___urcu_steal_work(struct urcu_worker *worker, +bool ___urcu_steal_work(struct urcu_worker *worker, struct urcu_worker *sibling) { + enum cds_wfcq_ret splice_ret; + /* * Don't bother grabbing the sibling queue lock if it is empty. */ if (cds_wfcq_empty(&sibling->head, &sibling->tail)) - return; + return false; cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail); - (void) __cds_wfcq_splice_blocking(&worker->head, + splice_ret = __cds_wfcq_splice_blocking(&worker->head, &worker->tail, &sibling->head, &sibling->tail); cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail); + /* Ensure that we preserve FIFO work order. */ + assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY); + return splice_ret != CDS_WFCQ_RET_SRC_EMPTY; } static inline -int __urcu_steal_work(struct urcu_workqueue *queue, +bool __urcu_steal_work(struct urcu_workqueue *queue, struct urcu_worker *worker) { struct urcu_worker *sibling_prev, *sibling_next; struct cds_list_head *sibling_node; + bool steal_performed = 0; if (!(worker->flags & URCU_WORKER_STEAL)) - return 0; + return false; rcu_read_lock(); @@ -204,7 +211,9 @@ int __urcu_steal_work(struct urcu_workqueue *queue, sibling_next = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_next != worker) - ___urcu_steal_work(worker, sibling_next); + steal_performed = ___urcu_steal_work(worker, sibling_next); + if (steal_performed) + goto end; sibling_node = rcu_dereference(worker->sibling_node.prev); if (sibling_node == &queue->sibling_head) @@ -212,25 +221,26 @@ int __urcu_steal_work(struct urcu_workqueue *queue, sibling_prev = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_prev != worker && sibling_prev != sibling_next) - ___urcu_steal_work(worker, sibling_prev); - + steal_performed = ___urcu_steal_work(worker, sibling_prev); +end: rcu_read_unlock(); - return !cds_wfcq_empty(&worker->head, &worker->tail); + return steal_performed; } static inline -void ___urcu_wakeup_sibling(struct urcu_worker *sibling) +bool ___urcu_wakeup_sibling(struct urcu_worker *sibling) { - urcu_adaptative_wake_up(&sibling->wait_node); + return urcu_adaptative_wake_up(&sibling->wait_node); } static inline -void __urcu_wakeup_siblings(struct urcu_workqueue *queue, +bool __urcu_wakeup_siblings(struct urcu_workqueue *queue, struct urcu_worker *worker) { struct urcu_worker *sibling_prev, *sibling_next; struct cds_list_head *sibling_node; + bool wakeup_performed = 0; if (!(worker->flags & URCU_WORKER_STEAL)) return; @@ -247,7 +257,9 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue, sibling_next = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_next != worker) - ___urcu_wakeup_sibling(sibling_next); + wakeup_performed = ___urcu_wakeup_sibling(sibling_next); + if (wakeup_performed) + goto end; sibling_node = rcu_dereference(worker->sibling_node.prev); if (sibling_node == &queue->sibling_head) @@ -255,9 +267,11 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue, sibling_prev = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_prev != worker && sibling_prev != sibling_next) - ___urcu_wakeup_sibling(sibling_prev); - + wakeup_performed = ___urcu_wakeup_sibling(sibling_prev); +end: rcu_read_unlock(); + + return wakeup_performed; } static inline @@ -324,7 +338,7 @@ do_work: * We will be busy handling the work batch, awaken siblings so * they can steal from us. */ - __urcu_wakeup_siblings(queue, worker); + (void) __urcu_wakeup_siblings(queue, worker); } static inline