RCU support for consumer's hash tables
authorDavid Goulet <dgoulet@efficios.com>
Wed, 11 Jan 2012 19:51:04 +0000 (14:51 -0500)
committerDavid Goulet <dgoulet@efficios.com>
Wed, 11 Jan 2012 19:53:14 +0000 (14:53 -0500)
Signed-off-by: David Goulet <dgoulet@efficios.com>
include/lttng/lttng-consumer.h
liblttng-consumer/lttng-consumer.c

index bba72ee69ec39f82a4b56cf69c65e84c7939b4ab..f5ad3e6f2ad416d5e3d6064444d1064ff1caa519 100644 (file)
@@ -184,9 +184,10 @@ struct lttng_consumer_global_data {
        /*
         * At this time, this lock is used to ensure coherence between the count
         * and number of element in the hash table. It's also a protection for
-        * concurrent read/write between threads. Although hash table used are
-        * lockless data structure, appropriate RCU lock mechanism are not yet
-        * implemented in the consumer.
+        * concurrent read/write between threads.
+        *
+        * XXX: We need to see if this lock is still needed with the lockless RCU
+        * hash tables.
         */
        pthread_mutex_t lock;
 
index f4af47404c6053af72f90107a5ea6acf9c3650c2..617282d9c38b3690f5969f883690c7922c478b2b 100644 (file)
@@ -67,6 +67,8 @@ static struct lttng_consumer_stream *consumer_find_stream(int key)
        if (key < 0)
                return NULL;
 
+       rcu_read_lock();
+
        lttng_ht_lookup(consumer_data.stream_ht, (void *)((unsigned long) key),
                        &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -74,6 +76,8 @@ static struct lttng_consumer_stream *consumer_find_stream(int key)
                stream = caa_container_of(node, struct lttng_consumer_stream, node);
        }
 
+       rcu_read_unlock();
+
        return stream;
 }
 
@@ -96,6 +100,8 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
        if (key < 0)
                return NULL;
 
+       rcu_read_lock();
+
        lttng_ht_lookup(consumer_data.channel_ht, (void *)((unsigned long) key),
                        &iter);
        node = lttng_ht_iter_get_node_ulong(&iter);
@@ -103,6 +109,8 @@ static struct lttng_consumer_channel *consumer_find_channel(int key)
                channel = caa_container_of(node, struct lttng_consumer_channel, node);
        }
 
+       rcu_read_unlock();
+
        return channel;
 }
 
@@ -146,6 +154,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
                goto end;
        }
 
+       rcu_read_lock();
+
        /* Get stream node from hash table */
        lttng_ht_lookup(consumer_data.stream_ht,
                        (void *)((unsigned long) stream->key), &iter);
@@ -153,6 +163,8 @@ void consumer_del_stream(struct lttng_consumer_stream *stream)
        ret = lttng_ht_del(consumer_data.stream_ht, &iter);
        assert(!ret);
 
+       rcu_read_unlock();
+
        if (consumer_data.stream_count <= 0) {
                goto end;
        }
@@ -181,6 +193,16 @@ end:
                consumer_del_channel(free_chan);
 }
 
+static void consumer_del_stream_rcu(struct rcu_head *head)
+{
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_consumer_stream *stream =
+               caa_container_of(node, struct lttng_consumer_stream, node);
+
+       consumer_del_stream(stream);
+}
+
 struct lttng_consumer_stream *consumer_allocate_stream(
                int channel_key, int stream_key,
                int shm_fd, int wait_fd,
@@ -257,7 +279,9 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal stream identifier, for UST */
        consumer_steal_stream_key(stream->key);
+       rcu_read_lock();
        lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
+       rcu_read_unlock();
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
@@ -321,11 +345,15 @@ void consumer_del_channel(struct lttng_consumer_channel *channel)
                goto end;
        }
 
