*/
#include <urcu/uatomic.h>
-#include <urcu/wfstack.h>
+#include <urcu/lfstack.h>
#include <urcu/waitqueue-lifo.h>
#include <urcu/wfcqueue.h>
#include <urcu/rculist.h>
#include <pthread.h>
+#include <assert.h>
+
+enum urcu_accept_ret {
+ URCU_ACCEPT_WORK = 0,
+ URCU_ACCEPT_SHUTDOWN = 1,
+};
+
+enum urcu_enqueue_ret {
+ URCU_ENQUEUE_OK = 0,
+ URCU_ENQUEUE_FULL = 1,
+};
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
/* RCU linked list head of siblings for work stealing. */
struct cds_list_head sibling_head;
pthread_mutex_t sibling_lock; /* Protect sibling list updates */
+
+ /* Maximum number of work entries (approximate). 0 means infinite. */
+ unsigned long nr_work_max;
+ unsigned long nr_work; /* Current number of work items */
+ bool shutdown; /* Shutdown performed */
};
struct urcu_worker {
+ /* Workqueue which can be either used by worker, or stolen. */
struct cds_wfcq_head head;
struct cds_wfcq_tail tail;
+ /* Work belonging to worker. Cannot be stolen. */
+ struct urcu_work *own;
+
struct urcu_wait_node wait_node;
/* RCU linked list node of siblings for work stealing. */
struct cds_list_head sibling_node;
+ struct urcu_workqueue *queue;
int flags; /* enum urcu_worker_flags */
};
};
static inline
-void urcu_workqueue_init(struct urcu_workqueue *queue)
+void urcu_workqueue_init(struct urcu_workqueue *queue,
+ unsigned long max_queue_len)
{
__cds_wfcq_init(&queue->head, &queue->tail);
urcu_wait_queue_init(&queue->waitqueue);
CDS_INIT_LIST_HEAD(&queue->sibling_head);
+ pthread_mutex_init(&queue->sibling_lock, NULL);
+ queue->nr_work_max = max_queue_len;
+ queue->nr_work = 0;
+ queue->shutdown = false;
}
static inline
-void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
+enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
+ struct urcu_work *work)
{
bool was_empty;
-
+ unsigned long nr_work_max;
+
+ nr_work_max = queue->nr_work_max;
+ if (nr_work_max) {
+ /* Approximate max queue size. */
+ if (uatomic_read(&queue->nr_work) >= nr_work_max)
+ return URCU_ENQUEUE_FULL;
+ uatomic_inc(&queue->nr_work);
+ }
cds_wfcq_node_init(&work->node);
/* Enqueue work. */
* worker threads when threads are busy enough to still be
* running when work is enqueued.
*/
- if (was_empty)
+ if (was_empty) {
+ rcu_read_lock(); /* Protect stack dequeue */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
+ }
+ return URCU_ENQUEUE_OK;
}
static inline
-void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
+void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue)
{
struct urcu_waiters waiters;
+ rcu_read_lock(); /* Protect stack dequeue */
urcu_move_waiters(&waiters, &queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
+
(void) urcu_wake_all_waiters(&waiters);
}
static inline
-void urcu_worker_init(struct urcu_worker *worker, int flags)
+void urcu_worker_init(struct urcu_workqueue *queue,
+ struct urcu_worker *worker, int flags)
{
cds_wfcq_init(&worker->head, &worker->tail);
worker->flags = flags;
urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
+ worker->own = NULL;
+ worker->wait_node.node.next = NULL;
+ worker->queue = queue;
}
static inline
pthread_mutex_lock(&queue->sibling_lock);
cds_list_del_rcu(&worker->sibling_node);
pthread_mutex_unlock(&queue->sibling_lock);
-
- /*
- * Wait for grace period before freeing or reusing
- * "worker" because used by RCU linked list.
- */
- synchronize_rcu();
}
+ /*
+ * Make sure we are removed from waitqueue.
+ */
+ if (CMM_LOAD_SHARED(worker->wait_node.node.next))
+ __urcu_workqueue_wakeup_all(queue);
+
/*
* Put any local work we still have back into the workqueue.
*/
* Wakeup worker thread if we have put work back into
* workqueue that was previously empty.
*/
+ rcu_read_lock(); /* Protect stack dequeue */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
}
+
+ /*
+ * Wait for grace period before freeing or reusing
+ * "worker" because used by RCU linked list.
+ * Also prevents ABA for waitqueue stack dequeue: matches RCU
+ * read-side critical sections around dequeue and move all
+ * operations on waitqueue).
+ */
+ synchronize_rcu();
+}
+
+static inline
+bool ___urcu_grab_work(struct urcu_worker *worker,
+ cds_wfcq_head_ptr_t src_head,
+ struct cds_wfcq_tail *src_tail,
+ bool steal)
+{
+ enum cds_wfcq_ret splice_ret;
+ struct __cds_wfcq_head tmp_head;
+ struct cds_wfcq_tail tmp_tail;
+ struct cds_wfcq_node *node;
+
+ /*
+ * Don't bother grabbing the src queue lock if it is empty.
+ */
+ if (cds_wfcq_empty(src_head, src_tail))
+ return false;
+ __cds_wfcq_init(&tmp_head, &tmp_tail);
+
+ /* Ensure that we preserve FIFO work order. */
+ assert(!steal || worker->own == NULL);
+
+ /* Splice to temporary queue. */
+ if (steal)
+ cds_wfcq_dequeue_lock(src_head.h, src_tail);
+ splice_ret = __cds_wfcq_splice_blocking(&tmp_head,
+ &tmp_tail,
+ src_head,
+ src_tail);
+ if (steal)
+ cds_wfcq_dequeue_unlock(src_head.h, src_tail);
+ if (splice_ret == CDS_WFCQ_RET_SRC_EMPTY)
+ return false;
+
+ /*
+ * Keep one work entry for ourself. This ensures forward
+ * progress amongst stealing co-workers. This also ensures that
+ * when a worker grab some work from the global workqueue, it
+ * will have at least one work item to deal with.
+ */
+ if (worker->own == NULL) {
+ if (!steal) {
+ /*
+ * Try to grab own work from worker workqueue to
+ * preserve FIFO order.
+ */
+ node = cds_wfcq_dequeue_blocking(&worker->head,
+ &worker->tail);
+ if (node)
+ goto got_node;
+ }
+ node = __cds_wfcq_dequeue_blocking(&tmp_head, &tmp_tail);
+ assert(node != NULL);
+got_node:
+ worker->own = caa_container_of(node, struct urcu_work, node);
+ }
+
+ /* Splice into worker workqueue. */
+ splice_ret = __cds_wfcq_splice_blocking(&worker->head,
+ &worker->tail,
+ &tmp_head,
+ &tmp_tail);
+ /* Ensure that we preserve FIFO work order. */
+ assert(!steal || splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
+ return true;
}
/*
* Try stealing work from siblings when we have nothing to do.
*/
static inline
-void ___urcu_steal_work(struct urcu_worker *worker,
+bool ___urcu_steal_work(struct urcu_worker *worker,
struct urcu_worker *sibling)
{
- cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
- (void) __cds_wfcq_splice_blocking(&worker->head,
- &worker->tail,
- &sibling->head,
- &sibling->tail);
- cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
+ return ___urcu_grab_work(worker, &sibling->head, &sibling->tail, 1);
}
static inline
-int __urcu_steal_work(struct urcu_workqueue *queue,
+bool __urcu_steal_work(struct urcu_workqueue *queue,
struct urcu_worker *worker)
{
struct urcu_worker *sibling_prev, *sibling_next;
struct cds_list_head *sibling_node;
+ bool steal_performed = 0;
if (!(worker->flags & URCU_WORKER_STEAL))
- return 0;
+ return false;
rcu_read_lock();
sibling_next = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_next != worker)
- ___urcu_steal_work(worker, sibling_next);
+ steal_performed = ___urcu_steal_work(worker, sibling_next);
+ if (steal_performed)
+ goto end;
sibling_node = rcu_dereference(worker->sibling_node.prev);
if (sibling_node == &queue->sibling_head)
sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_prev != worker && sibling_prev != sibling_next)
- ___urcu_steal_work(worker, sibling_prev);
-
+ steal_performed = ___urcu_steal_work(worker, sibling_prev);
+end:
rcu_read_unlock();
- return !cds_wfcq_empty(&worker->head, &worker->tail);
+ return steal_performed;
}
static inline
-void ___urcu_wakeup_sibling(struct urcu_worker *sibling)
+bool ___urcu_wakeup_sibling(struct urcu_worker *sibling)
{
- urcu_adaptative_wake_up(&sibling->wait_node);
+ return urcu_adaptative_wake_up(&sibling->wait_node);
}
static inline
-void __urcu_wakeup_siblings(struct urcu_workqueue *queue,
+bool __urcu_wakeup_siblings(struct urcu_workqueue *queue,
struct urcu_worker *worker)
{
struct urcu_worker *sibling_prev, *sibling_next;
struct cds_list_head *sibling_node;
+ bool wakeup_performed = 0;
if (!(worker->flags & URCU_WORKER_STEAL))
return;
sibling_next = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_next != worker)
- ___urcu_wakeup_sibling(sibling_next);
+ wakeup_performed = ___urcu_wakeup_sibling(sibling_next);
+ if (wakeup_performed)
+ goto end;
sibling_node = rcu_dereference(worker->sibling_node.prev);
if (sibling_node == &queue->sibling_head)
sibling_prev = caa_container_of(sibling_node, struct urcu_worker,
sibling_node);
if (sibling_prev != worker && sibling_prev != sibling_next)
- ___urcu_wakeup_sibling(sibling_prev);
-
+ wakeup_performed = ___urcu_wakeup_sibling(sibling_prev);
+end:
rcu_read_unlock();
+
+ return wakeup_performed;
}
static inline
-void urcu_accept_work(struct urcu_workqueue *queue,
- struct urcu_worker *worker,
- int blocking)
+enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
{
+ struct urcu_workqueue *queue = worker->queue;
enum cds_wfcq_ret wfcq_ret;
+ bool has_work;
- wfcq_ret = __cds_wfcq_splice_blocking(&worker->head,
- &worker->tail,
- &queue->head,
- &queue->tail);
+ has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
/* Don't wait if we have work to do. */
- if (wfcq_ret != CDS_WFCQ_RET_SRC_EMPTY
- || !cds_wfcq_empty(&worker->head,
- &worker->tail))
+ if (has_work || worker->own
+ || !cds_wfcq_empty(&worker->head, &worker->tail))
goto do_work;
/* Try to steal work from sibling instead of blocking */
if (__urcu_steal_work(queue, worker))
goto do_work;
- if (!blocking)
- return;
+ /* No more work to do, check shutdown state */
+ if (CMM_LOAD_SHARED(queue->shutdown))
+ return URCU_ACCEPT_SHUTDOWN;
urcu_wait_set_state(&worker->wait_node,
URCU_WAIT_WAITING);
if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
* NULL next pointer. We are therefore not in
* the queue.
*/
- cds_wfs_node_init(&worker->wait_node.node);
+ cds_lfs_node_init(&worker->wait_node.node);
+ /* Protect stack dequeue against ABA */
+ synchronize_rcu();
was_empty = !urcu_wait_add(&queue->waitqueue,
&worker->wait_node);
/*
* a wake up.
*/
if (was_empty && !cds_wfcq_empty(&queue->head,
- &queue->tail))
+ &queue->tail)) {
+ rcu_read_lock(); /* Protect stack dequeue */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
+ }
} else {
/*
* Non-NULL next pointer. We are therefore in
* We will be busy handling the work batch, awaken siblings so
* they can steal from us.
*/
- __urcu_wakeup_siblings(queue, worker);
+ (void) __urcu_wakeup_siblings(queue, worker);
+ return URCU_ACCEPT_WORK;
}
static inline
struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
{
+ struct urcu_workqueue *queue = worker->queue;
struct cds_wfcq_node *node;
+ struct urcu_work *work;
+ if (worker->own) {
+ /* Process our own work entry. */
+ work = worker->own;
+ worker->own = NULL;
+ goto end;
+ }
/*
* If we are registered for work stealing, we need to dequeue
* safely against siblings.
*/
- if (worker->flags & URCU_WORKER_STEAL)
+ if (worker->flags & URCU_WORKER_STEAL) {
+ /*
+ * Don't bother grabbing the worker queue lock if it is
+ * empty.
+ */
+ if (cds_wfcq_empty(&worker->head, &worker->tail))
+ return NULL;
node = cds_wfcq_dequeue_blocking(&worker->head,
&worker->tail);
- else
+ } else {
node = ___cds_wfcq_dequeue_with_state(&worker->head,
&worker->tail, NULL, 1, 0);
+ }
if (!node)
return NULL;
- return caa_container_of(node, struct urcu_work, node);
+ work = caa_container_of(node, struct urcu_work, node);
+end:
+ if (queue->nr_work_max)
+ uatomic_dec(&queue->nr_work);
+ return work;
+}
+
+static inline
+void urcu_workqueue_shutdown(struct urcu_workqueue *queue)
+{
+ /* Set shutdown */
+ CMM_STORE_SHARED(queue->shutdown, true);
+ /* Wakeup all workers */
+ __urcu_workqueue_wakeup_all(queue);
}
#endif /* _URCU_WORKQUEUE_FIFO_H */