Fix: consumerd: double unlock on rotate channel error path
[lttng-tools.git] / src / common / consumer / consumer.c
index 6505490cdc04e212c54aadd3d8eb58d569eb523e..093f957f2901ef6e9f543d0202b2b38f07b1cffc 100644 (file)
@@ -3425,6 +3425,9 @@ static enum open_packet_status open_packet(struct lttng_consumer_stream *stream)
        status = produced_pos_before != produced_pos_after ?
                        OPEN_PACKET_STATUS_OPENED :
                        OPEN_PACKET_STATUS_NO_SPACE;
+       if (status == OPEN_PACKET_STATUS_OPENED) {
+               stream->opened_packet_in_current_trace_chunk = true;
+       }
 end:
        return status;
 }
@@ -3565,14 +3568,12 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                                        ", channel name = %s, session id = %" PRIu64,
                                        stream->key, stream->chan->name,
                                        stream->chan->session_id);
-                       stream->opened_packet_in_current_trace_chunk =
-                                       true;
                        break;
                case OPEN_PACKET_STATUS_NO_SPACE:
                        /*
                         * Can't open a packet as there is no space left.
                         * This means that new events were produced, resulting
-                        * in a packet being opened, which is what we want
+                        * in a packet being opened, which is what we wanted
                         * anyhow.
                         */
                        DBG("No space left to open a packet after consuming a packet: stream id = %" PRIu64
@@ -3588,8 +3589,6 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                default:
                        abort();
                }
-
-               stream->opened_packet_in_current_trace_chunk = true;
        }
 
 sleep_stream:
@@ -4103,11 +4102,15 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
        const bool is_local_trace = relayd_id == -1ULL;
        struct consumer_relayd_sock_pair *relayd = NULL;
        bool rotating_to_new_chunk = true;
+       /* Array of `struct lttng_consumer_stream *` */
+       struct lttng_dynamic_pointer_array streams_packet_to_open;
+       size_t stream_idx;
 
        DBG("Consumer sample rotate position for channel %" PRIu64, key);
 
        lttng_dynamic_array_init(&stream_rotation_positions,
                        sizeof(struct relayd_stream_rotation_position), NULL);
+       lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
 
        rcu_read_lock();
 
@@ -4284,69 +4287,88 @@ int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
                         * packets in this scenario and allows the tracer to
                         * "stamp" the beginning of the new trace chunk at the
                         * earliest possible point.
+                        *
+                        * The packet open is performed after the channel
+                        * rotation to ensure that no attempt to open a packet
+                        * is performed in a stream that has no active trace
+                        * chunk.
                         */
-                       const enum open_packet_status status =
-                               open_packet(stream);
-
-                       switch (status) {
-                       case OPEN_PACKET_STATUS_OPENED:
-                               DBG("Opened a packet after a rotation: stream id = %" PRIu64
-                                               ", channel name = %s, session id = %" PRIu64,
-                                               stream->key, stream->chan->name,
-                                               stream->chan->session_id);
-                               stream->opened_packet_in_current_trace_chunk =
-                                               true;
-                               break;
-                       case OPEN_PACKET_STATUS_NO_SPACE:
-                               /*
-                                * Can't open a packet as there is no space left
-                                * in the buffer. A new packet will be opened
-                                * once one has been consumed.
-                                */
-                               DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
-                                               ", channel name = %s, session id = %" PRIu64,
-                                               stream->key, stream->chan->name,
-                                               stream->chan->session_id);
-                               break;
-                       case OPEN_PACKET_STATUS_ERROR:
-                               /* Logged by callee. */
+                       ret = lttng_dynamic_pointer_array_add_pointer(
+                                       &streams_packet_to_open, stream);
+                       if (ret) {
+                               PERROR("Failed to add a stream pointer to array of streams in which to open a packet");
                                ret = -1;
                                goto end_unlock_stream;
-                       default:
-                               abort();
                        }
                }
 
                pthread_mutex_unlock(&stream->lock);
        }
        stream = NULL;
-       pthread_mutex_unlock(&channel->lock);
 
