iter.iter.node = &stream->node.node;
ret = lttng_ht_del(ht, &iter);
assert(!ret);
-
rcu_read_unlock();
- if (consumer_data.stream_count <= 0) {
- goto end;
- }
+ assert(consumer_data.stream_count > 0);
consumer_data.stream_count--;
- if (!stream) {
- goto end;
- }
+
if (stream->out_fd >= 0) {
ret = close(stream->out_fd);
if (ret) {
gid_t gid,
int net_index,
int metadata_flag,
+ uint64_t session_id,
int *alloc_ret)
{
struct lttng_consumer_stream *stream;
stream->gid = gid;
stream->net_seq_idx = net_index;
stream->metadata_flag = metadata_flag;
+ stream->session_id = session_id;
strncpy(stream->path_name, path_name, sizeof(stream->path_name));
stream->path_name[sizeof(stream->path_name) - 1] = '\0';
+ pthread_mutex_init(&stream->lock, NULL);
/*
* Index differently the metadata node because the thread is using an
lttng_ht_node_init_ulong(&stream->node, stream->key);
}
+ /* Init session id node with the stream session id */
+ lttng_ht_node_init_ulong(&stream->node_session_id, stream->session_id);
+
/*
* The cpu number is needed before using any ustctl_* actions. Ignored for
* the kernel so the value does not matter.
pthread_mutex_unlock(&consumer_data.lock);
DBG3("Allocated stream %s (key %d, shm_fd %d, wait_fd %d, mmap_len %llu,"
- " out_fd %d, net_seq_idx %d)", stream->path_name, stream->key,
- stream->shm_fd, stream->wait_fd,
+ " out_fd %d, net_seq_idx %d, session_id %" PRIu64,
+ stream->path_name, stream->key, stream->shm_fd, stream->wait_fd,
(unsigned long long) stream->mmap_len, stream->out_fd,
- stream->net_seq_idx);
+ stream->net_seq_idx, stream->session_id);
return stream;
error:
return outfd;
}
-/*
- * Update a stream according to what we just received.
- */
-void consumer_change_stream_state(int stream_key,
- enum lttng_consumer_stream_state state)
-{
- struct lttng_consumer_stream *stream;
-
- pthread_mutex_lock(&consumer_data.lock);
- stream = consumer_find_stream(stream_key, consumer_data.stream_ht);
- if (stream) {
- stream->state = state;
- }
- consumer_data.need_update = 1;
- pthread_mutex_unlock(&consumer_data.lock);
-}
-
static
void consumer_free_channel(struct rcu_head *head)
{
rcu_read_unlock();
/*
- * Insert the consumer_poll_pipe at the end of the array and don't
+ * Insert the consumer_data_pipe at the end of the array and don't
* increment i so nb_fd is the number of real FD.
*/
- (*pollfd)[i].fd = ctx->consumer_poll_pipe[0];
+ (*pollfd)[i].fd = ctx->consumer_data_pipe[0];
(*pollfd)[i].events = POLLIN | POLLPRI;
return i;
}
rcu_read_lock();
- /*
- * close all outfd. Called when there are no more threads running (after
- * joining on the threads), no need to protect list iteration with mutex.
- */
- cds_lfht_for_each_entry(consumer_data.stream_ht->ht, &iter.iter, node,
- node) {
- struct lttng_consumer_stream *stream =
- caa_container_of(node, struct lttng_consumer_stream, node);
- consumer_del_stream(stream, consumer_data.stream_ht);
- }
-
cds_lfht_for_each_entry(consumer_data.channel_ht->ht, &iter.iter, node,
node) {
struct lttng_consumer_channel *channel =
rcu_read_unlock();
- lttng_ht_destroy(consumer_data.stream_ht);
lttng_ht_destroy(consumer_data.channel_ht);
}
ctx->on_recv_stream = recv_stream;
ctx->on_update_stream = update_stream;
- ret = pipe(ctx->consumer_poll_pipe);
+ ret = pipe(ctx->consumer_data_pipe);
if (ret < 0) {
PERROR("Error creating poll pipe");
goto error_poll_pipe;
}
/* set read end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[0], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[0], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
}
/* set write end of the pipe to non-blocking */
- ret = fcntl(ctx->consumer_poll_pipe[1], F_SETFL, O_NONBLOCK);
+ ret = fcntl(ctx->consumer_data_pipe[1], F_SETFL, O_NONBLOCK);
if (ret < 0) {
PERROR("fcntl O_NONBLOCK");
goto error_poll_fcntl;
for (i = 0; i < 2; i++) {
int err;
- err = close(ctx->consumer_poll_pipe[i]);
+ err = close(ctx->consumer_data_pipe[i]);
if (err) {
PERROR("close");
}
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[0]);
+ ret = close(ctx->consumer_data_pipe[0]);
if (ret) {
PERROR("close");
}
- ret = close(ctx->consumer_poll_pipe[1]);
+ ret = close(ctx->consumer_data_pipe[1]);
if (ret) {
PERROR("close");
}
local_stream = NULL;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
pollfd = zmalloc((consumer_data.stream_count + 1) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
goto end;
}
- /* allocate for all fds + 1 for the consumer_poll_pipe */
+ /* allocate for all fds + 1 for the consumer_data_pipe */
local_stream = zmalloc((consumer_data.stream_count + 1) *
sizeof(struct lttng_consumer_stream));
if (local_stream == NULL) {
}
/*
- * If the consumer_poll_pipe triggered poll go directly to the
+ * If the consumer_data_pipe triggered poll go directly to the
* beginning of the loop to update the array. We want to prioritize
* array update over low-priority reads.
*/
if (pollfd[nb_fd].revents & (POLLIN | POLLPRI)) {
size_t pipe_readlen;
- DBG("consumer_poll_pipe wake up");
+ DBG("consumer_data_pipe wake up");
/* Consume 1 byte of pipe data */
do {
- pipe_readlen = read(ctx->consumer_poll_pipe[0], &new_stream,
+ pipe_readlen = read(ctx->consumer_data_pipe[0], &new_stream,
sizeof(new_stream));
} while (pipe_readlen == -1 && errno == EINTR);
do {
struct lttng_consumer_stream *null_stream = NULL;
- ret = write(ctx->consumer_poll_pipe[1], &null_stream,
+ ret = write(ctx->consumer_data_pipe[1], &null_stream,
sizeof(null_stream));
} while (ret < 0 && errno == EINTR);
*/
void lttng_consumer_init(void)
{
- consumer_data.stream_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
consumer_data.relayd_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
+ consumer_data.stream_list_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
metadata_ht = lttng_ht_new(0, LTTNG_HT_TYPE_ULONG);
assert(metadata_ht);