rculfhash: parallelize resize
authorMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 20 Sep 2011 20:46:22 +0000 (16:46 -0400)
committerMathieu Desnoyers <mathieu.desnoyers@efficios.com>
Tue, 20 Sep 2011 20:46:22 +0000 (16:46 -0400)
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
rculfhash.c
tests/test_urcu_hash.c
urcu/rculfhash.h

index ce1cbb79a55f4cc69ed729dddb170773a46d17d8..42501a1f030c6ee29fb0c40e5fdc02394da06992 100644 (file)
 #define MAX_TABLE_ORDER                        64
 #endif
 
+/*
+ * Minimum number of dummy nodes to touch per thread to parallelize grow/shrink.
+ */
+#define MIN_PARTITION_PER_THREAD       4096
+
 #ifndef min
 #define min(a, b)      ((a) < (b) ? (a) : (b))
 #endif
@@ -235,6 +240,9 @@ struct cds_lfht {
        void (*cds_lfht_rcu_read_unlock)(void);
        void (*cds_lfht_rcu_thread_offline)(void);
        void (*cds_lfht_rcu_thread_online)(void);
+       void (*cds_lfht_rcu_register_thread)(void);
+       void (*cds_lfht_rcu_unregister_thread)(void);
+       pthread_attr_t *resize_attr;    /* Resize threads attributes */
        unsigned long count;            /* global approximate item count */
        struct ht_items_count *percpu_count;    /* per-cpu item count */
 };
@@ -244,6 +252,14 @@ struct rcu_resize_work {
        struct cds_lfht *ht;
 };
 
+struct partition_resize_work {
+       struct rcu_head head;
+       struct cds_lfht *ht;
+       unsigned long i, start, len;
+       void (*fct)(struct cds_lfht *ht, unsigned long i,
+                   unsigned long start, unsigned long len);
+};
+
 static
 struct cds_lfht_node *_cds_lfht_add(struct cds_lfht *ht,
                                unsigned long size,
@@ -879,25 +895,70 @@ end:
                return -ENOENT;
 }
 
+static
+void *partition_resize_thread(void *arg)
+{
+       struct partition_resize_work *work = arg;
+
+       work->ht->cds_lfht_rcu_register_thread();
+       work->fct(work->ht, work->i, work->start, work->len);
+       work->ht->cds_lfht_rcu_unregister_thread();
+       return NULL;
+}
+
+static
+void partition_resize_helper(struct cds_lfht *ht, unsigned long i,
+               unsigned long len,
+               void (*fct)(struct cds_lfht *ht, unsigned long i,
+                       unsigned long start, unsigned long len))
+{
+       unsigned long partition_len;
+       struct partition_resize_work *work;
+       int cpu, ret;
+       pthread_t *thread_id;
+
+       /* Note: nr_cpus_mask + 1 is always power of 2 */
+       partition_len = len >> get_count_order_ulong(nr_cpus_mask + 1);
+       work = calloc(nr_cpus_mask + 1, sizeof(*work));
+       thread_id = calloc(nr_cpus_mask + 1, sizeof(*thread_id));
+       assert(work);
+       for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) {
+               work[cpu].ht = ht;
+               work[cpu].i = i;
+               work[cpu].len = partition_len;
+               work[cpu].start = cpu * partition_len;
+               work[cpu].fct = fct;
+               ret = pthread_create(&thread_id[cpu], ht->resize_attr,
+                       partition_resize_thread, &work[cpu]);
+               assert(!ret);
+       }
+       for (cpu = 0; cpu < nr_cpus_mask + 1; cpu++) {
+               ret = pthread_join(thread_id[cpu], NULL);
+               assert(!ret);
+       }
+       free(work);
+       free(thread_id);
+}
+
 /*
  * Holding RCU read lock to protect _cds_lfht_add against memory
  * reclaim that could be performed by other call_rcu worker threads (ABA
  * problem).
  *
- * TODO: when we reach a certain length, we can split this population phase over
+ * When we reach a certain length, we can split this population phase over
  * many worker threads, based on the number of CPUs available in the system.
  * This should therefore take care of not having the expand lagging behind too
  * many concurrent insertion threads by using the scheduler's ability to
  * schedule dummy node population fairly with insertions.
  */
 static
