urcu-ht: resize support (wip)
[urcu.git] / urcu-ht.c
1
2 /*
3 * TODO: keys are currently assumed <= sizeof(void *). Key target never freed.
4 */
5
6 #define _LGPL_SOURCE
7 #include <stdlib.h>
8 #include <urcu.h>
9 #include <arch.h>
10 #include <arch_atomic.h>
11 #include <assert.h>
12 #include <compiler.h>
13 #include <urcu-defer.h>
14 #include <errno.h>
15 #include <urcu-ht.h>
16 #include <urcu/jhash.h>
17 #include <stdio.h>
18 #include <pthread.h>
19
20 struct rcu_ht_node;
21
22 struct rcu_ht_node {
23 struct rcu_ht_node *next;
24 void *key;
25 void *data;
26 };
27
28 struct rcu_ht {
29 struct rcu_ht_node **tbl;
30 ht_hash_fct hash_fct;
31 void (*free_fct)(void *data); /* fct to free data */
32 unsigned long size;
33 uint32_t keylen;
34 uint32_t hashseed;
35 pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */
36 int resize_ongoing; /* fast-path resize check */
37 };
38
39 struct rcu_ht *ht_new(ht_hash_fct hash_fct, void (*free_fct)(void *data),
40 unsigned long init_size, uint32_t keylen,
41 uint32_t hashseed)
42 {
43 struct rcu_ht *ht;
44
45 ht = calloc(1, sizeof(struct rcu_ht));
46 ht->hash_fct = hash_fct;
47 ht->free_fct = free_fct;
48 ht->size = init_size;
49 ht->keylen = keylen;
50 ht->hashseed = hashseed;
51 /* this mutex should not nest in read-side C.S. */
52 pthread_mutex_init(&ht->resize_mutex, NULL);
53 ht->resize_ongoing = 0;
54 ht->tbl = calloc(init_size, sizeof(struct rcu_ht_node *));
55 return ht;
56 }
57
58 void *ht_lookup(struct rcu_ht *ht, void *key)
59 {
60 unsigned long hash;
61 struct rcu_ht_node *node;
62 void *ret;
63
64 hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size;
65 smp_read_barrier_depends(); /* read size before links */
66
67 rcu_read_lock();
68 node = rcu_dereference(ht->tbl[hash]);
69 for (;;) {
70 if (likely(!node)) {
71 ret = NULL;
72 break;
73 }
74 if (node->key == key) {
75 ret = node->data;
76 break;
77 }
78 node = rcu_dereference(node->next);
79 }
80 rcu_read_unlock();
81
82 return ret;
83 }
84
85 /*
86 * Will re-try until either:
87 * - The key is already there (-EEXIST)
88 * - We successfully add the key at the head of a table bucket.
89 */
90 int ht_add(struct rcu_ht *ht, void *key, void *data)
91 {
92 struct rcu_ht_node *node, *old_head, *new_head;
93 unsigned long hash;
94 int ret = 0;
95
96 new_head = calloc(1, sizeof(struct rcu_ht_node));
97 new_head->key = key;
98 new_head->data = data;
99 /* here comes the fun and tricky part.
100 * Add at the beginning with a cmpxchg.
101 * Hold a read lock between the moment the first element is read
102 * and the nodes traversal (to find duplicates). This ensures
103 * the head pointer has not been reclaimed when cmpxchg is done.
104 * Always adding at the head ensures that we would have to
105 * re-try if a new item has been added concurrently. So we ensure that
106 * we never add duplicates. */
107 retry:
108 rcu_read_lock();
109
110 if (unlikely(ht->resize_ongoing)) {
111 rcu_read_unlock();
112 /*
113 * Wait for resize to complete before continuing.
114 */
115 ret = pthread_mutex_lock(&ht->resize_mutex);
116 assert(!ret);
117 ret = pthread_mutex_unlock(&ht->resize_mutex);
118 assert(!ret);
119 goto retry;
120 }
121
122 hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size;
123
124 old_head = node = rcu_dereference(ht->tbl[hash]);
125 for (;;) {
126 if (likely(!node)) {
127 break;
128 }
129 if (node->key == key) {
130 ret = -EEXIST;
131 goto end;
132 }
133 node = rcu_dereference(node->next);
134 }
135 new_head->next = old_head;
136 if (rcu_cmpxchg_pointer(&ht->tbl[hash], old_head, new_head) != old_head)
137 goto restart;
138 end:
139 rcu_read_unlock();
140 return ret;
141
142 /* restart loop, release and re-take the read lock to be kind to GP */
143 restart:
144 rcu_read_unlock();
145 goto retry;
146 }
147
148 /*
149 * Restart until we successfully remove the entry, or no entry is left
150 * ((void *)(unsigned long)-ENOENT).
151 * Deal with concurrent stealers by verifying that there are no element
152 * in the list still pointing to the element stolen. (del_node)
153 */
154 void *ht_steal(struct rcu_ht *ht, void *key)
155 {
156 struct rcu_ht_node **prev, *node, *del_node = NULL;
157 unsigned long hash;
158 void *data;
159 int ret;
160
161 retry:
162 rcu_read_lock();
163
164 if (unlikely(ht->resize_ongoing)) {
165 rcu_read_unlock();
166 /*
167 * Wait for resize to complete before continuing.
168 */
169 ret = pthread_mutex_lock(&ht->resize_mutex);
170 assert(!ret);
171 ret = pthread_mutex_unlock(&ht->resize_mutex);
172 assert(!ret);
173 goto retry;
174 }
175
176 hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size;
177
178 prev = &ht->tbl[hash];
179 node = rcu_dereference(*prev);
180 for (;;) {
181 if (likely(!node)) {
182 if (del_node) {
183 goto end;
184 } else {
185 data = (void *)(unsigned long)-ENOENT;
186 goto error;
187 }
188 }
189 if (node->key == key) {
190 break;
191 }
192 prev = &node->next;
193 node = rcu_dereference(*prev);
194 }
195 /* Found it ! pointer to object is in "prev" */
196 if (rcu_cmpxchg_pointer(prev, node, node->next) != node)
197 del_node = node;
198 goto restart;
199
200 end:
201 /*
202 * From that point, we own node. Note that there can still be concurrent
203 * RCU readers using it. We can free it outside of read lock after a GP.
204 */
205 rcu_read_unlock();
206
207 data = node->data;
208 call_rcu(free, node);
209 return data;
210
211 error:
212 rcu_read_unlock();
213 return data;
214
215 /* restart loop, release and re-take the read lock to be kind to GP */
216 restart:
217 rcu_read_unlock();
218 goto retry;
219 }
220
221 int ht_delete(struct rcu_ht *ht, void *key)
222 {
223 void *data;
224
225 data = ht_steal(ht, key);
226 if (data && data != (void *)(unsigned long)-ENOENT) {
227 if (ht->free_fct)
228 call_rcu(ht->free_fct, data);
229 return 0;
230 } else {
231 return -ENOENT;
232 }
233 }
234
235 /* Delete all old elements. Allow concurrent writer accesses. */
236 int ht_delete_all(struct rcu_ht *ht)
237 {
238 unsigned long i;
239 struct rcu_ht_node **prev, *node, *inext;
240 int cnt = 0;
241 int ret;
242
243 /*
244 * Mutual exclusion with resize operations, but leave add/steal execute
245 * concurrently. This is OK because we operate only on the heads.
246 */
247 ret = pthread_mutex_lock(&ht->resize_mutex);
248 assert(!ret);
249
250 for (i = 0; i < ht->size; i++) {
251 rcu_read_lock();
252 prev = &ht->tbl[i];
253 /*
254 * Cut the head. After that, we own the first element.
255 */
256 node = rcu_xchg_pointer(prev, NULL);
257 if (!node) {
258 rcu_read_unlock();
259 continue;
260 }
261 /*
262 * We manage a list shared with concurrent writers and readers.
263 * Note that a concurrent add may or may not be deleted by us,
264 * depending if it arrives before or after the head is cut.
265 * "node" points to our first node. Remove first elements
266 * iteratively.
267 */
268 for (;;) {
269 inext = NULL;
270 prev = &node->next;
271 if (prev)
272 inext = rcu_xchg_pointer(prev, NULL);
273 /*
274 * "node" is the first element of the list we have cut.
275 * We therefore own it, no concurrent writer may delete
276 * it. There can only be concurrent lookups. Concurrent
277 * add can only be done on a bucket head, but we've cut
278 * it already. inext is also owned by us, because we
279 * have exchanged it for "NULL". It will therefore be
280 * safe to use it after a G.P.
281 */
282 rcu_read_unlock();
283 if (node->data)
284 call_rcu(ht->free_fct, node->data);
285 call_rcu(free, node);
286 cnt++;
287 if (likely(!inext))
288 break;
289 rcu_read_lock();
290 node = inext;
291 }
292 }
293
294 ret = pthread_mutex_unlock(&ht->resize_mutex);
295 assert(!ret);
296 return cnt;
297 }
298
299 /*
300 * Should only be called when no more concurrent readers nor writers can
301 * possibly access the table.
302 */
303 int ht_destroy(struct rcu_ht *ht)
304 {
305 int ret;
306
307 ret = ht_delete_all(ht);
308 free(ht->tbl);
309 free(ht);
310 return ret;
311 }
312
313 static void ht_resize_grow(struct rcu_ht *ht)
314 {
315 unsigned long i, new_size, old_size;
316 struct rcu_ht_node **new_tbl, **old_tbl;
317 struct rcu_ht_node *node, *new_node, *tmp;
318 unsigned long hash;
319
320 old_size = ht->size;
321
322 if (old_size == 1)
323 return;
324
325 new_size = old_size << 1;
326 new_tbl = calloc(new_size, sizeof(struct rcu_ht_node *));
327
328 for (i = 0; i < old_size; i++) {
329 /*
330 * Re-hash each entry, insert in new table.
331 * It's important that a reader looking for a key _will_ find it
332 * if it's in the table.
333 * Copy each node. (just the node, not ->data)
334 */
335 node = ht->tbl[i];
336 while (node) {
337 hash = ht->hash_fct(node->key, ht->keylen, ht->hashseed)
338 % new_size;
339 new_node = malloc(sizeof(struct rcu_ht_node));
340 new_node->key = node->key;
341 new_node->data = node->data;
342 new_node->next = new_tbl[i]; /* add to head */
343 new_tbl[i] = new_node;
344 node = node->next;
345 }
346 }
347
348 old_tbl = ht->tbl;
349 ht->tbl = new_tbl;
350 smp_wmb(); /* write links and table before changing size */
351 ht->size = new_size;
352
353 /* Ensure all concurrent lookups use new size and table */
354 synchronize_rcu();
355
356 for (i = 0; i < old_size; i++) {
357 node = old_tbl[i];
358 while (node) {
359 tmp = node->next;
360 free(node);
361 node = tmp;
362 }
363 }
364 free(old_tbl);
365 }
366
367 static void ht_resize_shrink(struct rcu_ht *ht)
368 {
369 unsigned long i, new_size;
370 struct rcu_ht_node **new_tbl;
371 struct rcu_ht_node **prev, *node;
372
373 if (ht->size == 1)
374 return;
375
376 new_size = ht->size >> 1;
377
378 for (i = 0; i < new_size; i++) {
379 /* Link end with first entry of 2*i */
380 prev = &ht->tbl[i];
381 node = *prev;
382 while (node) {
383 prev = &node->next;
384 node = *prev;
385 }
386 *prev = ht->tbl[i << 1];
387 }
388 smp_wmb(); /* write links before changing size */
389 ht->size = new_size;
390
391 /* Ensure all concurrent lookups use new size */
392 synchronize_rcu();
393
394 new_tbl = realloc(ht->tbl, new_size * sizeof(struct rcu_ht_node *));
395 /* shrinking, pointers should not move */
396 assert(new_tbl == ht->tbl);
397 }
398
399 /*
400 * growth: >0: *2, <0: /2
401 */
402 void ht_resize(struct rcu_ht *ht, int growth)
403 {
404 int ret;
405
406 ret = pthread_mutex_lock(&ht->resize_mutex);
407 assert(!ret);
408 ht->resize_ongoing = 1;
409 synchronize_rcu();
410 /* All add/remove are waiting on the mutex. */
411 if (growth > 0)
412 ht_resize_grow(ht);
413 else if (growth < 0)
414 ht_resize_shrink(ht);
415 smp_mb();
416 ht->resize_ongoing = 0;
417 ret = pthread_mutex_unlock(&ht->resize_mutex);
418 assert(!ret);
419 }
420
421 /*
422 * Expects keys <= than pointer size to be encoded in the pointer itself.
423 */
424 uint32_t ht_jhash(void *key, uint32_t length, uint32_t initval)
425 {
426 uint32_t ret;
427 void *vkey;
428
429 if (length <= sizeof(void *))
430 vkey = &key;
431 else
432 vkey = key;
433 ret = jhash(vkey, length, initval);
434 return ret;
435 }
This page took 0.037699 seconds and 5 git commands to generate.