rculfhash: hold rcu read-side lock in resize
[urcu.git] / rculfhash.c
index 2565099d2c12b633dce919e927363b10ff63912c..c3a032e20d26a35c09d1f1b3c466432206ecbd32 100644 (file)
 #define dbg_printf(fmt, args...)
 #endif
 
+/* For testing */
+#define POISON_FREE
+
 /*
  * Per-CPU split-counters lazily update the global counter each 1024
  * addition/removal. It automatically keeps track of resize required.
@@ -217,6 +220,8 @@ struct cds_lfht {
        void (*cds_lfht_call_rcu)(struct rcu_head *head,
                      void (*func)(struct rcu_head *head));
        void (*cds_lfht_synchronize_rcu)(void);
+       void (*cds_lfht_rcu_read_lock)(void);
+       void (*cds_lfht_rcu_read_unlock)(void);
        unsigned long count;            /* global approximate item count */
        struct ht_items_count *percpu_count;    /* per-cpu item count */
 };
@@ -418,6 +423,16 @@ int get_count_order_ulong(unsigned long x)
        return order;
 }
 
+#ifdef POISON_FREE
+#define poison_free(ptr)                               \
+       do {                                            \
+               memset(ptr, 0x42, sizeof(*(ptr)));      \
+               free(ptr);                              \
+       } while (0)
+#else
+#define poison_free(ptr)       free(ptr)
+#endif
+
 static
 void cds_lfht_resize_lazy(struct cds_lfht *ht, struct rcu_table *t, int growth);
 
@@ -468,7 +483,7 @@ struct ht_items_count *alloc_per_cpu_items_count(void)
 static
 void free_per_cpu_items_count(struct ht_items_count *count)
 {
-       free(count);
+       poison_free(count);
 }
 
 static
@@ -644,7 +659,7 @@ void cds_lfht_free_table_cb(struct rcu_head *head)
 {
        struct rcu_table *t =
                caa_container_of(head, struct rcu_table, head);
-       free(t);
+       poison_free(t);
 }
 
 static
@@ -652,7 +667,7 @@ void cds_lfht_free_level(struct rcu_head *head)
 {
        struct rcu_level *l =
                caa_container_of(head, struct rcu_level, head);
-       free(l);
+       poison_free(l);
 }
 
 /*
@@ -662,15 +677,20 @@ static
 void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node)
 {
        struct cds_lfht_node *iter_prev, *iter, *next, *new_next;
+       struct cds_lfht_node *iter_trace[64];
+       unsigned long trace_idx = 0;
 
+       memset(iter_trace, 0, sizeof(iter_trace));
        assert(!is_dummy(dummy));
        assert(!is_removed(dummy));
        assert(!is_dummy(node));
        assert(!is_removed(node));
        for (;;) {
+               iter_trace[trace_idx++ & (64 - 1)] = (void *) 0x1;
                iter_prev = dummy;
                /* We can always skip the dummy node initially */
                iter = rcu_dereference(iter_prev->p.next);
+               iter_trace[trace_idx++ & (64 - 1)] = iter;
                assert(iter_prev->p.reverse_hash <= node->p.reverse_hash);
                /*
                 * We should never be called with dummy (start of chain)
@@ -689,6 +709,7 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node
                                break;
                        iter_prev = clear_flag(iter);
                        iter = next;
+                       iter_trace[trace_idx++ & (64 - 1)] = iter;
                }
                assert(!is_removed(iter));
                if (is_dummy(iter))
@@ -696,6 +717,7 @@ void _cds_lfht_gc_bucket(struct cds_lfht_node *dummy, struct cds_lfht_node *node
                else
                        new_next = clear_flag(next);
                (void) uatomic_cmpxchg(&iter_prev->p.next, iter, new_next);
+               iter_trace[trace_idx++ & (64 - 1)] = (void *) 0x2;
        }
 }
 
@@ -842,6 +864,11 @@ end:
                return -ENOENT;
 }
 
+/*
+ * Holding RCU read lock to protect _cds_lfht_add against memory
+ * reclaim that could be performed by other call_rcu worker threads (ABA
+ * problem).
+ */
 static
 void init_table(struct cds_lfht *ht, struct rcu_table *t,
                unsigned long first_order, unsigned long len_order)