-void init_table_populate(struct cds_lfht *ht, unsigned long i, unsigned long len)
+void init_table_populate_partition(struct cds_lfht *ht, unsigned long i,
+                                  unsigned long start, unsigned long len)
 {
        unsigned long j;
 
-       ht->cds_lfht_rcu_thread_online();
        ht->cds_lfht_rcu_read_lock();
-       for (j = 0; j < len; j++) {
+       for (j = start; j < start + len; j++) {
                struct cds_lfht_node *new_node =
                        (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j];
 
@@ -911,7 +972,20 @@ void init_table_populate(struct cds_lfht *ht, unsigned long i, unsigned long len
                        break;
        }
        ht->cds_lfht_rcu_read_unlock();
-       ht->cds_lfht_rcu_thread_offline();
+}
+
+static
+void init_table_populate(struct cds_lfht *ht, unsigned long i,
+                        unsigned long len)
+{
+       assert(nr_cpus_mask != -1);
+       if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) {
+               ht->cds_lfht_rcu_thread_online();
+               init_table_populate_partition(ht, i, 0, len);
+               ht->cds_lfht_rcu_thread_offline();
+               return;
+       }
+       partition_resize_helper(ht, i, len, init_table_populate_partition);
 }
 
 static
@@ -935,6 +1009,7 @@ void init_table(struct cds_lfht *ht,
 
                ht->t.tbl[i] = calloc(1, sizeof(struct rcu_level)
                                + (len * sizeof(struct _cds_lfht_node)));
+               assert(ht->t.tbl[i]);
 
                /*
                 * Set all dummy nodes reverse hash values for a level and
@@ -974,19 +1049,19 @@ void init_table(struct cds_lfht *ht,
  * Logical removal and garbage collection can therefore be done in batch or on a
  * node-per-node basis, as long as the guarantee above holds.
  *
- * TODO: when we reach a certain length, we can split this removal over many
- * worker threads, based on the number of CPUs available in the system. This
- * should take care of not letting resize process lag behind too many concurrent
+ * When we reach a certain length, we can split this removal over many worker
+ * threads, based on the number of CPUs available in the system. This should
+ * take care of not letting resize process lag behind too many concurrent
  * updater threads actively inserting into the hash table.
  */
 static
-void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
+void remove_table_partition(struct cds_lfht *ht, unsigned long i,
+                           unsigned long start, unsigned long len)
 {
        unsigned long j;
 
-       ht->cds_lfht_rcu_thread_online();
        ht->cds_lfht_rcu_read_lock();
-       for (j = 0; j < len; j++) {
+       for (j = start; j < start + len; j++) {
                struct cds_lfht_node *fini_node =
                        (struct cds_lfht_node *) &ht->t.tbl[i]->nodes[j];
 
@@ -1000,7 +1075,20 @@ void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
                        break;
        }
        ht->cds_lfht_rcu_read_unlock();
-       ht->cds_lfht_rcu_thread_offline();
+}
+
+static
+void remove_table(struct cds_lfht *ht, unsigned long i, unsigned long len)
+{
+
+       assert(nr_cpus_mask != -1);
+       if (nr_cpus_mask < 0 || len < (nr_cpus_mask + 1) * MIN_PARTITION_PER_THREAD) {
+               ht->cds_lfht_rcu_thread_online();
+               remove_table_partition(ht, i, 0, len);
+               ht->cds_lfht_rcu_thread_offline();
+               return;
+       }
+       partition_resize_helper(ht, i, len, remove_table_partition);
 }
 
 static
@@ -1061,7 +1149,10 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct,
                        void (*cds_lfht_rcu_read_lock)(void),
                        void (*cds_lfht_rcu_read_unlock)(void),
                        void (*cds_lfht_rcu_thread_offline)(void),
-                       void (*cds_lfht_rcu_thread_online)(void))
+                       void (*cds_lfht_rcu_thread_online)(void),
+                       void (*cds_lfht_rcu_register_thread)(void),
+                       void (*cds_lfht_rcu_unregister_thread)(void),
+                       pthread_attr_t *attr)
 {
        struct cds_lfht *ht;
        unsigned long order;
@@ -1070,6 +1161,7 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct,
        if (init_size && (init_size & (init_size - 1)))
                return NULL;
        ht = calloc(1, sizeof(struct cds_lfht));
+       assert(ht);
        ht->hash_fct = hash_fct;
        ht->compare_fct = compare_fct;
        ht->hash_seed = hash_seed;
@@ -1079,6 +1171,9 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct,
        ht->cds_lfht_rcu_read_unlock = cds_lfht_rcu_read_unlock;
        ht->cds_lfht_rcu_thread_offline = cds_lfht_rcu_thread_offline;
        ht->cds_lfht_rcu_thread_online = cds_lfht_rcu_thread_online;
+       ht->cds_lfht_rcu_register_thread = cds_lfht_rcu_register_thread;
+       ht->cds_lfht_rcu_unregister_thread = cds_lfht_rcu_unregister_thread;
+       ht->resize_attr = attr;
        ht->percpu_count = alloc_per_cpu_items_count();
        /* this mutex should not nest in read-side C.S. */
        pthread_mutex_init(&ht->resize_mutex, NULL);
@@ -1249,7 +1344,7 @@ int cds_lfht_delete_dummy(struct cds_lfht *ht)
  * Should only be called when no more concurrent readers nor writers can
  * possibly access the table.
  */
-int cds_lfht_destroy(struct cds_lfht *ht)
+int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr)
 {
        int ret;
 
@@ -1261,6 +1356,8 @@ int cds_lfht_destroy(struct cds_lfht *ht)
        if (ret)
                return ret;
        free_per_cpu_items_count(ht->percpu_count);
+       if (attr)
+               *attr = ht->resize_attr;
        poison_free(ht);
        return ret;
 }
index 273283ed5b61d0535fb7281d17d4cf6cb6253aea..238b8e560e0cc12caeb88d307723e3b90650813d 100644 (file)
@@ -750,7 +750,7 @@ int main(int argc, char **argv)
        count_writer = malloc(sizeof(*count_writer) * nr_writers);
        test_ht = cds_lfht_new(test_hash, test_compare, 0x42UL,
                        init_hash_size,
-                       opt_auto_resize ? CDS_LFHT_AUTO_RESIZE : 0);
+                       opt_auto_resize ? CDS_LFHT_AUTO_RESIZE : 0, NULL);
        ret = populate_hash();
        assert(!ret);
         err = create_all_cpu_call_rcu_data(0);
