Move add data stream to the data thread
authorDavid Goulet <dgoulet@efficios.com>
Thu, 11 Oct 2012 19:22:59 +0000 (15:22 -0400)
committerDavid Goulet <dgoulet@efficios.com>
Fri, 19 Oct 2012 16:49:01 +0000 (12:49 -0400)
As a second step of refactoring, upon receiving a data stream, we send
it to the data thread that is now in charge of handling it.

The ustctl_* calls are moved into the on_recv_stream function call
executed once the stream is allocated and fds have been received. This
also fits with the kernel consumer which mmap the buffers in the
on_recv_stream call.

This commit should speed up the add stream process for the session
daemon. There is still some actions to move out of the session daemon
poll thread to significantly gain speed, especially for network
streaming.

Signed-off-by: David Goulet <dgoulet@efficios.com>
src/common/consumer.c
src/common/consumer.h
src/common/kernel-consumer/kernel-consumer.c
src/common/ust-consumer/ust-consumer.c

index cf1b7b96f95a6c09897d44b9f6d735013513d035..be78e256f489661eda948fc8b143a25b69c23a0c 100644 (file)
@@ -89,7 +89,7 @@ static struct lttng_consumer_stream *consumer_find_stream(int key,
        return stream;
 }
 
        return stream;
 }
 
-static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
+void consumer_steal_stream_key(int key, struct lttng_ht *ht)
 {
        struct lttng_consumer_stream *stream;
 
 {
        struct lttng_consumer_stream *stream;
 
@@ -409,6 +409,14 @@ struct lttng_consumer_stream *consumer_allocate_stream(
        lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
        lttng_ht_node_init_ulong(&stream->node, stream->key);
 
        lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
        lttng_ht_node_init_ulong(&stream->node, stream->key);
 
+       /*
+        * The cpu number is needed before using any ustctl_* actions. Ignored for
+        * the kernel so the value does not matter.
+        */
+       pthread_mutex_lock(&consumer_data.lock);
+       stream->cpu = stream->chan->cpucount++;
+       pthread_mutex_unlock(&consumer_data.lock);
+
        DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
                        " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
                        stream->shm_fd, stream->wait_fd,
        DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
                        " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
                        stream->shm_fd, stream->wait_fd,
@@ -437,28 +445,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        pthread_mutex_lock(&consumer_data.lock);
        rcu_read_lock();
 
        pthread_mutex_lock(&consumer_data.lock);
        rcu_read_lock();
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               stream->cpu = stream->chan->cpucount++;
-               ret = lttng_ustconsumer_add_stream(stream);
-               if (ret) {
-                       ret = -EINVAL;
-                       goto error;
-               }
-
-               /* Steal stream identifier only for UST */
-               consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               ret = -ENOSYS;
-               goto error;
-       }
-
        lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
 
        /* Check and cleanup relayd */
        lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
 
        /* Check and cleanup relayd */
@@ -485,7 +471,6 @@ int consumer_add_stream(struct lttng_consumer_stream *stream)
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
        consumer_data.stream_count++;
        consumer_data.need_update = 1;
 
-error:
        rcu_read_unlock();
        pthread_mutex_unlock(&consumer_data.lock);
 
        rcu_read_unlock();
        pthread_mutex_unlock(&consumer_data.lock);
 
@@ -1585,12 +1570,6 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                goto free_stream;
        }
 
                goto free_stream;
        }
 
-       rcu_read_lock();
-       iter.iter.node = &stream->waitfd_node.node;
-       ret = lttng_ht_del(ht, &iter);
-       assert(!ret);
-       rcu_read_unlock();
-
        pthread_mutex_lock(&consumer_data.lock);
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
        pthread_mutex_lock(&consumer_data.lock);
        switch (consumer_data.type) {
        case LTTNG_CONSUMER_KERNEL:
@@ -1611,6 +1590,12 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
                goto end;
        }
 
                goto end;
        }
 
