X-Git-Url: https://git.liburcu.org/?p=urcu.git;a=blobdiff_plain;f=urcu-ht.c;h=b8777acae895ec8907ffbe6a6ddf1c5b4c37a912;hp=e3fd3137a9273795bb6842b110a47774120ae5ae;hb=1e52eccf3cb657dccb197132f7cb47b683902538;hpb=7ce4eb3848a2d2bdd6a267bfddfc8a36fcd45d66 diff --git a/urcu-ht.c b/urcu-ht.c index e3fd313..b8777ac 100644 --- a/urcu-ht.c +++ b/urcu-ht.c @@ -5,16 +5,28 @@ #define _LGPL_SOURCE #include +#include +#include +#include + #include +#include #include #include -#include #include -#include -#include -#include #include #include +#include +#include + +/* + * Maximum number of hash table buckets: 256M on 64-bit. + * Should take about 512MB max if we assume 1 node per 4 buckets. + */ +#define MAX_HT_BUCKETS ((256 << 10) / sizeof(void *)) + +/* node flags */ +#define NODE_STOLEN (1 << 0) struct rcu_ht_node; @@ -22,18 +34,22 @@ struct rcu_ht_node { struct rcu_ht_node *next; void *key; void *data; + unsigned int flags; +}; + +struct rcu_table { + unsigned long size; + struct rcu_ht_node *tbl[0]; }; struct rcu_ht { - struct rcu_ht_node **tbl; + struct rcu_table *t; /* shared */ ht_hash_fct hash_fct; void (*free_fct)(void *data); /* fct to free data */ uint32_t keylen; uint32_t hashseed; - struct ht_size { - unsigned long add; - unsigned long lookup; - } size; + pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */ + int resize_ongoing; /* fast-path resize check */ }; struct rcu_ht *ht_new(ht_hash_fct hash_fct, void (*free_fct)(void *data), @@ -45,24 +61,30 @@ struct rcu_ht *ht_new(ht_hash_fct hash_fct, void (*free_fct)(void *data), ht = calloc(1, sizeof(struct rcu_ht)); ht->hash_fct = hash_fct; ht->free_fct = free_fct; - ht->size.add = init_size; - ht->size.lookup = init_size; ht->keylen = keylen; ht->hashseed = hashseed; - ht->tbl = calloc(init_size, sizeof(struct rcu_ht_node *)); + /* this mutex should not nest in read-side C.S. */ + pthread_mutex_init(&ht->resize_mutex, NULL); + ht->resize_ongoing = 0; /* shared */ + ht->t = calloc(1, sizeof(struct rcu_table) + + (init_size * sizeof(struct rcu_ht_node *))); + ht->t->size = init_size; return ht; } void *ht_lookup(struct rcu_ht *ht, void *key) { + struct rcu_table *t; unsigned long hash; struct rcu_ht_node *node; void *ret; - hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size.lookup; - rcu_read_lock(); - node = rcu_dereference(ht->tbl[hash]); + t = rcu_dereference(ht->t); + smp_read_barrier_depends(); /* read t before size and table */ + hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % t->size; + smp_read_barrier_depends(); /* read size before links */ + node = rcu_dereference(t->tbl[hash]); for (;;) { if (likely(!node)) { ret = NULL; @@ -87,12 +109,14 @@ void *ht_lookup(struct rcu_ht *ht, void *key) int ht_add(struct rcu_ht *ht, void *key, void *data) { struct rcu_ht_node *node, *old_head, *new_head; + struct rcu_table *t; unsigned long hash; int ret = 0; new_head = calloc(1, sizeof(struct rcu_ht_node)); new_head->key = key; new_head->data = data; + new_head->flags = 0; /* here comes the fun and tricky part. * Add at the beginning with a cmpxchg. * Hold a read lock between the moment the first element is read @@ -104,9 +128,23 @@ int ht_add(struct rcu_ht *ht, void *key, void *data) retry: rcu_read_lock(); - hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size.add; + if (unlikely(LOAD_SHARED(ht->resize_ongoing))) { + rcu_read_unlock(); + /* + * Wait for resize to complete before continuing. + */ + ret = pthread_mutex_lock(&ht->resize_mutex); + assert(!ret); + ret = pthread_mutex_unlock(&ht->resize_mutex); + assert(!ret); + goto retry; + } + + t = rcu_dereference(ht->t); + /* no read barrier needed, because no concurrency with resize */ + hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % t->size; - old_head = node = rcu_dereference(ht->tbl[hash]); + old_head = node = rcu_dereference(t->tbl[hash]); for (;;) { if (likely(!node)) { break; @@ -118,11 +156,10 @@ retry: node = rcu_dereference(node->next); } new_head->next = old_head; - if (rcu_cmpxchg_pointer(&ht->tbl[hash], old_head, new_head) != old_head) + if (rcu_cmpxchg_pointer(&t->tbl[hash], old_head, new_head) != old_head) goto restart; end: rcu_read_unlock(); - return ret; /* restart loop, release and re-take the read lock to be kind to GP */ @@ -134,28 +171,49 @@ restart: /* * Restart until we successfully remove the entry, or no entry is left * ((void *)(unsigned long)-ENOENT). - * Deal with concurrent stealers by verifying that there are no element - * in the list still pointing to the element stolen. (del_node) + * Deal with concurrent stealers by doing an extra verification pass to check + * that no element in the list are still pointing to the element stolen. + * This could happen if two concurrent steal for consecutive objects are + * executed. A pointer to an object being stolen could be saved by the + * concurrent stealer for the previous object. + * Also, given that in this precise scenario, another stealer can also want to + * delete the doubly-referenced object; use a "stolen" flag to let only one + * stealer delete the object. */ void *ht_steal(struct rcu_ht *ht, void *key) { struct rcu_ht_node **prev, *node, *del_node = NULL; + struct rcu_table *t; unsigned long hash; void *data; + int ret; retry: rcu_read_lock(); - hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size.lookup; + if (unlikely(LOAD_SHARED(ht->resize_ongoing))) { + rcu_read_unlock(); + /* + * Wait for resize to complete before continuing. + */ + ret = pthread_mutex_lock(&ht->resize_mutex); + assert(!ret); + ret = pthread_mutex_unlock(&ht->resize_mutex); + assert(!ret); + goto retry; + } + + t = rcu_dereference(ht->t); + /* no read barrier needed, because no concurrency with resize */ + hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % t->size; - prev = &ht->tbl[hash]; + prev = &t->tbl[hash]; node = rcu_dereference(*prev); for (;;) { if (likely(!node)) { if (del_node) { goto end; } else { - data = (void *)(unsigned long)-ENOENT; goto error; } } @@ -165,6 +223,17 @@ retry: prev = &node->next; node = rcu_dereference(*prev); } + + if (!del_node) { + /* + * Another concurrent thread stole it ? If so, let it deal with + * this. Assume NODE_STOLEN is the only flag. If this changes, + * read flags before cmpxchg. + */ + if (cmpxchg(&node->flags, 0, NODE_STOLEN) != 0) + goto error; + } + /* Found it ! pointer to object is in "prev" */ if (rcu_cmpxchg_pointer(prev, node, node->next) == node) del_node = node; @@ -182,6 +251,7 @@ end: return data; error: + data = (void *)(unsigned long)-ENOENT; rcu_read_unlock(); return data; @@ -210,11 +280,22 @@ int ht_delete_all(struct rcu_ht *ht) { unsigned long i; struct rcu_ht_node **prev, *node, *inext; + struct rcu_table *t; int cnt = 0; + int ret; + + /* + * Mutual exclusion with resize operations, but leave add/steal execute + * concurrently. This is OK because we operate only on the heads. + */ + ret = pthread_mutex_lock(&ht->resize_mutex); + assert(!ret); - for (i = 0; i < ht->size.lookup; i++) { + t = rcu_dereference(ht->t); + /* no read barrier needed, because no concurrency with resize */ + for (i = 0; i < t->size; i++) { rcu_read_lock(); - prev = &ht->tbl[i]; + prev = &t->tbl[i]; /* * Cut the head. After that, we own the first element. */ @@ -255,6 +336,9 @@ int ht_delete_all(struct rcu_ht *ht) node = inext; } } + + ret = pthread_mutex_unlock(&ht->resize_mutex); + assert(!ret); return cnt; } @@ -267,11 +351,123 @@ int ht_destroy(struct rcu_ht *ht) int ret; ret = ht_delete_all(ht); - free(ht->tbl); + free(ht->t); free(ht); return ret; } +static void ht_resize_grow(struct rcu_ht *ht) +{ + unsigned long i, new_size, old_size; + struct rcu_table *new_t, *old_t; + struct rcu_ht_node *node, *new_node, *tmp; + unsigned long hash; + + old_t = ht->t; + old_size = old_t->size; + + if (old_size == MAX_HT_BUCKETS) + return; + + new_size = old_size << 1; + new_t = calloc(1, sizeof(struct rcu_table) + + (new_size * sizeof(struct rcu_ht_node *))); + new_t->size = new_size; + + for (i = 0; i < old_size; i++) { + /* + * Re-hash each entry, insert in new table. + * It's important that a reader looking for a key _will_ find it + * if it's in the table. + * Copy each node. (just the node, not ->data) + */ + node = old_t->tbl[i]; + while (node) { + hash = ht->hash_fct(node->key, ht->keylen, ht->hashseed) + % new_size; + new_node = malloc(sizeof(struct rcu_ht_node)); + new_node->key = node->key; + new_node->data = node->data; + new_node->flags = node->flags; + new_node->next = new_t->tbl[hash]; /* link to first */ + new_t->tbl[hash] = new_node; /* add to head */ + node = node->next; + } + } + + /* Changing table and size atomically wrt lookups */ + rcu_assign_pointer(ht->t, new_t); + + /* Ensure all concurrent lookups use new size and table */ + synchronize_rcu(); + + for (i = 0; i < old_size; i++) { + node = old_t->tbl[i]; + while (node) { + tmp = node->next; + free(node); + node = tmp; + } + } + free(old_t); +} + +static void ht_resize_shrink(struct rcu_ht *ht) +{ + unsigned long i, new_size; + struct rcu_table *new_t, *old_t; + struct rcu_ht_node **prev, *node; + + old_t = ht->t; + if (old_t->size == 1) + return; + + new_size = old_t->size >> 1; + + for (i = 0; i < new_size; i++) { + /* Link end with first entry of i + new_size */ + prev = &old_t->tbl[i]; + node = *prev; + while (node) { + prev = &node->next; + node = *prev; + } + *prev = old_t->tbl[i + new_size]; + } + smp_wmb(); /* write links before changing size */ + STORE_SHARED(old_t->size, new_size); + + /* Ensure all concurrent lookups use new size */ + synchronize_rcu(); + + new_t = realloc(old_t, sizeof(struct rcu_table) + + (new_size * sizeof(struct rcu_ht_node *))); + /* shrinking, pointers should not move */ + assert(new_t == old_t); +} + +/* + * growth: >0: *2, <0: /2 + */ +void ht_resize(struct rcu_ht *ht, int growth) +{ + int ret; + + ret = pthread_mutex_lock(&ht->resize_mutex); + assert(!ret); + STORE_SHARED(ht->resize_ongoing, 1); + synchronize_rcu(); + /* All add/remove are waiting on the mutex. */ + if (growth > 0) + ht_resize_grow(ht); + else if (growth < 0) + ht_resize_shrink(ht); + smp_mb(); + STORE_SHARED(ht->resize_ongoing, 0); + ret = pthread_mutex_unlock(&ht->resize_mutex); + assert(!ret); +} + /* * Expects keys <= than pointer size to be encoded in the pointer itself. */