waitqueue: add in_waitqueue field
[urcu.git] / urcu / workqueue-fifo.h
index 13d9278a5beca9baa46ea9ef3cc7f3b1981091e0..6918a8217bdb3e289404ca3983540073d9234fd8 100644 (file)
@@ -67,7 +67,9 @@ struct urcu_workqueue {
        /* Maximum number of work entries (approximate). 0 means infinite. */
        unsigned long nr_work_max;
        unsigned long nr_work;          /* Current number of work items */
-       bool shutdown;                  /* Shutdown performed */
+
+       int worker_flags;               /* Worker flags */
+       int shutdown;                   /* Shutdown performed */
 };
 
 struct urcu_worker {
@@ -91,7 +93,8 @@ enum urcu_worker_flags {
 
 static inline
 void urcu_workqueue_init(struct urcu_workqueue *queue,
-               unsigned long max_queue_len)
+               unsigned long max_queue_len,
+               int worker_flags)
 {
        __cds_wfcq_init(&queue->head, &queue->tail);
        urcu_wait_queue_init(&queue->waitqueue);
@@ -156,14 +159,14 @@ void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
 
 static inline
 void urcu_worker_init(struct urcu_workqueue *queue,
-               struct urcu_worker *worker, int flags)
+               struct urcu_worker *worker)
 {
        cds_wfcq_init(&worker->head, &worker->tail);
-       worker->flags = flags;
        urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
        worker->own = NULL;
        worker->wait_node.node.next = NULL;
        worker->queue = queue;
+       worker->flags = queue->worker_flags;
 }
 
 static inline
@@ -192,7 +195,7 @@ void urcu_worker_unregister(struct urcu_workqueue *queue,
        /*
         * Make sure we are removed from waitqueue.
         */
-       if (CMM_LOAD_SHARED(worker->wait_node.node.next))
+       if (urcu_in_waitqueue(&worker->wait_node))
                __urcu_workqueue_wakeup_all(queue);
 
        /*
@@ -401,14 +404,9 @@ enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
                return URCU_ACCEPT_SHUTDOWN;
        urcu_wait_set_state(&worker->wait_node,
                        URCU_WAIT_WAITING);
-       if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
+       if (!urcu_in_waitqueue(&worker->wait_node)) {
                int was_empty;
 
-               /*
-                * NULL next pointer. We are therefore not in
-                * the queue.
-                */
-               cds_lfs_node_init(&worker->wait_node.node);
                /* Protect stack dequeue against ABA */
                synchronize_rcu();
                was_empty = !urcu_wait_add(&queue->waitqueue,
@@ -430,13 +428,11 @@ enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
                }
        } else {
                /*
-                * Non-NULL next pointer. We are therefore in
-                * the queue, or the dispatcher just removed us
-                * from it (after we read the next pointer), and
-                * is therefore awakening us. The state will
-                * therefore have been changed from WAITING to
-                * some other state, which will let the busy
-                * wait pass through.
+                * We are in the queue, or the dispatcher just removed
+                * us from it (after we read the next pointer), and is
+                * therefore awakening us. The state will therefore have
+                * been changed from WAITING to some other state, which
+                * will let the busy wait pass through.
                 */
        }
        urcu_adaptative_busy_wait(&worker->wait_node);
@@ -499,4 +495,31 @@ void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
        __urcu_workqueue_wakeup_all(queue);
 }
 
+/*
+ * Use to let dispatcher steal work from the entire queue in case of
+ * stall. The "worker" parameter need to be intialized, but is usually
+ * not registered.
+ */
+static inline
+bool urcu_workqueue_steal_all(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
+{
+       struct urcu_worker *sibling;
+       bool has_work = false;
+
+       if (worker->flags & URCU_WORKER_STEAL) {
+               rcu_read_lock();
+               /* Steal from each worker */
+               cds_list_for_each_entry_rcu(sibling, &queue->sibling_head,
+                               sibling_node)
+                       has_work |= ___urcu_grab_work(worker, &sibling->head,
+                                               &sibling->tail, 1);
+               rcu_read_unlock();
+       }
+
+       /* Steal from global workqueue */
+       has_work |= ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
+       return has_work;
+}
+
 #endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.046449 seconds and 4 git commands to generate.