+       rcu_read_lock();
+       iter.iter.node = &stream->waitfd_node.node;
+       ret = lttng_ht_del(ht, &iter);
+       assert(!ret);
+       rcu_read_unlock();
+
        if (stream->out_fd >= 0) {
                ret = close(stream->out_fd);
                if (ret) {
        if (stream->out_fd >= 0) {
                ret = close(stream->out_fd);
                if (ret) {
@@ -1697,27 +1682,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
 
        pthread_mutex_lock(&consumer_data.lock);
 
 
        pthread_mutex_lock(&consumer_data.lock);
 
-       switch (consumer_data.type) {
-       case LTTNG_CONSUMER_KERNEL:
-               break;
-       case LTTNG_CONSUMER32_UST:
-       case LTTNG_CONSUMER64_UST:
-               ret = lttng_ustconsumer_add_stream(stream);
-               if (ret) {
-                       ret = -EINVAL;
-                       goto error;
-               }
-
-               /* Steal stream identifier only for UST */
-               consumer_steal_stream_key(stream->wait_fd, ht);
-               break;
-       default:
-               ERR("Unknown consumer_data type");
-               assert(0);
-               ret = -ENOSYS;
-               goto error;
-       }
-
        /*
         * From here, refcounts are updated so be _careful_ when returning an error
         * after this point.
        /*
         * From here, refcounts are updated so be _careful_ when returning an error
         * after this point.
@@ -1747,7 +1711,6 @@ static int consumer_add_metadata_stream(struct lttng_consumer_stream *stream,
        lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
        rcu_read_unlock();
 
        lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
        rcu_read_unlock();
 
-error:
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
        pthread_mutex_unlock(&consumer_data.lock);
        return ret;
 }
@@ -1947,7 +1910,7 @@ void *consumer_thread_data_poll(void *data)
        int num_rdy, num_hup, high_prio, ret, i;
        struct pollfd *pollfd = NULL;
        /* local view of the streams */
        int num_rdy, num_hup, high_prio, ret, i;
        struct pollfd *pollfd = NULL;
        /* local view of the streams */
-       struct lttng_consumer_stream **local_stream = NULL;
+       struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
        /* local view of consumer_data.fds_count */
        int nb_fd = 0;
        struct lttng_consumer_local_data *ctx = data;
@@ -2035,13 +1998,35 @@ void *consumer_thread_data_poll(void *data)
                 */
                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
                        size_t pipe_readlen;
                 */
                if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
                        size_t pipe_readlen;
-                       char tmp;
 
                        DBG("consumer_poll_pipe wake up");
                        /* Consume 1 byte of pipe data */
                        do {
 
                        DBG("consumer_poll_pipe wake up");
                        /* Consume 1 byte of pipe data */
                        do {
-                               pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+                               pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+                                               sizeof(new_stream));
                        } while (pipe_readlen == -1 && errno == EINTR);
                        } while (pipe_readlen == -1 && errno == EINTR);
+
+                       /*
+                        * If the stream is NULL, just ignore it. It's also possible that
+                        * the sessiond poll thread changed the consumer_quit state and is
+                        * waking us up to test it.
+                        */
+                       if (new_stream == NULL) {
+                               continue;
+                       }
+
+                       ret = consumer_add_stream(new_stream);
+                       if (ret) {
+                               ERR("Consumer add stream %d failed. Continuing",
+                                               new_stream->key);
+                               /*
+                                * At this point, if the add_stream fails, it is not in the
+                                * hash table thus passing the NULL value here.
+                                */
+                               consumer_del_stream(new_stream, NULL);
+                       }
+
+                       /* Continue to update the local streams and handle prio ones */
                        continue;
                }
 
                        continue;
                }
 
@@ -2261,19 +2246,16 @@ end:
        consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
 
        /*
        consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
 
        /*
-        * Wake-up the other end by writing a null byte in the pipe
-        * (non-blocking). Important note: Because writing into the
-        * pipe is non-blocking (and therefore we allow dropping wakeup
-        * data, as long as there is wakeup data present in the pipe
-        * buffer to wake up the other end), the other end should
-        * perform the following sequence for waiting:
-        * 1) empty the pipe (reads).
-        * 2) perform update operation.
-        * 3) wait on the pipe (poll).
+        * Notify the data poll thread to poll back again and test the
+        * consumer_quit state to quit gracefully.
         */
        do {
         */
        do {
-               ret = write(ctx->consumer_poll_pipe[1], "", 1);
+               struct lttng_consumer_stream *null_stream = NULL;
+
+               ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+                               sizeof(null_stream));
        } while (ret < 0 && errno == EINTR);
        } while (ret < 0 && errno == EINTR);
+
        rcu_unregister_thread();
        return NULL;
 }
        rcu_unregister_thread();
        return NULL;
 }
index 4b225e43c4ced7a0a38527402690a825e27ed9cc..8e5891aef60da7dfa3267ee18006605c6005f5c3 100644 (file)
@@ -362,6 +362,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
 struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                size_t data_size);
 struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
 int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
                size_t data_size);
