Implement waitqueue and workqueue APIs
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sat, 18 Oct 2014 14:38:32 +0000 (16:38 +0200)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Sun, 19 Oct 2014 15:31:40 +0000 (17:31 +0200)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Makefile.am
urcu-qsbr.c
urcu-wait.h [deleted file]
urcu.c
urcu/static/wfcqueue.h
urcu/waitqueue-lifo.h [new file with mode: 0644]
urcu/workqueue-fifo.h [new file with mode: 0644]

index 752510d717783c9d0868c6701767613b617ba357..ab62bbf01106daae58f3033f3cb643f73913a74a 100644 (file)
@@ -21,13 +21,14 @@ nobase_dist_include_HEADERS = urcu/compiler.h urcu/hlist.h urcu/list.h \
                urcu/ref.h urcu/cds.h urcu/urcu_ref.h urcu/urcu-futex.h \
                urcu/uatomic_arch.h urcu/rculfhash.h urcu/wfcqueue.h \
                urcu/lfstack.h urcu/syscall-compat.h \
+               urcu/waitqueue-lifo.h urcu/workqueue-fifo.h \
                $(top_srcdir)/urcu/map/*.h \
                $(top_srcdir)/urcu/static/*.h \
                urcu/rand-compat.h \
                urcu/tls-compat.h
 nobase_nodist_include_HEADERS = urcu/arch.h urcu/uatomic.h urcu/config.h
 
-dist_noinst_HEADERS = urcu-die.h urcu-wait.h
+dist_noinst_HEADERS = urcu-die.h
 
 EXTRA_DIST = $(top_srcdir)/urcu/arch/*.h $(top_srcdir)/urcu/uatomic/*.h \
                gpl-2.0.txt lgpl-2.1.txt lgpl-relicensing.txt \
index 71e7a39e6f73795026e054bf8f0620772ec9a70f..d7178f43215dd0e70b45ccac3f43317aaf44035e 100644 (file)
@@ -41,9 +41,9 @@
 #include "urcu/static/urcu-qsbr.h"
 #include "urcu-pointer.h"
 #include "urcu/tls-compat.h"
+#include "urcu/waitqueue-lifo.h"
 
 #include "urcu-die.h"
-#include "urcu-wait.h"
 
 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
 #undef _LGPL_SOURCE
diff --git a/urcu-wait.h b/urcu-wait.h
deleted file mode 100644 (file)
index d00842a..0000000
+++ /dev/null
@@ -1,185 +0,0 @@
-#ifndef _URCU_WAIT_H
-#define _URCU_WAIT_H
-
-/*
- * urcu-wait.h
- *
- * Userspace RCU library wait/wakeup management
- *
- * Copyright (c) 2012 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
-
-#include <urcu/uatomic.h>
-#include <urcu/wfstack.h>
-
-/*
- * Number of busy-loop attempts before waiting on futex for grace period
- * batching.
- */
-#define URCU_WAIT_ATTEMPTS 1000
-
-enum urcu_wait_state {
-       /* URCU_WAIT_WAITING is compared directly (futex compares it). */
-       URCU_WAIT_WAITING =     0,
-       /* non-zero are used as masks. */
-       URCU_WAIT_WAKEUP =      (1 << 0),
-       URCU_WAIT_RUNNING =     (1 << 1),
-       URCU_WAIT_TEARDOWN =    (1 << 2),
-};
-
-struct urcu_wait_node {
-       struct cds_wfs_node node;
-       int32_t state;  /* enum urcu_wait_state */
-};
-
-#define URCU_WAIT_NODE_INIT(name, _state)              \
-       { .state = _state }
-
-#define DEFINE_URCU_WAIT_NODE(name, state)             \
-       struct urcu_wait_node name = URCU_WAIT_NODE_INIT(name, state)
-
-#define DECLARE_URCU_WAIT_NODE(name)                   \
-       struct urcu_wait_node name
-
-struct urcu_wait_queue {
-       struct cds_wfs_stack stack;
-};
-
-#define URCU_WAIT_QUEUE_HEAD_INIT(name)                        \
-       { .stack.head = CDS_WFS_END, .stack.lock = PTHREAD_MUTEX_INITIALIZER }
-
-#define DECLARE_URCU_WAIT_QUEUE(name)                  \
-       struct urcu_wait_queue name
-
-#define DEFINE_URCU_WAIT_QUEUE(name)                   \
-       struct urcu_wait_queue name = URCU_WAIT_QUEUE_HEAD_INIT(name)
-
-struct urcu_waiters {
-       struct cds_wfs_head *head;
-};
-
-/*
- * Add ourself atomically to a wait queue. Return 0 if queue was
- * previously empty, else return 1.
- * A full memory barrier is issued before being added to the wait queue.
- */
-static inline
-bool urcu_wait_add(struct urcu_wait_queue *queue,
-               struct urcu_wait_node *node)
-{
-       return cds_wfs_push(&queue->stack, &node->node);
-}
-
-/*
- * Atomically move all waiters from wait queue into our local struct
- * urcu_waiters.
- */
-static inline
-void urcu_move_waiters(struct urcu_waiters *waiters,
-               struct urcu_wait_queue *queue)
-{
-       waiters->head = __cds_wfs_pop_all(&queue->stack);
-}
-
-static inline
-void urcu_wait_set_state(struct urcu_wait_node *node,
-               enum urcu_wait_state state)
-{
-       node->state = state;
-}
-
-static inline
-void urcu_wait_node_init(struct urcu_wait_node *node,
-               enum urcu_wait_state state)
-{
-       urcu_wait_set_state(node, state);
-       cds_wfs_node_init(&node->node);
-}
-
-/*
- * Note: urcu_adaptative_wake_up needs "value" to stay allocated
- * throughout its execution. In this scheme, the waiter owns the node
- * memory, and we only allow it to free this memory when it receives the
- * URCU_WAIT_TEARDOWN flag.
- */
-static inline
-void urcu_adaptative_wake_up(struct urcu_wait_node *wait)
-{
-       cmm_smp_mb();
-       assert(uatomic_read(&wait->state) == URCU_WAIT_WAITING);
-       uatomic_set(&wait->state, URCU_WAIT_WAKEUP);
-       if (!(uatomic_read(&wait->state) & URCU_WAIT_RUNNING))
-               futex_noasync(&wait->state, FUTEX_WAKE, 1, NULL, NULL, 0);
-       /* Allow teardown of struct urcu_wait memory. */
-       uatomic_or(&wait->state, URCU_WAIT_TEARDOWN);
-}
-
-/*
- * Caller must initialize "value" to URCU_WAIT_WAITING before passing its
- * memory to waker thread.
- */
-static inline
-void urcu_adaptative_busy_wait(struct urcu_wait_node *wait)
-{
-       unsigned int i;
-
-       /* Load and test condition before read state */
-       cmm_smp_rmb();
-       for (i = 0; i < URCU_WAIT_ATTEMPTS; i++) {
-               if (uatomic_read(&wait->state) != URCU_WAIT_WAITING)
-                       goto skip_futex_wait;
-               caa_cpu_relax();
-       }
-       futex_noasync(&wait->state, FUTEX_WAIT,
-               URCU_WAIT_WAITING, NULL, NULL, 0);
-skip_futex_wait:
-
-       /* Tell waker thread than we are running. */
-       uatomic_or(&wait->state, URCU_WAIT_RUNNING);
-
-       /*
-        * Wait until waker thread lets us know it's ok to tear down
-        * memory allocated for struct urcu_wait.
-        */
-       for (i = 0; i < URCU_WAIT_ATTEMPTS; i++) {
-               if (uatomic_read(&wait->state) & URCU_WAIT_TEARDOWN)
-                       break;
-               caa_cpu_relax();
-       }
-       while (!(uatomic_read(&wait->state) & URCU_WAIT_TEARDOWN))
-               poll(NULL, 0, 10);
-       assert(uatomic_read(&wait->state) & URCU_WAIT_TEARDOWN);
-}
-
-static inline
-void urcu_wake_all_waiters(struct urcu_waiters *waiters)
-{
-       struct cds_wfs_node *iter, *iter_n;
-
-       /* Wake all waiters in our stack head */
-       cds_wfs_for_each_blocking_safe(waiters->head, iter, iter_n) {
-               struct urcu_wait_node *wait_node =
-                       caa_container_of(iter, struct urcu_wait_node, node);
-
-               /* Don't wake already running threads */
-               if (wait_node->state & URCU_WAIT_RUNNING)
-                       continue;
-               urcu_adaptative_wake_up(wait_node);
-       }
-}
-
-#endif /* _URCU_WAIT_H */
diff --git a/urcu.c b/urcu.c
index ae3490f7cb2cdc1f7cdb1d26f77c0ee22620e9eb..0241d3b652d14526c7217260daa596735689154b 100644 (file)
--- a/urcu.c
+++ b/urcu.c
@@ -41,9 +41,9 @@
 #include "urcu/static/urcu.h"
 #include "urcu-pointer.h"
 #include "urcu/tls-compat.h"
