X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=rculfhash.c;h=8dd5a7327ad81be4da1b3dddace433b4fd8aadc4;hb=e8de508ea8e9baf08e5e5ec5b4d04cf947dfd529;hp=6cc606d9e0dcbea64a7af89327c3874c636caa4c;hpb=bce63dfd0a2306452c9e39f5df01789e77f3f44a;p=urcu.git diff --git a/rculfhash.c b/rculfhash.c index 6cc606d..8dd5a73 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -221,6 +221,13 @@ struct cds_lfht { cds_lfht_compare_fct compare_fct; unsigned long hash_seed; int flags; + /* + * We need to put the work threads offline (QSBR) when taking this + * mutex, because we use synchronize_rcu within this mutex critical + * section, which waits on read-side critical sections, and could + * therefore cause grace-period deadlock if we hold off RCU G.P. + * completion. + */ pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */ unsigned int in_progress_resize, in_progress_destroy; void (*cds_lfht_call_rcu)(struct rcu_head *head, @@ -228,6 +235,8 @@ struct cds_lfht { void (*cds_lfht_synchronize_rcu)(void); void (*cds_lfht_rcu_read_lock)(void); void (*cds_lfht_rcu_read_unlock)(void); + void (*cds_lfht_rcu_thread_offline)(void); + void (*cds_lfht_rcu_thread_online)(void); unsigned long count; /* global approximate item count */ struct ht_items_count *percpu_count; /* per-cpu item count */ }; @@ -743,10 +752,8 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, iter = rcu_dereference(iter_prev->p.next); assert(iter_prev->p.reverse_hash <= node->p.reverse_hash); for (;;) { - /* TODO: check if removed */ if (unlikely(!clear_flag(iter))) goto insert; - /* TODO: check if removed */ if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash)) goto insert; next = rcu_dereference(clear_flag(iter)->p.next); @@ -874,11 +881,17 @@ void init_table_hash(struct cds_lfht *ht, unsigned long i, } } +/* + * Holding RCU read lock to protect _cds_lfht_add against memory + * reclaim that could be performed by other call_rcu worker threads (ABA + * problem). + */ static void init_table_link(struct cds_lfht *ht, unsigned long i, unsigned long len) { unsigned long j; + ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); for (j = 0; j < len; j++) { struct cds_lfht_node *new_node = @@ -892,13 +905,9 @@ void init_table_link(struct cds_lfht *ht, unsigned long i, unsigned long len) break; } ht->cds_lfht_rcu_read_unlock(); + ht->cds_lfht_rcu_thread_offline(); } -/* - * Holding RCU read lock to protect _cds_lfht_add against memory - * reclaim that could be performed by other call_rcu worker threads (ABA - * problem). - */ static void init_table(struct cds_lfht *ht, unsigned long first_order, unsigned long len_order) @@ -937,11 +946,32 @@ void init_table(struct cds_lfht *ht, } } +/* + * Holding RCU read lock to protect _cds_lfht_remove against memory + * reclaim that could be performed by other call_rcu worker threads (ABA + * problem). + * For a single level, we logically remove and garbage collect each node. + * + * As a design choice, we perform logical removal and garbage collection on a + * node-per-node basis to simplify this algorithm. We also assume keeping good + * cache locality of the operation would overweight possible performance gain + * that could be achieved by batching garbage collection for multiple levels. + * However, this would have to be justified by benchmarks. + * + * Concurrent removal and add operations are helping us perform garbage + * collection of logically removed nodes. We guarantee that all logically + * removed nodes have been garbage-collected (unlinked) before call_rcu is + * invoked to free a hole level of dummy nodes (after a grace period). + * + * Logical removal and garbage collection can therefore be done in batch or on a + * node-per-node basis, as long as the guarantee above holds. + */ static void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) { unsigned long j; + ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); for (j = 0; j < len; j++) { struct cds_lfht_node *fini_node = @@ -957,13 +987,9 @@ void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) break; } ht->cds_lfht_rcu_read_unlock(); + ht->cds_lfht_rcu_thread_offline(); } -/* - * Holding RCU read lock to protect _cds_lfht_remove against memory - * reclaim that could be performed by other call_rcu worker threads (ABA - * problem). - */ static void fini_table(struct cds_lfht *ht, unsigned long first_order, unsigned long len_order) @@ -1006,7 +1032,9 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, void (*func)(struct rcu_head *head)), void (*cds_lfht_synchronize_rcu)(void), void (*cds_lfht_rcu_read_lock)(void), - void (*cds_lfht_rcu_read_unlock)(void)) + void (*cds_lfht_rcu_read_unlock)(void), + void (*cds_lfht_rcu_thread_offline)(void), + void (*cds_lfht_rcu_thread_online)(void)) { struct cds_lfht *ht; unsigned long order; @@ -1022,14 +1050,18 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, ht->cds_lfht_synchronize_rcu = cds_lfht_synchronize_rcu; ht->cds_lfht_rcu_read_lock = cds_lfht_rcu_read_lock; ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock; + ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline; + ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online; ht->percpu_count = alloc_per_cpu_items_count(); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); order = get_count_order_ulong(max(init_size, MIN_TABLE_SIZE)) + 1; ht->flags = flags; + ht->cds_lfht_rcu_thread_offline(); pthread_mutex_lock(&ht->resize_mutex); init_table(ht, 0, order); pthread_mutex_unlock(&ht->resize_mutex); + ht->cds_lfht_rcu_thread_online(); return ht; } @@ -1314,9 +1346,11 @@ void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { resize_target_update_count(ht, new_size); CMM_STORE_SHARED(ht->t.resize_initiated, 1); + ht->cds_lfht_rcu_thread_offline(); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); + ht->cds_lfht_rcu_thread_online(); } static @@ -1326,9 +1360,11 @@ void do_resize_cb(struct rcu_head *head) caa_container_of(head, struct rcu_resize_work, head); struct cds_lfht *ht = work->ht; + ht->cds_lfht_rcu_thread_offline(); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); + ht->cds_lfht_rcu_thread_online(); poison_free(work); cmm_smp_mb(); /* finish resize before decrement */ uatomic_dec(&ht->in_progress_resize);