From: Mathieu Desnoyers Date: Tue, 21 Oct 2014 12:18:50 +0000 (-0400) Subject: workqueue: only awaken a single sibling X-Git-Url: http://git.liburcu.org/?p=userspace-rcu.git;a=commitdiff_plain;h=5d30bf32449dc69a9961dbf44147c930ebe756f5 workqueue: only awaken a single sibling There is no point in awakening two siblings that will fight to be the first one to get a worker's workqueue. Just wake up one sibling at most. Signed-off-by: Mathieu Desnoyers --- diff --git a/urcu/waitqueue-lifo.h b/urcu/waitqueue-lifo.h index 4d800f0..2ccf630 100644 --- a/urcu/waitqueue-lifo.h +++ b/urcu/waitqueue-lifo.h @@ -129,10 +129,14 @@ void urcu_wait_node_init(struct urcu_wait_node *node, * throughout its execution. In this scheme, the waiter owns the node * memory, and we only allow it to free this memory when it receives the * URCU_WAIT_TEARDOWN flag. + * Return true if wakeup is performed, false if thread was already + * running. */ static inline -void urcu_adaptative_wake_up(struct urcu_wait_node *wait) +bool urcu_adaptative_wake_up(struct urcu_wait_node *wait) { + bool wakeup_performed = false; + cmm_smp_mb(); /* * "or" of WAKEUP flag rather than "set" is useful for multiple @@ -141,10 +145,13 @@ void urcu_adaptative_wake_up(struct urcu_wait_node *wait) * "value" should then be handled by the caller. */ uatomic_or(&wait->state, URCU_WAIT_WAKEUP); - if (!(uatomic_read(&wait->state) & URCU_WAIT_RUNNING)) + if (!(uatomic_read(&wait->state) & URCU_WAIT_RUNNING)) { futex_noasync(&wait->state, FUTEX_WAKE, 1, NULL, NULL, 0); + wakeup_performed = true; + } /* Allow teardown of struct urcu_wait memory. */ uatomic_or(&wait->state, URCU_WAIT_TEARDOWN); + return wakeup_performed; } /* @@ -193,7 +200,7 @@ int urcu_dequeue_wake_single(struct urcu_wait_queue *queue) { struct cds_wfs_node *node; struct urcu_wait_node *wait_node; - int wakeup_done = 0; + int ret = 0; node = __cds_wfs_pop_blocking(&queue->stack); if (!node) @@ -201,11 +208,9 @@ int urcu_dequeue_wake_single(struct urcu_wait_queue *queue) wait_node = caa_container_of(node, struct urcu_wait_node, node); CMM_STORE_SHARED(wait_node->node.next, NULL); /* Don't wake already running threads */ - if (!(wait_node->state & URCU_WAIT_RUNNING)) { - urcu_adaptative_wake_up(wait_node); - wakeup_done = 1; - } - return wakeup_done; + if (!(wait_node->state & URCU_WAIT_RUNNING)) + ret = urcu_adaptative_wake_up(wait_node); + return ret; } /* @@ -246,8 +251,8 @@ int urcu_wake_all_waiters(struct urcu_waiters *waiters) /* Don't wake already running threads */ if (wait_node->state & URCU_WAIT_RUNNING) continue; - urcu_adaptative_wake_up(wait_node); - nr_wakeup++; + if (urcu_adaptative_wake_up(wait_node)) + nr_wakeup++; } return nr_wakeup; } diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index ffdb974..1d7528d 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -220,17 +220,18 @@ int __urcu_steal_work(struct urcu_workqueue *queue, } static inline -void ___urcu_wakeup_sibling(struct urcu_worker *sibling) +bool ___urcu_wakeup_sibling(struct urcu_worker *sibling) { - urcu_adaptative_wake_up(&sibling->wait_node); + return urcu_adaptative_wake_up(&sibling->wait_node); } static inline -void __urcu_wakeup_siblings(struct urcu_workqueue *queue, +bool __urcu_wakeup_siblings(struct urcu_workqueue *queue, struct urcu_worker *worker) { struct urcu_worker *sibling_prev, *sibling_next; struct cds_list_head *sibling_node; + bool wakeup_performed = 0; if (!(worker->flags & URCU_WORKER_STEAL)) return; @@ -247,7 +248,9 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue, sibling_next = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_next != worker) - ___urcu_wakeup_sibling(sibling_next); + wakeup_performed = ___urcu_wakeup_sibling(sibling_next); + if (wakeup_performed) + goto end; sibling_node = rcu_dereference(worker->sibling_node.prev); if (sibling_node == &queue->sibling_head) @@ -255,9 +258,11 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue, sibling_prev = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_prev != worker && sibling_prev != sibling_next) - ___urcu_wakeup_sibling(sibling_prev); - + wakeup_performed = ___urcu_wakeup_sibling(sibling_prev); +end: rcu_read_unlock(); + + return wakeup_performed; } static inline @@ -324,7 +329,7 @@ do_work: * We will be busy handling the work batch, awaken siblings so * they can steal from us. */ - __urcu_wakeup_siblings(queue, worker); + (void) __urcu_wakeup_siblings(queue, worker); } static inline