+#include "urcu/waitqueue-lifo.h"
 
 #include "urcu-die.h"
-#include "urcu-wait.h"
 
 /* Do not #define _LGPL_SOURCE to ensure we can emit the wrapper symbols */
 #undef _LGPL_SOURCE
index 48b2625dd3a4d0b28450dac4d7d0f7df921028c7..b2ad7ab6957699d36ca3100737b8086683b4165b 100644 (file)
@@ -367,7 +367,8 @@ static inline struct cds_wfcq_node *
 ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head,
                struct cds_wfcq_tail *tail,
                int *state,
-               int blocking)
+               int blocking,
+               int safe)
 {
        struct __cds_wfcq_head *head = u_head._h;
        struct cds_wfcq_node *node, *next;
@@ -385,6 +386,8 @@ ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head,
        }
 
        if ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
+               struct cds_wfcq_node *ret_node;
+
                /*
                 * @node is probably the only node in the queue.
                 * Try to move the tail to &q->head.
@@ -400,7 +403,15 @@ ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head,
                 * content.
                 */
                _cds_wfcq_node_init(&head->node);
-               if (uatomic_cmpxchg(&tail->p, node, &head->node) == node) {
+               if (safe) {
+                       ret_node = uatomic_cmpxchg(&tail->p, node,
+                                       &head->node);
+               } else {
+                       ret_node = tail->p;
+                       if (ret_node == node)
+                               tail->p = &head->node;
+               }
+               if (ret_node == node) {
                        if (state)
                                *state |= CDS_WFCQ_STATE_LAST;
                        return node;
@@ -440,7 +451,7 @@ static inline struct cds_wfcq_node *
 ___cds_wfcq_dequeue_with_state_blocking(cds_wfcq_head_ptr_t head,
                struct cds_wfcq_tail *tail, int *state)
 {
-       return ___cds_wfcq_dequeue_with_state(head, tail, state, 1);
+       return ___cds_wfcq_dequeue_with_state(head, tail, state, 1, 1);
 }
 
 /*
@@ -466,7 +477,7 @@ static inline struct cds_wfcq_node *
 ___cds_wfcq_dequeue_with_state_nonblocking(cds_wfcq_head_ptr_t head,
                struct cds_wfcq_tail *tail, int *state)
 {
-       return ___cds_wfcq_dequeue_with_state(head, tail, state, 0);
+       return ___cds_wfcq_dequeue_with_state(head, tail, state, 0, 1);
 }
 
 /*
diff --git a/urcu/waitqueue-lifo.h b/urcu/waitqueue-lifo.h
new file mode 100644 (file)
index 0000000..4d800f0
--- /dev/null
@@ -0,0 +1,255 @@
+#ifndef _URCU_WAITQUEUE_LIFO_H
+#define _URCU_WAITQUEUE_LIFO_H
+
+/*
+ * urcu/waitqueue-lifo.h
+ *
+ * Userspace RCU library - wait queue scheme with LIFO semantic
+ *
+ * Copyright (c) 2012-2014 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <urcu/uatomic.h>
+#include <urcu/wfstack.h>
+#include <urcu/futex.h>
+
+/*
+ * Number of busy-loop attempts before waiting on futex for grace period
+ * batching.
+ */
+#define URCU_WAIT_ATTEMPTS 1000
+
+enum urcu_wait_state {
+       /* URCU_WAIT_WAITING is compared directly (futex compares it). */
+       URCU_WAIT_WAITING =     0,
+       /* non-zero are used as masks. */
+       URCU_WAIT_WAKEUP =      (1 << 0),
+       URCU_WAIT_RUNNING =     (1 << 1),
+       URCU_WAIT_TEARDOWN =    (1 << 2),
+};
+
+struct urcu_wait_node {
+       struct cds_wfs_node node;
+       int32_t state;  /* enum urcu_wait_state */
+};
+
+#define URCU_WAIT_NODE_INIT(name, _state)              \
+       { .state = _state }
+
+#define DEFINE_URCU_WAIT_NODE(name, state)             \
+       struct urcu_wait_node name = URCU_WAIT_NODE_INIT(name, state)
+
+#define DECLARE_URCU_WAIT_NODE(name)                   \
+       struct urcu_wait_node name
+
+struct urcu_wait_queue {
+       struct __cds_wfs_stack stack;
+};
+
+#define URCU_WAIT_QUEUE_HEAD_INIT(name)                        \
+       { .stack.head = CDS_WFS_END, }
+
+#define DECLARE_URCU_WAIT_QUEUE(name)                  \
+       struct urcu_wait_queue name
+
+#define DEFINE_URCU_WAIT_QUEUE(name)                   \
+       struct urcu_wait_queue name = URCU_WAIT_QUEUE_HEAD_INIT(name)
+
+static inline
+void urcu_wait_queue_init(struct urcu_wait_queue *queue)
+{
+       __cds_wfs_init(&queue->stack);
+}
+
+struct urcu_waiters {
+       struct cds_wfs_head *head;
+};
+
+/*
+ * Add ourself atomically to a wait queue. Return 0 if queue was
+ * previously empty, else return 1.
+ * A full memory barrier is issued before being added to the wait queue.
+ */
+static inline
+bool urcu_wait_add(struct urcu_wait_queue *queue,
+               struct urcu_wait_node *node)
+{
+       return cds_wfs_push(&queue->stack, &node->node);
+}
+
+/*
+ * Atomically move all waiters from wait queue into our local struct
+ * urcu_waiters.
+ */
+static inline
+void urcu_move_waiters(struct urcu_waiters *waiters,
+               struct urcu_wait_queue *queue)
+{
+       waiters->head = __cds_wfs_pop_all(&queue->stack);
+}
+
+static inline
+void urcu_wait_set_state(struct urcu_wait_node *node,
+               enum urcu_wait_state state)
+{
+       node->state = state;
+}
+
+static inline
+void urcu_wait_or_state(struct urcu_wait_node *node,
+               enum urcu_wait_state state)
+{
+       uatomic_or(&node->state, state);
+}
+
+static inline
+void urcu_wait_node_init(struct urcu_wait_node *node,
+               enum urcu_wait_state state)
+{
+       urcu_wait_set_state(node, state);
+       cds_wfs_node_init(&node->node);
+}
+
+/*
+ * Note: urcu_adaptative_wake_up needs "value" to stay allocated
+ * throughout its execution. In this scheme, the waiter owns the node
+ * memory, and we only allow it to free this memory when it receives the
+ * URCU_WAIT_TEARDOWN flag.
+ */
+static inline
+void urcu_adaptative_wake_up(struct urcu_wait_node *wait)
+{
+       cmm_smp_mb();
+       /*
+        * "or" of WAKEUP flag rather than "set" is useful for multiple
+        * concurrent wakeup sources. Note that "WAIT_TEARDOWN" becomes
+        * useless when we use multiple wakeup sources: lifetime of the
+        * "value" should then be handled by the caller.
+        */
+       uatomic_or(&wait->state, URCU_WAIT_WAKEUP);
+       if (!(uatomic_read(&wait->state) & URCU_WAIT_RUNNING))
+               futex_noasync(&wait->state, FUTEX_WAKE, 1, NULL, NULL, 0);
+       /* Allow teardown of struct urcu_wait memory. */
+       uatomic_or(&wait->state, URCU_WAIT_TEARDOWN);
+}
+
+/*
+ * Caller must initialize "value" to URCU_WAIT_WAITING before passing its
+ * memory to waker thread.
+ */
+static inline
+void urcu_adaptative_busy_wait(struct urcu_wait_node *wait)
+{
+       unsigned int i;
+
+       /* Load and test condition before read state */
+       cmm_smp_rmb();
+       for (i = 0; i < URCU_WAIT_ATTEMPTS; i++) {
+               if (uatomic_read(&wait->state) != URCU_WAIT_WAITING)
+                       goto skip_futex_wait;
+               caa_cpu_relax();
+       }
+       futex_noasync(&wait->state, FUTEX_WAIT,
+               URCU_WAIT_WAITING, NULL, NULL, 0);
+skip_futex_wait:
+
+       /* Tell waker thread than we are running. */
+       uatomic_or(&wait->state, URCU_WAIT_RUNNING);
+
+       /*
+        * Wait until waker thread lets us know it's ok to tear down
+        * memory allocated for struct urcu_wait.
+        */
+       for (i = 0; i < URCU_WAIT_ATTEMPTS; i++) {
+               if (uatomic_read(&wait->state) & URCU_WAIT_TEARDOWN)
+                       break;
+               caa_cpu_relax();
+       }
+       while (!(uatomic_read(&wait->state) & URCU_WAIT_TEARDOWN))
+               poll(NULL, 0, 10);
+       assert(uatomic_read(&wait->state) & URCU_WAIT_TEARDOWN);
+}
+
+/*
+ * Need mutual exclusion against other wakeup and move waiters
+ * operations. It is provided by the caller.
+ */
+static inline
+int urcu_dequeue_wake_single(struct urcu_wait_queue *queue)
+{
+       struct cds_wfs_node *node;
+       struct urcu_wait_node *wait_node;
+       int wakeup_done = 0;
+
+       node = __cds_wfs_pop_blocking(&queue->stack);
+       if (!node)
+               return -ENOENT;
+       wait_node = caa_container_of(node, struct urcu_wait_node, node);
+       CMM_STORE_SHARED(wait_node->node.next, NULL);
+       /* Don't wake already running threads */
+       if (!(wait_node->state & URCU_WAIT_RUNNING)) {
+               urcu_adaptative_wake_up(wait_node);
+               wakeup_done = 1;
+       }
+       return wakeup_done;
+}
+
+/*
+ * Need mutual exclusion against other wakeup and move waiters
+ * operations. It is provided by the caller.
+ */
+static inline
+int urcu_dequeue_wake_n(struct urcu_wait_queue *queue, int n)
+{
+       int nr_wakeup = 0;
+
+       for (;;) {
+               int ret;
+
+               ret = urcu_dequeue_wake_single(queue);
+               if (ret < 0)
+                       return nr_wakeup;
+               else if (ret > 0)
+                       nr_wakeup++;
+               else
+                       break;
+       }
+       return nr_wakeup;
+}
+
+static inline
+int urcu_wake_all_waiters(struct urcu_waiters *waiters)
+{
+       struct cds_wfs_node *iter, *iter_n;
+       int nr_wakeup = 0;
+
+       /* Wake all waiters in our stack head */
+       cds_wfs_for_each_blocking_safe(waiters->head, iter, iter_n) {
+               struct urcu_wait_node *wait_node =
+                       caa_container_of(iter, struct urcu_wait_node, node);
+
+               CMM_STORE_SHARED(wait_node->node.next, NULL);
+               /* Don't wake already running threads */
+               if (wait_node->state & URCU_WAIT_RUNNING)
+                       continue;
+               urcu_adaptative_wake_up(wait_node);
+               nr_wakeup++;
+       }
+       return nr_wakeup;
+}
+
+#endif /* _URCU_WAITQUEUE_LIFO_H */
diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h
new file mode 100644 (file)
index 0000000..65d3a2e
--- /dev/null
@@ -0,0 +1,345 @@
+#ifndef _URCU_WORKQUEUE_FIFO_H
+#define _URCU_WORKQUEUE_FIFO_H
+
+/*
+ * urcu/workqueue-fifo.h
+ *
+ * Userspace RCU library - work queue scheme with FIFO semantic
+ *
+ * Copyright (c) 2014 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include <urcu/uatomic.h>
+#include <urcu/wfstack.h>
+#include <urcu/waitqueue-lifo.h>
+#include <urcu/wfcqueue.h>
+#include <urcu/rculist.h>
+#include <pthread.h>
+
+/*
+ * We use RCU to steal work from siblings. Therefore, one of RCU flavors
+ * need to be included before this header. All worker that participate
+ * in stealing (initialized with the URCU_WORKER_STEAL flag) need to be
+ * registered RCU readers threads.
+ */
+
+struct urcu_work {
+       struct cds_wfcq_node node;
+};
+
+struct urcu_workqueue {
+       /* FIFO work queue */
+       struct __cds_wfcq_head head;
+       struct cds_wfcq_tail tail;
+
+       /* Associated wait queue for LIFO wait/wakeup */
+       struct urcu_wait_queue waitqueue;
+
+       /* RCU linked list head of siblings for work stealing. */
+       struct cds_list_head sibling_head;
+       pthread_mutex_t sibling_lock;   /* Protect sibling list updates */
+};
+
+struct urcu_worker {
+       struct cds_wfcq_head head;
+       struct cds_wfcq_tail tail;
+
+       struct urcu_wait_node wait_node;
+       /* RCU linked list node of siblings for work stealing. */
+       struct cds_list_head sibling_node;
+       int flags;      /* enum urcu_worker_flags */
+};
+
+enum urcu_worker_flags {
+       URCU_WORKER_STEAL       = (1 << 0),
+};
+
+static inline
+void urcu_workqueue_init(struct urcu_workqueue *queue)
+{
+       __cds_wfcq_init(&queue->head, &queue->tail);
+       urcu_wait_queue_init(&queue->waitqueue);
+       CDS_INIT_LIST_HEAD(&queue->sibling_head);
+}
+
+static inline
+void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
+{
+       bool was_empty;
+
+       cds_wfcq_node_init(&work->node);
+
+       /* Enqueue work. */
+       was_empty = !cds_wfcq_enqueue(&queue->head, &queue->tail,
+                       &work->node);
+       /*
+        * If workqueue was previously empty, wakeup one worker thread.
+        * It will eventually grab the entire content of the work-queue
+        * (therefore grabbing a "work batch"). After having grabbed the
+        * work batch, while that thread is running and taking care of
+        * that work batch, when we enqueue more work, we will wake
+        * another thread (if there is one waiting), which will
+        * eventually grab the new batch, and so on. This scheme ensures
+        * that contiguous batch of work are handled by the same thread
+        * (for locality), and also ensures that we scale work to many
+        * worker threads when threads are busy enough to still be
+        * running when work is enqueued.
+        */
+       if (was_empty)
+               (void) urcu_dequeue_wake_single(&queue->waitqueue);
+}
+
+static inline
+void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
+{
+       struct urcu_waiters waiters;
+
+       urcu_move_waiters(&waiters, &queue->waitqueue);
+       (void) urcu_wake_all_waiters(&waiters);
+}
+
+static inline
+void urcu_worker_init(struct urcu_worker *worker, int flags)
+{
+       cds_wfcq_init(&worker->head, &worker->tail);
+       worker->flags = flags;
+       urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
+}
+
+static inline
+void urcu_worker_register(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
+{
+       if (worker->flags & URCU_WORKER_STEAL) {
+               pthread_mutex_lock(&queue->sibling_lock);
+               cds_list_add_rcu(&worker->sibling_node, &queue->sibling_head);
+               pthread_mutex_unlock(&queue->sibling_lock);
+       }
+}
+
+static inline
+void urcu_worker_unregister(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
+{
+       enum cds_wfcq_ret wfcq_ret;
+
+       if (worker->flags & URCU_WORKER_STEAL) {
+               pthread_mutex_lock(&queue->sibling_lock);
+               cds_list_del_rcu(&worker->sibling_node);
+               pthread_mutex_unlock(&queue->sibling_lock);
+
+               /*
+                * Wait for grace period before freeing or reusing
+                * "worker" because used by RCU linked list.
+                */
+               synchronize_rcu();
+       }
+
+       /*
+        * Put any local work we still have back into the workqueue.
+        */
+       wfcq_ret = __cds_wfcq_splice_blocking(&queue->head,
+                       &queue->tail,
+                       &worker->head,
+                       &worker->tail);
+       if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
+                       && wfcq_ret == CDS_WFCQ_RET_DEST_EMPTY) {
+               /*
+                * Wakeup worker thread if we have put work back into
+                * workqueue that was previously empty.
+                */
+               (void) urcu_dequeue_wake_single(&queue->waitqueue);
+       }
+}
+
+/*
+ * Try stealing work from siblings when we have nothing to do.
+ */
+static inline
+void ___urcu_steal_work(struct urcu_worker *worker,
+               struct urcu_worker *sibling)
+{
+       cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
+       (void) __cds_wfcq_splice_blocking(&worker->head,
+                       &worker->tail,
+                       &sibling->head,
+                       &sibling->tail);
+       cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
+}
+
+static inline
+int __urcu_steal_work(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
+{
+       struct urcu_worker *sibling_prev, *sibling_next;
+       struct cds_list_head *sibling_node;
+
+       if (!(worker->flags & URCU_WORKER_STEAL))
+               return 0;
+
+       rcu_read_lock();
+
+       sibling_node = rcu_dereference(worker->sibling_node.next);
+       if (sibling_node == &queue->sibling_head)
+               sibling_node = rcu_dereference(sibling_node->next);
+       sibling_next = caa_container_of(sibling_node, struct urcu_worker,
+                       sibling_node);
+       if (sibling_next != worker)
+               ___urcu_steal_work(worker, sibling_next);
+
+       sibling_node = rcu_dereference(worker->sibling_node.prev);
+       if (sibling_node == &queue->sibling_head)
+               sibling_node = rcu_dereference(sibling_node->prev);
+       sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
+                       sibling_node);
+       if (sibling_prev != worker && sibling_prev != sibling_next)
+               ___urcu_steal_work(worker, sibling_prev);
+
+       rcu_read_unlock();
+
+       return !cds_wfcq_empty(&worker->head, &worker->tail);
+}
+
+static inline
+void ___urcu_wakeup_sibling(struct urcu_worker *sibling)
+{
+       urcu_adaptative_wake_up(&sibling->wait_node);
+}
+
+static inline
+void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
+               struct urcu_worker *worker)
+{
+       struct urcu_worker *sibling_prev, *sibling_next;
+       struct cds_list_head *sibling_node;
+
+       if (!(worker->flags & URCU_WORKER_STEAL))
+               return;
+
+       /* Only wakeup siblings if we have work in our own queue. */
+       if (cds_wfcq_empty(&worker->head, &worker->tail))
+               return;
+
+       rcu_read_lock();
+
+       sibling_node = rcu_dereference(worker->sibling_node.next);
+       if (sibling_node == &queue->sibling_head)
+               sibling_node = rcu_dereference(sibling_node->next);
+       sibling_next = caa_container_of(sibling_node, struct urcu_worker,
+                       sibling_node);
+       if (sibling_next != worker)
+               ___urcu_wakeup_sibling(sibling_next);
+
+       sibling_node = rcu_dereference(worker->sibling_node.prev);
+       if (sibling_node == &queue->sibling_head)
+               sibling_node = rcu_dereference(sibling_node->prev);
+       sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
+                       sibling_node);
+       if (sibling_prev != worker && sibling_prev != sibling_next)
+               ___urcu_wakeup_sibling(sibling_prev);
+
+       rcu_read_unlock();
+}
+
+static inline
+void urcu_accept_work(struct urcu_workqueue *queue,
+               struct urcu_worker *worker,
+               int blocking)
+{
+       enum cds_wfcq_ret wfcq_ret;
+
+       wfcq_ret = __cds_wfcq_splice_blocking(&worker->head,
+                       &worker->tail,
+                       &queue->head,
+                       &queue->tail);
+       /* Don't wait if we have work to do. */
+       if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
+                       || !cds_wfcq_empty(&worker->head,
+                               &worker->tail))
+               goto do_work;
+       /* Try to steal work from sibling instead of blocking */
+       if (__urcu_steal_work(queue, worker))
+               goto do_work;
+       if (!blocking)
+               return;
+       urcu_wait_set_state(&worker->wait_node,
+                       URCU_WAIT_WAITING);
+       if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
+               int was_empty;
+
+               /*
+                * NULL next pointer. We are therefore not in
+                * the queue.
+                */
+               cds_wfs_node_init(&worker->wait_node.node);
+               was_empty = !urcu_wait_add(&queue->waitqueue,
+                               &worker->wait_node);
+               /*
+                * If the wait queue was empty, it means we are the
+                * first thread to be put back into an otherwise empty
+                * wait queue. Re-check if work queue is empty after
+                * adding ourself to wait queue, so we can wakeup the
+                * top of wait queue since new work have appeared, and
+                * work enqueuer may not have seen that it needed to do
+                * a wake up.
+                */
+               if (was_empty && !cds_wfcq_empty(&queue->head,
+                                               &queue->tail))
+                       (void) urcu_dequeue_wake_single(&queue->waitqueue);
+       } else {
+               /*
+                * Non-NULL next pointer. We are therefore in
+                * the queue, or the dispatcher just removed us
+                * from it (after we read the next pointer), and
+                * is therefore awakening us. The state will
+                * therefore have been changed from WAITING to
+                * some other state, which will let the busy
+                * wait pass through.
+                */
+       }
+       urcu_adaptative_busy_wait(&worker->wait_node);
+       return;
+
+do_work:
+       /*
+        * We will be busy handling the work batch, awaken siblings so
+        * they can steal from us.
+        */
+       __urcu_wakeup_siblings(queue, worker);
+}
+
+static inline
+struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
+{
+       struct cds_wfcq_node *node;
+
+       /*
+        * If we are registered for work stealing, we need to dequeue
+        * safely against siblings.
+        */
+       if (worker->flags & URCU_WORKER_STEAL)
+               node = cds_wfcq_dequeue_blocking(&worker->head,
+                               &worker->tail);
+       else
+               node = ___cds_wfcq_dequeue_with_state(&worker->head,
+                               &worker->tail, NULL, 1, 0);
+       if (!node)
+               return NULL;
+       return caa_container_of(node, struct urcu_work, node);
+}
+
+#endif /* _URCU_WORKQUEUE_FIFO_H */
This page took 0.034959 seconds and 4 git commands to generate.