+ if (relayd_id) {
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+
+ /* Local protocol error. */
+ assert(chunk_creation_timestamp);
+ ret = time_to_iso8601_str(chunk_creation_timestamp,
+ creation_timestamp_buffer,
+ sizeof(creation_timestamp_buffer));
+ creation_timestamp_str = !ret ? creation_timestamp_buffer :
+ "(formatting error)";
+
+ DBG("Consumer create trace chunk command: relay_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", chunk_override_name = %s"
+ ", chunk_creation_timestamp = %s",
+ relayd_id_str, session_id, chunk_id,
+ chunk_override_name ? : "(none)",
+ creation_timestamp_str);
+
+ /*
+ * The trace chunk registry, as used by the consumer daemon, implicitly
+ * owns the trace chunks. This is only needed in the consumer since
+ * the consumer has no notion of a session beyond session IDs being
+ * used to identify other objects.
+ *
+ * The lttng_trace_chunk_registry_publish() call below provides a
+ * reference which is not released; it implicitly becomes the session
+ * daemon's reference to the chunk in the consumer daemon.
+ *
+ * The lifetime of trace chunks in the consumer daemon is managed by
+ * the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
+ * and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
+ */
+ created_chunk = lttng_trace_chunk_create(chunk_id,
+ chunk_creation_timestamp);
+ if (!created_chunk) {
+ ERR("Failed to create trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+
+ if (chunk_override_name) {
+ chunk_status = lttng_trace_chunk_override_name(created_chunk,
+ chunk_override_name);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+
+ if (chunk_directory_handle) {
+ chunk_status = lttng_trace_chunk_set_credentials(created_chunk,
+ credentials);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk credentials");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ /*
+ * The consumer daemon has no ownership of the chunk output
+ * directory.
+ */
+ chunk_status = lttng_trace_chunk_set_as_user(created_chunk,
+ chunk_directory_handle);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ERR("Failed to set trace chunk's directory handle");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+ }
+
+ published_chunk = lttng_trace_chunk_registry_publish_chunk(
+ consumer_data.chunk_registry, session_id,
+ created_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ created_chunk = NULL;
+ if (!published_chunk) {
+ ERR("Failed to publish trace chunk");
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error;
+ }
+
+ rcu_read_lock();
+ cds_lfht_for_each_entry_duplicate(consumer_data.channels_by_session_id_ht->ht,
+ consumer_data.channels_by_session_id_ht->hash_fct(
+ &session_id, lttng_ht_seed),
+ consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id, &iter.iter, channel,
+ channels_by_session_id_ht_node.node) {
+ ret = lttng_consumer_channel_set_trace_chunk(channel,
+ published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id, chunk_id,
+ chunk_creation_timestamp, NULL,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
+ }
+ }
+
+ if (relayd_id) {
+ struct consumer_relayd_sock_pair *relayd;
+
+ relayd = consumer_find_relayd(*relayd_id);
+ if (relayd) {
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_create_trace_chunk(
+ &relayd->control_sock, published_chunk);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ } else {
+ ERR("Failed to find relay daemon socket: relayd_id = %" PRIu64, *relayd_id);
+ }
+
+ if (!relayd || ret) {
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ close_ret = lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ NULL, path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64 ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
+
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ goto error_unlock;
+ }
+ }
+error_unlock:
+ rcu_read_unlock();
+error:
+ /* Release the reference returned by the "publish" operation. */
+ lttng_trace_chunk_put(published_chunk);
+ lttng_trace_chunk_put(created_chunk);
+ return ret_code;
+}
+
+enum lttcomm_return_code lttng_consumer_close_trace_chunk(
+ const uint64_t *relayd_id, uint64_t session_id,
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path)
+{
+ enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ struct lttng_trace_chunk *chunk;
+ char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
+ const char *relayd_id_str = "(none)";
+ const char *close_command_name = "none";
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_channel *channel;
+ enum lttng_trace_chunk_status chunk_status;
+
+ if (relayd_id) {
+ int ret;
+
+ /* Only used for logging purposes. */
+ ret = snprintf(relayd_id_buffer, sizeof(relayd_id_buffer),
+ "%" PRIu64, *relayd_id);
+ if (ret > 0 && ret < sizeof(relayd_id_buffer)) {
+ relayd_id_str = relayd_id_buffer;
+ } else {
+ relayd_id_str = "(formatting error)";
+ }
+ }
+ if (close_command) {
+ close_command_name = lttng_trace_chunk_command_type_get_name(
+ *close_command);
+ }
+
+ DBG("Consumer close trace chunk command: relayd_id = %s"
+ ", session_id = %" PRIu64 ", chunk_id = %" PRIu64
+ ", close command = %s",
+ relayd_id_str, session_id, chunk_id,
+ close_command_name);
+
+ chunk = lttng_trace_chunk_registry_find_chunk(
+ consumer_data.chunk_registry, session_id, chunk_id);
+ if (!chunk) {
+ ERR("Failed to find chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id, chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;