From 0e2125fb7b2c7a92d7030b41df5fb372a7706c32 Mon Sep 17 00:00:00 2001 From: Olivier Dion Date: Mon, 29 May 2023 11:21:11 -0400 Subject: [PATCH] Add cmm_emit_legacy_smp_mb() Some public APIs stipulate implicit memory barriers on operations. These were coherent with the memory model used at that time. However, with the migration to a memory model closer to the C11 memory model, these memory barriers are not strictly emitted by the atomic operations in the new memory model. Therefore, introducing the `--disable-legacy-mb' configuration option. By default, liburcu is configured to emit these legacy memory barriers, thus keeping backward compatibility at the expense of slower performances. However, users can opt-out by disabling the legacy memory barriers. This options is publicly exported in the system configuration header file and can be overrode manually on a compilation unit basis by defining `CONFIG_RCU_EMIT_LEGACY_MB' before including any liburcu files. The usage of this macro requires to re-write atomic operations in term of the CMM memory model. This is done for the queue and stack APIs. Change-Id: Ia5ce3b3d8cd1955556ce96fa4408a63aa098a1a6 Signed-off-by: Olivier Dion Signed-off-by: Mathieu Desnoyers --- configure.ac | 13 ++++++ include/urcu/arch.h | 6 +++ include/urcu/config.h.in | 3 ++ include/urcu/static/lfstack.h | 25 ++++++++---- include/urcu/static/rculfqueue.h | 14 ++++--- include/urcu/static/rculfstack.h | 8 +++- include/urcu/static/wfcqueue.h | 68 +++++++++++++++++--------------- include/urcu/static/wfqueue.h | 9 +++-- include/urcu/static/wfstack.h | 24 +++++++---- 9 files changed, 114 insertions(+), 56 deletions(-) diff --git a/configure.ac b/configure.ac index 7045cdc..15055d6 100644 --- a/configure.ac +++ b/configure.ac @@ -239,6 +239,11 @@ AE_FEATURE([cds-lfht-iter-debug], [Enable extra debugging checks for lock-free h AE_FEATURE_DEFAULT_DISABLE AE_FEATURE([compiler-atomic-builtins], [Enable the use of compiler atomic builtins.]) +# emit legacy memory barriers +# Enable by default +AE_FEATURE_DEFAULT_ENABLE +AE_FEATURE([legacy-mb], [Disable legacy memory barriers.]) + # When given, add -Werror to WARN_CFLAGS and WARN_CXXFLAGS. # Disabled by default AE_FEATURE_DEFAULT_DISABLE @@ -272,6 +277,10 @@ AE_IF_FEATURE_ENABLED([compiler-atomic-builtins], [ AC_DEFINE([CONFIG_RCU_USE_ATOMIC_BUILTINS], [1], [Use compiler atomic builtins.]) ]) +AE_IF_FEATURE_ENABLED([legacy-mb], [ + AC_DEFINE([CONFIG_RCU_EMIT_LEGACY_MB], [1], [Emit legacy memory barriers that were documented in the APIs.]) +]) + ## ## ## Set automake variables for optional feature conditionnals in Makefile.am ## ## ## @@ -390,6 +399,10 @@ AE_PPRINT_PROP_BOOL([Multi-flavor support], 1) AE_IS_FEATURE_ENABLED([compiler-atomic-builtins]) && value=1 || value=0 AE_PPRINT_PROP_BOOL([Use compiler atomic builtins], $value) +# legacy memory barriers +AE_IS_FEATURE_ENABLED([legacy-mb]) && value=1 || value=0 +AE_PPRINT_PROP_BOOL([Emit legacy memory barriers], $value) + report_bindir="`eval eval echo $bindir`" report_libdir="`eval eval echo $libdir`" diff --git a/include/urcu/arch.h b/include/urcu/arch.h index 45ba6a2..717d79c 100644 --- a/include/urcu/arch.h +++ b/include/urcu/arch.h @@ -155,5 +155,11 @@ #error "Cannot build: unrecognized architecture, see ." #endif +#ifdef CONFIG_RCU_EMIT_LEGACY_MB +# define cmm_emit_legacy_smp_mb() cmm_smp_mb() +#else +# define cmm_emit_legacy_smp_mb() do { } while (0) +#endif + #endif /* _URCU_ARCH_H */ diff --git a/include/urcu/config.h.in b/include/urcu/config.h.in index aa1d6c9..473d7a2 100644 --- a/include/urcu/config.h.in +++ b/include/urcu/config.h.in @@ -26,6 +26,9 @@ /* Uatomic API uses atomic builtins. */ #undef CONFIG_RCU_USE_ATOMIC_BUILTINS +/* Emit legacy memory barriers? */ +#undef CONFIG_RCU_EMIT_LEGACY_MB + /* Expose multi-flavor support */ #define CONFIG_RCU_HAVE_MULTIFLAVOR 1 diff --git a/include/urcu/static/lfstack.h b/include/urcu/static/lfstack.h index 75db75e..d7e70d4 100644 --- a/include/urcu/static/lfstack.h +++ b/include/urcu/static/lfstack.h @@ -100,7 +100,7 @@ bool ___cds_lfs_empty_head(struct cds_lfs_head *head) static inline bool _cds_lfs_empty(cds_lfs_stack_ptr_t s) { - return ___cds_lfs_empty_head(CMM_LOAD_SHARED(s._s->head)); + return ___cds_lfs_empty_head(uatomic_load(&s._s->head, CMM_RELAXED)); } /* @@ -108,6 +108,8 @@ bool _cds_lfs_empty(cds_lfs_stack_ptr_t s) * * Does not require any synchronization with other push nor pop. * + * Operations before push are consistent when observed after associated pop. + * * Lock-free stack push is not subject to ABA problem, so no need to * take the RCU read-side lock. Even if "head" changes between two * uatomic_cmpxchg() invocations here (being popped, and then pushed @@ -153,7 +155,9 @@ bool _cds_lfs_push(cds_lfs_stack_ptr_t u_s, * uatomic_cmpxchg() implicit memory barrier orders earlier * stores to node before publication. */ - head = uatomic_cmpxchg(&s->head, old_head, new_head); + cmm_emit_legacy_smp_mb(); + head = uatomic_cmpxchg_mo(&s->head, old_head, new_head, + CMM_SEQ_CST, CMM_SEQ_CST); if (old_head == head) break; } @@ -165,6 +169,8 @@ bool _cds_lfs_push(cds_lfs_stack_ptr_t u_s, * * Returns NULL if stack is empty. * + * Operations after pop are consistent when observed before associated push. + * * __cds_lfs_pop needs to be synchronized using one of the following * techniques: * @@ -189,7 +195,7 @@ struct cds_lfs_node *___cds_lfs_pop(cds_lfs_stack_ptr_t u_s) struct cds_lfs_head *head, *next_head; struct cds_lfs_node *next; - head = _CMM_LOAD_SHARED(s->head); + head = uatomic_load(&s->head, CMM_CONSUME); if (___cds_lfs_empty_head(head)) return NULL; /* Empty stack */ @@ -198,12 +204,14 @@ struct cds_lfs_node *___cds_lfs_pop(cds_lfs_stack_ptr_t u_s) * memory barrier before uatomic_cmpxchg() in * cds_lfs_push. */ - cmm_smp_read_barrier_depends(); - next = _CMM_LOAD_SHARED(head->node.next); + next = uatomic_load(&head->node.next, CMM_RELAXED); next_head = caa_container_of(next, struct cds_lfs_head, node); - if (uatomic_cmpxchg(&s->head, head, next_head) == head) + if (uatomic_cmpxchg_mo(&s->head, head, next_head, + CMM_SEQ_CST, CMM_SEQ_CST) == head){ + cmm_emit_legacy_smp_mb(); return &head->node; + } /* busy-loop if head changed under us */ } } @@ -231,6 +239,7 @@ static inline struct cds_lfs_head *___cds_lfs_pop_all(cds_lfs_stack_ptr_t u_s) { struct __cds_lfs_stack *s = u_s._s; + struct cds_lfs_head *head; /* * Implicit memory barrier after uatomic_xchg() matches implicit @@ -242,7 +251,9 @@ struct cds_lfs_head *___cds_lfs_pop_all(cds_lfs_stack_ptr_t u_s) * taking care to order writes to each node prior to the full * memory barrier after this uatomic_xchg(). */ - return uatomic_xchg(&s->head, NULL); + head = uatomic_xchg_mo(&s->head, NULL, CMM_SEQ_CST); + cmm_emit_legacy_smp_mb(); + return head; } /* diff --git a/include/urcu/static/rculfqueue.h b/include/urcu/static/rculfqueue.h index c0daffe..03c4ecd 100644 --- a/include/urcu/static/rculfqueue.h +++ b/include/urcu/static/rculfqueue.h @@ -134,26 +134,29 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, * uatomic_cmpxchg() implicit memory barrier orders earlier stores to * node before publication. */ - for (;;) { struct cds_lfq_node_rcu *tail, *next; tail = rcu_dereference(q->tail); - next = uatomic_cmpxchg(&tail->next, NULL, node); + cmm_emit_legacy_smp_mb(); + next = uatomic_cmpxchg_mo(&tail->next, NULL, node, + CMM_SEQ_CST, CMM_SEQ_CST); if (next == NULL) { /* * Tail was at the end of queue, we successfully * appended to it. Now move tail (another * enqueue might beat us to it, that's fine). */ - (void) uatomic_cmpxchg(&q->tail, tail, node); + (void) uatomic_cmpxchg_mo(&q->tail, tail, node, + CMM_SEQ_CST, CMM_SEQ_CST); return; } else { /* * Failure to append to current tail. * Help moving tail further and retry. */ - (void) uatomic_cmpxchg(&q->tail, tail, next); + (void) uatomic_cmpxchg_mo(&q->tail, tail, next, + CMM_SEQ_CST, CMM_SEQ_CST); continue; } } @@ -197,7 +200,8 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) enqueue_dummy(q); next = rcu_dereference(head->next); } - if (uatomic_cmpxchg(&q->head, head, next) != head) + if (uatomic_cmpxchg_mo(&q->head, head, next, + CMM_SEQ_CST, CMM_SEQ_CST) != head) continue; /* Concurrently pushed. */ if (head->dummy) { /* Free dummy after grace period. */ diff --git a/include/urcu/static/rculfstack.h b/include/urcu/static/rculfstack.h index 5bb06b1..b44b9e2 100644 --- a/include/urcu/static/rculfstack.h +++ b/include/urcu/static/rculfstack.h @@ -69,7 +69,9 @@ int _cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, * uatomic_cmpxchg() implicit memory barrier orders earlier * stores to node before publication. */ - head = uatomic_cmpxchg(&s->head, old_head, node); + cmm_emit_legacy_smp_mb(); + head = uatomic_cmpxchg_mo(&s->head, old_head, node, + CMM_SEQ_CST, CMM_SEQ_CST); if (old_head == head) break; } @@ -94,7 +96,9 @@ _cds_lfs_pop_rcu(struct cds_lfs_stack_rcu *s) if (head) { struct cds_lfs_node_rcu *next = rcu_dereference(head->next); - if (uatomic_cmpxchg(&s->head, head, next) == head) { + if (uatomic_cmpxchg_mo(&s->head, head, next, + CMM_SEQ_CST, CMM_SEQ_CST) == head) { + cmm_emit_legacy_smp_mb(); return head; } else { /* Concurrent modification. Retry. */ diff --git a/include/urcu/static/wfcqueue.h b/include/urcu/static/wfcqueue.h index 8c4729c..26741ae 100644 --- a/include/urcu/static/wfcqueue.h +++ b/include/urcu/static/wfcqueue.h @@ -77,6 +77,11 @@ static inline void _cds_wfcq_node_init(struct cds_wfcq_node *node) node->next = NULL; } +static inline void _cds_wfcq_node_init_atomic(struct cds_wfcq_node *node) +{ + uatomic_store(&node->next, NULL, CMM_RELAXED); +} + /* * cds_wfcq_init: initialize wait-free queue (with lock). Pair with * cds_wfcq_destroy(). @@ -139,8 +144,8 @@ static inline bool _cds_wfcq_empty(cds_wfcq_head_ptr_t u_head, * common case to ensure that dequeuers do not frequently access * enqueuer's tail->p cache line. */ - return CMM_LOAD_SHARED(head->node.next) == NULL - && CMM_LOAD_SHARED(tail->p) == &head->node; + return uatomic_load(&head->node.next, CMM_CONSUME) == NULL + && uatomic_load(&tail->p, CMM_CONSUME) == &head->node; } static inline void _cds_wfcq_dequeue_lock(struct cds_wfcq_head *head, @@ -174,7 +179,7 @@ static inline bool ___cds_wfcq_append(cds_wfcq_head_ptr_t u_head, * stores to data structure containing node and setting * node->next to NULL before publication. */ - old_tail = uatomic_xchg(&tail->p, new_tail); + old_tail = uatomic_xchg_mo(&tail->p, new_tail, CMM_SEQ_CST); /* * Implicit memory barrier after uatomic_xchg() orders store to @@ -185,7 +190,8 @@ static inline bool ___cds_wfcq_append(cds_wfcq_head_ptr_t u_head, * store will append "node" to the queue from a dequeuer * perspective. */ - CMM_STORE_SHARED(old_tail->next, new_head); + uatomic_store(&old_tail->next, new_head, CMM_RELEASE); + /* * Return false if queue was empty prior to adding the node, * else return true. @@ -196,8 +202,8 @@ static inline bool ___cds_wfcq_append(cds_wfcq_head_ptr_t u_head, /* * cds_wfcq_enqueue: enqueue a node into a wait-free queue. * - * Issues a full memory barrier before enqueue. No mutual exclusion is - * required. + * Operations prior to enqueue are consistant with respect to dequeuing or + * splicing and iterating. * * Returns false if the queue was empty prior to adding the node. * Returns true otherwise. @@ -206,6 +212,8 @@ static inline bool _cds_wfcq_enqueue(cds_wfcq_head_ptr_t head, struct cds_wfcq_tail *tail, struct cds_wfcq_node *new_tail) { + cmm_emit_legacy_smp_mb(); + return ___cds_wfcq_append(head, tail, new_tail, new_tail); } @@ -256,8 +264,10 @@ ___cds_wfcq_node_sync_next(struct cds_wfcq_node *node, int blocking) /* * Adaptative busy-looping waiting for enqueuer to complete enqueue. + * + * Load node.next before loading node's content */ - while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + while ((next = uatomic_load(&node->next, CMM_CONSUME)) == NULL) { if (___cds_wfcq_busy_wait(&attempt, blocking)) return CDS_WFCQ_WOULDBLOCK; } @@ -276,8 +286,7 @@ ___cds_wfcq_first(cds_wfcq_head_ptr_t u_head, if (_cds_wfcq_empty(__cds_wfcq_head_cast(head), tail)) return NULL; node = ___cds_wfcq_node_sync_next(&head->node, blocking); - /* Load head->node.next before loading node's content */ - cmm_smp_read_barrier_depends(); + return node; } @@ -329,16 +338,15 @@ ___cds_wfcq_next(cds_wfcq_head_ptr_t head __attribute__((unused)), * out if we reached the end of the queue, we first check * node->next as a common case to ensure that iteration on nodes * do not frequently access enqueuer's tail->p cache line. + * + * Load node->next before loading next's content */ - if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { - /* Load node->next before tail->p */ - cmm_smp_rmb(); - if (CMM_LOAD_SHARED(tail->p) == node) + if ((next = uatomic_load(&node->next, CMM_CONSUME)) == NULL) { + if (uatomic_load(&tail->p, CMM_RELAXED) == node) return NULL; next = ___cds_wfcq_node_sync_next(node, blocking); } - /* Load node->next before loading next's content */ - cmm_smp_read_barrier_depends(); + return next; } @@ -400,7 +408,7 @@ ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head, return CDS_WFCQ_WOULDBLOCK; } - if ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + if ((next = uatomic_load(&node->next, CMM_CONSUME)) == NULL) { /* * @node is probably the only node in the queue. * Try to move the tail to &q->head. @@ -408,17 +416,13 @@ ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head, * NULL if the cmpxchg succeeds. Should the * cmpxchg fail due to a concurrent enqueue, the * q->head.next will be set to the next node. - * The implicit memory barrier before - * uatomic_cmpxchg() orders load node->next - * before loading q->tail. - * The implicit memory barrier before uatomic_cmpxchg - * orders load q->head.next before loading node's - * content. */ - _cds_wfcq_node_init(&head->node); - if (uatomic_cmpxchg(&tail->p, node, &head->node) == node) { + _cds_wfcq_node_init_atomic(&head->node); + if (uatomic_cmpxchg_mo(&tail->p, node, &head->node, + CMM_SEQ_CST, CMM_SEQ_CST) == node) { if (state) *state |= CDS_WFCQ_STATE_LAST; + cmm_emit_legacy_smp_mb(); return node; } next = ___cds_wfcq_node_sync_next(node, blocking); @@ -428,7 +432,7 @@ ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head, * (currently NULL) back to its original value. */ if (!blocking && next == CDS_WFCQ_WOULDBLOCK) { - head->node.next = node; + uatomic_store(&head->node.next, node, CMM_RELAXED); return CDS_WFCQ_WOULDBLOCK; } } @@ -436,10 +440,9 @@ ___cds_wfcq_dequeue_with_state(cds_wfcq_head_ptr_t u_head, /* * Move queue head forward. */ - head->node.next = next; + uatomic_store(&head->node.next, next, CMM_RELAXED); + cmm_emit_legacy_smp_mb(); - /* Load q->head.next before loading node's content */ - cmm_smp_read_barrier_depends(); return node; } @@ -501,6 +504,8 @@ ___cds_wfcq_dequeue_nonblocking(cds_wfcq_head_ptr_t head, /* * __cds_wfcq_splice: enqueue all src_q nodes at the end of dest_q. * + * Operations after splice are consistant with respect to enqueue. + * * Dequeue all nodes from src_q. * dest_q must be already initialized. * Mutual exclusion for src_q should be ensured by the caller as @@ -534,10 +539,10 @@ ___cds_wfcq_splice( * uatomic_xchg, as well as tail pointer vs head node * address. */ - head = uatomic_xchg(&src_q_head->node.next, NULL); + head = uatomic_xchg_mo(&src_q_head->node.next, NULL, CMM_SEQ_CST); if (head) break; /* non-empty */ - if (CMM_LOAD_SHARED(src_q_tail->p) == &src_q_head->node) + if (uatomic_load(&src_q_tail->p, CMM_CONSUME) == &src_q_head->node) return CDS_WFCQ_RET_SRC_EMPTY; if (___cds_wfcq_busy_wait(&attempt, blocking)) return CDS_WFCQ_RET_WOULDBLOCK; @@ -549,7 +554,8 @@ ___cds_wfcq_splice( * concurrent enqueue on src_q, which exchanges the tail before * updating the previous tail's next pointer. */ - tail = uatomic_xchg(&src_q_tail->p, &src_q_head->node); + cmm_emit_legacy_smp_mb(); + tail = uatomic_xchg_mo(&src_q_tail->p, &src_q_head->node, CMM_SEQ_CST); /* * Append the spliced content of src_q into dest_q. Does not diff --git a/include/urcu/static/wfqueue.h b/include/urcu/static/wfqueue.h index 731b43f..0cb7b1b 100644 --- a/include/urcu/static/wfqueue.h +++ b/include/urcu/static/wfqueue.h @@ -67,13 +67,14 @@ static inline void _cds_wfq_enqueue(struct cds_wfq_queue *q, * structure containing node and setting node->next to NULL before * publication. */ - old_tail = uatomic_xchg(&q->tail, &node->next); + cmm_emit_legacy_smp_mb(); + old_tail = uatomic_xchg_mo(&q->tail, &node->next, CMM_SEQ_CST); /* * At this point, dequeuers see a NULL old_tail->next, which indicates * that the queue is being appended to. The following store will append * "node" to the queue from a dequeuer perspective. */ - CMM_STORE_SHARED(*old_tail, node); + uatomic_store(old_tail, node, CMM_RELEASE); } /* @@ -88,7 +89,7 @@ ___cds_wfq_node_sync_next(struct cds_wfq_node *node) /* * Adaptative busy-looping waiting for enqueuer to complete enqueue. */ - while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + while ((next = uatomic_load(&node->next, CMM_CONSUME)) == NULL) { if (++attempt >= WFQ_ADAPT_ATTEMPTS) { (void) poll(NULL, 0, WFQ_WAIT); /* Wait for 10ms */ attempt = 0; @@ -115,7 +116,7 @@ ___cds_wfq_dequeue_blocking(struct cds_wfq_queue *q) /* * Queue is empty if it only contains the dummy node. */ - if (q->head == &q->dummy && CMM_LOAD_SHARED(q->tail) == &q->dummy.next) + if (q->head == &q->dummy && uatomic_load(&q->tail, CMM_CONSUME) == &q->dummy.next) return NULL; node = q->head; diff --git a/include/urcu/static/wfstack.h b/include/urcu/static/wfstack.h index 8c5648e..c46e97d 100644 --- a/include/urcu/static/wfstack.h +++ b/include/urcu/static/wfstack.h @@ -110,7 +110,7 @@ static inline bool _cds_wfs_empty(cds_wfs_stack_ptr_t u_stack) { struct __cds_wfs_stack *s = u_stack._s; - return ___cds_wfs_end(CMM_LOAD_SHARED(s->head)); + return ___cds_wfs_end(uatomic_load(&s->head, CMM_RELAXED)); } /* @@ -119,6 +119,8 @@ static inline bool _cds_wfs_empty(cds_wfs_stack_ptr_t u_stack) * Issues a full memory barrier before push. No mutual exclusion is * required. * + * Operations before push are consistent when observed after associated pop. + * * Returns 0 if the stack was empty prior to adding the node. * Returns non-zero otherwise. */ @@ -134,12 +136,13 @@ int _cds_wfs_push(cds_wfs_stack_ptr_t u_stack, struct cds_wfs_node *node) * uatomic_xchg() implicit memory barrier orders earlier stores * to node (setting it to NULL) before publication. */ - old_head = uatomic_xchg(&s->head, new_head); + cmm_emit_legacy_smp_mb(); + old_head = uatomic_xchg_mo(&s->head, new_head, CMM_SEQ_CST); /* * At this point, dequeuers see a NULL node->next, they should * busy-wait until node->next is set to old_head. */ - CMM_STORE_SHARED(node->next, &old_head->node); + uatomic_store(&node->next, &old_head->node, CMM_RELEASE); return !___cds_wfs_end(old_head); } @@ -155,7 +158,7 @@ ___cds_wfs_node_sync_next(struct cds_wfs_node *node, int blocking) /* * Adaptative busy-looping waiting for push to complete. */ - while ((next = CMM_LOAD_SHARED(node->next)) == NULL) { + while ((next = uatomic_load(&node->next, CMM_CONSUME)) == NULL) { if (!blocking) return CDS_WFS_WOULDBLOCK; if (++attempt >= CDS_WFS_ADAPT_ATTEMPTS) { @@ -180,7 +183,7 @@ ___cds_wfs_pop(cds_wfs_stack_ptr_t u_stack, int *state, int blocking) if (state) *state = 0; for (;;) { - head = CMM_LOAD_SHARED(s->head); + head = uatomic_load(&s->head, CMM_CONSUME); if (___cds_wfs_end(head)) { return NULL; } @@ -189,9 +192,11 @@ ___cds_wfs_pop(cds_wfs_stack_ptr_t u_stack, int *state, int blocking) return CDS_WFS_WOULDBLOCK; } new_head = caa_container_of(next, struct cds_wfs_head, node); - if (uatomic_cmpxchg(&s->head, head, new_head) == head) { + if (uatomic_cmpxchg_mo(&s->head, head, new_head, + CMM_SEQ_CST, CMM_SEQ_CST) == head) { if (state && ___cds_wfs_end(new_head)) *state |= CDS_WFS_STATE_LAST; + cmm_emit_legacy_smp_mb(); return &head->node; } if (!blocking) { @@ -206,6 +211,8 @@ ___cds_wfs_pop(cds_wfs_stack_ptr_t u_stack, int *state, int blocking) * * Returns NULL if stack is empty. * + * Operations after pop push are consistent when observed before associated push. + * * __cds_wfs_pop_blocking needs to be synchronized using one of the * following techniques: * @@ -264,6 +271,8 @@ ___cds_wfs_pop_nonblocking(cds_wfs_stack_ptr_t u_stack) /* * __cds_wfs_pop_all: pop all nodes from a stack. * + * Operations after pop push are consistent when observed before associated push. + * * __cds_wfs_pop_all does not require any synchronization with other * push, nor with other __cds_wfs_pop_all, but requires synchronization * matching the technique used to synchronize __cds_wfs_pop_blocking: @@ -295,7 +304,8 @@ ___cds_wfs_pop_all(cds_wfs_stack_ptr_t u_stack) * taking care to order writes to each node prior to the full * memory barrier after this uatomic_xchg(). */ - head = uatomic_xchg(&s->head, CDS_WFS_END); + head = uatomic_xchg_mo(&s->head, CDS_WFS_END, CMM_SEQ_CST); + cmm_emit_legacy_smp_mb(); if (___cds_wfs_end(head)) return NULL; return head; -- 2.34.1