@@ -804,7 +804,7 @@ int main(int argc, char **argv)
        if (count || removed)
                printf("WARNING: nodes left in the hash table upon destroy: "
                        "%lu nodes + %lu logically removed.\n", count, removed);
-       ret = cds_lfht_destroy(test_ht);
+       ret = cds_lfht_destroy(test_ht, NULL);
 
        if (ret)
                printf_verbose("final delete aborted\n");
index bc9f232d65deb6632f9c19a84dceb2a6e901a377..4f7cc526f9ddc93528276c3e5740dab9c6f54f9f 100644 (file)
@@ -95,7 +95,10 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct,
                        void (*cds_lfht_rcu_read_lock)(void),
                        void (*cds_lfht_rcu_read_unlock)(void),
                        void (*cds_lfht_rcu_thread_offline)(void),
-                       void (*cds_lfht_rcu_thread_online)(void));
+                       void (*cds_lfht_rcu_thread_online)(void),
+                       void (*cds_lfht_rcu_register_thread)(void),
+                       void (*cds_lfht_rcu_unregister_thread)(void),
+                       pthread_attr_t *attr);
 
 /*
  * cds_lfht_new - allocate a hash table.
@@ -106,30 +109,45 @@ struct cds_lfht *_cds_lfht_new(cds_lfht_hash_fct hash_fct,
  * @flags: hash table creation flags (can be combined with bitwise or: '|').
  *           0: no flags.
  *           CDS_LFHT_AUTO_RESIZE: automatically resize hash table.
+ * @attr: optional resize worker thread attributes. NULL for default.
  *
  * Return NULL on error.
  * Note: the RCU flavor must be already included before the hash table header.
+ *
+ * The programmer is responsible for ensuring that resize operation has a
+ * priority equal to hash table updater threads. It should be performed by
+ * specifying the appropriate priority in the pthread "attr" argument, and,
+ * for CDS_LFHT_AUTO_RESIZE, by ensuring that call_rcu worker threads also have
+ * this priority level. Having lower priority for call_rcu and resize threads
+ * does not pose any correctness issue, but the resize operations could be
+ * starved by updates, thus leading to long hash table bucket chains.
  */
 static inline
 struct cds_lfht *cds_lfht_new(cds_lfht_hash_fct hash_fct,
                        cds_lfht_compare_fct compare_fct,
                        unsigned long hash_seed,
                        unsigned long init_size,
-                       int flags)
+                       int flags,
+                       pthread_attr_t *attr)
 {
        return _cds_lfht_new(hash_fct, compare_fct, hash_seed,
                        init_size, flags,
                        call_rcu, synchronize_rcu, rcu_read_lock,
                        rcu_read_unlock, rcu_thread_offline,
-                       rcu_thread_online);
+                       rcu_thread_online, rcu_register_thread,
+                       rcu_unregister_thread, attr);
 }
 
 /*
  * cds_lfht_destroy - destroy a hash table.
+ * @ht: the hash table to destroy.
+ * @attr: (output) resize worker thread attributes, as received by cds_lfht_new.
+ *        The caller will typically want to free this pointer if dynamically
+ *        allocated.
  *
  * Return 0 on success, negative error value on error.
  */
-int cds_lfht_destroy(struct cds_lfht *ht);
+int cds_lfht_destroy(struct cds_lfht *ht, pthread_attr_t **attr);
 
 /*
  * cds_lfht_count_nodes - count the number of nodes in the hash table.
This page took 0.03022 seconds and 4 git commands to generate.