#include <common/trace-chunk.h>
#include <common/trace-chunk-registry.h>
#include <common/string-utils/format.h>
+#include <common/dynamic-array.h>
struct lttng_consumer_global_data consumer_data = {
.stream_count = 0,
data_hdr.stream_id = htobe64(stream->relayd_stream_id);
data_hdr.data_size = htobe32(data_size);
data_hdr.padding_size = htobe32(padding);
+
/*
* Note that net_seq_num below is assigned with the *current* value of
* next_net_seq_num and only after that the next_net_seq_num will be
/* RCU lock for the relayd pointer */
rcu_read_lock();
-
- assert(stream->chan->trace_chunk);
+ assert(stream->net_seq_idx != (uint64_t) -1ULL ||
+ stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_dynamic_array stream_rotation_positions;
+ uint64_t next_chunk_id, stream_count = 0;
+ enum lttng_trace_chunk_status chunk_status;
+ const bool is_local_trace = relayd_id == -1ULL;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool rotating_to_new_chunk = true;
DBG("Consumer sample rotate position for channel %" PRIu64, key);
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position), NULL);
+
rcu_read_lock();
pthread_mutex_lock(&channel->lock);
+ assert(channel->trace_chunk);
+ chunk_status = lttng_trace_chunk_get_id(channel->trace_chunk,
+ &next_chunk_id);
+ if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = -1;
+ goto end_unlock_channel;
+ }
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
*/
pthread_mutex_lock(&stream->lock);
+ if (stream->trace_chunk == stream->chan->trace_chunk) {
+ rotating_to_new_chunk = false;
+ }
+
ret = lttng_consumer_sample_snapshot_positions(stream);
if (ret < 0) {
ERR("Failed to sample snapshot position during channel rotation");
stream->rotate_ready = true;
}
+ /*
+ * Active flush; has no effect if the production position
+ * is at a packet boundary.
+ */
ret = consumer_flush_buffer(stream, 1);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel rotation",
goto end_unlock_stream;
}
+ if (!is_local_trace) {
+ /*
+ * The relay daemon control protocol expects a rotation
+ * position as "the sequence number of the first packet
+ * _after_ the current trace chunk.
+ *
+ * At the moment when the positions of the buffers are
+ * sampled, the production position does not necessarily
+ * sit at a packet boundary. The 'active' flush
+ * operation above will push the production position to
+ * the next packet boundary _if_ it is not already
+ * sitting at such a boundary.
+ *
+ * Assuming a current production position that is not
+ * on the bound of a packet, the 'target' sequence
+ * number is
+ * (consumed_pos / subbuffer_size) + 1
+ * Note the '+ 1' to ensure the current packet is
+ * part of the current trace chunk.
+ *
+ * However, if the production position is already at
+ * a packet boundary, the '+ 1' is not necessary as the
+ * last packet of the current chunk is already
+ * 'complete'.
+ */
+ const struct relayd_stream_rotation_position position = {
+ .stream_id = stream->relayd_stream_id,
+ .rotate_at_seq_num = (stream->rotate_position / stream->max_sb_size) +
+ !!(stream->rotate_position % stream->max_sb_size),
+ };
+
+ ret = lttng_dynamic_array_add_element(
+ &stream_rotation_positions,
+ &position);
+ if (ret) {
+ ERR("Failed to allocate stream rotation position");
+ goto end_unlock_stream;
+ }
+ stream_count++;
+ }
pthread_mutex_unlock(&stream->lock);
}
+ stream = NULL;
pthread_mutex_unlock(&channel->lock);
+ if (is_local_trace) {
+ ret = 0;
+ goto end;
+ }
+
+ relayd = consumer_find_relayd(relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, relayd_id);
+ ret = -1;
+ goto end;
+ }
+
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_rotate_streams(&relayd->control_sock, stream_count,
+ rotating_to_new_chunk ? &next_chunk_id : NULL,
+ (const struct relayd_stream_rotation_position *)
+ stream_rotation_positions.buffer.data);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64,
+ relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ goto end;
+ }
+
ret = 0;
goto end;
end_unlock_stream:
pthread_mutex_unlock(&stream->lock);
+end_unlock_channel:
pthread_mutex_unlock(&channel->lock);
end:
rcu_read_unlock();
+ lttng_dynamic_array_reset(&stream_rotation_positions);
return ret;
}
return ret;
}
-/*
- * Perform the rotation a stream file on the relay.
- */
-int rotate_relay_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
-{
- int ret;
- struct consumer_relayd_sock_pair *relayd;
- uint64_t chunk_id;
- enum lttng_trace_chunk_status chunk_status;
-
- DBG("Rotate relay stream");
- relayd = consumer_find_relayd(stream->net_seq_idx);
- if (!relayd) {
- ERR("Failed to find relayd");
- ret = -1;
- goto end;
- }
-
- chunk_status = lttng_trace_chunk_get_id(stream->chan->trace_chunk,
- &chunk_id);
- if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
- ERR("Failed to retrieve the id of the current trace chunk of channel \"%s\"",
- stream->chan->name);
- ret = -1;
- goto end;
- }
-
- pthread_mutex_lock(&relayd->ctrl_sock_mutex);
- ret = relayd_rotate_stream(&relayd->control_sock,
- stream->relayd_stream_id,
- chunk_id,
- stream->last_sequence_number);
- pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
- if (ret < 0) {
- ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
- lttng_consumer_cleanup_relayd(relayd);
- }
- if (ret) {
- ERR("Rotate relay stream");
- }
-
-end:
- return ret;
-}
-
/*
* Performs the stream rotation for the rotate session feature if needed.
* It must be called with the channel and stream locks held.
stream->trace_chunk = stream->chan->trace_chunk;
}
- if (stream->net_seq_idx != (uint64_t) -1ULL) {
- ret = rotate_relay_stream(ctx, stream);
- } else {
+ if (stream->net_seq_idx == (uint64_t) -1ULL) {
ret = rotate_local_stream(ctx, stream);
- }
- if (ret < 0) {
- ERR("Failed to rotate stream, ret = %i", ret);
- goto error;
+ if (ret < 0) {
+ ERR("Failed to rotate stream, ret = %i", ret);
+ goto error;
+ }
}
if (stream->metadata_flag && stream->trace_chunk) {
const uint64_t *relayd_id, uint64_t session_id,
uint64_t chunk_id)
{
+ int ret;
enum lttcomm_return_code ret_code;
struct lttng_trace_chunk *chunk;
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
+ const bool is_local_trace = !relayd_id;
+ struct consumer_relayd_sock_pair *relayd = NULL;
+ bool chunk_exists_remote;
if (relayd_id) {
int ret;
}
DBG("Consumer trace chunk exists command: relayd_id = %s"
- ", session_id = %" PRIu64
", chunk_id = %" PRIu64, relayd_id_str,
- session_id, chunk_id);
+ chunk_id);
chunk = lttng_trace_chunk_registry_find_chunk(
consumer_data.chunk_registry, session_id,
chunk_id);
DBG("Trace chunk %s locally", chunk ? "exists" : "does not exist");
- ret_code = chunk ? LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL :
+ if (chunk) {
+ ret_code = LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_LOCAL;
+ lttng_trace_chunk_put(chunk);
+ goto end;
+ } else if (is_local_trace) {
+ ret_code = LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ goto end;
+ }
+
+ rcu_read_lock();
+ relayd = consumer_find_relayd(*relayd_id);
+ if (!relayd) {
+ ERR("Failed to find relayd %" PRIu64, *relayd_id);
+ ret_code = LTTCOMM_CONSUMERD_INVALID_PARAMETERS;
+ goto end_rcu_unlock;
+ }
+ DBG("Looking up existence of trace chunk on relay daemon");
+ pthread_mutex_lock(&relayd->ctrl_sock_mutex);
+ ret = relayd_trace_chunk_exists(&relayd->control_sock, chunk_id,
+ &chunk_exists_remote);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Failed to look-up the existence of trace chunk on relay daemon");
+ ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
+ goto end_rcu_unlock;
+ }
+
+ ret_code = chunk_exists_remote ?
+ LTTCOMM_CONSUMERD_TRACE_CHUNK_EXISTS_REMOTE :
LTTCOMM_CONSUMERD_UNKNOWN_TRACE_CHUNK;
+ DBG("Trace chunk %s on relay daemon",
+ chunk_exists_remote ? "exists" : "does not exist");
- lttng_trace_chunk_put(chunk);
+end_rcu_unlock:
+ rcu_read_unlock();
+end:
return ret_code;
}