+void consumer_steal_stream_key(int key, struct lttng_ht *ht);
 
 extern struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
 
 extern struct lttng_consumer_local_data *lttng_consumer_create(
                enum lttng_consumer_type type,
index 13cbe2149de6852ed08e6532f6025ff3e382a103..1d725c2318b74029feb878289cfdfde55fa082fd 100644 (file)
@@ -233,12 +233,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                        if (ret < 0) {
                                PERROR("write metadata pipe");
                                consumer_del_stream(new_stream, NULL);
                        if (ret < 0) {
                                PERROR("write metadata pipe");
                                consumer_del_stream(new_stream, NULL);
+                               goto end_nosignal;
                        }
                } else {
                        }
                } else {
-                       ret = consumer_add_stream(new_stream);
-                       if (ret) {
-                               ERR("Consumer add stream %d failed. Continuing",
-                                               new_stream->key);
+                       do {
+                               ret = write(ctx->consumer_poll_pipe[1], &new_stream,
+                                               sizeof(new_stream));
+                       } while (ret < 0 && errno == EINTR);
+                       if (ret < 0) {
+                               PERROR("write data pipe");
                                consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                                consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
@@ -284,20 +287,6 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                goto end_nosignal;
        }
 
                goto end_nosignal;
        }
 
-       /*
-        * Wake-up the other end by writing a null byte in the pipe (non-blocking).
-        * Important note: Because writing into the pipe is non-blocking (and
-        * therefore we allow dropping wakeup data, as long as there is wakeup data
-        * present in the pipe buffer to wake up the other end), the other end
-        * should perform the following sequence for waiting:
-        *
-        * 1) empty the pipe (reads).
-        * 2) perform update operation.
-        * 3) wait on the pipe (poll).
-        */
-       do {
-               ret = write(ctx->consumer_poll_pipe[1], "", 1);
-       } while (ret < 0 && errno == EINTR);
 end_nosignal:
        rcu_read_unlock();
 
 end_nosignal:
        rcu_read_unlock();
 
index 11706877a7f5b3f147034abcf106a0bb5e77c5d2..718887971abb7b280565613b0b08643e8232304b 100644 (file)
@@ -265,14 +265,12 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                                goto end_nosignal;
                        }
                } else {
                                goto end_nosignal;
                        }
                } else {
-                       ret = consumer_add_stream(new_stream);
-                       if (ret) {
-                               ERR("Consumer add stream %d failed. Continuing",
-                                               new_stream->key);
-                               /*
-                                * At this point, if the add_stream fails, it is not in the
-                                * hash table thus passing the NULL value here.
-                                */
+                       do {
+                               ret = write(ctx->consumer_poll_pipe[1], &new_stream,
+                                               sizeof(new_stream));
+                       } while (ret < 0 && errno == EINTR);
+                       if (ret < 0) {
+                               PERROR("write data pipe");
                                consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
                                consumer_del_stream(new_stream, NULL);
                                goto end_nosignal;
                        }
@@ -334,20 +332,6 @@ int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
                break;
        }
 
                break;
        }
 
-       /*
-        * Wake-up the other end by writing a null byte in the pipe (non-blocking).
-        * Important note: Because writing into the pipe is non-blocking (and
-        * therefore we allow dropping wakeup data, as long as there is wakeup data
-        * present in the pipe buffer to wake up the other end), the other end
-        * should perform the following sequence for waiting:
-        *
-        * 1) empty the pipe (reads).
-        * 2) perform update operation.
-        * 3) wait on the pipe (poll).
-        */
-       do {
-               ret = write(ctx->consumer_poll_pipe[1], "", 1);
-       } while (ret < 0 && errno == EINTR);
 end_nosignal:
        rcu_read_unlock();
 
 end_nosignal:
        rcu_read_unlock();
 
@@ -528,6 +512,13 @@ int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
                stream->out_fd = ret;
        }
 
                stream->out_fd = ret;
        }
 
+       ret = lttng_ustconsumer_add_stream(stream);
+       if (ret) {
+               consumer_del_stream(stream, NULL);
+               ret = -1;
+               goto error;
+       }
+
        /* we return 0 to let the library handle the FD internally */
        return 0;
 
        /* we return 0 to let the library handle the FD internally */
        return 0;
 
This page took 0.033673 seconds and 4 git commands to generate.