assert(!ret);
}
+static inline void _cds_wfq_destroy(struct cds_wfq_queue *q)
+{
+ int ret = pthread_mutex_destroy(&q->lock);
+ assert(!ret);
+}
+
static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q,
struct cds_wfq_node *node)
{
* structure containing node and setting node->next to NULL before
* publication.
*/
- old_tail = uatomic_xchg(&q->tail, node);
+ old_tail = uatomic_xchg(&q->tail, &node->next);
/*
* At this point, dequeuers see a NULL old_tail->next, which indicates
* that the queue is being appended to. The following store will append
CMM_STORE_SHARED(*old_tail, node);
}
+/*
+ * Waiting for enqueuer to complete enqueue and return the next node
+ */
+static inline struct cds_wfq_node *
+___cds_wfq_node_sync_next(struct cds_wfq_node *node)
+{
+ struct cds_wfq_node *next;
+ int attempt = 0;
+
+ /*
+ * Adaptative busy-looping waiting for enqueuer to complete enqueue.
+ */
+ while ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
+ if (++attempt >= WFQ_ADAPT_ATTEMPTS) {
+ (void) poll(NULL, 0, WFQ_WAIT); /* Wait for 10ms */
+ attempt = 0;
+ } else
+ caa_cpu_relax();
+ }
+
+ return next;
+}
+
/*
* It is valid to reuse and free a dequeued node immediately.
*
___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q)
{
struct cds_wfq_node *node, *next;
- int attempt = 0;
/*
* Queue is empty if it only contains the dummy node.
return NULL;
node = q->head;
- /*
- * Adaptative busy-looping waiting for enqueuer to complete enqueue.
- */
- while ((next = CMM_LOAD_SHARED(node->next)) == NULL) {
- if (++attempt >= WFQ_ADAPT_ATTEMPTS) {
- poll(NULL, 0, WFQ_WAIT); /* Wait for 10ms */
- attempt = 0;
- } else
- caa_cpu_relax();
- }
+ next = ___cds_wfq_node_sync_next(node);
+
/*
* Move queue head forward.
*/