Merge commit 'origin/urcu/ht' into urcu/ht
[urcu.git] / urcu-ht.c
1
2 /*
3 * TODO: keys are currently assumed <= sizeof(void *). Key target never freed.
4 */
5
6 #define _LGPL_SOURCE
7 #include <stdlib.h>
8 #include <errno.h>
9 #include <assert.h>
10 #include <stdio.h>
11
12 #include <urcu.h>
13 #include <urcu-defer.h>
14 #include <arch.h>
15 #include <arch_atomic.h>
16 #include <compiler.h>
17 #include <urcu/jhash.h>
18 #include <stdio.h>
19 #include <pthread.h>
20 #include <urcu-ht.h>
21
22 /* node flags */
23 #define NODE_STOLEN (1 << 0)
24
25 struct rcu_ht_node;
26
27 struct rcu_ht_node {
28 struct rcu_ht_node *next;
29 void *key;
30 void *data;
31 unsigned int flags;
32 };
33
34 struct rcu_ht {
35 struct rcu_ht_node **tbl;
36 ht_hash_fct hash_fct;
37 void (*free_fct)(void *data); /* fct to free data */
38 unsigned long size;
39 uint32_t keylen;
40 uint32_t hashseed;
41 pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */
42 int resize_ongoing; /* fast-path resize check */
43 };
44
45 struct rcu_ht *ht_new(ht_hash_fct hash_fct, void (*free_fct)(void *data),
46 unsigned long init_size, uint32_t keylen,
47 uint32_t hashseed)
48 {
49 struct rcu_ht *ht;
50
51 ht = calloc(1, sizeof(struct rcu_ht));
52 ht->hash_fct = hash_fct;
53 ht->free_fct = free_fct;
54 ht->size = init_size;
55 ht->keylen = keylen;
56 ht->hashseed = hashseed;
57 /* this mutex should not nest in read-side C.S. */
58 pthread_mutex_init(&ht->resize_mutex, NULL);
59 ht->resize_ongoing = 0;
60 ht->tbl = calloc(init_size, sizeof(struct rcu_ht_node *));
61 return ht;
62 }
63
64 void *ht_lookup(struct rcu_ht *ht, void *key)
65 {
66 unsigned long hash;
67 struct rcu_ht_node *node;
68 void *ret;
69
70 hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size;
71 smp_read_barrier_depends(); /* read size before links */
72
73 rcu_read_lock();
74 node = rcu_dereference(ht->tbl[hash]);
75 for (;;) {
76 if (likely(!node)) {
77 ret = NULL;
78 break;
79 }
80 if (node->key == key) {
81 ret = node->data;
82 break;
83 }
84 node = rcu_dereference(node->next);
85 }
86 rcu_read_unlock();
87
88 return ret;
89 }
90
91 /*
92 * Will re-try until either:
93 * - The key is already there (-EEXIST)
94 * - We successfully add the key at the head of a table bucket.
95 */
96 int ht_add(struct rcu_ht *ht, void *key, void *data)
97 {
98 struct rcu_ht_node *node, *old_head, *new_head;
99 unsigned long hash;
100 int ret = 0;
101
102 new_head = calloc(1, sizeof(struct rcu_ht_node));
103 new_head->key = key;
104 new_head->data = data;
105 new_head->flags = 0;
106 /* here comes the fun and tricky part.
107 * Add at the beginning with a cmpxchg.
108 * Hold a read lock between the moment the first element is read
109 * and the nodes traversal (to find duplicates). This ensures
110 * the head pointer has not been reclaimed when cmpxchg is done.
111 * Always adding at the head ensures that we would have to
112 * re-try if a new item has been added concurrently. So we ensure that
113 * we never add duplicates. */
114 retry:
115 rcu_read_lock();
116
117 if (unlikely(ht->resize_ongoing)) {
118 rcu_read_unlock();
119 /*
120 * Wait for resize to complete before continuing.
121 */
122 ret = pthread_mutex_lock(&ht->resize_mutex);
123 assert(!ret);
124 ret = pthread_mutex_unlock(&ht->resize_mutex);
125 assert(!ret);
126 goto retry;
127 }
128
129 hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size;
130
131 old_head = node = rcu_dereference(ht->tbl[hash]);
132 for (;;) {
133 if (likely(!node)) {
134 break;
135 }
136 if (node->key == key) {
137 ret = -EEXIST;
138 goto end;
139 }
140 node = rcu_dereference(node->next);
141 }
142 new_head->next = old_head;
143 if (rcu_cmpxchg_pointer(&ht->tbl[hash], old_head, new_head) != old_head)
144 goto restart;
145 end:
146 rcu_read_unlock();
147 return ret;
148
149 /* restart loop, release and re-take the read lock to be kind to GP */
150 restart:
151 rcu_read_unlock();
152 goto retry;
153 }
154
155 /*
156 * Restart until we successfully remove the entry, or no entry is left
157 * ((void *)(unsigned long)-ENOENT).
158 * Deal with concurrent stealers by doing an extra verification pass to check
159 * that no element in the list are still pointing to the element stolen.
160 * This could happen if two concurrent steal for consecutive objects are
161 * executed. A pointer to an object being stolen could be saved by the
162 * concurrent stealer for the previous object.
163 * Also, given that in this precise scenario, another stealer can also want to
164 * delete the doubly-referenced object; use a "stolen" flag to let only one
165 * stealer delete the object.
166 */
167 void *ht_steal(struct rcu_ht *ht, void *key)
168 {
169 struct rcu_ht_node **prev, *node, *del_node = NULL;
170 unsigned long hash;
171 void *data;
172 int ret;
173
174 retry:
175 rcu_read_lock();
176
177 if (unlikely(ht->resize_ongoing)) {
178 rcu_read_unlock();
179 /*
180 * Wait for resize to complete before continuing.
181 */
182 ret = pthread_mutex_lock(&ht->resize_mutex);
183 assert(!ret);
184 ret = pthread_mutex_unlock(&ht->resize_mutex);
185 assert(!ret);
186 goto retry;
187 }
188
189 hash = ht->hash_fct(key, ht->keylen, ht->hashseed) % ht->size;
190
191 prev = &ht->tbl[hash];
192 node = rcu_dereference(*prev);
193 for (;;) {
194 if (likely(!node)) {
195 if (del_node) {
196 goto end;
197 } else {
198 goto error;
199 }
200 }
201 if (node->key == key) {
202 break;
203 }
204 prev = &node->next;
205 node = rcu_dereference(*prev);
206 }
207
208 if (!del_node) {
209 /*
210 * Another concurrent thread stole it ? If so, let it deal with
211 * this. Assume NODE_STOLEN is the only flag. If this changes,
212 * read flags before cmpxchg.
213 */
214 if (cmpxchg(&node->flags, 0, NODE_STOLEN) != 0)
215 goto error;
216 }
217
218 /* Found it ! pointer to object is in "prev" */
219 if (rcu_cmpxchg_pointer(prev, node, node->next) == node)
220 del_node = node;
221 goto restart;
222
223 end:
224 /*
225 * From that point, we own node. Note that there can still be concurrent
226 * RCU readers using it. We can free it outside of read lock after a GP.
227 */
228 rcu_read_unlock();
229
230 data = del_node->data;
231 call_rcu(free, del_node);
232 return data;
233
234 error:
235 data = (void *)(unsigned long)-ENOENT;
236 rcu_read_unlock();
237 return data;
238
239 /* restart loop, release and re-take the read lock to be kind to GP */
240 restart:
241 rcu_read_unlock();
242 goto retry;
243 }
244
245 int ht_delete(struct rcu_ht *ht, void *key)
246 {
247 void *data;
248
249 data = ht_steal(ht, key);
250 if (data && data != (void *)(unsigned long)-ENOENT) {
251 if (ht->free_fct)
252 call_rcu(ht->free_fct, data);
253 return 0;
254 } else {
255 return -ENOENT;
256 }
257 }
258
259 /* Delete all old elements. Allow concurrent writer accesses. */
260 int ht_delete_all(struct rcu_ht *ht)
261 {
262 unsigned long i;
263 struct rcu_ht_node **prev, *node, *inext;
264 int cnt = 0;
265 int ret;
266
267 /*
268 * Mutual exclusion with resize operations, but leave add/steal execute
269 * concurrently. This is OK because we operate only on the heads.
270 */
271 ret = pthread_mutex_lock(&ht->resize_mutex);
272 assert(!ret);
273
274 for (i = 0; i < ht->size; i++) {
275 rcu_read_lock();
276 prev = &ht->tbl[i];
277 /*
278 * Cut the head. After that, we own the first element.
279 */
280 node = rcu_xchg_pointer(prev, NULL);
281 if (!node) {
282 rcu_read_unlock();
283 continue;
284 }
285 /*
286 * We manage a list shared with concurrent writers and readers.
287 * Note that a concurrent add may or may not be deleted by us,
288 * depending if it arrives before or after the head is cut.
289 * "node" points to our first node. Remove first elements
290 * iteratively.
291 */
292 for (;;) {
293 inext = NULL;
294 prev = &node->next;
295 if (prev)
296 inext = rcu_xchg_pointer(prev, NULL);
297 /*
298 * "node" is the first element of the list we have cut.
299 * We therefore own it, no concurrent writer may delete
300 * it. There can only be concurrent lookups. Concurrent
301 * add can only be done on a bucket head, but we've cut
302 * it already. inext is also owned by us, because we
303 * have exchanged it for "NULL". It will therefore be
304 * safe to use it after a G.P.
305 */
306 rcu_read_unlock();
307 if (node->data)
308 call_rcu(ht->free_fct, node->data);
309 call_rcu(free, node);
310 cnt++;
311 if (likely(!inext))
312 break;
313 rcu_read_lock();
314 node = inext;
315 }
316 }
317
318 ret = pthread_mutex_unlock(&ht->resize_mutex);
319 assert(!ret);
320 return cnt;
321 }
322
323 /*
324 * Should only be called when no more concurrent readers nor writers can
325 * possibly access the table.
326 */
327 int ht_destroy(struct rcu_ht *ht)
328 {
329 int ret;
330
331 ret = ht_delete_all(ht);
332 free(ht->tbl);
333 free(ht);
334 return ret;
335 }
336
337 static void ht_resize_grow(struct rcu_ht *ht)
338 {
339 unsigned long i, new_size, old_size;
340 struct rcu_ht_node **new_tbl, **old_tbl;
341 struct rcu_ht_node *node, *new_node, *tmp;
342 unsigned long hash;
343
344 old_size = ht->size;
345
346 if (old_size == 1)
347 return;
348
349 new_size = old_size << 1;
350 new_tbl = calloc(new_size, sizeof(struct rcu_ht_node *));
351
352 for (i = 0; i < old_size; i++) {
353 /*
354 * Re-hash each entry, insert in new table.
355 * It's important that a reader looking for a key _will_ find it
356 * if it's in the table.
357 * Copy each node. (just the node, not ->data)
358 */
359 node = ht->tbl[i];
360 while (node) {
361 hash = ht->hash_fct(node->key, ht->keylen, ht->hashseed)
362 % new_size;
363 new_node = malloc(sizeof(struct rcu_ht_node));
364 new_node->key = node->key;
365 new_node->data = node->data;
366 new_node->next = new_tbl[i]; /* add to head */
367 new_tbl[i] = new_node;
368 node = node->next;
369 }
370 }
371
372 old_tbl = ht->tbl;
373 ht->tbl = new_tbl;
374 smp_wmb(); /* write links and table before changing size */
375 ht->size = new_size;
376
377 /* Ensure all concurrent lookups use new size and table */
378 synchronize_rcu();
379
380 for (i = 0; i < old_size; i++) {
381 node = old_tbl[i];
382 while (node) {
383 tmp = node->next;
384 free(node);
385 node = tmp;
386 }
387 }
388 free(old_tbl);
389 }
390
391 static void ht_resize_shrink(struct rcu_ht *ht)
392 {
393 unsigned long i, new_size;
394 struct rcu_ht_node **new_tbl;
395 struct rcu_ht_node **prev, *node;
396
397 if (ht->size == 1)
398 return;
399
400 new_size = ht->size >> 1;
401
402 for (i = 0; i < new_size; i++) {
403 /* Link end with first entry of 2*i */
404 prev = &ht->tbl[i];
405 node = *prev;
406 while (node) {
407 prev = &node->next;
408 node = *prev;
409 }
410 *prev = ht->tbl[i << 1];
411 }
412 smp_wmb(); /* write links before changing size */
413 ht->size = new_size;
414
415 /* Ensure all concurrent lookups use new size */
416 synchronize_rcu();
417
418 new_tbl = realloc(ht->tbl, new_size * sizeof(struct rcu_ht_node *));
419 /* shrinking, pointers should not move */
420 assert(new_tbl == ht->tbl);
421 }
422
423 /*
424 * growth: >0: *2, <0: /2
425 */
426 void ht_resize(struct rcu_ht *ht, int growth)
427 {
428 int ret;
429
430 ret = pthread_mutex_lock(&ht->resize_mutex);
431 assert(!ret);
432 ht->resize_ongoing = 1;
433 synchronize_rcu();
434 /* All add/remove are waiting on the mutex. */
435 if (growth > 0)
436 ht_resize_grow(ht);
437 else if (growth < 0)
438 ht_resize_shrink(ht);
439 smp_mb();
440 ht->resize_ongoing = 0;
441 ret = pthread_mutex_unlock(&ht->resize_mutex);
442 assert(!ret);
443 }
444
445 /*
446 * Expects keys <= than pointer size to be encoded in the pointer itself.
447 */
448 uint32_t ht_jhash(void *key, uint32_t length, uint32_t initval)
449 {
450 uint32_t ret;
451 void *vkey;
452
453 if (length <= sizeof(void *))
454 vkey = &key;
455 else
456 vkey = key;
457 ret = jhash(vkey, length, initval);
458 return ret;
459 }
This page took 0.037477 seconds and 5 git commands to generate.