summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
7d980de)
As a second step of refactoring, upon receiving a data stream, we send
it to the data thread that is now in charge of handling it.
The ustctl_* calls are moved into the on_recv_stream function call
executed once the stream is allocated and fds have been received. This
also fits with the kernel consumer which mmap the buffers in the
on_recv_stream call.
This commit should speed up the add stream process for the session
daemon. There is still some actions to move out of the session daemon
poll thread to significantly gain speed, especially for network
streaming.
Signed-off-by: David Goulet <dgoulet@efficios.com>
-static void consumer_steal_stream_key(int key, struct lttng_ht *ht)
+void consumer_steal_stream_key(int key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
{
struct lttng_consumer_stream *stream;
lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
lttng_ht_node_init_ulong(&stream->node, stream->key);
lttng_ht_node_init_ulong(&stream->waitfd_node, stream->wait_fd);
lttng_ht_node_init_ulong(&stream->node, stream->key);
+ /*
+ * The cpu number is needed before using any ustctl_* actions. Ignored for
+ * the kernel so the value does not matter.
+ */
+ pthread_mutex_lock(&consumer_data.lock);
+ stream->cpu = stream->chan->cpucount++;
+ pthread_mutex_unlock(&consumer_data.lock);
+
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
" out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
stream->shm_fd, stream->wait_fd,
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
" out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
stream->shm_fd, stream->wait_fd,
pthread_mutex_lock(&consumer_data.lock);
rcu_read_lock();
pthread_mutex_lock(&consumer_data.lock);
rcu_read_lock();
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- stream->cpu = stream->chan->cpucount++;
- ret = lttng_ustconsumer_add_stream(stream);
- if (ret) {
- ret = -EINVAL;
- goto error;
- }
-
- /* Steal stream identifier only for UST */
- consumer_steal_stream_key(stream->key, consumer_data.stream_ht);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- ret = -ENOSYS;
- goto error;
- }
-
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
/* Check and cleanup relayd */
lttng_ht_add_unique_ulong(consumer_data.stream_ht, &stream->node);
/* Check and cleanup relayd */
consumer_data.stream_count++;
consumer_data.need_update = 1;
consumer_data.stream_count++;
consumer_data.need_update = 1;
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
- rcu_read_lock();
- iter.iter.node = &stream->waitfd_node.node;
- ret = lttng_ht_del(ht, &iter);
- assert(!ret);
- rcu_read_unlock();
-
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
pthread_mutex_lock(&consumer_data.lock);
switch (consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
+ rcu_read_lock();
+ iter.iter.node = &stream->waitfd_node.node;
+ ret = lttng_ht_del(ht, &iter);
+ assert(!ret);
+ rcu_read_unlock();
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
pthread_mutex_lock(&consumer_data.lock);
pthread_mutex_lock(&consumer_data.lock);
- switch (consumer_data.type) {
- case LTTNG_CONSUMER_KERNEL:
- break;
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- ret = lttng_ustconsumer_add_stream(stream);
- if (ret) {
- ret = -EINVAL;
- goto error;
- }
-
- /* Steal stream identifier only for UST */
- consumer_steal_stream_key(stream->wait_fd, ht);
- break;
- default:
- ERR("Unknown consumer_data type");
- assert(0);
- ret = -ENOSYS;
- goto error;
- }
-
/*
* From here, refcounts are updated so be _careful_ when returning an error
* after this point.
/*
* From here, refcounts are updated so be _careful_ when returning an error
* after this point.
lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
rcu_read_unlock();
lttng_ht_add_unique_ulong(ht, &stream->waitfd_node);
rcu_read_unlock();
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
pthread_mutex_unlock(&consumer_data.lock);
return ret;
}
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
/* local view of the streams */
int num_rdy, num_hup, high_prio, ret, i;
struct pollfd *pollfd = NULL;
/* local view of the streams */
- struct lttng_consumer_stream **local_stream = NULL;
+ struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
struct lttng_consumer_local_data *ctx = data;
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
DBG("consumer_poll_pipe wake up");
/* Consume 1 byte of pipe data */
do {
DBG("consumer_poll_pipe wake up");
/* Consume 1 byte of pipe data */
do {
- pipe_readlen = read(ctx->consumer_poll_pipe[0], &tmp, 1);
+ pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+ sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
} while (pipe_readlen == -1 && errno == EINTR);
+
+ /*
+ * If the stream is NULL, just ignore it. It's also possible that
+ * the sessiond poll thread changed the consumer_quit state and is
+ * waking us up to test it.
+ */
+ if (new_stream == NULL) {
+ continue;
+ }
+
+ ret = consumer_add_stream(new_stream);
+ if (ret) {
+ ERR("Consumer add stream %d failed. Continuing",
+ new_stream->key);
+ /*
+ * At this point, if the add_stream fails, it is not in the
+ * hash table thus passing the NULL value here.
+ */
+ consumer_del_stream(new_stream, NULL);
+ }
+
+ /* Continue to update the local streams and handle prio ones */
consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
/*
consumer_poll_timeout = LTTNG_CONSUMER_POLL_TIMEOUT;
/*
- * Wake-up the other end by writing a null byte in the pipe
- * (non-blocking). Important note: Because writing into the
- * pipe is non-blocking (and therefore we allow dropping wakeup
- * data, as long as there is wakeup data present in the pipe
- * buffer to wake up the other end), the other end should
- * perform the following sequence for waiting:
- * 1) empty the pipe (reads).
- * 2) perform update operation.
- * 3) wait on the pipe (poll).
+ * Notify the data poll thread to poll back again and test the
+ * consumer_quit state to quit gracefully.
- ret = write(ctx->consumer_poll_pipe[1], "", 1);
+ struct lttng_consumer_stream *null_stream = NULL;
+
+ ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+ sizeof(null_stream));
} while (ret < 0 && errno == EINTR);
} while (ret < 0 && errno == EINTR);
rcu_unregister_thread();
return NULL;
}
rcu_unregister_thread();
return NULL;
}
struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
+void consumer_steal_stream_key(int key, struct lttng_ht *ht);
extern struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
extern struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
if (ret < 0) {
PERROR("write metadata pipe");
consumer_del_stream(new_stream, NULL);
if (ret < 0) {
PERROR("write metadata pipe");
consumer_del_stream(new_stream, NULL);
- ret = consumer_add_stream(new_stream);
- if (ret) {
- ERR("Consumer add stream %d failed. Continuing",
- new_stream->key);
+ do {
+ ret = write(ctx->consumer_poll_pipe[1], &new_stream,
+ sizeof(new_stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write data pipe");
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
- /*
- * Wake-up the other end by writing a null byte in the pipe (non-blocking).
- * Important note: Because writing into the pipe is non-blocking (and
- * therefore we allow dropping wakeup data, as long as there is wakeup data
- * present in the pipe buffer to wake up the other end), the other end
- * should perform the following sequence for waiting:
- *
- * 1) empty the pipe (reads).
- * 2) perform update operation.
- * 3) wait on the pipe (poll).
- */
- do {
- ret = write(ctx->consumer_poll_pipe[1], "", 1);
- } while (ret < 0 && errno == EINTR);
end_nosignal:
rcu_read_unlock();
end_nosignal:
rcu_read_unlock();
goto end_nosignal;
}
} else {
goto end_nosignal;
}
} else {
- ret = consumer_add_stream(new_stream);
- if (ret) {
- ERR("Consumer add stream %d failed. Continuing",
- new_stream->key);
- /*
- * At this point, if the add_stream fails, it is not in the
- * hash table thus passing the NULL value here.
- */
+ do {
+ ret = write(ctx->consumer_poll_pipe[1], &new_stream,
+ sizeof(new_stream));
+ } while (ret < 0 && errno == EINTR);
+ if (ret < 0) {
+ PERROR("write data pipe");
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
consumer_del_stream(new_stream, NULL);
goto end_nosignal;
}
- /*
- * Wake-up the other end by writing a null byte in the pipe (non-blocking).
- * Important note: Because writing into the pipe is non-blocking (and
- * therefore we allow dropping wakeup data, as long as there is wakeup data
- * present in the pipe buffer to wake up the other end), the other end
- * should perform the following sequence for waiting:
- *
- * 1) empty the pipe (reads).
- * 2) perform update operation.
- * 3) wait on the pipe (poll).
- */
- do {
- ret = write(ctx->consumer_poll_pipe[1], "", 1);
- } while (ret < 0 && errno == EINTR);
end_nosignal:
rcu_read_unlock();
end_nosignal:
rcu_read_unlock();
+ ret = lttng_ustconsumer_add_stream(stream);
+ if (ret) {
+ consumer_del_stream(stream, NULL);
+ ret = -1;
+ goto error;
+ }
+
/* we return 0 to let the library handle the FD internally */
return 0;
/* we return 0 to let the library handle the FD internally */
return 0;