From 6e5f88cf94a225b155719046d87bfd32ba47e06a Mon Sep 17 00:00:00 2001 From: Mathieu Desnoyers Date: Sat, 3 Sep 2011 09:15:14 -0400 Subject: [PATCH] Revert "CDS API: removal of rcu_read lock/unlock dep, removal of call_rcu argument from init" This reverts commit 7618919ae496bda84a2efa4f2ad0abe569892a9e. Rationale: I thought about it some more, and had discussions with various people, and there are a few reasons to go for a scheme where rcu read lock should be taken by the caller, and to pass call_rcu as a parameter to the data structure init function: A) The advantage, as Paul E. McKenney pointed out, is that one single .so is enough to support all RCU flavors. Very convenient for external data structure containers. B) It clearly documents where rcu read-side locks are needed, so the user keep control and in-depth understanding of their read-side locks. C) When multiple API functions that require RCU read-side lock to be held (sometimes even the same lock) throughout a sequence of API calls, we have no choice but to let the caller hold the read-side lock. D) Due to support of multiple nesting of rcu read-side lock, any "improvement" we could get by releasing the read-side lock in retry loops would vanish in the cases where we are called within nested C.S.. E) If a library uses synchronize_rcu, this should be clearly documented, and even frowned upon, because this involves important limitations on the design of the caller, and important performance hit. There are usually ways to reach the same result through use of call_rcu, which should really be used thoroughout these libraries. F) It clearly documents when a data structure needs to use call_rcu internally. G) Some very early benchmark results show that there is indeed not much performance gain to achieve by inlining call_rcu, even if it is a version with a cache for the "call_rcu structure" lookup (per-cpu/per-thread/global). So passing it as a parameter to the data structure init function should be fine, even in cases where it is called very often. H) For use-cases where applications would like to use more than one RCU flavor concurrently (which is now supported), leaving management of RCU read-side C.S. to the reader allows the application to take more than one RCU read-side lock across API calls. It also lets the application specify its own call_rcu function that could handle more than one RCU flavor. So for all these reasons, I reverting back to the API we have in our last release (0.6.4). Signed-off-by: Mathieu Desnoyers --- rculfqueue.c | 6 ++++-- tests/test_urcu_lfq.c | 9 ++++++++- tests/test_urcu_lfs.c | 2 ++ urcu/rculfqueue.h | 10 +++++++--- urcu/rculfstack.h | 2 +- urcu/static/rculfqueue.h | 25 +++++++++---------------- urcu/static/rculfstack.h | 6 +----- 7 files changed, 32 insertions(+), 28 deletions(-) diff --git a/rculfqueue.c b/rculfqueue.c index c579e75..09b8587 100644 --- a/rculfqueue.c +++ b/rculfqueue.c @@ -44,9 +44,11 @@ void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) _cds_lfq_node_init_rcu(node); } -void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) +void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))) { - _cds_lfq_init_rcu(q); + _cds_lfq_init_rcu(q, queue_call_rcu); } int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) diff --git a/tests/test_urcu_lfq.c b/tests/test_urcu_lfq.c index 82a90b0..b61a7d4 100644 --- a/tests/test_urcu_lfq.c +++ b/tests/test_urcu_lfq.c @@ -180,7 +180,9 @@ void *thr_enqueuer(void *_count) if (!node) goto fail; cds_lfq_node_init_rcu(node); + rcu_read_lock(); cds_lfq_enqueue_rcu(&q, node); + rcu_read_unlock(); nr_successful_enqueues++; if (unlikely(wdelay)) @@ -228,7 +230,10 @@ void *thr_dequeuer(void *_count) for (;;) { struct cds_lfq_node_rcu *node; + rcu_read_lock(); node = cds_lfq_dequeue_rcu(&q); + rcu_read_unlock(); + if (node) { defer_rcu(free, node); nr_successful_dequeues++; @@ -257,7 +262,9 @@ void test_end(struct cds_lfq_queue_rcu *q, unsigned long long *nr_dequeues) struct cds_lfq_node_rcu *node; do { + rcu_read_lock(); node = cds_lfq_dequeue_rcu(q); + rcu_read_unlock(); if (node) { free(node); /* no more concurrent access */ (*nr_dequeues)++; @@ -356,7 +363,7 @@ int main(int argc, char **argv) tid_dequeuer = malloc(sizeof(*tid_dequeuer) * nr_dequeuers); count_enqueuer = malloc(2 * sizeof(*count_enqueuer) * nr_enqueuers); count_dequeuer = malloc(2 * sizeof(*count_dequeuer) * nr_dequeuers); - cds_lfq_init_rcu(&q); + cds_lfq_init_rcu(&q, call_rcu); next_aff = 0; diff --git a/tests/test_urcu_lfs.c b/tests/test_urcu_lfs.c index a7f9af3..252454d 100644 --- a/tests/test_urcu_lfs.c +++ b/tests/test_urcu_lfs.c @@ -229,7 +229,9 @@ void *thr_dequeuer(void *_count) for (;;) { struct cds_lfs_node_rcu *node; + rcu_read_lock(); node = cds_lfs_pop_rcu(&s); + rcu_read_unlock(); if (node) { defer_rcu(free, node); nr_successful_dequeues++; diff --git a/urcu/rculfqueue.h b/urcu/rculfqueue.h index 1582694..e1d64f1 100644 --- a/urcu/rculfqueue.h +++ b/urcu/rculfqueue.h @@ -39,6 +39,8 @@ struct cds_lfq_node_rcu { struct cds_lfq_queue_rcu { struct cds_lfq_node_rcu *head, *tail; + void (*queue_call_rcu)(struct rcu_head *head, + void (*func)(struct rcu_head *head)); }; #ifdef _LGPL_SOURCE @@ -78,7 +80,9 @@ struct cds_lfq_queue_rcu { #else /* !_LGPL_SOURCE */ extern void cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node); -extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q); +extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))); /* * The queue should be emptied before calling destroy. * @@ -87,13 +91,13 @@ extern void cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q); extern int cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q); /* - * Acts as a RCU reader. + * Should be called under rcu read lock critical section. */ extern void cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, struct cds_lfq_node_rcu *node); /* - * Acts as a RCU reader. + * Should be called under rcu read lock critical section. * * The caller must wait for a grace period to pass before freeing the returned * node or modifying the cds_lfq_node_rcu structure. diff --git a/urcu/rculfstack.h b/urcu/rculfstack.h index cf00daf..7d359d4 100644 --- a/urcu/rculfstack.h +++ b/urcu/rculfstack.h @@ -72,7 +72,7 @@ extern int cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, struct cds_lfs_node_rcu *node); /* - * Acts as a RCU reader. + * Should be called under rcu read lock critical section. * * The caller must wait for a grace period to pass before freeing the returned * node or modifying the cds_lfs_node_rcu structure. diff --git a/urcu/static/rculfqueue.h b/urcu/static/rculfqueue.h index 99335c4..fea6110 100644 --- a/urcu/static/rculfqueue.h +++ b/urcu/static/rculfqueue.h @@ -88,7 +88,7 @@ void rcu_free_dummy(struct cds_lfq_node_rcu *node) assert(node->dummy); dummy = caa_container_of(node, struct cds_lfq_node_rcu_dummy, parent); - call_rcu(&dummy->head, free_dummy_cb); + dummy->q->queue_call_rcu(&dummy->head, free_dummy_cb); } static inline @@ -109,10 +109,13 @@ void _cds_lfq_node_init_rcu(struct cds_lfq_node_rcu *node) } static inline -void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q) +void _cds_lfq_init_rcu(struct cds_lfq_queue_rcu *q, + void queue_call_rcu(struct rcu_head *head, + void (*func)(struct rcu_head *head))) { q->tail = make_dummy(q, NULL); q->head = q->tail; + q->queue_call_rcu = queue_call_rcu; } /* @@ -133,7 +136,7 @@ int _cds_lfq_destroy_rcu(struct cds_lfq_queue_rcu *q) } /* - * Acts as a RCU reader. + * Should be called under rcu read lock critical section. */ static inline void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, @@ -147,7 +150,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, for (;;) { struct cds_lfq_node_rcu *tail, *next; - rcu_read_lock(); tail = rcu_dereference(q->tail); next = uatomic_cmpxchg(&tail->next, NULL, node); if (next == NULL) { @@ -157,7 +159,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, * enqueue might beat us to it, that's fine). */ (void) uatomic_cmpxchg(&q->tail, tail, node); - rcu_read_unlock(); return; } else { /* @@ -165,7 +166,6 @@ void _cds_lfq_enqueue_rcu(struct cds_lfq_queue_rcu *q, * Help moving tail further and retry. */ (void) uatomic_cmpxchg(&q->tail, tail, next); - rcu_read_unlock(); continue; } } @@ -182,7 +182,7 @@ void enqueue_dummy(struct cds_lfq_queue_rcu *q) } /* - * Acts as a RCU reader. + * Should be called under rcu read lock critical section. * * The caller must wait for a grace period to pass before freeing the returned * node or modifying the cds_lfq_node_rcu structure. @@ -194,13 +194,10 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) for (;;) { struct cds_lfq_node_rcu *head, *next; - rcu_read_lock(); head = rcu_dereference(q->head); next = rcu_dereference(head->next); - if (head->dummy && next == NULL) { - rcu_read_unlock(); + if (head->dummy && next == NULL) return NULL; /* empty */ - } /* * We never, ever allow dequeue to get to a state where * the queue is empty (we need at least one node in the @@ -212,17 +209,13 @@ struct cds_lfq_node_rcu *_cds_lfq_dequeue_rcu(struct cds_lfq_queue_rcu *q) enqueue_dummy(q); next = rcu_dereference(head->next); } - if (uatomic_cmpxchg(&q->head, head, next) != head) { - rcu_read_unlock(); + if (uatomic_cmpxchg(&q->head, head, next) != head) continue; /* Concurrently pushed. */ - } if (head->dummy) { /* Free dummy after grace period. */ rcu_free_dummy(head); - rcu_read_unlock(); continue; /* try again */ } - rcu_read_unlock(); return head; } } diff --git a/urcu/static/rculfstack.h b/urcu/static/rculfstack.h index 1df121b..4259e0c 100644 --- a/urcu/static/rculfstack.h +++ b/urcu/static/rculfstack.h @@ -90,7 +90,7 @@ int _cds_lfs_push_rcu(struct cds_lfs_stack_rcu *s, } /* - * Acts as a RCU reader. + * Should be called under rcu read-side lock. * * The caller must wait for a grace period to pass before freeing the returned * node or modifying the cds_lfs_node_rcu structure. @@ -103,22 +103,18 @@ _cds_lfs_pop_rcu(struct cds_lfs_stack_rcu *s) for (;;) { struct cds_lfs_node_rcu *head; - rcu_read_lock(); head = rcu_dereference(s->head); if (head) { struct cds_lfs_node_rcu *next = rcu_dereference(head->next); if (uatomic_cmpxchg(&s->head, head, next) == head) { - rcu_read_unlock(); return head; } else { /* Concurrent modification. Retry. */ - rcu_read_unlock(); continue; } } else { /* Empty stack */ - rcu_read_unlock(); return NULL; } } -- 2.34.1