Comment cleanup: fix ascii art
[urcu.git] / rculfhash.c
... / ...
CommitLineData
1/*
2 * rculfhash.c
3 *
4 * Userspace RCU library - Lock-Free Resizable RCU Hash Table
5 *
6 * Copyright 2010-2011 - Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
7 *
8 * This library is free software; you can redistribute it and/or
9 * modify it under the terms of the GNU Lesser General Public
10 * License as published by the Free Software Foundation; either
11 * version 2.1 of the License, or (at your option) any later version.
12 *
13 * This library is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
16 * Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public
19 * License along with this library; if not, write to the Free Software
20 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 */
22
23/*
24 * Based on the following articles:
25 * - Ori Shalev and Nir Shavit. Split-ordered lists: Lock-free
26 * extensible hash tables. J. ACM 53, 3 (May 2006), 379-405.
27 * - Michael, M. M. High performance dynamic lock-free hash tables
28 * and list-based sets. In Proceedings of the fourteenth annual ACM
29 * symposium on Parallel algorithms and architectures, ACM Press,
30 * (2002), 73-82.
31 *
32 * Some specificities of this Lock-Free Resizable RCU Hash Table
33 * implementation:
34 *
35 * - RCU read-side critical section allows readers to perform hash
36 * table lookups and use the returned objects safely by delaying
37 * memory reclaim of a grace period.
38 * - Add and remove operations are lock-free, and do not need to
39 * allocate memory. They need to be executed within RCU read-side
40 * critical section to ensure the objects they read are valid and to
41 * deal with the cmpxchg ABA problem.
42 * - add and add_unique operations are supported. add_unique checks if
43 * the node key already exists in the hash table. It ensures no key
44 * duplicata exists.
45 * - The resize operation executes concurrently with add/remove/lookup.
46 * - Hash table nodes are contained within a split-ordered list. This
47 * list is ordered by incrementing reversed-bits-hash value.
48 * - An index of dummy nodes is kept. These dummy nodes are the hash
49 * table "buckets", and they are also chained together in the
50 * split-ordered list, which allows recursive expansion.
51 * - The resize operation for small tables only allows expanding the hash table.
52 * It is triggered automatically by detecting long chains in the add
53 * operation.
54 * - The resize operation for larger tables (and available through an
55 * API) allows both expanding and shrinking the hash table.
56 * - Per-CPU Split-counters are used to keep track of the number of
57 * nodes within the hash table for automatic resize triggering.
58 * - Resize operation initiated by long chain detection is executed by a
59 * call_rcu thread, which keeps lock-freedom of add and remove.
60 * - Resize operations are protected by a mutex.
61 * - The removal operation is split in two parts: first, a "removed"
62 * flag is set in the next pointer within the node to remove. Then,
63 * a "garbage collection" is performed in the bucket containing the
64 * removed node (from the start of the bucket up to the removed node).
65 * All encountered nodes with "removed" flag set in their next
66 * pointers are removed from the linked-list. If the cmpxchg used for
67 * removal fails (due to concurrent garbage-collection or concurrent
68 * add), we retry from the beginning of the bucket. This ensures that
69 * the node with "removed" flag set is removed from the hash table
70 * (not visible to lookups anymore) before the RCU read-side critical
71 * section held across removal ends. Furthermore, this ensures that
72 * the node with "removed" flag set is removed from the linked-list
73 * before its memory is reclaimed. Only the thread which removal
74 * successfully set the "removed" flag (with a cmpxchg) into a node's
75 * next pointer is considered to have succeeded its removal (and thus
76 * owns the node to reclaim). Because we garbage-collect starting from
77 * an invariant node (the start-of-bucket dummy node) up to the
78 * "removed" node (or find a reverse-hash that is higher), we are sure
79 * that a successful traversal of the chain leads to a chain that is
80 * present in the linked-list (the start node is never removed) and
81 * that is does not contain the "removed" node anymore, even if
82 * concurrent delete/add operations are changing the structure of the
83 * list concurrently.
84 * - The add operation performs gargage collection of buckets if it
85 * encounters nodes with removed flag set in the bucket where it wants
86 * to add its new node. This ensures lock-freedom of add operation by
87 * helping the remover unlink nodes from the list rather than to wait
88 * for it do to so.
89 * - A RCU "order table" indexed by log2(hash index) is copied and
90 * expanded by the resize operation. This order table allows finding
91 * the "dummy node" tables.
92 * - There is one dummy node table per hash index order. The size of
93 * each dummy node table is half the number of hashes contained in
94 * this order.
95 * - call_rcu is used to garbage-collect the old order table.
96 * - The per-order dummy node tables contain a compact version of the
97 * hash table nodes. These tables are invariant after they are
98 * populated into the hash table.
99 *
100 * A bit of ascii art explanation:
101 *
102 * Order index is the off-by-one compare to the actual power of 2 because
103 * we use index 0 to deal with the 0 special-case.
104 *
105 * This shows the nodes for a small table ordered by reversed bits:
106 *
107 * bits reverse
108 * 0 000 000
109 * 4 100 001
110 * 2 010 010
111 * 6 110 011
112 * 1 001 100
113 * 5 101 101
114 * 3 011 110
115 * 7 111 111
116 *
117 * This shows the nodes in order of non-reversed bits, linked by
118 * reversed-bit order.
119 *
120 * order bits reverse
121 * 0 0 000 000
122 * 1 | 1 001 100 <-
123 * 2 | | 2 010 010 <- |
124 * | | | 3 011 110 | <- |
125 * 3 -> | | | 4 100 001 | |
126 * -> | | 5 101 101 |
127 * -> | 6 110 011
128 * -> 7 111 111
129 */
130
131#define _LGPL_SOURCE
132#include <stdlib.h>
133#include <errno.h>
134#include <assert.h>
135#include <stdio.h>
136#include <stdint.h>
137#include <string.h>
138
139#include "config.h"
140#include <urcu.h>
141#include <urcu-call-rcu.h>
142#include <urcu/arch.h>
143#include <urcu/uatomic.h>
144#include <urcu/compiler.h>
145#include <urcu/rculfhash.h>
146#include <stdio.h>
147#include <pthread.h>
148
149#ifdef DEBUG
150#define dbg_printf(fmt, args...) printf("[debug rculfhash] " fmt, ## args)
151#else
152#define dbg_printf(fmt, args...)
153#endif
154
155/*
156 * Per-CPU split-counters lazily update the global counter each 1024
157 * addition/removal. It automatically keeps track of resize required.
158 * We use the bucket length as indicator for need to expand for small
159 * tables and machines lacking per-cpu data suppport.
160 */
161#define COUNT_COMMIT_ORDER 10
162#define CHAIN_LEN_TARGET 1
163#define CHAIN_LEN_RESIZE_THRESHOLD 3
164
165/*
166 * Define the minimum table size.
167 */
168#define MIN_TABLE_SIZE 1
169
170#if (CAA_BITS_PER_LONG == 32)
171#define MAX_TABLE_ORDER 32
172#else
173#define MAX_TABLE_ORDER 64
174#endif
175
176/*
177 * Minimum number of dummy nodes to touch per thread to parallelize grow/shrink.
178 */
179#define MIN_PARTITION_PER_THREAD_ORDER 12
180#define MIN_PARTITION_PER_THREAD (1UL << MIN_PARTITION_PER_THREAD_ORDER)
181
182#ifndef min
183#define min(a, b) ((a) < (b) ? (a) : (b))
184#endif
185
186#ifndef max
187#define max(a, b) ((a) > (b) ? (a) : (b))
188#endif
189
190/*
191 * The removed flag needs to be updated atomically with the pointer.
192 * It indicates that no node must attach to the node scheduled for
193 * removal, and that node garbage collection must be performed.
194 * The dummy flag does not require to be updated atomically with the
195 * pointer, but it is added as a pointer low bit flag to save space.
196 */
197#define REMOVED_FLAG (1UL << 0)
198#define DUMMY_FLAG (1UL << 1)
199#define FLAGS_MASK ((1UL << 2) - 1)
200
201/* Value of the end pointer. Should not interact with flags. */
202#define END_VALUE NULL
203
204struct ht_items_count {
205 unsigned long add, del;
206} __attribute__((aligned(CAA_CACHE_LINE_SIZE)));
207
208struct rcu_level {
209 /* Note: manually update allocation length when adding a field */
210 struct _cds_lfht_node nodes[0];
211};
212
213struct rcu_table {
214 unsigned long size; /* always a power of 2, shared (RCU) */
215 unsigned long resize_target;
216 int resize_initiated;
217 struct rcu_level *tbl[MAX_TABLE_ORDER];
218};
219
220struct cds_lfht {
221 struct rcu_table t;
222 cds_lfht_hash_fct hash_fct;
223 cds_lfht_compare_fct compare_fct;
224 unsigned long hash_seed;
225 int flags;
226 /*
227 * We need to put the work threads offline (QSBR) when taking this
228 * mutex, because we use synchronize_rcu within this mutex critical
229 * section, which waits on read-side critical sections, and could
230 * therefore cause grace-period deadlock if we hold off RCU G.P.
231 * completion.
232 */
233 pthread_mutex_t resize_mutex; /* resize mutex: add/del mutex */
234 unsigned int in_progress_resize, in_progress_destroy;
235 void (*cds_lfht_call_rcu)(struct rcu_head *head,
236 void (*func)(struct rcu_head *head));
237 void (*cds_lfht_synchronize_rcu)(void);
238 void (*cds_lfht_rcu_read_lock)(void);
239 void (*cds_lfht_rcu_read_unlock)(void);
240 void (*cds_lfht_rcu_thread_offline)(void);
241 void (*cds_lfht_rcu_thread_online)(void);
242 void (*cds_lfht_rcu_register_thread)(void);
243 void (*cds_lfht_rcu_unregister_thread)(void);
244 pthread_attr_t *resize_attr; /* Resize threads attributes */
245 long count; /* global approximate item count */
246 struct ht_items_count *percpu_count; /* per-cpu item count */
247};
248
249struct rcu_resize_work {
250 struct rcu_head head;
251 struct cds_lfht *ht;
252};
253
254struct partition_resize_work {
255 pthread_t thread_id;
256 struct cds_lfht *ht;
257 unsigned long i, start, len;
258 void (*fct)(struct cds_lfht *ht, unsigned long i,
259 unsigned long start, unsigned long len);
260};
261
262static
263void _cds_lfht_add(struct cds_lfht *ht,
264 unsigned long size,
265 struct cds_lfht_node *node,
266 struct cds_lfht_iter *unique_ret,
267 int dummy);
268
269/*
270 * Algorithm to reverse bits in a word by lookup table, extended to
271 * 64-bit words.
272 * Source:
273 * http://graphics.stanford.edu/~seander/bithacks.html#BitReverseTable
274 * Originally from Public Domain.
275 */
276
277static const uint8_t BitReverseTable256[256] =
278{
279#define R2(n) (n), (n) + 2*64, (n) + 1*64, (n) + 3*64
280#define R4(n) R2(n), R2((n) + 2*16), R2((n) + 1*16), R2((n) + 3*16)
281#define R6(n) R4(n), R4((n) + 2*4 ), R4((n) + 1*4 ), R4((n) + 3*4 )
282 R6(0), R6(2), R6(1), R6(3)
283};
284#undef R2
285#undef R4
286#undef R6
287
288static
289uint8_t bit_reverse_u8(uint8_t v)
290{
291 return BitReverseTable256[v];
292}
293
294static __attribute__((unused))
295uint32_t bit_reverse_u32(uint32_t v)
296{
297 return ((uint32_t) bit_reverse_u8(v) << 24) |
298 ((uint32_t) bit_reverse_u8(v >> 8) << 16) |
299 ((uint32_t) bit_reverse_u8(v >> 16) << 8) |
300 ((uint32_t) bit_reverse_u8(v >> 24));
301}
302
303static __attribute__((unused))
304uint64_t bit_reverse_u64(uint64_t v)
305{
306 return ((uint64_t) bit_reverse_u8(v) << 56) |
307 ((uint64_t) bit_reverse_u8(v >> 8) << 48) |
308 ((uint64_t) bit_reverse_u8(v >> 16) << 40) |
309 ((uint64_t) bit_reverse_u8(v >> 24) << 32) |
310 ((uint64_t) bit_reverse_u8(v >> 32) << 24) |
311 ((uint64_t) bit_reverse_u8(v >> 40) << 16) |
312 ((uint64_t) bit_reverse_u8(v >> 48) << 8) |
313 ((uint64_t) bit_reverse_u8(v >> 56));
314}
315
316static
317unsigned long bit_reverse_ulong(unsigned long v)
318{
319#if (CAA_BITS_PER_LONG == 32)
320 return bit_reverse_u32(v);
321#else
322 return bit_reverse_u64(v);
323#endif
324}
325
326/*
327 * fls: returns the position of the most significant bit.
328 * Returns 0 if no bit is set, else returns the position of the most
329 * significant bit (from 1 to 32 on 32-bit, from 1 to 64 on 64-bit).
330 */
331#if defined(__i386) || defined(__x86_64)
332static inline
333unsigned int fls_u32(uint32_t x)
334{
335 int r;
336
337 asm("bsrl %1,%0\n\t"
338 "jnz 1f\n\t"
339 "movl $-1,%0\n\t"
340 "1:\n\t"
341 : "=r" (r) : "rm" (x));
342 return r + 1;
343}
344#define HAS_FLS_U32
345#endif
346
347#if defined(__x86_64)
348static inline
349unsigned int fls_u64(uint64_t x)
350{
351 long r;
352
353 asm("bsrq %1,%0\n\t"
354 "jnz 1f\n\t"
355 "movq $-1,%0\n\t"
356 "1:\n\t"
357 : "=r" (r) : "rm" (x));
358 return r + 1;
359}
360#define HAS_FLS_U64
361#endif
362
363#ifndef HAS_FLS_U64
364static __attribute__((unused))
365unsigned int fls_u64(uint64_t x)
366{
367 unsigned int r = 64;
368
369 if (!x)
370 return 0;
371
372 if (!(x & 0xFFFFFFFF00000000ULL)) {
373 x <<= 32;
374 r -= 32;
375 }
376 if (!(x & 0xFFFF000000000000ULL)) {
377 x <<= 16;
378 r -= 16;
379 }
380 if (!(x & 0xFF00000000000000ULL)) {
381 x <<= 8;
382 r -= 8;
383 }
384 if (!(x & 0xF000000000000000ULL)) {
385 x <<= 4;
386 r -= 4;
387 }
388 if (!(x & 0xC000000000000000ULL)) {
389 x <<= 2;
390 r -= 2;
391 }
392 if (!(x & 0x8000000000000000ULL)) {
393 x <<= 1;
394 r -= 1;
395 }
396 return r;
397}
398#endif
399
400#ifndef HAS_FLS_U32
401static __attribute__((unused))
402unsigned int fls_u32(uint32_t x)
403{
404 unsigned int r = 32;
405
406 if (!x)
407 return 0;
408 if (!(x & 0xFFFF0000U)) {
409 x <<= 16;
410 r -= 16;
411 }
412 if (!(x & 0xFF000000U)) {
413 x <<= 8;
414 r -= 8;
415 }
416 if (!(x & 0xF0000000U)) {
417 x <<= 4;
418 r -= 4;
419 }
420 if (!(x & 0xC0000000U)) {
421 x <<= 2;
422 r -= 2;
423 }
424 if (!(x & 0x80000000U)) {
425 x <<= 1;
426 r -= 1;
427 }
428 return r;
429}
430#endif
431
432unsigned int fls_ulong(unsigned long x)
433{
434#if (CAA_BITS_PER_lONG == 32)
435 return fls_u32(x);
436#else
437 return fls_u64(x);
438#endif
439}
440
441/*
442 * Return the minimum order for which x <= (1UL << order).
443 * Return -1 if x is 0.
444 */
445int get_count_order_u32(uint32_t x)
446{
447 if (!x)
448 return -1;
449
450 return fls_u32(x - 1);
451}
452
453/*
454 * Return the minimum order for which x <= (1UL << order).
455 * Return -1 if x is 0.
456 */
457int get_count_order_ulong(unsigned long x)
458{
459 if (!x)
460 return -1;
461
462 return fls_ulong(x - 1);
463}
464
465#ifdef POISON_FREE
466#define poison_free(ptr) \
467 do { \
468 memset(ptr, 0x42, sizeof(*(ptr))); \
469 free(ptr); \
470 } while (0)
471#else
472#define poison_free(ptr) free(ptr)
473#endif
474
475static
476void cds_lfht_resize_lazy(struct cds_lfht *ht, unsigned long size, int growth);
477
478/*
479 * If the sched_getcpu() and sysconf(_SC_NPROCESSORS_CONF) calls are
480 * available, then we support hash table item accounting.
481 * In the unfortunate event the number of CPUs reported would be
482 * inaccurate, we use modulo arithmetic on the number of CPUs we got.
483 */
484#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
485
486static
487void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
488 unsigned long count);
489
490static long nr_cpus_mask = -1;
491
492static
493struct ht_items_count *alloc_per_cpu_items_count(void)
494{
495 struct ht_items_count *count;
496
497 switch (nr_cpus_mask) {
498 case -2:
499 return NULL;
500 case -1:
501 {
502 long maxcpus;
503
504 maxcpus = sysconf(_SC_NPROCESSORS_CONF);
505 if (maxcpus <= 0) {
506 nr_cpus_mask = -2;
507 return NULL;
508 }
509 /*
510 * round up number of CPUs to next power of two, so we
511 * can use & for modulo.
512 */
513 maxcpus = 1UL << get_count_order_ulong(maxcpus);
514 nr_cpus_mask = maxcpus - 1;
515 }
516 /* Fall-through */
517 default:
518 return calloc(nr_cpus_mask + 1, sizeof(*count));
519 }
520}
521
522static
523void free_per_cpu_items_count(struct ht_items_count *count)
524{
525 poison_free(count);
526}
527
528static
529int ht_get_cpu(void)
530{
531 int cpu;
532
533 assert(nr_cpus_mask >= 0);
534 cpu = sched_getcpu();
535 if (unlikely(cpu < 0))
536 return cpu;
537 else
538 return cpu & nr_cpus_mask;
539}
540
541static
542void ht_count_add(struct cds_lfht *ht, unsigned long size)
543{
544 unsigned long percpu_count;
545 int cpu;
546
547 if (unlikely(!ht->percpu_count))
548 return;
549 cpu = ht_get_cpu();
550 if (unlikely(cpu < 0))
551 return;
552 percpu_count = uatomic_add_return(&ht->percpu_count[cpu].add, 1);
553 if (unlikely(!(percpu_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))) {
554 long count;
555
556 dbg_printf("add percpu %lu\n", percpu_count);
557 count = uatomic_add_return(&ht->count,
558 1UL << COUNT_COMMIT_ORDER);
559 /* If power of 2 */
560 if (!(count & (count - 1))) {
561 if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) < size)
562 return;
563 dbg_printf("add set global %ld\n", count);
564 cds_lfht_resize_lazy_count(ht, size,
565 count >> (CHAIN_LEN_TARGET - 1));
566 }
567 }
568}
569
570static
571void ht_count_del(struct cds_lfht *ht, unsigned long size)
572{
573 unsigned long percpu_count;
574 int cpu;
575
576 if (unlikely(!ht->percpu_count))
577 return;
578 cpu = ht_get_cpu();
579 if (unlikely(cpu < 0))
580 return;
581 percpu_count = uatomic_add_return(&ht->percpu_count[cpu].del, 1);
582 if (unlikely(!(percpu_count & ((1UL << COUNT_COMMIT_ORDER) - 1)))) {
583 long count;
584
585 dbg_printf("del percpu %lu\n", percpu_count);
586 count = uatomic_add_return(&ht->count,
587 -(1UL << COUNT_COMMIT_ORDER));
588 /* If power of 2 */
589 if (!(count & (count - 1))) {
590 if ((count >> CHAIN_LEN_RESIZE_THRESHOLD) >= size)
591 return;
592 dbg_printf("del set global %ld\n", count);
593 /*
594 * Don't shrink table if the number of nodes is below a
595 * certain threshold.
596 */
597 if (count < (1UL << COUNT_COMMIT_ORDER) * (nr_cpus_mask + 1))
598 return;
599 cds_lfht_resize_lazy_count(ht, size,
600 count >> (CHAIN_LEN_TARGET - 1));
601 }
602 }
603}
604
605#else /* #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
606
607static const long nr_cpus_mask = -2;
608
609static
610struct ht_items_count *alloc_per_cpu_items_count(void)
611{
612 return NULL;
613}
614
615static
616void free_per_cpu_items_count(struct ht_items_count *count)
617{
618}
619
620static
621void ht_count_add(struct cds_lfht *ht, unsigned long size)
622{
623}
624
625static
626void ht_count_del(struct cds_lfht *ht, unsigned long size)
627{
628}
629
630#endif /* #else #if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF) */
631
632
633static
634void check_resize(struct cds_lfht *ht, unsigned long size, uint32_t chain_len)
635{
636 unsigned long count;
637
638 if (!(ht->flags & CDS_LFHT_AUTO_RESIZE))
639 return;
640 count = uatomic_read(&ht->count);
641 /*
642 * Use bucket-local length for small table expand and for
643 * environments lacking per-cpu data support.
644 */
645 if (count >= (1UL << COUNT_COMMIT_ORDER))
646 return;
647 if (chain_len > 100)
648 dbg_printf("WARNING: large chain length: %u.\n",
649 chain_len);
650 if (chain_len >= CHAIN_LEN_RESIZE_THRESHOLD)
651 cds_lfht_resize_lazy(ht, size,
652 get_count_order_u32(chain_len - (CHAIN_LEN_TARGET - 1)));
653}
654
655static
656struct cds_lfht_node *clear_flag(struct cds_lfht_node *node)
657{
658 return (struct cds_lfht_node *) (((unsigned long) node) & ~FLAGS_MASK);
659}
660
661static
662int is_removed(struct cds_lfht_node *node)
663{
664 return ((unsigned long) node) & REMOVED_FLAG;
665}
666
667static
668struct cds_lfht_node *flag_removed(struct cds_lfht_node *node)
669{
670 return (struct cds_lfht_node *) (((unsigned long) node) | REMOVED_FLAG);
671}
672
673static
674int is_dummy(struct cds_lfht_node *node)
675{
676 return ((unsigned long) node) & DUMMY_FLAG;
677}
678
679static
680struct cds_lfht_node *flag_dummy(struct cds_lfht_node *node)
681{
682 return (struct cds_lfht_node *) (((unsigned long) node) | DUMMY_FLAG);
683}
684
685static
686struct cds_lfht_node *get_end(void)
687{
688 return (struct cds_lfht_node *) END_VALUE;
689}
690
691static
692int is_end(struct cds_lfht_node *node)
693{
694 return clear_flag(node) == (struct cds_lfht_node *) END_VALUE;
695}
696
697static
698unsigned long _uatomic_max(unsigned long *ptr, unsigned long v)
699{
700 unsigned long old1, old2;
701
702 old1 = uatomic_read(ptr);
703 do {
704 old2 = old1;
705 if (old2 >= v)
706 return old2;
707 } while ((old1 = uatomic_cmpxchg(ptr, old2, v)) != old2);
708 return v;
709}
710
711static
712struct _cds_lfht_node *lookup_bucket(struct cds_lfht *ht, unsigned long size,
713 unsigned long hash)
714{
715 unsigned long index, order;
716
717 assert(size > 0);
718 index = hash & (size - 1);
719 /*
720 * equivalent to get_count_order_ulong(index + 1), but optimizes
721 * away the non-existing 0 special-case for
722 * get_count_order_ulong.
723 */
724 order = fls_ulong(index);
725
726 dbg_printf("lookup hash %lu index %lu order %lu aridx %lu\n",
727 hash, index, order, index & (!order ? 0 : ((1UL << (order - 1)) - 1)));
728
729 return &ht->t.tbl[order]->nodes[index & (!order ? 0 : ((1UL << (order - 1)) - 1))];
730}
731
732/*
733 * Remove all logically deleted nodes from a bucket up to a certain node key.
734 */
735static
736void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node)
737{
738 struct cds_lfht_node *iter_prev, *iter, *next, *new_next;
739
740 assert(!is_dummy(dummy));
741 assert(!is_removed(dummy));
742 assert(!is_dummy(node));
743 assert(!is_removed(node));
744 for (;;) {
745 iter_prev = dummy;
746 /* We can always skip the dummy node initially */
747 iter = rcu_dereference(iter_prev->p.next);
748 assert(!is_removed(iter));
749 assert(iter_prev->p.reverse_hash <= node->p.reverse_hash);
750 /*
751 * We should never be called with dummy (start of chain)
752 * and logically removed node (end of path compression
753 * marker) being the actual same node. This would be a
754 * bug in the algorithm implementation.
755 */
756 assert(dummy != node);
757 for (;;) {
758 if (unlikely(is_end(iter)))
759 return;
760 if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash))
761 return;
762 next = rcu_dereference(clear_flag(iter)->p.next);
763 if (likely(is_removed(next)))
764 break;
765 iter_prev = clear_flag(iter);
766 iter = next;
767 }
768 assert(!is_removed(iter));
769 if (is_dummy(iter))
770 new_next = flag_dummy(clear_flag(next));
771 else
772 new_next = clear_flag(next);
773 (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next);
774 }
775 return;
776}
777
778static
779int _cds_lfht_replace(struct cds_lfht *ht, unsigned long size,
780 struct cds_lfht_node *old_node,
781 struct cds_lfht_node *old_next,
782 struct cds_lfht_node *new_node)
783{
784 struct cds_lfht_node *dummy, *ret_next;
785 struct _cds_lfht_node *lookup;
786
787 if (!old_node) /* Return -ENOENT if asked to replace NULL node */
788 return -ENOENT;
789
790 assert(!is_removed(old_node));
791 assert(!is_dummy(old_node));
792 assert(!is_removed(new_node));
793 assert(!is_dummy(new_node));
794 assert(new_node != old_node);
795 for (;;) {
796 /* Insert after node to be replaced */
797 if (is_removed(old_next)) {
798 /*
799 * Too late, the old node has been removed under us
800 * between lookup and replace. Fail.
801 */
802 return -ENOENT;
803 }
804 assert(!is_dummy(old_next));
805 assert(new_node != clear_flag(old_next));
806 new_node->p.next = clear_flag(old_next);
807 /*
808 * Here is the whole trick for lock-free replace: we add
809 * the replacement node _after_ the node we want to
810 * replace by atomically setting its next pointer at the
811 * same time we set its removal flag. Given that
812 * the lookups/get next use an iterator aware of the
813 * next pointer, they will either skip the old node due
814 * to the removal flag and see the new node, or use
815 * the old node, but will not see the new one.
816 */
817 ret_next = uatomic_cmpxchg(&old_node->p.next,
818 old_next, flag_removed(new_node));
819 if (ret_next == old_next)
820 break; /* We performed the replacement. */
821 old_next = ret_next;
822 }
823
824 /*
825 * Ensure that the old node is not visible to readers anymore:
826 * lookup for the node, and remove it (along with any other
827 * logically removed node) if found.
828 */
829 lookup = lookup_bucket(ht, size, bit_reverse_ulong(old_node->p.reverse_hash));
830 dummy = (struct cds_lfht_node *) lookup;
831 _cds_lfht_gc_bucket(dummy, new_node);
832
833 assert(is_removed(rcu_dereference(old_node->p.next)));
834 return 0;
835}
836
837/*
838 * A non-NULL unique_ret pointer uses the "add unique" (or uniquify) add
839 * mode. A NULL unique_ret allows creation of duplicate keys.
840 */
841static
842void _cds_lfht_add(struct cds_lfht *ht,
843 unsigned long size,
844 struct cds_lfht_node *node,
845 struct cds_lfht_iter *unique_ret,
846 int dummy)
847{
848 struct cds_lfht_node *iter_prev, *iter, *next, *new_node, *new_next,
849 *return_node;
850 struct _cds_lfht_node *lookup;
851
852 assert(!is_dummy(node));
853 assert(!is_removed(node));
854 if (!size) {
855 assert(dummy);
856 assert(!unique_ret);
857 node->p.next = flag_dummy(get_end());
858 return; /* Initial first add (head) */
859 }
860 lookup = lookup_bucket(ht, size, bit_reverse_ulong(node->p.reverse_hash));
861 for (;;) {
862 uint32_t chain_len = 0;
863
864 /*
865 * iter_prev points to the non-removed node prior to the
866 * insert location.
867 */
868 iter_prev = (struct cds_lfht_node *) lookup;
869 /* We can always skip the dummy node initially */
870 iter = rcu_dereference(iter_prev->p.next);
871 assert(iter_prev->p.reverse_hash <= node->p.reverse_hash);
872 for (;;) {
873 if (unlikely(is_end(iter)))
874 goto insert;
875 if (likely(clear_flag(iter)->p.reverse_hash > node->p.reverse_hash))
876 goto insert;
877 /* dummy node is the first node of the identical-hash-value chain */
878 if (dummy && clear_flag(iter)->p.reverse_hash == node->p.reverse_hash)
879 goto insert;
880 next = rcu_dereference(clear_flag(iter)->p.next);
881 if (unlikely(is_removed(next)))
882 goto gc_node;
883 if (unique_ret
884 && !is_dummy(next)
885 && clear_flag(iter)->p.reverse_hash == node->p.reverse_hash
886 && !ht->compare_fct(node->key, node->key_len,
887 clear_flag(iter)->key,
888 clear_flag(iter)->key_len)) {
889 unique_ret->node = clear_flag(iter);
890 unique_ret->next = next;
891 return;
892 }
893 /* Only account for identical reverse hash once */
894 if (iter_prev->p.reverse_hash != clear_flag(iter)->p.reverse_hash
895 && !is_dummy(next))
896 check_resize(ht, size, ++chain_len);
897 iter_prev = clear_flag(iter);
898 iter = next;
899 }
900
901 insert:
902 assert(node != clear_flag(iter));
903 assert(!is_removed(iter_prev));
904 assert(!is_removed(iter));
905 assert(iter_prev != node);
906 if (!dummy)
907 node->p.next = clear_flag(iter);
908 else
909 node->p.next = flag_dummy(clear_flag(iter));
910 if (is_dummy(iter))
911 new_node = flag_dummy(node);
912 else
913 new_node = node;
914 if (uatomic_cmpxchg(&iter_prev->p.next, iter,
915 new_node) != iter) {
916 continue; /* retry */
917 } else {
918 return_node = node;
919 goto end;
920 }
921
922 gc_node:
923 assert(!is_removed(iter));
924 if (is_dummy(iter))
925 new_next = flag_dummy(clear_flag(next));
926 else
927 new_next = clear_flag(next);
928 (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next);
929 /* retry */
930 }
931end:
932 if (unique_ret) {
933 unique_ret->node = return_node;
934 /* unique_ret->next left unset, never used. */
935 }
936}
937
938static
939int _cds_lfht_del(struct cds_lfht *ht, unsigned long size,
940 struct cds_lfht_node *node,
941 int dummy_removal)
942{
943 struct cds_lfht_node *dummy, *next, *old;
944 struct _cds_lfht_node *lookup;
945
946 if (!node) /* Return -ENOENT if asked to delete NULL node */
947 return -ENOENT;
948
949 /* logically delete the node */
950 assert(!is_dummy(node));
951 assert(!is_removed(node));
952 old = rcu_dereference(node->p.next);
953 do {
954 struct cds_lfht_node *new_next;
955
956 next = old;
957 if (unlikely(is_removed(next)))
958 return -ENOENT;
959 if (dummy_removal)
960 assert(is_dummy(next));
961 else
962 assert(!is_dummy(next));
963 new_next = flag_removed(next);
964 old = uatomic_cmpxchg(&node->p.next, next, new_next);
965 } while (old != next);
966 /* We performed the (logical) deletion. */
967
968 /*
969 * Ensure that the node is not visible to readers anymore: lookup for
970 * the node, and remove it (along with any other logically removed node)
971 * if found.
972 */
973 lookup = lookup_bucket(ht, size, bit_reverse_ulong(node->p.reverse_hash));
974 dummy = (struct cds_lfht_node *) lookup;
975 _cds_lfht_gc_bucket(dummy, node);
976
977 assert(is_removed(rcu_dereference(node->p.next)));
978 return 0;
979}
980
981static
982void *partition_resize_thread(void *arg)
983{
984 struct partition_resize_work *work = arg;
985
986 work->ht->cds_lfht_rcu_register_thread();
987 work->fct(work->ht, work->i, work->start, work->len);
988 work->ht->cds_lfht_rcu_unregister_thread();
989 return NULL;
990}
991
992static
993void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
994 unsigned long len,
995 void (*fct)(struct cds_lfht *ht, unsigned long i,
996 unsigned long start, unsigned long len))
997{
998 unsigned long partition_len;
999 struct partition_resize_work *work;
1000 int thread, ret;
1001 unsigned long nr_threads;
1002
1003 /*
1004 * Note: nr_cpus_mask + 1 is always power of 2.
1005 * We spawn just the number of threads we need to satisfy the minimum
1006 * partition size, up to the number of CPUs in the system.
1007 */
1008 if (nr_cpus_mask > 0) {
1009 nr_threads = min(nr_cpus_mask + 1,
1010 len >> MIN_PARTITION_PER_THREAD_ORDER);
1011 } else {
1012 nr_threads = 1;
1013 }
1014 partition_len = len >> get_count_order_ulong(nr_threads);
1015 work = calloc(nr_threads, sizeof(*work));
1016 assert(work);
1017 for (thread = 0; thread < nr_threads; thread++) {
1018 work[thread].ht = ht;
1019 work[thread].i = i;
1020 work[thread].len = partition_len;
1021 work[thread].start = thread * partition_len;
1022 work[thread].fct = fct;
1023 ret = pthread_create(&(work[thread].thread_id), ht->resize_attr,
1024 partition_resize_thread, &work[thread]);
1025 assert(!ret);
1026 }
1027 for (thread = 0; thread < nr_threads; thread++) {
1028 ret = pthread_join(work[thread].thread_id, NULL);
1029 assert(!ret);
1030 }
1031 free(work);
1032}
1033
1034/*
1035 * Holding RCU read lock to protect _cds_lfht_add against memory
1036 * reclaim that could be performed by other call_rcu worker threads (ABA
1037 * problem).
1038 *
1039 * When we reach a certain length, we can split this population phase over
1040 * many worker threads, based on the number of CPUs available in the system.
1041 * This should therefore take care of not having the expand lagging behind too
1042 * many concurrent insertion threads by using the scheduler's ability to
1043 * schedule dummy node population fairly with insertions.
1044 */
1045static
1046void init_table_populate_partition(struct cds_lfht *ht, unsigned long i,
1047 unsigned long start, unsigned long len)
1048{
1049 unsigned long j;
1050
1051 ht->cds_lfht_rcu_read_lock();
1052 for (j = start; j < start + len; j++) {
1053 struct cds_lfht_node *new_node =
1054 (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j];
1055
1056 dbg_printf("init populate: i %lu j %lu hash %lu\n",
1057 i, j, !i ? 0 : (1UL << (i - 1)) + j);
1058 new_node->p.reverse_hash =
1059 bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j);
1060 _cds_lfht_add(ht, !i ? 0 : (1UL << (i - 1)),
1061 new_node, NULL, 1);
1062 }
1063 ht->cds_lfht_rcu_read_unlock();
1064}
1065
1066static
1067void init_table_populate(struct cds_lfht *ht, unsigned long i,
1068 unsigned long len)
1069{
1070 assert(nr_cpus_mask != -1);
1071 if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
1072 ht->cds_lfht_rcu_thread_online();
1073 init_table_populate_partition(ht, i, 0, len);
1074 ht->cds_lfht_rcu_thread_offline();
1075 return;
1076 }
1077 partition_resize_helper(ht, i, len, init_table_populate_partition);
1078}
1079
1080static
1081void init_table(struct cds_lfht *ht,
1082 unsigned long first_order, unsigned long len_order)
1083{
1084 unsigned long i, end_order;
1085
1086 dbg_printf("init table: first_order %lu end_order %lu\n",
1087 first_order, first_order + len_order);
1088 end_order = first_order + len_order;
1089 for (i = first_order; i < end_order; i++) {
1090 unsigned long len;
1091
1092 len = !i ? 1 : 1UL << (i - 1);
1093 dbg_printf("init order %lu len: %lu\n", i, len);
1094
1095 /* Stop expand if the resize target changes under us */
1096 if (CMM_LOAD_SHARED(ht->t.resize_target) < (!i ? 1 : (1UL << i)))
1097 break;
1098
1099 ht->t.tbl[i] = calloc(1, len * sizeof(struct _cds_lfht_node));
1100 assert(ht->t.tbl[i]);
1101
1102 /*
1103 * Set all dummy nodes reverse hash values for a level and
1104 * link all dummy nodes into the table.
1105 */
1106 init_table_populate(ht, i, len);
1107
1108 /*
1109 * Update table size.
1110 */
1111 cmm_smp_wmb(); /* populate data before RCU size */
1112 CMM_STORE_SHARED(ht->t.size, !i ? 1 : (1UL << i));
1113
1114 dbg_printf("init new size: %lu\n", !i ? 1 : (1UL << i));
1115 if (CMM_LOAD_SHARED(ht->in_progress_destroy))
1116 break;
1117 }
1118}
1119
1120/*
1121 * Holding RCU read lock to protect _cds_lfht_remove against memory
1122 * reclaim that could be performed by other call_rcu worker threads (ABA
1123 * problem).
1124 * For a single level, we logically remove and garbage collect each node.
1125 *
1126 * As a design choice, we perform logical removal and garbage collection on a
1127 * node-per-node basis to simplify this algorithm. We also assume keeping good
1128 * cache locality of the operation would overweight possible performance gain
1129 * that could be achieved by batching garbage collection for multiple levels.
1130 * However, this would have to be justified by benchmarks.
1131 *
1132 * Concurrent removal and add operations are helping us perform garbage
1133 * collection of logically removed nodes. We guarantee that all logically
1134 * removed nodes have been garbage-collected (unlinked) before call_rcu is
1135 * invoked to free a hole level of dummy nodes (after a grace period).
1136 *
1137 * Logical removal and garbage collection can therefore be done in batch or on a
1138 * node-per-node basis, as long as the guarantee above holds.
1139 *
1140 * When we reach a certain length, we can split this removal over many worker
1141 * threads, based on the number of CPUs available in the system. This should
1142 * take care of not letting resize process lag behind too many concurrent
1143 * updater threads actively inserting into the hash table.
1144 */
1145static
1146void remove_table_partition(struct cds_lfht *ht, unsigned long i,
1147 unsigned long start, unsigned long len)
1148{
1149 unsigned long j;
1150
1151 ht->cds_lfht_rcu_read_lock();
1152 for (j = start; j < start + len; j++) {
1153 struct cds_lfht_node *fini_node =
1154 (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j];
1155
1156 dbg_printf("remove entry: i %lu j %lu hash %lu\n",
1157 i, j, !i ? 0 : (1UL << (i - 1)) + j);
1158 fini_node->p.reverse_hash =
1159 bit_reverse_ulong(!i ? 0 : (1UL << (i - 1)) + j);
1160 (void) _cds_lfht_del(ht, !i ? 0 : (1UL << (i - 1)),
1161 fini_node, 1);
1162 }
1163 ht->cds_lfht_rcu_read_unlock();
1164}
1165
1166static
1167void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
1168{
1169
1170 assert(nr_cpus_mask != -1);
1171 if (nr_cpus_mask < 0 || len < 2 * MIN_PARTITION_PER_THREAD) {
1172 ht->cds_lfht_rcu_thread_online();
1173 remove_table_partition(ht, i, 0, len);
1174 ht->cds_lfht_rcu_thread_offline();
1175 return;
1176 }
1177 partition_resize_helper(ht, i, len, remove_table_partition);
1178}
1179
1180static
1181void fini_table(struct cds_lfht *ht,
1182 unsigned long first_order, unsigned long len_order)
1183{
1184 long i, end_order;
1185 void *free_by_rcu = NULL;
1186
1187 dbg_printf("fini table: first_order %lu end_order %lu\n",
1188 first_order, first_order + len_order);
1189 end_order = first_order + len_order;
1190 assert(first_order > 0);
1191 for (i = end_order - 1; i >= first_order; i--) {
1192 unsigned long len;
1193
1194 len = !i ? 1 : 1UL << (i - 1);
1195 dbg_printf("fini order %lu len: %lu\n", i, len);
1196
1197 /* Stop shrink if the resize target changes under us */
1198 if (CMM_LOAD_SHARED(ht->t.resize_target) > (1UL << (i - 1)))
1199 break;
1200
1201 cmm_smp_wmb(); /* populate data before RCU size */
1202 CMM_STORE_SHARED(ht->t.size, 1UL << (i - 1));
1203
1204 /*
1205 * We need to wait for all add operations to reach Q.S. (and
1206 * thus use the new table for lookups) before we can start
1207 * releasing the old dummy nodes. Otherwise their lookup will
1208 * return a logically removed node as insert position.
1209 */
1210 ht->cds_lfht_synchronize_rcu();
1211 if (free_by_rcu)
1212 free(free_by_rcu);
1213
1214 /*
1215 * Set "removed" flag in dummy nodes about to be removed.
1216 * Unlink all now-logically-removed dummy node pointers.
1217 * Concurrent add/remove operation are helping us doing
1218 * the gc.
1219 */
1220 remove_table(ht, i, len);
1221
1222 free_by_rcu = ht->t.tbl[i];
1223
1224 dbg_printf("fini new size: %lu\n", 1UL << i);
1225 if (CMM_LOAD_SHARED(ht->in_progress_destroy))
1226 break;
1227 }
1228
1229 if (free_by_rcu) {
1230 ht->cds_lfht_synchronize_rcu();
1231 free(free_by_rcu);
1232 }
1233}
1234
1235struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct,
1236 cds_lfht_compare_fct compare_fct,
1237 unsigned long hash_seed,
1238 unsigned long init_size,
1239 int flags,
1240 void (*cds_lfht_call_rcu)(struct rcu_head *head,
1241 void (*func)(struct rcu_head *head)),
1242 void (*cds_lfht_synchronize_rcu)(void),
1243 void (*cds_lfht_rcu_read_lock)(void),
1244 void (*cds_lfht_rcu_read_unlock)(void),
1245 void (*cds_lfht_rcu_thread_offline)(void),
1246 void (*cds_lfht_rcu_thread_online)(void),
1247 void (*cds_lfht_rcu_register_thread)(void),
1248 void (*cds_lfht_rcu_unregister_thread)(void),
1249 pthread_attr_t *attr)
1250{
1251 struct cds_lfht *ht;
1252 unsigned long order;
1253
1254 /* init_size must be power of two */
1255 if (init_size && (init_size & (init_size - 1)))
1256 return NULL;
1257 ht = calloc(1, sizeof(struct cds_lfht));
1258 assert(ht);
1259 ht->hash_fct = hash_fct;
1260 ht->compare_fct = compare_fct;
1261 ht->hash_seed = hash_seed;
1262 ht->cds_lfht_call_rcu = cds_lfht_call_rcu;
1263 ht->cds_lfht_synchronize_rcu = cds_lfht_synchronize_rcu;
1264 ht->cds_lfht_rcu_read_lock = cds_lfht_rcu_read_lock;
1265 ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock;
1266 ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline;
1267 ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online;
1268 ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread;
1269 ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread;
1270 ht->resize_attr = attr;
1271 ht->percpu_count = alloc_per_cpu_items_count();
1272 /* this mutex should not nest in read-side C.S. */
1273 pthread_mutex_init(&ht->resize_mutex, NULL);
1274 order = get_count_order_ulong(max(init_size, MIN_TABLE_SIZE)) + 1;
1275 ht->flags = flags;
1276 ht->cds_lfht_rcu_thread_offline();
1277 pthread_mutex_lock(&ht->resize_mutex);
1278 ht->t.resize_target = 1UL << (order - 1);
1279 init_table(ht, 0, order);
1280 pthread_mutex_unlock(&ht->resize_mutex);
1281 ht->cds_lfht_rcu_thread_online();
1282 return ht;
1283}
1284
1285void cds_lfht_lookup(struct cds_lfht *ht, void *key, size_t key_len,
1286 struct cds_lfht_iter *iter)
1287{
1288 struct cds_lfht_node *node, *next, *dummy_node;
1289 struct _cds_lfht_node *lookup;
1290 unsigned long hash, reverse_hash, size;
1291
1292 hash = ht->hash_fct(key, key_len, ht->hash_seed);
1293 reverse_hash = bit_reverse_ulong(hash);
1294
1295 size = rcu_dereference(ht->t.size);
1296 lookup = lookup_bucket(ht, size, hash);
1297 dummy_node = (struct cds_lfht_node *) lookup;
1298 /* We can always skip the dummy node initially */
1299 node = rcu_dereference(dummy_node->p.next);
1300 node = clear_flag(node);
1301 for (;;) {
1302 if (unlikely(is_end(node))) {
1303 node = next = NULL;
1304 break;
1305 }
1306 if (unlikely(node->p.reverse_hash > reverse_hash)) {
1307 node = next = NULL;
1308 break;
1309 }
1310 next = rcu_dereference(node->p.next);
1311 if (likely(!is_removed(next))
1312 && !is_dummy(next)
1313 && clear_flag(node)->p.reverse_hash == reverse_hash
1314 && likely(!ht->compare_fct(node->key, node->key_len, key, key_len))) {
1315 break;
1316 }
1317 node = clear_flag(next);
1318 }
1319 assert(!node || !is_dummy(rcu_dereference(node->p.next)));
1320 iter->node = node;
1321 iter->next = next;
1322}
1323
1324void cds_lfht_next_duplicate(struct cds_lfht *ht, struct cds_lfht_iter *iter)
1325{
1326 struct cds_lfht_node *node, *next;
1327 unsigned long reverse_hash;
1328 void *key;
1329 size_t key_len;
1330
1331 node = iter->node;
1332 reverse_hash = node->p.reverse_hash;
1333 key = node->key;
1334 key_len = node->key_len;
1335 next = iter->next;
1336 node = clear_flag(next);
1337
1338 for (;;) {
1339 if (unlikely(is_end(node))) {
1340 node = next = NULL;
1341 break;
1342 }
1343 if (unlikely(node->p.reverse_hash > reverse_hash)) {
1344 node = next = NULL;
1345 break;
1346 }
1347 next = rcu_dereference(node->p.next);
1348 if (likely(!is_removed(next))
1349 && !is_dummy(next)
1350 && likely(!ht->compare_fct(node->key, node->key_len, key, key_len))) {
1351 break;
1352 }
1353 node = clear_flag(next);
1354 }
1355 assert(!node || !is_dummy(rcu_dereference(node->p.next)));
1356 iter->node = node;
1357 iter->next = next;
1358}
1359
1360void cds_lfht_next(struct cds_lfht *ht, struct cds_lfht_iter *iter)
1361{
1362 struct cds_lfht_node *node, *next;
1363
1364 node = clear_flag(iter->next);
1365 for (;;) {
1366 if (unlikely(is_end(node))) {
1367 node = next = NULL;
1368 break;
1369 }
1370 next = rcu_dereference(node->p.next);
1371 if (likely(!is_removed(next))
1372 && !is_dummy(next)) {
1373 break;
1374 }
1375 node = clear_flag(next);
1376 }
1377 assert(!node || !is_dummy(rcu_dereference(node->p.next)));
1378 iter->node = node;
1379 iter->next = next;
1380}
1381
1382void cds_lfht_first(struct cds_lfht *ht, struct cds_lfht_iter *iter)
1383{
1384 struct _cds_lfht_node *lookup;
1385
1386 /*
1387 * Get next after first dummy node. The first dummy node is the
1388 * first node of the linked list.
1389 */
1390 lookup = &ht->t.tbl[0]->nodes[0];
1391 iter->next = lookup->next;
1392 cds_lfht_next(ht, iter);
1393}
1394
1395void cds_lfht_add(struct cds_lfht *ht, struct cds_lfht_node *node)
1396{
1397 unsigned long hash, size;
1398
1399 hash = ht->hash_fct(node->key, node->key_len, ht->hash_seed);
1400 node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash);
1401
1402 size = rcu_dereference(ht->t.size);
1403 _cds_lfht_add(ht, size, node, NULL, 0);
1404 ht_count_add(ht, size);
1405}
1406
1407struct cds_lfht_node *cds_lfht_add_unique(struct cds_lfht *ht,
1408 struct cds_lfht_node *node)
1409{
1410 unsigned long hash, size;
1411 struct cds_lfht_iter iter;
1412
1413 hash = ht->hash_fct(node->key, node->key_len, ht->hash_seed);
1414 node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash);
1415
1416 size = rcu_dereference(ht->t.size);
1417 _cds_lfht_add(ht, size, node, &iter, 0);
1418 if (iter.node == node)
1419 ht_count_add(ht, size);
1420 return iter.node;
1421}
1422
1423struct cds_lfht_node *cds_lfht_add_replace(struct cds_lfht *ht,
1424 struct cds_lfht_node *node)
1425{
1426 unsigned long hash, size;
1427 struct cds_lfht_iter iter;
1428
1429 hash = ht->hash_fct(node->key, node->key_len, ht->hash_seed);
1430 node->p.reverse_hash = bit_reverse_ulong((unsigned long) hash);
1431
1432 size = rcu_dereference(ht->t.size);
1433 for (;;) {
1434 _cds_lfht_add(ht, size, node, &iter, 0);
1435 if (iter.node == node) {
1436 ht_count_add(ht, size);
1437 return NULL;
1438 }
1439
1440 if (!_cds_lfht_replace(ht, size, iter.node, iter.next, node))
1441 return iter.node;
1442 }
1443}
1444
1445int cds_lfht_replace(struct cds_lfht *ht, struct cds_lfht_iter *old_iter,
1446 struct cds_lfht_node *new_node)
1447{
1448 unsigned long size;
1449
1450 size = rcu_dereference(ht->t.size);
1451 return _cds_lfht_replace(ht, size, old_iter->node, old_iter->next,
1452 new_node);
1453}
1454
1455int cds_lfht_del(struct cds_lfht *ht, struct cds_lfht_iter *iter)
1456{
1457 unsigned long size;
1458 int ret;
1459
1460 size = rcu_dereference(ht->t.size);
1461 ret = _cds_lfht_del(ht, size, iter->node, 0);
1462 if (!ret)
1463 ht_count_del(ht, size);
1464 return ret;
1465}
1466
1467static
1468int cds_lfht_delete_dummy(struct cds_lfht *ht)
1469{
1470 struct cds_lfht_node *node;
1471 struct _cds_lfht_node *lookup;
1472 unsigned long order, i, size;
1473
1474 /* Check that the table is empty */
1475 lookup = &ht->t.tbl[0]->nodes[0];
1476 node = (struct cds_lfht_node *) lookup;
1477 do {
1478 node = clear_flag(node)->p.next;
1479 if (!is_dummy(node))
1480 return -EPERM;
1481 assert(!is_removed(node));
1482 } while (!is_end(node));
1483 /*
1484 * size accessed without rcu_dereference because hash table is
1485 * being destroyed.
1486 */
1487 size = ht->t.size;
1488 /* Internal sanity check: all nodes left should be dummy */
1489 for (order = 0; order < get_count_order_ulong(size) + 1; order++) {
1490 unsigned long len;
1491
1492 len = !order ? 1 : 1UL << (order - 1);
1493 for (i = 0; i < len; i++) {
1494 dbg_printf("delete order %lu i %lu hash %lu\n",
1495 order, i,
1496 bit_reverse_ulong(ht->t.tbl[order]->nodes[i].reverse_hash));
1497 assert(is_dummy(ht->t.tbl[order]->nodes[i].next));
1498 }
1499 poison_free(ht->t.tbl[order]);
1500 }
1501 return 0;
1502}
1503
1504/*
1505 * Should only be called when no more concurrent readers nor writers can
1506 * possibly access the table.
1507 */
1508int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
1509{
1510 int ret;
1511
1512 /* Wait for in-flight resize operations to complete */
1513 _CMM_STORE_SHARED(ht->in_progress_destroy, 1);
1514 cmm_smp_mb(); /* Store destroy before load resize */
1515 while (uatomic_read(&ht->in_progress_resize))
1516 poll(NULL, 0, 100); /* wait for 100ms */
1517 ret = cds_lfht_delete_dummy(ht);
1518 if (ret)
1519 return ret;
1520 free_per_cpu_items_count(ht->percpu_count);
1521 if (attr)
1522 *attr = ht->resize_attr;
1523 poison_free(ht);
1524 return ret;
1525}
1526
1527void cds_lfht_count_nodes(struct cds_lfht *ht,
1528 long *approx_before,
1529 unsigned long *count,
1530 unsigned long *removed,
1531 long *approx_after)
1532{
1533 struct cds_lfht_node *node, *next;
1534 struct _cds_lfht_node *lookup;
1535 unsigned long nr_dummy = 0;
1536
1537 *approx_before = 0;
1538 if (nr_cpus_mask >= 0) {
1539 int i;
1540
1541 for (i = 0; i < nr_cpus_mask + 1; i++) {
1542 *approx_before += uatomic_read(&ht->percpu_count[i].add);
1543 *approx_before -= uatomic_read(&ht->percpu_count[i].del);
1544 }
1545 }
1546
1547 *count = 0;
1548 *removed = 0;
1549
1550 /* Count non-dummy nodes in the table */
1551 lookup = &ht->t.tbl[0]->nodes[0];
1552 node = (struct cds_lfht_node *) lookup;
1553 do {
1554 next = rcu_dereference(node->p.next);
1555 if (is_removed(next)) {
1556 if (!is_dummy(next))
1557 (*removed)++;
1558 else
1559 (nr_dummy)++;
1560 } else if (!is_dummy(next))
1561 (*count)++;
1562 else
1563 (nr_dummy)++;
1564 node = clear_flag(next);
1565 } while (!is_end(node));
1566 dbg_printf("number of dummy nodes: %lu\n", nr_dummy);
1567 *approx_after = 0;
1568 if (nr_cpus_mask >= 0) {
1569 int i;
1570
1571 for (i = 0; i < nr_cpus_mask + 1; i++) {
1572 *approx_after += uatomic_read(&ht->percpu_count[i].add);
1573 *approx_after -= uatomic_read(&ht->percpu_count[i].del);
1574 }
1575 }
1576}
1577
1578/* called with resize mutex held */
1579static
1580void _do_cds_lfht_grow(struct cds_lfht *ht,
1581 unsigned long old_size, unsigned long new_size)
1582{
1583 unsigned long old_order, new_order;
1584
1585 old_order = get_count_order_ulong(old_size) + 1;
1586 new_order = get_count_order_ulong(new_size) + 1;
1587 dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
1588 old_size, old_order, new_size, new_order);
1589 assert(new_size > old_size);
1590 init_table(ht, old_order, new_order - old_order);
1591}
1592
1593/* called with resize mutex held */
1594static
1595void _do_cds_lfht_shrink(struct cds_lfht *ht,
1596 unsigned long old_size, unsigned long new_size)
1597{
1598 unsigned long old_order, new_order;
1599
1600 new_size = max(new_size, MIN_TABLE_SIZE);
1601 old_order = get_count_order_ulong(old_size) + 1;
1602 new_order = get_count_order_ulong(new_size) + 1;
1603 dbg_printf("resize from %lu (order %lu) to %lu (order %lu) buckets\n",
1604 old_size, old_order, new_size, new_order);
1605 assert(new_size < old_size);
1606
1607 /* Remove and unlink all dummy nodes to remove. */
1608 fini_table(ht, new_order, old_order - new_order);
1609}
1610
1611
1612/* called with resize mutex held */
1613static
1614void _do_cds_lfht_resize(struct cds_lfht *ht)
1615{
1616 unsigned long new_size, old_size;
1617
1618 /*
1619 * Resize table, re-do if the target size has changed under us.
1620 */
1621 do {
1622 assert(uatomic_read(&ht->in_progress_resize));
1623 if (CMM_LOAD_SHARED(ht->in_progress_destroy))
1624 break;
1625 ht->t.resize_initiated = 1;
1626 old_size = ht->t.size;
1627 new_size = CMM_LOAD_SHARED(ht->t.resize_target);
1628 if (old_size < new_size)
1629 _do_cds_lfht_grow(ht, old_size, new_size);
1630 else if (old_size > new_size)
1631 _do_cds_lfht_shrink(ht, old_size, new_size);
1632 ht->t.resize_initiated = 0;
1633 /* write resize_initiated before read resize_target */
1634 cmm_smp_mb();
1635 } while (ht->t.size != CMM_LOAD_SHARED(ht->t.resize_target));
1636}
1637
1638static
1639unsigned long resize_target_update(struct cds_lfht *ht, unsigned long size,
1640 int growth_order)
1641{
1642 return _uatomic_max(&ht->t.resize_target,
1643 size << growth_order);
1644}
1645
1646static
1647void resize_target_update_count(struct cds_lfht *ht,
1648 unsigned long count)
1649{
1650 count = max(count, MIN_TABLE_SIZE);
1651 uatomic_set(&ht->t.resize_target, count);
1652}
1653
1654void cds_lfht_resize(struct cds_lfht *ht, unsigned long new_size)
1655{
1656 resize_target_update_count(ht, new_size);
1657 CMM_STORE_SHARED(ht->t.resize_initiated, 1);
1658 ht->cds_lfht_rcu_thread_offline();
1659 pthread_mutex_lock(&ht->resize_mutex);
1660 _do_cds_lfht_resize(ht);
1661 pthread_mutex_unlock(&ht->resize_mutex);
1662 ht->cds_lfht_rcu_thread_online();
1663}
1664
1665static
1666void do_resize_cb(struct rcu_head *head)
1667{
1668 struct rcu_resize_work *work =
1669 caa_container_of(head, struct rcu_resize_work, head);
1670 struct cds_lfht *ht = work->ht;
1671
1672 ht->cds_lfht_rcu_thread_offline();
1673 pthread_mutex_lock(&ht->resize_mutex);
1674 _do_cds_lfht_resize(ht);
1675 pthread_mutex_unlock(&ht->resize_mutex);
1676 ht->cds_lfht_rcu_thread_online();
1677 poison_free(work);
1678 cmm_smp_mb(); /* finish resize before decrement */
1679 uatomic_dec(&ht->in_progress_resize);
1680}
1681
1682static
1683void cds_lfht_resize_lazy(struct cds_lfht *ht, unsigned long size, int growth)
1684{
1685 struct rcu_resize_work *work;
1686 unsigned long target_size;
1687
1688 target_size = resize_target_update(ht, size, growth);
1689 /* Store resize_target before read resize_initiated */
1690 cmm_smp_mb();
1691 if (!CMM_LOAD_SHARED(ht->t.resize_initiated) && size < target_size) {
1692 uatomic_inc(&ht->in_progress_resize);
1693 cmm_smp_mb(); /* increment resize count before load destroy */
1694 if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
1695 uatomic_dec(&ht->in_progress_resize);
1696 return;
1697 }
1698 work = malloc(sizeof(*work));
1699 work->ht = ht;
1700 ht->cds_lfht_call_rcu(&work->head, do_resize_cb);
1701 CMM_STORE_SHARED(ht->t.resize_initiated, 1);
1702 }
1703}
1704
1705#if defined(HAVE_SCHED_GETCPU) && defined(HAVE_SYSCONF)
1706
1707static
1708void cds_lfht_resize_lazy_count(struct cds_lfht *ht, unsigned long size,
1709 unsigned long count)
1710{
1711 struct rcu_resize_work *work;
1712
1713 if (!(ht->flags & CDS_LFHT_AUTO_RESIZE))
1714 return;
1715 resize_target_update_count(ht, count);
1716 /* Store resize_target before read resize_initiated */
1717 cmm_smp_mb();
1718 if (!CMM_LOAD_SHARED(ht->t.resize_initiated)) {
1719 uatomic_inc(&ht->in_progress_resize);
1720 cmm_smp_mb(); /* increment resize count before load destroy */
1721 if (CMM_LOAD_SHARED(ht->in_progress_destroy)) {
1722 uatomic_dec(&ht->in_progress_resize);
1723 return;
1724 }
1725 work = malloc(sizeof(*work));
1726 work->ht = ht;
1727 ht->cds_lfht_call_rcu(&work->head, do_resize_cb);
1728 CMM_STORE_SHARED(ht->t.resize_initiated, 1);
1729 }
1730}
1731
1732#endif
This page took 0.070063 seconds and 4 git commands to generate.