- memcpy(stream->channel_read_only_attributes.path, channel->pathname,
- sizeof(stream->channel_read_only_attributes.path));
-}
-
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
- uint64_t stream_key,
- enum lttng_consumer_stream_state state,
- const char *channel_name,
- uid_t uid,
- gid_t gid,
- uint64_t relayd_id,
- uint64_t session_id,
- int cpu,
- int *alloc_ret,
- enum consumer_channel_type type,
- unsigned int monitor,
- uint64_t trace_archive_id)
-{
- int ret;
- struct lttng_consumer_stream *stream;
-
- stream = zmalloc(sizeof(*stream));
- if (stream == NULL) {
- PERROR("malloc struct lttng_consumer_stream");
- ret = -ENOMEM;
- goto end;
- }
-
- rcu_read_lock();
-
- stream->key = stream_key;
- stream->out_fd = -1;
- stream->out_fd_offset = 0;
- stream->output_written = 0;
- stream->state = state;
- stream->uid = uid;
- stream->gid = gid;
- stream->net_seq_idx = relayd_id;
- stream->session_id = session_id;
- stream->monitor = monitor;
- stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE;
- stream->index_file = NULL;
- stream->last_sequence_number = -1ULL;
- stream->trace_archive_id = trace_archive_id;
- pthread_mutex_init(&stream->lock, NULL);
- pthread_mutex_init(&stream->metadata_timer_lock, NULL);
-
- /* If channel is the metadata, flag this stream as metadata. */
- if (type == CONSUMER_CHANNEL_TYPE_METADATA) {
- stream->metadata_flag = 1;
- /* Metadata is flat out. */
- strncpy(stream->name, DEFAULT_METADATA_NAME, sizeof(stream->name));
- /* Live rendez-vous point. */
- pthread_cond_init(&stream->metadata_rdv, NULL);
- pthread_mutex_init(&stream->metadata_rdv_lock, NULL);
- } else {
- /* Format stream name to <channel_name>_<cpu_number> */
- ret = snprintf(stream->name, sizeof(stream->name), "%s_%d",
- channel_name, cpu);
- if (ret < 0) {
- PERROR("snprintf stream name");
- goto error;
- }
- }
-
- /* Key is always the wait_fd for streams. */
- lttng_ht_node_init_u64(&stream->node, stream->key);
-
- /* Init node per channel id key */
- lttng_ht_node_init_u64(&stream->node_channel_id, channel_key);
-
- /* Init session id node with the stream session id */
- lttng_ht_node_init_u64(&stream->node_session_id, stream->session_id);
-
- DBG3("Allocated stream %s (key %" PRIu64 ", chan_key %" PRIu64
- " relayd_id %" PRIu64 ", session_id %" PRIu64,
- stream->name, stream->key, channel_key,
- stream->net_seq_idx, stream->session_id);
-
- rcu_read_unlock();
- return stream;
-
-error:
- rcu_read_unlock();
- free(stream);
-end:
- if (alloc_ret) {
- *alloc_ret = ret;
- }
- return NULL;