X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=urcu%2Fworkqueue-fifo.h;h=0769acced6667b6d56ffd20b65fd0d160602cdc6;hb=d3afe03932758e40b6548480fc5cd04d0811e620;hp=1d7528db7e1327b6507141f8f6fbfd992fca7571;hpb=5d30bf32449dc69a9961dbf44147c930ebe756f5;p=userspace-rcu.git diff --git a/urcu/workqueue-fifo.h b/urcu/workqueue-fifo.h index 1d7528d..0769acc 100644 --- a/urcu/workqueue-fifo.h +++ b/urcu/workqueue-fifo.h @@ -29,6 +29,7 @@ #include #include #include +#include /* * We use RCU to steal work from siblings. Therefore, one of RCU flavors @@ -99,8 +100,11 @@ void urcu_queue_work(struct urcu_workqueue *queue, struct urcu_work *work) * worker threads when threads are busy enough to still be * running when work is enqueued. */ - if (was_empty) + if (was_empty) { + rcu_read_lock(); /* Protect stack dequeue */ (void) urcu_dequeue_wake_single(&queue->waitqueue); + rcu_read_unlock(); /* Protect stack dequeue */ + } } static inline @@ -108,7 +112,10 @@ void urcu_workqueue_wakeup_all(struct urcu_workqueue *queue) { struct urcu_waiters waiters; + rcu_read_lock(); /* Protect stack dequeue */ urcu_move_waiters(&waiters, &queue->waitqueue); + rcu_read_unlock(); /* Protect stack dequeue */ + (void) urcu_wake_all_waiters(&waiters); } @@ -141,14 +148,17 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, pthread_mutex_lock(&queue->sibling_lock); cds_list_del_rcu(&worker->sibling_node); pthread_mutex_unlock(&queue->sibling_lock); - - /* - * Wait for grace period before freeing or reusing - * "worker" because used by RCU linked list. - */ - synchronize_rcu(); } + /* + * Wait for grace period before freeing or reusing + * "worker" because used by RCU linked list. + * Also prevents ABA for waitqueue stack dequeue: matches RCU + * read-side critical sections around dequeue and move all + * operations on waitqueue). + */ + synchronize_rcu(); + /* * Put any local work we still have back into the workqueue. */ @@ -162,7 +172,9 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, * Wakeup worker thread if we have put work back into * workqueue that was previously empty. */ + rcu_read_lock(); /* Protect stack dequeue */ (void) urcu_dequeue_wake_single(&queue->waitqueue); + rcu_read_unlock(); /* Protect stack dequeue */ } } @@ -170,31 +182,37 @@ void urcu_worker_unregister(struct urcu_workqueue *queue, * Try stealing work from siblings when we have nothing to do. */ static inline -void ___urcu_steal_work(struct urcu_worker *worker, +bool ___urcu_steal_work(struct urcu_worker *worker, struct urcu_worker *sibling) { + enum cds_wfcq_ret splice_ret; + /* * Don't bother grabbing the sibling queue lock if it is empty. */ if (cds_wfcq_empty(&sibling->head, &sibling->tail)) - return; + return false; cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail); - (void) __cds_wfcq_splice_blocking(&worker->head, + splice_ret = __cds_wfcq_splice_blocking(&worker->head, &worker->tail, &sibling->head, &sibling->tail); cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail); + /* Ensure that we preserve FIFO work order. */ + assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY); + return splice_ret != CDS_WFCQ_RET_SRC_EMPTY; } static inline -int __urcu_steal_work(struct urcu_workqueue *queue, +bool __urcu_steal_work(struct urcu_workqueue *queue, struct urcu_worker *worker) { struct urcu_worker *sibling_prev, *sibling_next; struct cds_list_head *sibling_node; + bool steal_performed = 0; if (!(worker->flags & URCU_WORKER_STEAL)) - return 0; + return false; rcu_read_lock(); @@ -204,7 +222,9 @@ int __urcu_steal_work(struct urcu_workqueue *queue, sibling_next = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_next != worker) - ___urcu_steal_work(worker, sibling_next); + steal_performed = ___urcu_steal_work(worker, sibling_next); + if (steal_performed) + goto end; sibling_node = rcu_dereference(worker->sibling_node.prev); if (sibling_node == &queue->sibling_head) @@ -212,11 +232,11 @@ int __urcu_steal_work(struct urcu_workqueue *queue, sibling_prev = caa_container_of(sibling_node, struct urcu_worker, sibling_node); if (sibling_prev != worker && sibling_prev != sibling_next) - ___urcu_steal_work(worker, sibling_prev); - + steal_performed = ___urcu_steal_work(worker, sibling_prev); +end: rcu_read_unlock(); - return !cds_wfcq_empty(&worker->head, &worker->tail); + return steal_performed; } static inline @@ -296,6 +316,8 @@ void urcu_accept_work(struct urcu_workqueue *queue, * the queue. */ cds_wfs_node_init(&worker->wait_node.node); + /* Protect stack dequeue against ABA */ + synchronize_rcu(); was_empty = !urcu_wait_add(&queue->waitqueue, &worker->wait_node); /* @@ -308,8 +330,11 @@ void urcu_accept_work(struct urcu_workqueue *queue, * a wake up. */ if (was_empty && !cds_wfcq_empty(&queue->head, - &queue->tail)) + &queue->tail)) { + rcu_read_lock(); /* Protect stack dequeue */ (void) urcu_dequeue_wake_single(&queue->waitqueue); + rcu_read_unlock(); /* Protect stack dequeue */ + } } else { /* * Non-NULL next pointer. We are therefore in