+       rcu_read_lock();
+
        lttng_ht_lookup(consumer_data.channel_ht,
                        (void *)((unsigned long) channel->key), &iter);
        ret = lttng_ht_del(consumer_data.channel_ht, &iter);
        assert(!ret);
 
+       rcu_read_unlock();
+
        if (channel->mmap_base != NULL) {
                ret = munmap(channel->mmap_base, channel->mmap_len);
                if (ret != 0) {
@@ -344,6 +372,16 @@ end:
        pthread_mutex_unlock(&consumer_data.lock);
 }
 
+static void consumer_del_channel_rcu(struct rcu_head *head)
+{
+       struct lttng_ht_node_ulong *node =
+               caa_container_of(head, struct lttng_ht_node_ulong, head);
+       struct lttng_consumer_channel *channel=
+               caa_container_of(node, struct lttng_consumer_channel, node);
+
+       consumer_del_channel(channel);
+}
+
 struct lttng_consumer_channel *consumer_allocate_channel(
                int channel_key,
                int shm_fd, int wait_fd,
@@ -403,7 +441,9 @@ int consumer_add_channel(struct lttng_consumer_channel *channel)
        pthread_mutex_lock(&consumer_data.lock);
        /* Steal channel identifier, for UST */
        consumer_steal_channel_key(channel->key);
+       rcu_read_lock();
        lttng_ht_add_unique_ulong(consumer_data.channel_ht, &channel->node);
+       rcu_read_unlock();
        pthread_mutex_unlock(&consumer_data.lock);
        return 0;
 }
@@ -510,27 +550,29 @@ void lttng_consumer_cleanup(void)
 {
        int ret;
        struct lttng_ht_iter iter;
-       struct lttng_consumer_stream *stream;
-       struct lttng_consumer_channel *channel;
+       struct lttng_ht_node_ulong *node;
+
+       rcu_read_lock();
 
        /*
-        * close all outfd. Called when there are no more threads
-        * running (after joining on the threads), no need to protect
-        * list iteration with mutex.
+        * close all outfd. Called when there are no more threads running (after
+        * joining on the threads), no need to protect list iteration with mutex.
         */
-       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, stream,
-                       node.node) {
+       cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
+                       node) {
                ret = lttng_ht_del(consumer_data.stream_ht, &iter);
                assert(!ret);
-               consumer_del_stream(stream);
+               call_rcu(&node->head, consumer_del_stream_rcu);
        }
 
-       cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, channel,
-                       node.node) {
+       cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
+                       node) {
                ret = lttng_ht_del(consumer_data.channel_ht, &iter);
                assert(!ret);
-               consumer_del_channel(channel);
+               call_rcu(&node->head, consumer_del_channel_rcu);
        }
+
+       rcu_read_unlock();
 }
 
 /*
@@ -897,11 +939,15 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                }
                        } else if (pollfd[i].revents & POLLERR) {
                                ERR("Error returned in polling fd %d.", pollfd[i].fd);
-                               consumer_del_stream(local_stream[i]);
+                               rcu_read_lock();
+                               consumer_del_stream_rcu(&local_stream[i]->node.head);
+                               rcu_read_unlock();
                                num_hup++;
                        } else if (pollfd[i].revents & POLLNVAL) {
                                ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
-                               consumer_del_stream(local_stream[i]);
+                               rcu_read_lock();
+                               consumer_del_stream_rcu(&local_stream[i]->node.head);
+                               rcu_read_unlock();
                                num_hup++;
                        } else if ((pollfd[i].revents & POLLHUP) &&
                                        !(pollfd[i].revents & POLLIN)) {
@@ -919,7 +965,9 @@ void *lttng_consumer_thread_poll_fds(void *data)
                                } else {
                                        DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
                                }
-                               consumer_del_stream(local_stream[i]);
+                               rcu_read_lock();
+                               consumer_del_stream_rcu(&local_stream[i]->node.head);
+                               rcu_read_unlock();
                                num_hup++;
                        }
                }
This page took 0.029151 seconds and 4 git commands to generate.