URCU_ACCEPT_SHUTDOWN = 1,
};
+enum urcu_enqueue_ret {
+ URCU_ENQUEUE_OK = 0,
+ URCU_ENQUEUE_FULL = 1,
+};
+
/*
* We use RCU to steal work from siblings. Therefore, one of RCU flavors
* need to be included before this header. All worker that participate
struct cds_list_head sibling_head;
pthread_mutex_t sibling_lock; /* Protect sibling list updates */
- bool shutdown; /* Shutdown performed */
+ /* Maximum number of work entries (approximate). 0 means infinite. */
+ unsigned long nr_work_max;
+ unsigned long nr_work; /* Current number of work items */
+
+ int worker_flags; /* Worker flags */
+ int shutdown; /* Shutdown performed */
};
struct urcu_worker {
struct urcu_wait_node wait_node;
/* RCU linked list node of siblings for work stealing. */
struct cds_list_head sibling_node;
+ struct urcu_workqueue *queue;
int flags; /* enum urcu_worker_flags */
};
};
static inline
-void urcu_workqueue_init(struct urcu_workqueue *queue)
+void urcu_workqueue_init(struct urcu_workqueue *queue,
+ unsigned long max_queue_len,
+ int worker_flags)
{
__cds_wfcq_init(&queue->head, &queue->tail);
urcu_wait_queue_init(&queue->waitqueue);
CDS_INIT_LIST_HEAD(&queue->sibling_head);
+ pthread_mutex_init(&queue->sibling_lock, NULL);
+ queue->nr_work_max = max_queue_len;
+ queue->nr_work = 0;
queue->shutdown = false;
}
static inline
-void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work)
+enum urcu_enqueue_ret urcu_queue_work(struct urcu_workqueue *queue,
+ struct urcu_work *work)
{
bool was_empty;
-
+ unsigned long nr_work_max;
+
+ nr_work_max = queue->nr_work_max;
+ if (nr_work_max) {
+ /* Approximate max queue size. */
+ if (uatomic_read(&queue->nr_work) >= nr_work_max)
+ return URCU_ENQUEUE_FULL;
+ uatomic_inc(&queue->nr_work);
+ }
cds_wfcq_node_init(&work->node);
/* Enqueue work. */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
rcu_read_unlock(); /* Protect stack dequeue */
}
+ return URCU_ENQUEUE_OK;
}
static inline
}
static inline
-void urcu_worker_init(struct urcu_worker *worker, int flags)
+void urcu_worker_init(struct urcu_workqueue *queue,
+ struct urcu_worker *worker)
{
cds_wfcq_init(&worker->head, &worker->tail);
- worker->flags = flags;
urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING);
worker->own = NULL;
+ worker->wait_node.node.next = NULL;
+ worker->queue = queue;
+ worker->flags = queue->worker_flags;
}
static inline
/*
* Make sure we are removed from waitqueue.
*/
- if (CMM_LOAD_SHARED(worker->wait_node.node.next))
+ if (urcu_in_waitqueue(&worker->wait_node))
__urcu_workqueue_wakeup_all(queue);
/*
bool steal)
{
enum cds_wfcq_ret splice_ret;
- struct cds_wfcq_head tmp_head;
+ struct __cds_wfcq_head tmp_head;
struct cds_wfcq_tail tmp_tail;
struct cds_wfcq_node *node;
*/
if (cds_wfcq_empty(src_head, src_tail))
return false;
- cds_wfcq_init(&tmp_head, &tmp_tail);
+ __cds_wfcq_init(&tmp_head, &tmp_tail);
/* Ensure that we preserve FIFO work order. */
assert(!steal || worker->own == NULL);
}
/* Splice into worker workqueue. */
- splice_ret = cds_wfcq_splice_blocking(&worker->head,
+ splice_ret = __cds_wfcq_splice_blocking(&worker->head,
&worker->tail,
&tmp_head,
&tmp_tail);
}
static inline
-enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue,
- struct urcu_worker *worker)
+enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker)
{
+ struct urcu_workqueue *queue = worker->queue;
enum cds_wfcq_ret wfcq_ret;
bool has_work;
has_work = ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
/* Don't wait if we have work to do. */
- if (has_work || !cds_wfcq_empty(&worker->head, &worker->tail))
+ if (has_work || worker->own
+ || !cds_wfcq_empty(&worker->head, &worker->tail))
goto do_work;
/* Try to steal work from sibling instead of blocking */
if (__urcu_steal_work(queue, worker))
return URCU_ACCEPT_SHUTDOWN;
urcu_wait_set_state(&worker->wait_node,
URCU_WAIT_WAITING);
- if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) {
+ if (!urcu_in_waitqueue(&worker->wait_node)) {
int was_empty;
- /*
- * NULL next pointer. We are therefore not in
- * the queue.
- */
- cds_lfs_node_init(&worker->wait_node.node);
/* Protect stack dequeue against ABA */
synchronize_rcu();
was_empty = !urcu_wait_add(&queue->waitqueue,
}
} else {
/*
- * Non-NULL next pointer. We are therefore in
- * the queue, or the dispatcher just removed us
- * from it (after we read the next pointer), and
- * is therefore awakening us. The state will
- * therefore have been changed from WAITING to
- * some other state, which will let the busy
- * wait pass through.
+ * We are in the queue, or the dispatcher just removed
+ * us from it (after we read the next pointer), and is
+ * therefore awakening us. The state will therefore have
+ * been changed from WAITING to some other state, which
+ * will let the busy wait pass through.
*/
}
urcu_adaptative_busy_wait(&worker->wait_node);
static inline
struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker)
{
+ struct urcu_workqueue *queue = worker->queue;
struct cds_wfcq_node *node;
+ struct urcu_work *work;
if (worker->own) {
- struct urcu_work *work;
-
/* Process our own work entry. */
work = worker->own;
worker->own = NULL;
- return work;
+ goto end;
}
/*
* If we are registered for work stealing, we need to dequeue
}
if (!node)
return NULL;
- return caa_container_of(node, struct urcu_work, node);
+ work = caa_container_of(node, struct urcu_work, node);
+end:
+ if (queue->nr_work_max)
+ uatomic_dec(&queue->nr_work);
+ return work;
}
static inline
__urcu_workqueue_wakeup_all(queue);
}
+/*
+ * Use to let dispatcher steal work from the entire queue in case of
+ * stall. The "worker" parameter need to be intialized, but is usually
+ * not registered.
+ */
+static inline
+bool urcu_workqueue_steal_all(struct urcu_workqueue *queue,
+ struct urcu_worker *worker)
+{
+ struct urcu_worker *sibling;
+ bool has_work = false;
+
+ if (worker->flags & URCU_WORKER_STEAL) {
+ rcu_read_lock();
+ /* Steal from each worker */
+ cds_list_for_each_entry_rcu(sibling, &queue->sibling_head,
+ sibling_node)
+ has_work |= ___urcu_grab_work(worker, &sibling->head,
+ &sibling->tail, 1);
+ rcu_read_unlock();
+ }
+
+ /* Steal from global workqueue */
+ has_work |= ___urcu_grab_work(worker, &queue->head, &queue->tail, 0);
+ return has_work;
+}
+
#endif /* _URCU_WORKQUEUE_FIFO_H */