rcu_read_unlock();
}
-static
-int rotate_notify_sessiond(struct lttng_consumer_local_data *ctx,
- uint64_t key)
-{
- ssize_t ret;
-
- do {
- ret = write(ctx->channel_rotate_pipe, &key, sizeof(key));
- } while (ret == -1 && errno == EINTR);
- if (ret == -1) {
- PERROR("Failed to write to the channel rotation pipe");
- } else {
- DBG("Sent channel rotation notification for channel key %"
- PRIu64, key);
- ret = 0;
- }
-
- return (int) ret;
-}
-
/*
* Perform operations that need to be done after a stream has
* rotated and released the stream lock.
abort();
}
- if (--stream->chan->nr_stream_rotate_pending == 0) {
- DBG("Rotation of channel \"%s\" completed, notifying the session daemon",
- stream->chan->name);
- ret = rotate_notify_sessiond(ctx, stream->chan->key);
- }
pthread_mutex_unlock(&stream->chan->lock);
-
return ret;
}
} else {
ret = rotate_local_stream(ctx, stream);
}
+ stream->trace_archive_id++;
if (ret < 0) {
- ERR("Rotate stream");
+ ERR("Failed to rotate stream, ret = %i", ret);
goto error;
}
}
}
-int lttng_consumer_rotate_pending_relay(uint64_t session_id,
+/* Stream lock must be acquired by the caller. */
+static
+bool check_stream_rotation_pending(const struct lttng_consumer_stream *stream,
+ uint64_t session_id, uint64_t chunk_id)
+{
+ bool pending = false;
+
+ if (stream->session_id != session_id) {
+ /* Skip. */
+ goto end;
+ }
+
+ /*
+ * If the stream's archive_id belongs to the chunk being rotated (or an
+ * even older one), it means that the consumer has not consumed all the
+ * buffers that belong to the chunk being rotated. Therefore, the
+ * rotation is considered as ongoing/pending.
+ */
+ pending = stream->trace_archive_id <= chunk_id;
+end:
+ return pending;
+}
+
+/* RCU read lock must be acquired by the caller. */
+int lttng_consumer_check_rotation_pending_local(uint64_t session_id,
+ uint64_t chunk_id)
+{
+ struct lttng_ht_iter iter;
+ struct lttng_consumer_stream *stream;
+ bool rotation_pending = false;
+
+ /* Start with the metadata streams... */
+ cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
+ pthread_mutex_lock(&stream->lock);
+ rotation_pending = check_stream_rotation_pending(stream,
+ session_id, chunk_id);
+ pthread_mutex_unlock(&stream->lock);
+ if (rotation_pending) {
+ goto end;
+ }
+ }
+
+ /* ... followed by the data streams. */
+ cds_lfht_for_each_entry(data_ht->ht, &iter.iter, stream, node.node) {
+ pthread_mutex_lock(&stream->lock);
+ rotation_pending = check_stream_rotation_pending(stream,
+ session_id, chunk_id);
+ pthread_mutex_unlock(&stream->lock);
+ if (rotation_pending) {
+ goto end;
+ }
+ }
+
+end:
+ return !!rotation_pending;
+}
+
+int lttng_consumer_check_rotation_pending_relay(uint64_t session_id,
uint64_t relayd_id, uint64_t chunk_id)
{
int ret;
relayd = consumer_find_relayd(relayd_id);
if (!relayd) {
- ERR("Failed to find relayd");
+ ERR("Failed to find relayd id %" PRIu64, relayd_id);
ret = -1;
goto end;
}