X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=rculfhash.c;h=42501a1f030c6ee29fb0c40e5fdc02394da06992;hp=f95ef5dc083aa4e22618512708beb37cee22e1e3;hb=b7d619b0a4d1613664285e3986b930a05c131c70;hpb=5f51139190536f948f9571fdf3c97cf0198356d0 diff --git a/rculfhash.c b/rculfhash.c index f95ef5d..42501a1 100644 --- a/rculfhash.c +++ b/rculfhash.c @@ -167,14 +167,9 @@ #define CHAIN_LEN_RESIZE_THRESHOLD 3 /* - * Define the minimum table size. Protects against hash table resize overload - * when too many entries are added quickly before the resize can complete. - * This is especially the case if the table could be shrinked to a size of 1. - * TODO: we might want to make the add/remove operations help the resize to - * add or remove dummy nodes when a resize is ongoing to ensure upper-bound on - * chain length. + * Define the minimum table size. */ -#define MIN_TABLE_SIZE 128 +#define MIN_TABLE_SIZE 1 #if (CAA_BITS_PER_LONG == 32) #define MAX_TABLE_ORDER 32 @@ -182,6 +177,11 @@ #define MAX_TABLE_ORDER 64 #endif +/* + * Minimum number of dummy nodes to touch per thread to parallelize grow/shrink. + */ +#define MIN_PARTITION_PER_THREAD 4096 + #ifndef min #define min(a, b) ((a) < (b) ? (a) : (b)) #endif @@ -199,6 +199,9 @@ #define DUMMY_FLAG (1UL << 1) #define FLAGS_MASK ((1UL << 2) - 1) +/* Value of the end pointer. Should not interact with flags. */ +#define END_VALUE NULL + struct ht_items_count { unsigned long add, remove; } __attribute__((aligned(CAA_CACHE_LINE_SIZE))); @@ -237,6 +240,9 @@ struct cds_lfht { void (*cds_lfht_rcu_read_unlock)(void); void (*cds_lfht_rcu_thread_offline)(void); void (*cds_lfht_rcu_thread_online)(void); + void (*cds_lfht_rcu_register_thread)(void); + void (*cds_lfht_rcu_unregister_thread)(void); + pthread_attr_t *resize_attr; /* Resize threads attributes */ unsigned long count; /* global approximate item count */ struct ht_items_count *percpu_count; /* per-cpu item count */ }; @@ -246,6 +252,20 @@ struct rcu_resize_work { struct cds_lfht *ht; }; +struct partition_resize_work { + struct rcu_head head; + struct cds_lfht *ht; + unsigned long i, start, len; + void (*fct)(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len); +}; + +static +struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, + unsigned long size, + struct cds_lfht_node *node, + int unique, int dummy); + /* * Algorithm to reverse bits in a word by lookup table, extended to * 64-bit words. @@ -651,7 +671,19 @@ struct cds_lfht_node *flag_dummy(struct cds_lfht_node *node) { return (struct cds_lfht_node *) (((unsigned long) node) | DUMMY_FLAG); } - + +static +struct cds_lfht_node *get_end(void) +{ + return (struct cds_lfht_node *) END_VALUE; +} + +static +int is_end(struct cds_lfht_node *node) +{ + return clear_flag(node) == (struct cds_lfht_node *) END_VALUE; +} + static unsigned long _uatomic_max(unsigned long *ptr, unsigned long v) { @@ -699,7 +731,7 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node */ assert(dummy != node); for (;;) { - if (unlikely(!clear_flag(iter))) + if (unlikely(is_end(iter))) return; if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash)) return; @@ -716,6 +748,7 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node new_next = clear_flag(next); (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next); } + return; } static @@ -733,7 +766,7 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, assert(!is_removed(node)); if (!size) { assert(dummy); - node->p.next = flag_dummy(NULL); + node->p.next = flag_dummy(get_end()); return node; /* Initial first add (head) */ } hash = bit_reverse_ulong(node->p.reverse_hash); @@ -752,10 +785,8 @@ struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht, iter = rcu_dereference(iter_prev->p.next); assert(iter_prev->p.reverse_hash <= node->p.reverse_hash); for (;;) { - /* TODO: check if removed */ - if (unlikely(!clear_flag(iter))) + if (unlikely(is_end(iter))) goto insert; - /* TODO: check if removed */ if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash)) goto insert; next = rcu_dereference(clear_flag(iter)->p.next); @@ -865,51 +896,98 @@ end: } static -void init_table_hash(struct cds_lfht *ht, unsigned long i, - unsigned long len) +void *partition_resize_thread(void *arg) { - unsigned long j; + struct partition_resize_work *work = arg; - for (j = 0; j < len; j++) { - struct cds_lfht_node *new_node = - (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; + work->ht->cds_lfht_rcu_register_thread(); + work->fct(work->ht, work->i, work->start, work->len); + work->ht->cds_lfht_rcu_unregister_thread(); + return NULL; +} - dbg_printf("init hash entry: i %lu j %lu hash %lu\n", - i, j, !i ? 0 : (1UL << (i - 1)) + j); - new_node->p.reverse_hash = - bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j); - if (CMM_LOAD_SHARED(ht->in_progress_destroy)) - break; +static +void partition_resize_helper(struct cds_lfht *ht, unsigned long i, + unsigned long len, + void (*fct)(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len)) +{ + unsigned long partition_len; + struct partition_resize_work *work; + int cpu, ret; + pthread_t *thread_id; + + /* Note: nr_cpus_mask + 1 is always power of 2 */ + partition_len = len >> get_count_order_ulong(nr_cpus_mask + 1); + work = calloc(nr_cpus_mask + 1, sizeof(*work)); + thread_id = calloc(nr_cpus_mask + 1, sizeof(*thread_id)); + assert(work); + for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) { + work[cpu].ht = ht; + work[cpu].i = i; + work[cpu].len = partition_len; + work[cpu].start = cpu * partition_len; + work[cpu].fct = fct; + ret = pthread_create(&thread_id[cpu], ht->resize_attr, + partition_resize_thread, &work[cpu]); + assert(!ret); + } + for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) { + ret = pthread_join(thread_id[cpu], NULL); + assert(!ret); } + free(work); + free(thread_id); } +/* + * Holding RCU read lock to protect _cds_lfht_add against memory + * reclaim that could be performed by other call_rcu worker threads (ABA + * problem). + * + * When we reach a certain length, we can split this population phase over + * many worker threads, based on the number of CPUs available in the system. + * This should therefore take care of not having the expand lagging behind too + * many concurrent insertion threads by using the scheduler's ability to + * schedule dummy node population fairly with insertions. + */ static -void init_table_link(struct cds_lfht *ht, unsigned long i, unsigned long len) +void init_table_populate_partition(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len) { unsigned long j; - ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); - for (j = 0; j < len; j++) { + for (j = start; j < start + len; j++) { struct cds_lfht_node *new_node = (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; - dbg_printf("init link: i %lu j %lu hash %lu\n", + dbg_printf("init populate: i %lu j %lu hash %lu\n", i, j, !i ? 0 : (1UL << (i - 1)) + j); + new_node->p.reverse_hash = + bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j); (void) _cds_lfht_add(ht, !i ? 0 : (1UL << (i - 1)), new_node, 0, 1); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; } ht->cds_lfht_rcu_read_unlock(); - ht->cds_lfht_rcu_thread_offline(); } -/* - * Holding RCU read lock to protect _cds_lfht_add against memory - * reclaim that could be performed by other call_rcu worker threads (ABA - * problem). - */ +static +void init_table_populate(struct cds_lfht *ht, unsigned long i, + unsigned long len) +{ + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) { + ht->cds_lfht_rcu_thread_online(); + init_table_populate_partition(ht, i, 0, len); + ht->cds_lfht_rcu_thread_offline(); + return; + } + partition_resize_helper(ht, i, len, init_table_populate_partition); +} + static void init_table(struct cds_lfht *ht, unsigned long first_order, unsigned long len_order) @@ -924,38 +1002,66 @@ void init_table(struct cds_lfht *ht, len = !i ? 1 : 1UL << (i - 1); dbg_printf("init order %lu len: %lu\n", i, len); + + /* Stop expand if the resize target changes under us */ + if (CMM_LOAD_SHARED(ht->t.resize_target) < (!i ? 1 : (1UL << i))) + break; + ht->t.tbl[i] = calloc(1, sizeof(struct rcu_level) + (len * sizeof(struct _cds_lfht_node))); - - /* Set all dummy nodes reverse hash values for a level */ - init_table_hash(ht, i, len); + assert(ht->t.tbl[i]); /* - * Link all dummy nodes into the table. Concurrent - * add/remove are helping us. + * Set all dummy nodes reverse hash values for a level and + * link all dummy nodes into the table. */ - init_table_link(ht, i, len); + init_table_populate(ht, i, len); /* - * Update table size (after init for now, because no - * concurrent updater help (TODO)). + * Update table size. */ cmm_smp_wmb(); /* populate data before RCU size */ CMM_STORE_SHARED(ht->t.size, !i ? 1 : (1UL << i)); + dbg_printf("init new size: %lu\n", !i ? 1 : (1UL << i)); if (CMM_LOAD_SHARED(ht->in_progress_destroy)) break; } } +/* + * Holding RCU read lock to protect _cds_lfht_remove against memory + * reclaim that could be performed by other call_rcu worker threads (ABA + * problem). + * For a single level, we logically remove and garbage collect each node. + * + * As a design choice, we perform logical removal and garbage collection on a + * node-per-node basis to simplify this algorithm. We also assume keeping good + * cache locality of the operation would overweight possible performance gain + * that could be achieved by batching garbage collection for multiple levels. + * However, this would have to be justified by benchmarks. + * + * Concurrent removal and add operations are helping us perform garbage + * collection of logically removed nodes. We guarantee that all logically + * removed nodes have been garbage-collected (unlinked) before call_rcu is + * invoked to free a hole level of dummy nodes (after a grace period). + * + * Logical removal and garbage collection can therefore be done in batch or on a + * node-per-node basis, as long as the guarantee above holds. + * + * When we reach a certain length, we can split this removal over many worker + * threads, based on the number of CPUs available in the system. This should + * take care of not letting resize process lag behind too many concurrent + * updater threads actively inserting into the hash table. + */ static -void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) +void remove_table_partition(struct cds_lfht *ht, unsigned long i, + unsigned long start, unsigned long len) { unsigned long j; - ht->cds_lfht_rcu_thread_online(); ht->cds_lfht_rcu_read_lock(); - for (j = 0; j < len; j++) { + for (j = start; j < start + len; j++) { struct cds_lfht_node *fini_node = (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j]; @@ -969,14 +1075,22 @@ void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) break; } ht->cds_lfht_rcu_read_unlock(); - ht->cds_lfht_rcu_thread_offline(); } -/* - * Holding RCU read lock to protect _cds_lfht_remove against memory - * reclaim that could be performed by other call_rcu worker threads (ABA - * problem). - */ +static +void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len) +{ + + assert(nr_cpus_mask != -1); + if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) { + ht->cds_lfht_rcu_thread_online(); + remove_table_partition(ht, i, 0, len); + ht->cds_lfht_rcu_thread_offline(); + return; + } + partition_resize_helper(ht, i, len, remove_table_partition); +} + static void fini_table(struct cds_lfht *ht, unsigned long first_order, unsigned long len_order) @@ -987,13 +1101,27 @@ void fini_table(struct cds_lfht *ht, first_order, first_order + len_order); end_order = first_order + len_order; assert(first_order > 0); - assert(ht->t.size == (1UL << (first_order - 1))); for (i = end_order - 1; i >= first_order; i--) { unsigned long len; len = !i ? 1 : 1UL << (i - 1); dbg_printf("fini order %lu len: %lu\n", i, len); + /* Stop shrink if the resize target changes under us */ + if (CMM_LOAD_SHARED(ht->t.resize_target) > (1UL << (i - 1))) + break; + + cmm_smp_wmb(); /* populate data before RCU size */ + CMM_STORE_SHARED(ht->t.size, 1UL << (i - 1)); + + /* + * We need to wait for all add operations to reach Q.S. (and + * thus use the new table for lookups) before we can start + * releasing the old dummy nodes. Otherwise their lookup will + * return a logically removed node as insert position. + */ + ht->cds_lfht_synchronize_rcu(); + /* * Set "removed" flag in dummy nodes about to be removed. * Unlink all now-logically-removed dummy node pointers. @@ -1010,7 +1138,7 @@ void fini_table(struct cds_lfht *ht, } } -struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, +struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct, cds_lfht_compare_fct compare_fct, unsigned long hash_seed, unsigned long init_size, @@ -1021,7 +1149,10 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, void (*cds_lfht_rcu_read_lock)(void), void (*cds_lfht_rcu_read_unlock)(void), void (*cds_lfht_rcu_thread_offline)(void), - void (*cds_lfht_rcu_thread_online)(void)) + void (*cds_lfht_rcu_thread_online)(void), + void (*cds_lfht_rcu_register_thread)(void), + void (*cds_lfht_rcu_unregister_thread)(void), + pthread_attr_t *attr) { struct cds_lfht *ht; unsigned long order; @@ -1030,6 +1161,7 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, if (init_size && (init_size & (init_size - 1))) return NULL; ht = calloc(1, sizeof(struct cds_lfht)); + assert(ht); ht->hash_fct = hash_fct; ht->compare_fct = compare_fct; ht->hash_seed = hash_seed; @@ -1039,6 +1171,9 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock; ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline; ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online; + ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread; + ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread; + ht->resize_attr = attr; ht->percpu_count = alloc_per_cpu_items_count(); /* this mutex should not nest in read-side C.S. */ pthread_mutex_init(&ht->resize_mutex, NULL); @@ -1046,6 +1181,7 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, ht->flags = flags; ht->cds_lfht_rcu_thread_offline(); pthread_mutex_lock(&ht->resize_mutex); + ht->t.resize_target = 1UL << (order - 1); init_table(ht, 0, order); pthread_mutex_unlock(&ht->resize_mutex); ht->cds_lfht_rcu_thread_online(); @@ -1054,7 +1190,7 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct, struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key_len) { - struct cds_lfht_node *node, *next; + struct cds_lfht_node *node, *next, *dummy_node; struct _cds_lfht_node *lookup; unsigned long hash, reverse_hash, index, order, size; @@ -1067,10 +1203,15 @@ struct cds_lfht_node *cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key lookup = &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1))) - 1)]; dbg_printf("lookup hash %lu index %lu order %lu aridx %lu\n", hash, index, order, index & (!order ? 0 : ((1UL << (order - 1)) - 1))); - node = (struct cds_lfht_node *) lookup; + dummy_node = (struct cds_lfht_node *) lookup; + /* We can always skip the dummy node initially */ + node = rcu_dereference(dummy_node->p.next); + node = clear_flag(node); for (;;) { - if (unlikely(!node)) + if (unlikely(is_end(node))) { + node = NULL; break; + } if (unlikely(node->p.reverse_hash > reverse_hash)) { node = NULL; break; @@ -1102,8 +1243,10 @@ struct cds_lfht_node *cds_lfht_next(struct cds_lfht *ht, node = clear_flag(next); for (;;) { - if (unlikely(!node)) + if (unlikely(is_end(node))) { + node = NULL; break; + } if (unlikely(node->p.reverse_hash > reverse_hash)) { node = NULL; break; @@ -1143,7 +1286,7 @@ struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht, size = rcu_dereference(ht->t.size); ret = _cds_lfht_add(ht, size, node, 1, 0); - if (ret != node) + if (ret == node) ht_count_add(ht, size); return ret; } @@ -1175,7 +1318,7 @@ int cds_lfht_delete_dummy(struct cds_lfht *ht) if (!is_dummy(node)) return -EPERM; assert(!is_removed(node)); - } while (clear_flag(node)); + } while (!is_end(node)); /* * size accessed without rcu_dereference because hash table is * being destroyed. @@ -1201,7 +1344,7 @@ int cds_lfht_delete_dummy(struct cds_lfht *ht) * Should only be called when no more concurrent readers nor writers can * possibly access the table. */ -int cds_lfht_destroy(struct cds_lfht *ht) +int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr) { int ret; @@ -1213,6 +1356,8 @@ int cds_lfht_destroy(struct cds_lfht *ht) if (ret) return ret; free_per_cpu_items_count(ht->percpu_count); + if (attr) + *attr = ht->resize_attr; poison_free(ht); return ret; } @@ -1241,7 +1386,7 @@ void cds_lfht_count_nodes(struct cds_lfht *ht, else (nr_dummy)++; node = clear_flag(next); - } while (node); + } while (!is_end(node)); dbg_printf("number of dummy nodes: %lu\n", nr_dummy); } @@ -1274,17 +1419,6 @@ void _do_cds_lfht_shrink(struct cds_lfht *ht, old_size, old_order, new_size, new_order); assert(new_size < old_size); - cmm_smp_wmb(); /* populate data before RCU size */ - CMM_STORE_SHARED(ht->t.size, new_size); - - /* - * We need to wait for all add operations to reach Q.S. (and - * thus use the new table for lookups) before we can start - * releasing the old dummy nodes. Otherwise their lookup will - * return a logically removed node as insert position. - */ - ht->cds_lfht_synchronize_rcu(); - /* Remove and unlink all dummy nodes to remove. */ fini_table(ht, new_order, old_order - new_order); } @@ -1310,7 +1444,7 @@ void _do_cds_lfht_resize(struct cds_lfht *ht) ht->t.resize_initiated = 0; /* write resize_initiated before read resize_target */ cmm_smp_mb(); - } while (new_size != CMM_LOAD_SHARED(ht->t.resize_target)); + } while (ht->t.size != CMM_LOAD_SHARED(ht->t.resize_target)); } static