X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=urcu%2Fworkqueue-fifo.h;h=1292e04d6a173f6a276d6fab70dfaca004f7f26d;hb=6e17009c7e4276b3a90aada07e7c9835e9a788be;hp=68e050afdce43314b67cea2ad570fc3e836d8a37;hpb=7a618cf73bc9da1a6d258f361a41ed9e6f1b9f88;p=userspace-rcu.git diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index 68e050a..1292e04 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -31,6 +31,11 @@ #include #include +enum urcu_accept_ret { + URCU_ACCEPT_WORK = 0, + URCU_ACCEPT_SHUTDOWN = 1, +}; + /* * We use RCU to steal work from siblings. Therefore, one of RCU flavors * need to be included before this header. All worker that participate @@ -53,6 +58,8 @@ struct urcu_workqueue { /* RCU linked list head of siblings for work stealing. */ struct cds_list_head sibling_head; pthread_mutex_t sibling_lock; /* Protect sibling list updates */ + + bool shutdown; /* Shutdown performed */ }; struct urcu_worker { @@ -75,6 +82,7 @@ void urcu_workqueue_init(struct urcu_workqueue *queue) __cds_wfcq_init(&queue->head, &queue->tail); urcu_wait_queue_init(&queue->waitqueue); CDS_INIT_LIST_HEAD(&queue->sibling_head); + queue->shutdown = false; } static inline @@ -108,7 +116,7 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work) } static inline -void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) +void __urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) { struct urcu_waiters waiters; @@ -151,13 +159,10 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, } /* - * Wait for grace period before freeing or reusing - * "worker" because used by RCU linked list. - * Also prevents ABA for waitqueue stack dequeue: matches RCU - * read-side critical sections around dequeue and move all - * operations on waitqueue). + * Make sure we are removed from waitqueue. */ - synchronize_rcu(); + if (CMM_LOAD_SHARED(worker->wait_node.node.next)) + __urcu_workqueue_wakeup_all(queue); /* * Put any local work we still have back into the workqueue. @@ -176,6 +181,15 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, (void) urcu_dequeue_wake_single(&queue->waitqueue); rcu_read_unlock(); /* Protect stack dequeue */ } + + /* + * Wait for grace period before freeing or reusing + * "worker" because used by RCU linked list. + * Also prevents ABA for waitqueue stack dequeue: matches RCU + * read-side critical sections around dequeue and move all + * operations on waitqueue). + */ + synchronize_rcu(); } /* @@ -192,12 +206,10 @@ bool ___urcu_steal_work(struct urcu_worker *worker, */ if (cds_wfcq_empty(&sibling->head, &sibling->tail)) return false; - cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail); - splice_ret = __cds_wfcq_splice_blocking(&worker->head, + splice_ret = cds_wfcq_splice_blocking(&worker->head, &worker->tail, &sibling->head, &sibling->tail); - cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail); /* Ensure that we preserve FIFO work order. */ assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY); return splice_ret != CDS_WFCQ_RET_SRC_EMPTY; @@ -286,9 +298,8 @@ end: } static inline -void urcu_accept_work(struct urcu_workqueue *queue, - struct urcu_worker *worker, - int blocking) +enum urcu_accept_ret urcu_accept_work(struct urcu_workqueue *queue, + struct urcu_worker *worker) { enum cds_wfcq_ret wfcq_ret; @@ -304,8 +315,9 @@ void urcu_accept_work(struct urcu_workqueue *queue, /* Try to steal work from sibling instead of blocking */ if (__urcu_steal_work(queue, worker)) goto do_work; - if (!blocking) - return; + /* No more work to do, check shutdown state */ + if (CMM_LOAD_SHARED(queue->shutdown)) + return URCU_ACCEPT_SHUTDOWN; urcu_wait_set_state(&worker->wait_node, URCU_WAIT_WAITING); if (!CMM_LOAD_SHARED(worker->wait_node.node.next)) { @@ -355,6 +367,7 @@ do_work: * they can steal from us. */ (void) __urcu_wakeup_siblings(queue, worker); + return URCU_ACCEPT_WORK; } static inline @@ -384,4 +397,13 @@ struct urcu_work *urcu_dequeue_work(struct urcu_worker *worker) return caa_container_of(node, struct urcu_work, node); } +static inline +void urcu_workqueue_shutdown(struct urcu_workqueue *queue) +{ + /* Set shutdown */ + CMM_STORE_SHARED(queue->shutdown, true); + /* Wakeup all workers */ + __urcu_workqueue_wakeup_all(queue); +} + #endif /* _URCU_WORKQUEUE_FIFO_H */