@@ -859,6 +886,7 @@ void init_table(struct cds_lfht *ht, struct rcu_table *t,
                dbg_printf("init order %lu len: %lu\n", i, len);
                t->tbl[i] = calloc(1, sizeof(struct rcu_level)
                                + (len * sizeof(struct _cds_lfht_node)));
+               ht->cds_lfht_rcu_read_lock();
                for (j = 0; j < len; j++) {
                        struct cds_lfht_node *new_node =
                                (struct cds_lfht_node *) &t->tbl[i]->nodes[j];
@@ -871,6 +899,7 @@ void init_table(struct cds_lfht *ht, struct rcu_table *t,
                        if (CMM_LOAD_SHARED(ht->in_progress_destroy))
                                break;
                }
+               ht->cds_lfht_rcu_read_unlock();
                /* Update table size */
                t->size = !i ? 1 : (1UL << i);
                dbg_printf("init new size: %lu\n", t->size);
@@ -881,6 +910,11 @@ void init_table(struct cds_lfht *ht, struct rcu_table *t,
        t->resize_initiated = 0;
 }
 
+/*
+ * Holding RCU read lock to protect _cds_lfht_remove against memory
+ * reclaim that could be performed by other call_rcu worker threads (ABA
+ * problem).
+ */
 static
 void fini_table(struct cds_lfht *ht, struct rcu_table *t,
                unsigned long first_order, unsigned long len_order)
@@ -902,8 +936,9 @@ void fini_table(struct cds_lfht *ht, struct rcu_table *t,
                 * removal so gc lookups use non-logically-removed dummy
                 * nodes.
                 */
-               t->size = (i == 1) ? 0 : 1UL << (i - 2);
+               t->size = 1UL << (i - 1);
                /* Unlink */
+               ht->cds_lfht_rcu_read_lock();
                for (j = 0; j < len; j++) {
                        struct cds_lfht_node *fini_node =
                                (struct cds_lfht_node *) &t->tbl[i]->nodes[j];
@@ -916,6 +951,7 @@ void fini_table(struct cds_lfht *ht, struct rcu_table *t,
                        if (CMM_LOAD_SHARED(ht->in_progress_destroy))
                                break;
                }
+               ht->cds_lfht_rcu_read_unlock();
                ht->cds_lfht_call_rcu(&t->tbl[i]->head, cds_lfht_free_level);
                dbg_printf("fini new size: %lu\n", t->size);
                if (CMM_LOAD_SHARED(ht->in_progress_destroy))
@@ -932,7 +968,9 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct,
                        int flags,
                        void (*cds_lfht_call_rcu)(struct rcu_head *head,
                                        void (*func)(struct rcu_head *head)),
-                       void (*cds_lfht_synchronize_rcu)(void))
+                       void (*cds_lfht_synchronize_rcu)(void),
+                       void (*cds_lfht_rcu_read_lock)(void),
+                       void (*cds_lfht_rcu_read_unlock)(void))
 {
        struct cds_lfht *ht;
        unsigned long order;
@@ -946,6 +984,8 @@ struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct,
        ht->hash_seed = hash_seed;
        ht->cds_lfht_call_rcu = cds_lfht_call_rcu;
        ht->cds_lfht_synchronize_rcu = cds_lfht_synchronize_rcu;
+       ht->cds_lfht_rcu_read_lock = cds_lfht_rcu_read_lock;
+       ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock;
        ht->in_progress_resize = 0;
        ht->percpu_count = alloc_per_cpu_items_count();
        /* this mutex should not nest in read-side C.S. */
@@ -1101,7 +1141,7 @@ int cds_lfht_delete_dummy(struct cds_lfht *ht)
                                bit_reverse_ulong(t->tbl[order]->nodes[i].reverse_hash));
                        assert(is_dummy(t->tbl[order]->nodes[i].next));
                }
-               free(t->tbl[order]);
+               poison_free(t->tbl[order]);
        }
        return 0;
 }
@@ -1121,9 +1161,9 @@ int cds_lfht_destroy(struct cds_lfht *ht)
        ret = cds_lfht_delete_dummy(ht);
        if (ret)
                return ret;
-       free(ht->t);
+       poison_free(ht->t);
        free_per_cpu_items_count(ht->percpu_count);
-       free(ht);
+       poison_free(ht);
        return ret;
 }
 
@@ -1275,7 +1315,7 @@ void do_resize_cb(struct rcu_head *head)
        pthread_mutex_lock(&ht->resize_mutex);
        _do_cds_lfht_resize(ht);
        pthread_mutex_unlock(&ht->resize_mutex);
-       free(work);
+       poison_free(work);
        cmm_smp_mb();   /* finish resize before decrement */
        uatomic_dec(&ht->in_progress_resize);
 }
This page took 0.024833 seconds and 4 git commands to generate.