X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=rculfhash.c;h=bff26a8d4fe668e2d4330927dcc812384aea039e;hp=074a6f4ea8c4c40f7b2f7374d3ecdc8387e34bff;hb=15cfbec77d2c573110cc936d5b33745d44207b50;hpb=860d07e812c3001e9d6ce261c8b861dabd3203b3 diff --git a/rculfhash.c b/rculfhash.c index 074a6f4..bff26a8 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -144,7 +144,6 @@ #include #include #include -#include #include #include #include @@ -193,6 +192,8 @@ /* * The removed flag needs to be updated atomically with the pointer. + * It indicates that no node must attach to the node scheduled for + * removal, and that node garbage collection must be performed. * The dummy flag does not require to be updated atomically with the * pointer, but it is added as a pointer low bit flag to save space. */ @@ -244,7 +245,7 @@ struct cds_lfht { void (*cds_lfht_rcu_register_thread)(void); void (*cds_lfht_rcu_unregister_thread)(void); pthread_attr_t *resize_attr; /* Resize threads attributes */ - unsigned long count; /* global approximate item count */ + long count; /* global approximate item count */ struct ht_items_count *percpu_count; /* per-cpu item count */ }; @@ -261,11 +262,17 @@ struct partition_resize_work { unsigned long start, unsigned long len); }; +enum add_mode { + ADD_DEFAULT = 0, + ADD_UNIQUE = 1, + ADD_REPLACE = 2, +}; + static struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, unsigned long size, struct cds_lfht_node *node, - int unique, int dummy); + enum add_mode mode, int dummy); /* * Algorithm to reverse bits in a word by lookup table, extended to @@ -548,7 +555,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size) return; percpu_count = uatomic_add_return(&ht->percpu_count[cpu].add, 1); if (unlikely(!(percpu_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))) { - unsigned long count; + long count; dbg_printf("add percpu %lu\n", percpu_count); count = uatomic_add_return(&ht->count, @@ -557,7 +564,7 @@ void ht_count_add(struct cds_lfht *ht, unsigned long size) if (!(count & (count - 1))) { if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size) return; - dbg_printf("add set global %lu\n", count); + dbg_printf("add set global %ld\n", count); cds_lfht_resize_lazy_count(ht, size, count >> (CHAIN_LEN_TARGET - 1)); } @@ -575,9 +582,9 @@ void ht_count_del(struct cds_lfht *ht, unsigned long size) cpu = ht_get_cpu(); if (unlikely(cpu < 0)) return; - percpu_count = uatomic_add_return(&ht->percpu_count[cpu].del, -1); + percpu_count = uatomic_add_return(&ht->percpu_count[cpu].del, 1); if (unlikely(!(percpu_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))) { - unsigned long count; + long count; dbg_printf("del percpu %lu\n", percpu_count); count = uatomic_add_return(&ht->count, @@ -586,7 +593,13 @@ void ht_count_del(struct cds_lfht *ht, unsigned long size) if (!(count & (count - 1))) { if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size) return; - dbg_printf("del set global %lu\n", count); + dbg_printf("del set global %ld\n", count); + /* + * Don't shrink table if the number of nodes is below a + * certain threshold. + */ + if (count < (1UL << COUNT_COMMIT_ORDER) * (nr_cpus_mask + 1)) + return; cds_lfht_resize_lazy_count(ht, size, count >> (CHAIN_LEN_TARGET - 1)); } @@ -747,19 +760,95 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node new_next = flag_dummy(clear_flag(next)); else new_next = clear_flag(next); + if (is_removed(iter)) + new_next = flag_removed(new_next); (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next); } return; } +static +int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size, + struct cds_lfht_node *old_node, + struct cds_lfht_node *ret_next, + struct cds_lfht_node *new_node) +{ + struct cds_lfht_node *dummy, *old_next; + struct _cds_lfht_node *lookup; + int flagged = 0; + unsigned long hash, index, order; + + if (!old_node) /* Return -ENOENT if asked to replace NULL node */ + goto end; + + assert(!is_removed(old_node)); + assert(!is_dummy(old_node)); + assert(!is_removed(new_node)); + assert(!is_dummy(new_node)); + assert(new_node != old_node); + do { + /* Insert after node to be replaced */ + old_next = ret_next; + if (is_removed(old_next)) { + /* + * Too late, the old node has been removed under us + * between lookup and replace. Fail. + */ + goto end; + } + assert(!is_dummy(old_next)); + assert(new_node != clear_flag(old_next)); + new_node->p.next = clear_flag(old_next); + /* + * Here is the whole trick for lock-free replace: we add + * the replacement node _after_ the node we want to + * replace by atomically setting its next pointer at the + * same time we set its removal flag. Given that + * the lookups/get next use an iterator aware of the + * next pointer, they will either skip the old node due + * to the removal flag and see the new node, or use + * the old node, but will not see the new one. + */ + ret_next = uatomic_cmpxchg(&old_node->p.next, + old_next, flag_removed(new_node)); + } while (ret_next != old_next); + + /* We performed the replacement. */ + flagged = 1; + + /* + * Ensure that the old node is not visible to readers anymore: + * lookup for the node, and remove it (along with any other + * logically removed node) if found. + */ + hash = bit_reverse_ulong(old_node->p.reverse_hash); + assert(size > 0); + index = hash & (size - 1); + order = get_count_order_ulong(index + 1); + lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))]; + dummy = (struct cds_lfht_node *) lookup; + _cds_lfht_gc_bucket(dummy, new_node); +end: + /* + * Only the flagging action indicated that we (and no other) + * replaced the node from the hash table. + */ + if (flagged) { + assert(is_removed(rcu_dereference(old_node->p.next))); + return 0; + } else { + return -ENOENT; + } +} + static struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, unsigned long size, struct cds_lfht_node *node, - int unique, int dummy) + enum add_mode mode, int dummy) { struct cds_lfht_node *iter_prev, *iter, *next, *new_node, *new_next, - *dummy_node; + *dummy_node, *return_node; struct _cds_lfht_node *lookup; unsigned long hash, index, order; @@ -793,12 +882,16 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, next = rcu_dereference(clear_flag(iter)->p.next); if (unlikely(is_removed(next))) goto gc_node; - if (unique + if ((mode == ADD_UNIQUE || mode == ADD_REPLACE) && !is_dummy(next) && !ht->compare_fct(node->key, node->key_len, clear_flag(iter)->key, - clear_flag(iter)->key_len)) - return clear_flag(iter); + clear_flag(iter)->key_len)) { + if (mode == ADD_UNIQUE) + return clear_flag(iter); + else /* mode == ADD_REPLACE */ + goto replace; + } /* Only account for identical reverse hash once */ if (iter_prev->p.reverse_hash != clear_flag(iter)->p.reverse_hash && !is_dummy(next)) @@ -806,6 +899,7 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, iter_prev = clear_flag(iter); iter = next; } + insert: assert(node != clear_flag(iter)); assert(!is_removed(iter_prev)); @@ -820,10 +914,26 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, else new_node = node; if (uatomic_cmpxchg(&iter_prev->p.next, iter, - new_node) != iter) + new_node) != iter) { continue; /* retry */ - else + } else { + if (mode == ADD_REPLACE) + return_node = NULL; + else /* ADD_DEFAULT and ADD_UNIQUE */ + return_node = node; goto gc_end; + } + + replace: + + if (!_cds_lfht_replace(ht, size, clear_flag(iter), next, + node)) { + return_node = clear_flag(iter); + goto end; /* gc already done */ + } else { + continue; /* retry */ + } + gc_node: assert(!is_removed(iter)); if (is_dummy(iter)) @@ -840,7 +950,8 @@ gc_end: lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))]; dummy_node = (struct cds_lfht_node *) lookup; _cds_lfht_gc_bucket(dummy_node, node); - return node; +end: + return return_node; } static @@ -853,11 +964,16 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, int flagged = 0; unsigned long hash, index, order; + if (!node) /* Return -ENOENT if asked to delete NULL node */ + goto end; + /* logically delete the node */ assert(!is_dummy(node)); assert(!is_removed(node)); old = rcu_dereference(node->p.next); do { + struct cds_lfht_node *new_next; + next = old; if (unlikely(is_removed(next))) goto end; @@ -865,8 +981,8 @@ int _cds_lfht_del(struct cds_lfht *ht, unsigned long size, assert(is_dummy(next)); else assert(!is_dummy(next)); - old = uatomic_cmpxchg(&node->p.next, next, - flag_removed(next)); + new_next = flag_removed(next); + old = uatomic_cmpxchg(&node->p.next, next, new_next); } while (old != next); /* We performed the (logical) deletion. */ @@ -892,8 +1008,9 @@ end: if (flagged) { assert(is_removed(rcu_dereference(node->p.next))); return 0; - } else + } else { return -ENOENT; + } } static @@ -975,7 +1092,7 @@ void init_table_populate_partition(struct cds_lfht *ht, unsigned long i, new_node->p.reverse_hash = bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j); (void) _cds_lfht_add(ht, !i ? 0 : (1UL << (i - 1)), - new_node, 0, 1); + new_node, ADD_DEFAULT, 1); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; } @@ -1196,7 +1313,8 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, return ht; } -struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key_len) +void cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key_len, + struct cds_lfht_iter *iter) { struct cds_lfht_node *node, *next, *dummy_node; struct _cds_lfht_node *lookup; @@ -1217,11 +1335,11 @@ struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key node = clear_flag(node); for (;;) { if (unlikely(is_end(node))) { - node = NULL; + node = next = NULL; break; } if (unlikely(node->p.reverse_hash > reverse_hash)) { - node = NULL; + node = next = NULL; break; } next = rcu_dereference(node->p.next); @@ -1233,30 +1351,31 @@ struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key node = clear_flag(next); } assert(!node || !is_dummy(rcu_dereference(node->p.next))); - return node; + iter->node = node; + iter->next = next; } -struct cds_lfht_node *cds_lfht_next(struct cds_lfht *ht, - struct cds_lfht_node *node) +void cds_lfht_next_duplicate(struct cds_lfht *ht, struct cds_lfht_iter *iter) { - struct cds_lfht_node *next; + struct cds_lfht_node *node, *next; unsigned long reverse_hash; void *key; size_t key_len; + node = iter->node; reverse_hash = node->p.reverse_hash; key = node->key; key_len = node->key_len; - next = rcu_dereference(node->p.next); + next = iter->next; node = clear_flag(next); for (;;) { if (unlikely(is_end(node))) { - node = NULL; + node = next = NULL; break; } if (unlikely(node->p.reverse_hash > reverse_hash)) { - node = NULL; + node = next = NULL; break; } next = rcu_dereference(node->p.next); @@ -1268,7 +1387,46 @@ struct cds_lfht_node *cds_lfht_next(struct cds_lfht *ht, node = clear_flag(next); } assert(!node || !is_dummy(rcu_dereference(node->p.next))); - return node; + iter->node = node; + iter->next = next; +} + +void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter) +{ + struct cds_lfht_node *node, *next; + + node = iter->node; + next = iter->next; + node = clear_flag(next); + + for (;;) { + if (unlikely(is_end(node))) { + node = next = NULL; + break; + } + next = rcu_dereference(node->p.next); + if (likely(!is_removed(next)) + && !is_dummy(next)) { + break; + } + node = clear_flag(next); + } + assert(!node || !is_dummy(rcu_dereference(node->p.next))); + iter->node = node; + iter->next = next; +} + +void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter) +{ + struct _cds_lfht_node *lookup; + + /* + * Get next after first dummy node. The first dummy node is the + * first node of the linked list. + */ + lookup = &ht->t.tbl[0]->nodes[0]; + iter->node = (struct cds_lfht_node *) lookup; + cds_lfht_next(ht, iter); } void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node) @@ -1279,12 +1437,12 @@ void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node) node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash); size = rcu_dereference(ht->t.size); - (void) _cds_lfht_add(ht, size, node, 0, 0); + (void) _cds_lfht_add(ht, size, node, ADD_DEFAULT, 0); ht_count_add(ht, size); } struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, - struct cds_lfht_node *node) + struct cds_lfht_node *node) { unsigned long hash, size; struct cds_lfht_node *ret; @@ -1293,19 +1451,45 @@ struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash); size = rcu_dereference(ht->t.size); - ret = _cds_lfht_add(ht, size, node, 1, 0); + ret = _cds_lfht_add(ht, size, node, ADD_UNIQUE, 0); if (ret == node) ht_count_add(ht, size); return ret; } -int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_node *node) +struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht, + struct cds_lfht_node *node) +{ + unsigned long hash, size; + struct cds_lfht_node *ret; + + hash = ht->hash_fct(node->key, node->key_len, ht->hash_seed); + node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash); + + size = rcu_dereference(ht->t.size); + ret = _cds_lfht_add(ht, size, node, ADD_REPLACE, 0); + if (ret == NULL) + ht_count_add(ht, size); + return ret; +} + +int cds_lfht_replace(struct cds_lfht *ht, struct cds_lfht_iter *old_iter, + struct cds_lfht_node *new_node) +{ + unsigned long size; + + size = rcu_dereference(ht->t.size); + return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next, + new_node); +} + +int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_iter *iter) { unsigned long size; int ret; size = rcu_dereference(ht->t.size); - ret = _cds_lfht_del(ht, size, node, 0); + ret = _cds_lfht_del(ht, size, iter->node, 0); if (!ret) ht_count_del(ht, size); return ret; @@ -1371,13 +1555,25 @@ int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) } void cds_lfht_count_nodes(struct cds_lfht *ht, + long *approx_before, unsigned long *count, - unsigned long *removed) + unsigned long *removed, + long *approx_after) { struct cds_lfht_node *node, *next; struct _cds_lfht_node *lookup; unsigned long nr_dummy = 0; + *approx_before = 0; + if (nr_cpus_mask >= 0) { + int i; + + for (i = 0; i < nr_cpus_mask + 1; i++) { + *approx_before += uatomic_read(&ht->percpu_count[i].add); + *approx_before -= uatomic_read(&ht->percpu_count[i].del); + } + } + *count = 0; *removed = 0; @@ -1387,8 +1583,10 @@ void cds_lfht_count_nodes(struct cds_lfht *ht, do { next = rcu_dereference(node->p.next); if (is_removed(next)) { - assert(!is_dummy(next)); - (*removed)++; + if (!is_dummy(next)) + (*removed)++; + else + (nr_dummy)++; } else if (!is_dummy(next)) (*count)++; else @@ -1396,6 +1594,15 @@ void cds_lfht_count_nodes(struct cds_lfht *ht, node = clear_flag(next); } while (!is_end(node)); dbg_printf("number of dummy nodes: %lu\n", nr_dummy); + *approx_after = 0; + if (nr_cpus_mask >= 0) { + int i; + + for (i = 0; i < nr_cpus_mask + 1; i++) { + *approx_after += uatomic_read(&ht->percpu_count[i].add); + *approx_after -= uatomic_read(&ht->percpu_count[i].del); + } + } } /* called with resize mutex held */