*/
#include <urcu/uatomic.h>
-#include <urcu/wfstack.h>
+#include <urcu/lfstack.h>
#include <urcu/waitqueue-lifo.h>
#include <urcu/wfcqueue.h>
#include <urcu/rculist.h>
* worker threads when threads are busy enough to still be
* running when work is enqueued.
*/
- if (was_empty)
+ if (was_empty) {
+ rcu_read_lock(); /* Protect stack dequeue */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
+ }
}
static inline
{
struct urcu_waiters waiters;
+ rcu_read_lock(); /* Protect stack dequeue */
urcu_move_waiters(&waiters, &queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
+
(void) urcu_wake_all_waiters(&waiters);
}
pthread_mutex_lock(&queue->sibling_lock);
cds_list_del_rcu(&worker->sibling_node);
pthread_mutex_unlock(&queue->sibling_lock);
-
- /*
- * Wait for grace period before freeing or reusing
- * "worker" because used by RCU linked list.
- */
- synchronize_rcu();
}
+ /*
+ * Wait for grace period before freeing or reusing
+ * "worker" because used by RCU linked list.
+ * Also prevents ABA for waitqueue stack dequeue: matches RCU
+ * read-side critical sections around dequeue and move all
+ * operations on waitqueue).
+ */
+ synchronize_rcu();
+
/*
* Put any local work we still have back into the workqueue.
*/
* Wakeup worker thread if we have put work back into
* workqueue that was previously empty.
*/
+ rcu_read_lock(); /* Protect stack dequeue */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
}
}
*/
if (cds_wfcq_empty(&sibling->head, &sibling->tail))
return false;
- cds_wfcq_dequeue_lock(&sibling->head, &sibling->tail);
- splice_ret = __cds_wfcq_splice_blocking(&worker->head,
+ splice_ret = cds_wfcq_splice_blocking(&worker->head,
&worker->tail,
&sibling->head,
&sibling->tail);
- cds_wfcq_dequeue_unlock(&sibling->head, &sibling->tail);
/* Ensure that we preserve FIFO work order. */
assert(splice_ret != CDS_WFCQ_RET_DEST_NON_EMPTY);
return splice_ret != CDS_WFCQ_RET_SRC_EMPTY;
* NULL next pointer. We are therefore not in
* the queue.
*/
- cds_wfs_node_init(&worker->wait_node.node);
+ cds_lfs_node_init(&worker->wait_node.node);
+ /* Protect stack dequeue against ABA */
+ synchronize_rcu();
was_empty = !urcu_wait_add(&queue->waitqueue,
&worker->wait_node);
/*
* a wake up.
*/
if (was_empty && !cds_wfcq_empty(&queue->head,
- &queue->tail))
+ &queue->tail)) {
+ rcu_read_lock(); /* Protect stack dequeue */
(void) urcu_dequeue_wake_single(&queue->waitqueue);
+ rcu_read_unlock(); /* Protect stack dequeue */
+ }
} else {
/*
* Non-NULL next pointer. We are therefore in