workqueue: only awaken a single sibling
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 21 Oct 2014 12:18:50 +0000 (08:18 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 21 Oct 2014 12:30:38 +0000 (08:30 -0400)
There is no point in awakening two siblings that will fight to be the
first one to get a worker's workqueue. Just wake up one sibling at most.

Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
urcu/waitqueue-lifo.h
urcu/workqueue-fifo.h

index 4d800f027c3a093dd8ed83434c85dbaf002b89ff..2ccf6300ca0ccd8576daecde56a43011d971c91e 100644 (file)
@@ -129,10 +129,14 @@ void urcu_wait_node_init(struct urcu_wait_node *node,
  * throughout its execution. In this scheme, the waiter owns the node
  * memory, and we only allow it to free this memory when it receives the
  * URCU_WAIT_TEARDOWN flag.
+ * Return true if wakeup is performed, false if thread was already
+ * running.
  */
 static inline
-void urcu_adaptative_wake_up(struct urcu_wait_node *wait)
+bool urcu_adaptative_wake_up(struct urcu_wait_node *wait)
 {
+       bool wakeup_performed = false;
+
        cmm_smp_mb();
        /*
         * "or" of WAKEUP flag rather than "set" is useful for multiple
@@ -141,10 +145,13 @@ void urcu_adaptative_wake_up(struct urcu_wait_node *wait)
         * "value" should then be handled by the caller.
         */
        uatomic_or(&wait->state, URCU_WAIT_WAKEUP);
-       if (!(uatomic_read(&wait->state) & URCU_WAIT_RUNNING))
+       if (!(uatomic_read(&wait->state) & URCU_WAIT_RUNNING)) {
                futex_noasync(&wait->state, FUTEX_WAKE, 1, NULL, NULL, 0);
+               wakeup_performed = true;
+       }
        /* Allow teardown of struct urcu_wait memory. */
        uatomic_or(&wait->state, URCU_WAIT_TEARDOWN);
+       return wakeup_performed;
 }
 
 /*
@@ -193,7 +200,7 @@ int urcu_dequeue_wake_single(struct urcu_wait_queue *queue)
 {
        struct cds_wfs_node *node;
        struct urcu_wait_node *wait_node;
-       int wakeup_done = 0;
+       int ret = 0;
 
        node = __cds_wfs_pop_blocking(&queue->stack);
        if (!node)
@@ -201,11 +208,9 @@ int urcu_dequeue_wake_single(struct urcu_wait_queue *queue)
        wait_node = caa_container_of(node, struct urcu_wait_node, node);
        CMM_STORE_SHARED(wait_node->node.next, NULL);
        /* Don't wake already running threads */
-       if (!(wait_node->state & URCU_WAIT_RUNNING)) {
-               urcu_adaptative_wake_up(wait_node);
-               wakeup_done = 1;
-       }
-       return wakeup_done;
+       if (!(wait_node->state & URCU_WAIT_RUNNING))
+               ret = urcu_adaptative_wake_up(wait_node);
+       return ret;
 }
 
 /*
@@ -246,8 +251,8 @@ int urcu_wake_all_waiters(struct urcu_waiters *waiters)
                /* Don't wake already running threads */
                if (wait_node->state & URCU_WAIT_RUNNING)
                        continue;
-               urcu_adaptative_wake_up(wait_node);
-               nr_wakeup++;
+               if (urcu_adaptative_wake_up(wait_node))
+                       nr_wakeup++;
        }
        return nr_wakeup;
 }
index ffdb974bb5a720f317ff9e539e15fcfd31eef468..1d7528db7e1327b6507141f8f6fbfd992fca7571 100644 (file)
@@ -220,17 +220,18 @@ int __urcu_steal_work(struct urcu_workqueue *queue,
 }
 
 static inline
-void ___urcu_wakeup_sibling(struct urcu_worker *sibling)
+bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
 {
-       urcu_adaptative_wake_up(&sibling->wait_node);
+       return urcu_adaptative_wake_up(&sibling->wait_node);
 }
 
 static inline
-void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
+bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
                struct urcu_worker *worker)
 {
        struct urcu_worker *sibling_prev, *sibling_next;
        struct cds_list_head *sibling_node;
+       bool wakeup_performed = 0;
 
        if (!(worker->flags & URCU_WORKER_STEAL))
                return;
@@ -247,7 +248,9 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
        sibling_next = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_next != worker)
-               ___urcu_wakeup_sibling(sibling_next);
+               wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
+       if (wakeup_performed)
+               goto end;
 
        sibling_node = rcu_dereference(worker->sibling_node.prev);
        if (sibling_node == &queue->sibling_head)
@@ -255,9 +258,11 @@ void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
        sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
                        sibling_node);
        if (sibling_prev != worker && sibling_prev != sibling_next)
-               ___urcu_wakeup_sibling(sibling_prev);
-
+               wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
+end:
        rcu_read_unlock();
+
+       return wakeup_performed;
 }
 
 static inline
@@ -324,7 +329,7 @@ do_work:
         * We will be busy handling the work batch, awaken siblings so
         * they can steal from us.
         */
-       __urcu_wakeup_siblings(queue, worker);
+       (void) __urcu_wakeup_siblings(queue, worker);
 }
 
 static inline
This page took 0.027216 seconds and 4 git commands to generate.