+ break;
+ }
+ case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+ {
+ int channel_monitor_pipe;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+ 1);
+ if (ret != sizeof(channel_monitor_pipe)) {
+ ERR("Failed to receive channel monitor pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+ ret = consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret) {
+ int flags;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret;
+
+ ret = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ DBG("Channel monitor pipe set as non-blocking");
+ } else {
+ ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+ }
+ goto end_msg_sessiond;
+ }
+ case LTTNG_CONSUMER_ROTATE_CHANNEL:
+ {
+ /*
+ * Sample the rotate position of all the streams in this channel.
+ */
+ ret = lttng_consumer_rotate_channel(msg.u.rotate_channel.key,
+ msg.u.rotate_channel.pathname,
+ msg.u.rotate_channel.relayd_id,
+ msg.u.rotate_channel.metadata,
+ msg.u.rotate_channel.new_chunk_id,
+ ctx);
+ if (ret < 0) {
+ ERR("Rotate channel failed");
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ /*
+ * Rotate the streams that are ready right now.
+ * FIXME: this is a second consecutive iteration over the
+ * streams in a channel, there is probably a better way to
+ * handle this, but it needs to be after the
+ * consumer_send_status_msg() call.
+ */
+ ret = lttng_consumer_rotate_ready_streams(
+ msg.u.rotate_channel.key, ctx);
+ if (ret < 0) {
+ ERR("Rotate channel failed");
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_ROTATE_RENAME:
+ {
+ DBG("Consumer rename session %" PRIu64 " after rotation",
+ msg.u.rotate_rename.session_id);
+ ret = lttng_consumer_rotate_rename(msg.u.rotate_rename.old_path,
+ msg.u.rotate_rename.new_path,
+ msg.u.rotate_rename.uid,
+ msg.u.rotate_rename.gid,
+ msg.u.rotate_rename.relayd_id);
+ if (ret < 0) {
+ ERR("Rotate rename failed");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL:
+ {
+ int pending;
+ uint32_t pending_reply;
+
+ DBG("Perform local check of pending rotation for session id %" PRIu64,
+ msg.u.check_rotation_pending_local.session_id);
+ pending = lttng_consumer_check_rotation_pending_local(
+ msg.u.check_rotation_pending_local.session_id,
+ msg.u.check_rotation_pending_local.chunk_id);
+ if (pending < 0) {
+ ERR("Local rotation pending check failed with code %i", pending);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ pending_reply = !!pending;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ if (pending < 0) {
+ /*
+ * An error occured while running the command;
+ * don't send the 'pending' flag as the sessiond
+ * will not read it.
+ */
+ break;
+ }
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending_reply,
+ sizeof(pending_reply));
+ if (ret < 0) {
+ PERROR("Failed to send rotation pending return code");
+ goto error_fatal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY:
+ {
+ int pending;
+ uint32_t pending_reply;
+
+ DBG("Perform relayd check of pending rotation for session id %" PRIu64,
+ msg.u.check_rotation_pending_relay.session_id);
+ pending = lttng_consumer_check_rotation_pending_relay(
+ msg.u.check_rotation_pending_relay.session_id,
+ msg.u.check_rotation_pending_relay.relayd_id,
+ msg.u.check_rotation_pending_relay.chunk_id);
+ if (pending < 0) {
+ ERR("Relayd rotation pending check failed with code %i", pending);
+ ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
+ } else {
+ pending_reply = !!pending;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+
+ if (pending < 0) {
+ /*
+ * An error occured while running the command;
+ * don't send the 'pending' flag as the sessiond
+ * will not read it.
+ */
+ break;
+ }
+
+ /* Send back returned value to session daemon */
+ ret = lttcomm_send_unix_sock(sock, &pending_reply,
+ sizeof(pending_reply));
+ if (ret < 0) {
+ PERROR("Failed to send rotation pending return code");
+ goto error_fatal;
+ }
+ break;
+ }
+ case LTTNG_CONSUMER_MKDIR:
+ {
+ DBG("Consumer mkdir %s in session %" PRIu64,
+ msg.u.mkdir.path,
+ msg.u.mkdir.session_id);
+ ret = lttng_consumer_mkdir(msg.u.mkdir.path,
+ msg.u.mkdir.uid,
+ msg.u.mkdir.gid,
+ msg.u.mkdir.relayd_id);
+ if (ret < 0) {
+ ERR("consumer mkdir failed");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ }
+
+ health_code_update();
+
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ /* Somehow, the session daemon is not responding anymore. */
+ goto end_nosignal;
+ }
+ break;
+ }
+ default:
+ break;
+ }
+
+end_nosignal:
+ rcu_read_unlock();
+
+ health_code_update();
+
+ /*
+ * Return 1 to indicate success since the 0 value can be a socket
+ * shutdown during the recv() or send() call.
+ */
+ return 1;
+
+end_msg_sessiond:
+ /*
+ * The returned value here is not useful since either way we'll return 1 to
+ * the caller because the session daemon socket management is done
+ * elsewhere. Returning a negative code or 0 will shutdown the consumer.
+ */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ rcu_read_unlock();
+
+ health_code_update();
+
+ return 1;
+end_channel_error:
+ if (channel) {
+ /*
+ * Free channel here since no one has a reference to it. We don't
+ * free after that because a stream can store this pointer.
+ */
+ destroy_channel(channel);
+ }
+ /* We have to send a status channel message indicating an error. */
+ ret = consumer_send_status_channel(sock, NULL);
+ if (ret < 0) {
+ /* Stop everything if session daemon can not be notified. */
+ goto error_fatal;
+ }
+ rcu_read_unlock();
+
+ health_code_update();
+
+ return 1;
+error_fatal:
+ rcu_read_unlock();
+ /* This will issue a consumer stop. */
+ return -1;
+}
+
+/*
+ * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
+ * compiled out, we isolate it in this library.
+ */
+int lttng_ustctl_get_mmap_read_offset(struct lttng_consumer_stream *stream,
+ unsigned long *off)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_get_mmap_read_offset(stream->ustream, off);
+}
+
+/*
+ * Wrapper over the mmap() read offset from ust-ctl library. Since this can be
+ * compiled out, we isolate it in this library.
+ */
+void *lttng_ustctl_get_mmap_base(struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ return ustctl_get_mmap_base(stream->ustream);
+}
+
+void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
+ int producer_active)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ ustctl_flush_buffer(stream->ustream, producer_active);
+}
+
+/*
+ * Take a snapshot for a specific stream.
+ *