X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=urcu%2Fworkqueue-fifo.h;h=6918a8217bdb3e289404ca3983540073d9234fd8;hb=refs%2Fheads%2Furcu%2Fworkqueue-wakeup;hp=13d9278a5beca9baa46ea9ef3cc7f3b1981091e0;hpb=8a2c74fefc366e219bc09c4a4ebd7bacd5cb83e5;p=urcu.git diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index 13d9278..6918a82 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -67,7 +67,9 @@ struct urcu_workqueue { /* Maximum number of work entries (approximate). 0 means infinite. */ unsigned long nr_work_max; unsigned long nr_work; /* Current number of work items */ - bool shutdown; /* Shutdown performed */ + + int worker_flags; /* Worker flags */ + int shutdown; /* Shutdown performed */ }; struct urcu_worker { @@ -91,7 +93,8 @@ enum urcu_worker_flags { static inline void urcu_workqueue_init(struct urcu_workqueue *queue, - unsigned long max_queue_len) + unsigned long max_queue_len, + int worker_flags) { __cds_wfcq_init(&queue->head, &queue->tail); urcu_wait_queue_init(&queue->waitqueue); @@ -156,14 +159,14 @@ void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) static inline void urcu_worker_init(struct urcu_workqueue *queue, - struct urcu_worker *worker, int flags) + struct urcu_worker *worker) { cds_wfcq_init(&worker->head, &worker->tail); - worker->flags = flags; urcu_wait_node_init(&worker->wait_node, URCU_WAIT_RUNNING); worker->own = NULL; worker->wait_node.node.next = NULL; worker->queue = queue; + worker->flags = queue->worker_flags; } static inline @@ -192,7 +195,7 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, /* * Make sure we are removed from waitqueue. */ - if (CMM_LOAD_SHARED(worker->wait_node.node.next)) + if (urcu_in_waitqueue(&worker->wait_node)) __urcu_workqueue_wakeup_all(queue); /* @@ -401,14 +404,9 @@ enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker) return URCU_ACCEPT_SHUTDOWN; urcu_wait_set_state(&worker->wait_node, URCU_WAIT_WAITING); - if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) { + if (!urcu_in_waitqueue(&worker->wait_node)) { int was_empty; - /* - * NULL next pointer. We are therefore not in - * the queue. - */ - cds_lfs_node_init(&worker->wait_node.node); /* Protect stack dequeue against ABA */ synchronize_rcu(); was_empty = !urcu_wait_add(&queue->waitqueue, @@ -430,13 +428,11 @@ enum urcu_accept_ret urcu_accept_work(struct urcu_worker *worker) } } else { /* - * Non-NULL next pointer. We are therefore in - * the queue, or the dispatcher just removed us - * from it (after we read the next pointer), and - * is therefore awakening us. The state will - * therefore have been changed from WAITING to - * some other state, which will let the busy - * wait pass through. + * We are in the queue, or the dispatcher just removed + * us from it (after we read the next pointer), and is + * therefore awakening us. The state will therefore have + * been changed from WAITING to some other state, which + * will let the busy wait pass through. */ } urcu_adaptative_busy_wait(&worker->wait_node); @@ -499,4 +495,31 @@ void urcu_workqueue_shutdown(struct urcu_workqueue *queue) __urcu_workqueue_wakeup_all(queue); } +/* + * Use to let dispatcher steal work from the entire queue in case of + * stall. The "worker" parameter need to be intialized, but is usually + * not registered. + */ +static inline +bool urcu_workqueue_steal_all(struct urcu_workqueue *queue, + struct urcu_worker *worker) +{ + struct urcu_worker *sibling; + bool has_work = false; + + if (worker->flags & URCU_WORKER_STEAL) { + rcu_read_lock(); + /* Steal from each worker */ + cds_list_for_each_entry_rcu(sibling, &queue->sibling_head, + sibling_node) + has_work |= ___urcu_grab_work(worker, &sibling->head, + &sibling->tail, 1); + rcu_read_unlock(); + } + + /* Steal from global workqueue */ + has_work |= ___urcu_grab_work(worker, &queue->head, &queue->tail, 0); + return has_work; +} + #endif /* _URCU_WORKQUEUE_FIFO_H */