-       if (is_local_trace) {
-               ret = 0;
-               goto end;
-       }
+       if (!is_local_trace) {
+               relayd = consumer_find_relayd(relayd_id);
+               if (!relayd) {
+                       ERR("Failed to find relayd %" PRIu64, relayd_id);
+                       ret = -1;
+                       goto end_unlock_channel;
+               }
 
-       relayd = consumer_find_relayd(relayd_id);
-       if (!relayd) {
-               ERR("Failed to find relayd %" PRIu64, relayd_id);
-               ret = -1;
-               goto end;
+               pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+               ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+                               rotating_to_new_chunk ? &next_chunk_id : NULL,
+                               (const struct relayd_stream_rotation_position *)
+                                               stream_rotation_positions.buffer
+                                                               .data);
+               pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+               if (ret < 0) {
+                       ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+                                       relayd->net_seq_idx);
+                       lttng_consumer_cleanup_relayd(relayd);
+                       goto end_unlock_channel;
+               }
        }
 
-       pthread_mutex_lock(&relayd->ctrl_sock_mutex);
-       ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
-                       rotating_to_new_chunk ? &next_chunk_id : NULL,
-                       (const struct relayd_stream_rotation_position *)
-                                       stream_rotation_positions.buffer.data);
-       pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
-       if (ret < 0) {
-               ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
-                               relayd->net_seq_idx);
-               lttng_consumer_cleanup_relayd(relayd);
-               goto end;
+       for (stream_idx = 0;
+                       stream_idx < lttng_dynamic_pointer_array_get_count(
+                               &streams_packet_to_open);
+                       stream_idx++) {
+               enum open_packet_status status;
+
+               stream = lttng_dynamic_pointer_array_get_pointer(
+                               &streams_packet_to_open, stream_idx);
+
+               pthread_mutex_lock(&stream->lock);
+               status = open_packet(stream);
+               pthread_mutex_unlock(&stream->lock);
+               switch (status) {
+               case OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case OPEN_PACKET_STATUS_NO_SPACE:
+                       /*
+                        * Can't open a packet as there is no space left
+                        * in the buffer. A new packet will be opened
+                        * once one has been consumed.
+                        */
+                       DBG("No space left to open a packet after a rotation: stream id = %" PRIu64
+                           ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case OPEN_PACKET_STATUS_ERROR:
+                       /* Logged by callee. */
+                       ret = -1;
+                       goto end_unlock_channel;
+               default:
+                       abort();
+               }
        }
 
+       pthread_mutex_unlock(&channel->lock);
        ret = 0;
        goto end;
 
@@ -4357,6 +4379,7 @@ end_unlock_channel:
 end:
        rcu_read_unlock();
        lttng_dynamic_array_reset(&stream_rotation_positions);
+       lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
        return ret;
 }
 
@@ -5172,3 +5195,70 @@ int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel)
 end:
        return ret;
 }
+
+enum lttcomm_return_code lttng_consumer_open_channel_packets(
+               struct lttng_consumer_channel *channel)
+{
+       struct lttng_consumer_stream *stream;
+       enum lttcomm_return_code ret = LTTCOMM_CONSUMERD_SUCCESS;
+
+       if (channel->metadata_stream) {
+               ERR("Open channel packets command attempted on a metadata channel");
+               ret = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+               goto end;
+       }
+
+       rcu_read_lock();
+       cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+               enum open_packet_status status;
+
+               pthread_mutex_lock(&stream->lock);
+               if (cds_lfht_is_node_deleted(&stream->node.node)) {
+                       goto next;
+               }
+
+               status = open_packet(stream);
+               switch (status) {
+               case OPEN_PACKET_STATUS_OPENED:
+                       DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       stream->opened_packet_in_current_trace_chunk = true;
+                       break;
+               case OPEN_PACKET_STATUS_NO_SPACE:
+                       DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel name = %s, session id = %" PRIu64,
+                                       stream->key, stream->chan->name,
+                                       stream->chan->session_id);
+                       break;
+               case OPEN_PACKET_STATUS_ERROR:
+                       /*
+                        * Only unexpected internal errors can lead to this
+                        * failing. Report an unknown error.
+                        */
+                       ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+                                       ", channel id = %" PRIu64
+                                       ", channel name = %s"
+                                       ", session id = %" PRIu64,
+                                       stream->key, channel->key,
+                                       channel->name, channel->session_id);
+                       ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+                       goto error_unlock;
+               default:
+                       abort();
+               }
+
+       next:
+               pthread_mutex_unlock(&stream->lock);
+       }
+
+end_rcu_unlock:
+       rcu_read_unlock();
+end:
+       return ret;
+
+error_unlock:
+       pthread_mutex_unlock(&stream->lock);
+       goto end_rcu_unlock;
+}
This page took 0.026679 seconds and 4 git commands to generate.