X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=rculfhash.c;h=1288c78d585f983764da0c96eefb6df4f5712cc3;hp=6cc606d9e0dcbea64a7af89327c3874c636caa4c;hb=4d676753007f54e45b7e0307244ffc876a953874;hpb=bce63dfd0a2306452c9e39f5df01789e77f3f44a diff --git a/rculfhash.c b/rculfhash.c index 6cc606d..1288c78 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -167,12 +167,7 @@ #define CHAIN_LEN_RESIZE_THRESHOLD 3 /* - * Define the minimum table size. Protects against hash table resize overload - * when too many entries are added quickly before the resize can complete. - * This is especially the case if the table could be shrinked to a size of 1. - * TODO: we might want to make the add/remove operations help the resize to - * add or remove dummy nodes when a resize is ongoing to ensure upper-bound on - * chain length. + * Define the minimum table size. */ #define MIN_TABLE_SIZE 128 @@ -221,6 +216,13 @@ struct cds_lfht { cds_lfht_compare_fct compare_fct; unsigned long hash_seed; int flags; + /* + * We need to put the work threads offline (QSBR) when taking this + * mutex, because we use synchronize_rcu within this mutex critical + * section, which waits on read-side critical sections, and could + * therefore cause grace-period deadlock if we hold off RCU G.P. + * completion. + */ pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */ unsigned int in_progress_resize, in_progress_destroy; void (*cds_lfht_call_rcu)(struct rcu_head *head, @@ -228,6 +230,8 @@ struct cds_lfht { void (*cds_lfht_synchronize_rcu)(void); void (*cds_lfht_rcu_read_lock)(void); void (*cds_lfht_rcu_read_unlock)(void); + void (*cds_lfht_rcu_thread_offline)(void); + void (*cds_lfht_rcu_thread_online)(void); unsigned long count; /* global approximate item count */ struct ht_items_count *percpu_count; /* per-cpu item count */ }; @@ -237,6 +241,12 @@ struct rcu_resize_work { struct cds_lfht *ht; }; +static +struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, + unsigned long size, + struct cds_lfht_node *node, + int unique, int dummy); + /* * Algorithm to reverse bits in a word by lookup table, extended to * 64-bit words. @@ -669,7 +679,7 @@ void cds_lfht_free_level(struct rcu_head *head) * Remove all logically deleted nodes from a bucket up to a certain node key. */ static -void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node) +int _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node) { struct cds_lfht_node *iter_prev, *iter, *next, *new_next; @@ -681,6 +691,15 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node iter_prev = dummy; /* We can always skip the dummy node initially */ iter = rcu_dereference(iter_prev->p.next); + if (unlikely(iter == NULL)) { + /* + * We are executing concurrently with a hash table + * expand, so we see a dummy node with NULL next value. + * Help expand by linking this node into the list and + * retry. + */ + return 1; + } assert(iter_prev->p.reverse_hash <= node->p.reverse_hash); /* * We should never be called with dummy (start of chain) @@ -691,9 +710,9 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node assert(dummy != node); for (;;) { if (unlikely(!clear_flag(iter))) - return; + return 0; if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash)) - return; + return 0; next = rcu_dereference(clear_flag(iter)->p.next); if (likely(is_removed(next))) break; @@ -707,6 +726,7 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node new_next = clear_flag(next); (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next); } + return 0; } static @@ -741,12 +761,27 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, iter_prev = (struct cds_lfht_node *) lookup; /* We can always skip the dummy node initially */ iter = rcu_dereference(iter_prev->p.next); + if (unlikely(iter == NULL)) { + /* + * We are executing concurrently with a hash table + * expand, so we see a dummy node with NULL next value. + * Help expand by linking this node into the list and + * retry. + */ + (void) _cds_lfht_add(ht, size >> 1, iter_prev, 0, 1); + continue; /* retry */ + } assert(iter_prev->p.reverse_hash <= node->p.reverse_hash); for (;;) { - /* TODO: check if removed */ + /* + * When adding a dummy node, we allow concurrent + * add/removal to help. If we find the dummy node in + * place, skip its insertion. + */ + if (unlikely(dummy && clear_flag(iter) == node)) + return node; if (unlikely(!clear_flag(iter))) goto insert; - /* TODO: check if removed */ if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash)) goto insert; next = rcu_dereference(clear_flag(iter)->p.next); @@ -798,7 +833,11 @@ gc_end: order = get_count_order_ulong(index + 1); lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))]; dummy_node = (struct cds_lfht_node *) lookup; - _cds_lfht_gc_bucket(dummy_node, node); + if (_cds_lfht_gc_bucket(dummy_node, node)) { + /* Help expand */ + (void) _cds_lfht_add(ht, size >> 1, dummy_node, 0, 1); + goto gc_end; /* retry */ + } return node; } @@ -836,13 +875,18 @@ int _cds_lfht_remove(struct cds_lfht *ht, unsigned long size, * the node, and remove it (along with any other logically removed node) * if found. */ +gc_retry: hash = bit_reverse_ulong(node->p.reverse_hash); assert(size > 0); index = hash & (size - 1); order = get_count_order_ulong(index + 1); lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))]; dummy = (struct cds_lfht_node *) lookup; - _cds_lfht_gc_bucket(dummy, node); + if (_cds_lfht_gc_bucket(dummy, node)) { + /* Help expand */ + (void) _cds_lfht_add(ht, size >> 1, dummy, 0, 1); + goto gc_retry; /* retry */ + } end: /* * Only the flagging action indicated that we (and no other) @@ -874,11 +918,17 @@ void init_table_hash(struct cds_lfht *ht, unsigned long i, } } +/* + * Holding RCU read lock to protect _cds_lfht_add against memory + * reclaim that could be performed by other call_rcu worker threads (ABA + * problem). + */ static void init_table_link(struct cds_lfht *ht, unsigned long i, unsigned long len) { unsigned long j; + ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); for (j = 0; j < len; j++) { struct cds_lfht_node *new_node = @@ -892,13 +942,9 @@ void init_table_link(struct cds_lfht *ht, unsigned long i, unsigned long len) break; } ht->cds_lfht_rcu_read_unlock(); + ht->cds_lfht_rcu_thread_offline(); } -/* - * Holding RCU read lock to protect _cds_lfht_add against memory - * reclaim that could be performed by other call_rcu worker threads (ABA - * problem). - */ static void init_table(struct cds_lfht *ht, unsigned long first_order, unsigned long len_order) @@ -913,35 +959,64 @@ void init_table(struct cds_lfht *ht, len = !i ? 1 : 1UL << (i - 1); dbg_printf("init order %lu len: %lu\n", i, len); + + /* Stop expand if the resize target changes under us */ + if (CMM_LOAD_SHARED(ht->t.resize_target) < (!i ? 1 : (1UL << i))) + break; + ht->t.tbl[i] = calloc(1, sizeof(struct rcu_level) + (len * sizeof(struct _cds_lfht_node))); /* Set all dummy nodes reverse hash values for a level */ init_table_hash(ht, i, len); + /* + * Update table size. At this point, concurrent add/remove see + * dummy nodes with correctly initialized reverse hash value, + * but with NULL next pointers. If they do, they can help us + * link the dummy nodes into the list and retry. + */ + cmm_smp_wmb(); /* populate data before RCU size */ + CMM_STORE_SHARED(ht->t.size, !i ? 1 : (1UL << i)); + /* * Link all dummy nodes into the table. Concurrent * add/remove are helping us. */ init_table_link(ht, i, len); - /* - * Update table size (after init for now, because no - * concurrent updater help (TODO)). - */ - cmm_smp_wmb(); /* populate data before RCU size */ - CMM_STORE_SHARED(ht->t.size, !i ? 1 : (1UL << i)); dbg_printf("init new size: %lu\n", !i ? 1 : (1UL << i)); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; } } +/* + * Holding RCU read lock to protect _cds_lfht_remove against memory + * reclaim that could be performed by other call_rcu worker threads (ABA + * problem). + * For a single level, we logically remove and garbage collect each node. + * + * As a design choice, we perform logical removal and garbage collection on a + * node-per-node basis to simplify this algorithm. We also assume keeping good + * cache locality of the operation would overweight possible performance gain + * that could be achieved by batching garbage collection for multiple levels. + * However, this would have to be justified by benchmarks. + * + * Concurrent removal and add operations are helping us perform garbage + * collection of logically removed nodes. We guarantee that all logically + * removed nodes have been garbage-collected (unlinked) before call_rcu is + * invoked to free a hole level of dummy nodes (after a grace period). + * + * Logical removal and garbage collection can therefore be done in batch or on a + * node-per-node basis, as long as the guarantee above holds. + */ static void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) { unsigned long j; + ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); for (j = 0; j < len; j++) { struct cds_lfht_node *fini_node = @@ -957,13 +1032,9 @@ void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) break; } ht->cds_lfht_rcu_read_unlock(); + ht->cds_lfht_rcu_thread_offline(); } -/* - * Holding RCU read lock to protect _cds_lfht_remove against memory - * reclaim that could be performed by other call_rcu worker threads (ABA - * problem). - */ static void fini_table(struct cds_lfht *ht, unsigned long first_order, unsigned long len_order) @@ -974,13 +1045,27 @@ void fini_table(struct cds_lfht *ht, first_order, first_order + len_order); end_order = first_order + len_order; assert(first_order > 0); - assert(ht->t.size == (1UL << (first_order - 1))); for (i = end_order - 1; i >= first_order; i--) { unsigned long len; len = !i ? 1 : 1UL << (i - 1); dbg_printf("fini order %lu len: %lu\n", i, len); + /* Stop shrink if the resize target changes under us */ + if (CMM_LOAD_SHARED(ht->t.resize_target) > (1UL << (i - 1))) + break; + + cmm_smp_wmb(); /* populate data before RCU size */ + CMM_STORE_SHARED(ht->t.size, 1UL << (i - 1)); + + /* + * We need to wait for all add operations to reach Q.S. (and + * thus use the new table for lookups) before we can start + * releasing the old dummy nodes. Otherwise their lookup will + * return a logically removed node as insert position. + */ + ht->cds_lfht_synchronize_rcu(); + /* * Set "removed" flag in dummy nodes about to be removed. * Unlink all now-logically-removed dummy node pointers. @@ -1006,7 +1091,9 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, void (*func)(struct rcu_head *head)), void (*cds_lfht_synchronize_rcu)(void), void (*cds_lfht_rcu_read_lock)(void), - void (*cds_lfht_rcu_read_unlock)(void)) + void (*cds_lfht_rcu_read_unlock)(void), + void (*cds_lfht_rcu_thread_offline)(void), + void (*cds_lfht_rcu_thread_online)(void)) { struct cds_lfht *ht; unsigned long order; @@ -1022,14 +1109,19 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, ht->cds_lfht_synchronize_rcu = cds_lfht_synchronize_rcu; ht->cds_lfht_rcu_read_lock = cds_lfht_rcu_read_lock; ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock; + ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline; + ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online; ht->percpu_count = alloc_per_cpu_items_count(); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); order = get_count_order_ulong(max(init_size, MIN_TABLE_SIZE)) + 1; ht->flags = flags; + ht->cds_lfht_rcu_thread_offline(); pthread_mutex_lock(&ht->resize_mutex); + ht->t.resize_target = 1UL << (order - 1); init_table(ht, 0, order); pthread_mutex_unlock(&ht->resize_mutex); + ht->cds_lfht_rcu_thread_online(); return ht; } @@ -1255,17 +1347,6 @@ void _do_cds_lfht_shrink(struct cds_lfht *ht, old_size, old_order, new_size, new_order); assert(new_size < old_size); - cmm_smp_wmb(); /* populate data before RCU size */ - CMM_STORE_SHARED(ht->t.size, new_size); - - /* - * We need to wait for all add operations to reach Q.S. (and - * thus use the new table for lookups) before we can start - * releasing the old dummy nodes. Otherwise their lookup will - * return a logically removed node as insert position. - */ - ht->cds_lfht_synchronize_rcu(); - /* Remove and unlink all dummy nodes to remove. */ fini_table(ht, new_order, old_order - new_order); } @@ -1291,7 +1372,7 @@ void _do_cds_lfht_resize(struct cds_lfht *ht) ht->t.resize_initiated = 0; /* write resize_initiated before read resize_target */ cmm_smp_mb(); - } while (new_size != CMM_LOAD_SHARED(ht->t.resize_target)); + } while (ht->t.size != CMM_LOAD_SHARED(ht->t.resize_target)); } static @@ -1314,9 +1395,11 @@ void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size) { resize_target_update_count(ht, new_size); CMM_STORE_SHARED(ht->t.resize_initiated, 1); + ht->cds_lfht_rcu_thread_offline(); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); + ht->cds_lfht_rcu_thread_online(); } static @@ -1326,9 +1409,11 @@ void do_resize_cb(struct rcu_head *head) caa_container_of(head, struct rcu_resize_work, head); struct cds_lfht *ht = work->ht; + ht->cds_lfht_rcu_thread_offline(); pthread_mutex_lock(&ht->resize_mutex); _do_cds_lfht_resize(ht); pthread_mutex_unlock(&ht->resize_mutex); + ht->cds_lfht_rcu_thread_online(); poison_free(work); cmm_smp_mb(); /* finish resize before decrement */ uatomic_dec(&ht->in_progress_resize);