summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
f3fe2a9)
The "network" sequence number (net_seq_num) is a 64-bit sequence number
tagging each packet sent over the network. The net_seq_num increments
monotonically (+1) for each packet sent from consumer daemon to relay
daemon, on a per-stream basis. It is tagged by the consumer daemon when
sending a trace packet to the relay daemon.
The LTTng kernel and user-space ring buffer "consumed position"
(consumed_pos) and "produced position" (produced_pos) are free-running
counters counting the number of bytes consumed and produced so far by
each stream. Because those counters are updated atomically, they are
limited to a size of 32-bit on 32-bit architectures.
The "packet" sequence number (packet_seq_num) is a sequence number
found in the packet header starting from LTTng 2.8. It is a 64-bit
sequence number assigned by the lttng-modules and lttng-ust ring
buffers. It increments monotonically (+1) for each packet produced
within a given ring buffer (stream).
Using produced_pos as rotation position and comparing it to the
net_seq_num has a few issues:
1) It breaks on 32-bit producers after generating more than 4GB of
data per stream, due to overflow. The net_seq_num is a 64-bit
counter, which does not overflow, but the produced_pos overflows
after 4GB on 32-bit architectures. This can lead to never-completing
rotations.
2) It breaks scenarios where ring buffers are configured in
overwrite mode, and streaming to a relay daemon. Indeed, when
the ring buffer moves the consumed_pos ahead, actually overwriting
data within the ring buffer, it introduces an offset between the
produced_pos and the net_seq_num. Therefore, if producers are
generating a low- (or no-) throughput in some streams, the
rotation may never complete, even on 64-bit architectures.
The solution proposed for this issue is to use the packet_seq_num as
rotation position rather than the net_seq_num. It takes care of
the two problematic scenarios, since the counter is always 64-bit
(even on 32-bit architectures), and because the counter is managed
by the producer, which therefore tracks progress of the ring buffer
overwrites.
This commit introduces changes required at the relayd side. A
separate commit introduces the changes required in the consumerd.
In relayd, one major restriction is the fact that the packet_seq_num
is not sent over the data socket, only through the control socket
receiving the indexes.
Therefore, in order to figure out the pivot position for the data
socket for a given stream, the associated index first needs to be
received. At that point, the corresponding net_seq_num is known,
which provides the pivot position for the data stream. Given that
the data and index sockets provide no ordering guarantees with
respect to their arrival, we handle the fact that data might have
been saved to disk in the wrong (previous) trace chunk by moving
it to the next trace chunk when the pivot position is known.
In order to allow "jumps" in the sequence numbers produced by
overwrite mode buffers, try_rotate_stream_index(), which previously
asserted that each sequence number was received in sequence, now
uses the packet_seq_num pivot position as a lower (inclusive) bound.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Change-Id: I755329e313f0980655a164b7bdb57e4f3d8e944a
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
if (minor_version >= 8) {
index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
if (minor_version >= 8) {
index->index_data.stream_instance_id = htobe64(data->stream_instance_id);
index->index_data.packet_seq_num = htobe64(data->packet_seq_num);
+ } else {
+ uint64_t unset_value = -1ULL;
+
+ index->index_data.stream_instance_id = htobe64(unset_value);
+ index->index_data.packet_seq_num = htobe64(unset_value);
}
return relay_index_set_data(index, &index_data);
}
return relay_index_set_data(index, &index_data);
index_info.stream_instance_id =
be64toh(index_info.stream_instance_id);
index_info.packet_seq_num = be64toh(index_info.packet_seq_num);
index_info.stream_instance_id =
be64toh(index_info.stream_instance_id);
index_info.packet_seq_num = be64toh(index_info.packet_seq_num);
+ } else {
+ index_info.stream_instance_id = -1ULL;
+ index_info.packet_seq_num = -1ULL;
}
stream = stream_get_by_id(index_info.relay_stream_id);
}
stream = stream_get_by_id(index_info.relay_stream_id);
}
if (stream->prev_data_seq == -1ULL ||
}
if (stream->prev_data_seq == -1ULL ||
- stream->prev_data_seq + 1 < stream->ongoing_rotation.value.seq_num) {
+ stream->ongoing_rotation.value.prev_data_net_seq == -1ULL ||
+ stream->prev_data_seq <
+ stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* The next packet that will be written is not part of the next
* chunk yet.
*/
/*
* The next packet that will be written is not part of the next
* chunk yet.
*/
- DBG("Stream %" PRIu64 " not yet ready for rotation (rotate_at_seq_num = %" PRIu64
+ DBG("Stream %" PRIu64 " data not yet ready for rotation "
+ "(rotate_at_index_packet_seq_num = %" PRIu64
+ ", rotate_at_prev_data_net_seq = %" PRIu64
", prev_data_seq = %" PRIu64 ")",
stream->stream_handle,
", prev_data_seq = %" PRIu64 ")",
stream->stream_handle,
- stream->ongoing_rotation.value.seq_num,
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->ongoing_rotation.value.prev_data_net_seq,
stream->prev_data_seq);
goto end;
stream->prev_data_seq);
goto end;
- } else if (stream->prev_data_seq > stream->ongoing_rotation.value.seq_num) {
+ } else if (stream->prev_data_seq > stream->ongoing_rotation.value.prev_data_net_seq) {
/*
* prev_data_seq is checked here since indexes and rotation
* commands are serialized with respect to each other.
/*
* prev_data_seq is checked here since indexes and rotation
* commands are serialized with respect to each other.
- if (stream->prev_index_seq == -1ULL ||
- stream->prev_index_seq + 1 < stream->ongoing_rotation.value.seq_num) {
- DBG("Stream %" PRIu64 " index not yet ready for rotation (rotate_at_seq_num = %" PRIu64 ", prev_index_seq = %" PRIu64 ")",
+ if (!stream->received_packet_seq_num.is_set ||
+ LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 <
+ stream->ongoing_rotation.value.packet_seq_num) {
+ DBG("Stream %" PRIu64 " index not yet ready for rotation "
+ "(rotate_at_packet_seq_num = %" PRIu64
+ ", received_packet_seq_num = "
+ "(value = %" PRIu64 ", is_set = %" PRIu8 "))",
- stream->ongoing_rotation.value.seq_num,
- stream->prev_index_seq);
+ stream->ongoing_rotation.value.packet_seq_num,
+ stream->received_packet_seq_num.value,
+ stream->received_packet_seq_num.is_set);
- /* The next index belongs to the new trace chunk; rotate. */
- assert(stream->prev_index_seq + 1 ==
- stream->ongoing_rotation.value.seq_num);
+ /*
+ * The next index belongs to the new trace chunk; rotate.
+ * In overwrite mode, the packet seq num may jump over the
+ * rotation position.
+ */
+ assert(LTTNG_OPTIONAL_GET(stream->received_packet_seq_num) + 1 >=
+ stream->ongoing_rotation.value.packet_seq_num);
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
ret = create_index_file(stream,
stream->ongoing_rotation.value.next_trace_chunk);
stream->ongoing_rotation.value.index_rotated = true;
DBG("Rotating stream %" PRIu64 " index file",
stream->stream_handle);
ret = create_index_file(stream,
stream->ongoing_rotation.value.next_trace_chunk);
stream->ongoing_rotation.value.index_rotated = true;
+ /*
+ * Set the rotation pivot position for the data, now that we have the
+ * net_seq_num matching the packet_seq_num index pivot position.
+ */
+ stream->ongoing_rotation.value.prev_data_net_seq =
+ stream->prev_index_seq;
if (stream->ongoing_rotation.value.data_rotated &&
stream->ongoing_rotation.value.index_rotated) {
/* Rotation completed; reset its state. */
if (stream->ongoing_rotation.value.data_rotated &&
stream->ongoing_rotation.value.index_rotated) {
/* Rotation completed; reset its state. */
{
int ret = 0;
const struct relay_stream_rotation rotation = {
{
int ret = 0;
const struct relay_stream_rotation rotation = {
- .seq_num = rotation_sequence_number,
+ .data_rotated = false,
+ .index_rotated = false,
+ .packet_seq_num = rotation_sequence_number,
+ .prev_data_net_seq = -1ULL,
.next_trace_chunk = next_trace_chunk,
};
.next_trace_chunk = next_trace_chunk,
};
}
LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
}
LTTNG_OPTIONAL_SET(&stream->ongoing_rotation, rotation);
- DBG("Setting pending rotation: stream_id = %" PRIu64 ", rotation_seq_num = %" PRIu64,
+ DBG("Setting pending rotation: stream_id = %" PRIu64
+ ", rotate_at_packet_seq_num = %" PRIu64,
stream->stream_handle, rotation_sequence_number);
if (stream->is_metadata) {
/*
stream->stream_handle, rotation_sequence_number);
if (stream->is_metadata) {
/*
stream->ongoing_rotation.value.index_rotated = true;
ret = stream_rotate_data_file(stream);
} else {
stream->ongoing_rotation.value.index_rotated = true;
ret = stream_rotate_data_file(stream);
} else {
- ret = try_rotate_stream_data(stream);
+ ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
if (ret < 0) {
goto end;
}
- ret = try_rotate_stream_index(stream);
+ ret = try_rotate_stream_data(stream);
if (ret < 0) {
goto end;
}
if (ret < 0) {
goto end;
}
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
tracefile_array_file_rotate(stream->tfa, TRACEFILE_ROTATE_READ);
tracefile_array_commit_seq(stream->tfa);
stream->index_received_seqcount++;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ be64toh(index->index_data.packet_seq_num));
*flushed = true;
} else if (ret > 0) {
index->total_size = total_size;
*flushed = true;
} else if (ret > 0) {
index->total_size = total_size;
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;
stream->prev_index_seq = index_info->net_seq_num;
stream->index_received_seqcount++;
stream->pos_after_last_complete_data_index += index->total_size;
stream->prev_index_seq = index_info->net_seq_num;
+ LTTNG_OPTIONAL_SET(&stream->received_packet_seq_num,
+ index_info->packet_seq_num);
ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
ret = try_rotate_stream_index(stream);
if (ret < 0) {
goto end;
}
+ ret = try_rotate_stream_data(stream);
+ if (ret < 0) {
+ goto end;
+ }
} else if (ret > 0) {
/* no flush. */
ret = 0;
} else if (ret > 0) {
/* no flush. */
ret = 0;
bool data_rotated;
bool index_rotated;
/*
bool data_rotated;
bool index_rotated;
/*
- * Sequence number of the first packet of the new trace chunk to which
- * the stream is rotating.
+ * Packet sequence number of the first packet of the new trace chunk to
+ * which the stream is rotating.
+ uint64_t packet_seq_num;
+ /*
+ * Monotonically increasing previous network sequence number of first
+ * data packet of the new trace chunk to which the stream is rotating.
+ */
+ uint64_t prev_data_net_seq;
struct lttng_trace_chunk *next_trace_chunk;
};
struct lttng_trace_chunk *next_trace_chunk;
};
*/
uint64_t index_received_seqcount;
*/
uint64_t index_received_seqcount;
+ /*
+ * Packet sequence number of the last received packet index.
+ * Only populated when interacting with CTF_INDEX 1.1+.
+ */
+ LTTNG_OPTIONAL(uint64_t) received_packet_seq_num;
+
/*
* Tracefile array is an index of the stream trace files,
* indexed by position. It allows keeping track of the oldest
/*
* Tracefile array is an index of the stream trace files,
* indexed by position. It allows keeping track of the oldest
struct relayd_stream_rotation_position {
uint64_t stream_id;
/*
struct relayd_stream_rotation_position {
uint64_t stream_id;
/*
- * Sequence number of the first packet belonging to the new
+ * Packet sequence number of the first packet belonging to the new
* "destination" trace chunk to which the stream is rotating.
*
* Ignored for metadata streams.
* "destination" trace chunk to which the stream is rotating.
*
* Ignored for metadata streams.