uint64_t session_id, unsigned int ignore_sent_flag)
{
ssize_t ret;
- struct lttng_viewer_stream send_stream;
struct lttng_ht_iter iter;
struct relay_viewer_stream *vstream;
cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream,
stream_n.node) {
struct ctf_trace *ctf_trace;
+ struct lttng_viewer_stream send_stream = {};
health_code_update();
ret = fd_tracker_util_poll_create(the_fd_tracker,
name, events, 1, LTTNG_CLOEXEC);
+ if (ret) {
+ PERROR("Failed to create \"%s\" poll file descriptor", name);
+ goto error;
+ }
/* Add quit pipe */
ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd,
(const char **) (formated_name ? &formated_name : NULL),
1, create_sock, sock);
- free(formated_name);
+ if (ret) {
+ PERROR("Failed to create \"%s\" socket",
+ formated_name ?: "Unknown");
+ goto error;
+ }
DBG("Listening on %s socket %d", name, sock->fd);
ret = sock->ops->bind(sock);
}
+ free(formated_name);
return sock;
error:
if (sock) {
lttcomm_destroy_sock(sock);
}
+ free(formated_name);
return NULL;
}
goto send_reply;
}
- if (rstream->trace_chunk) {
- uint64_t rchunk_id, vchunk_id;
+ if (rstream->trace_chunk && !lttng_trace_chunk_ids_equal(
+ conn->viewer_session->current_trace_chunk,
+ rstream->trace_chunk)) {
+ DBG("Metadata relay stream and viewer chunk ids differ");
- /*
- * If the relay stream is not yet closed, ensure the viewer
- * chunk matches the relay chunk after clear.
- */
- if (lttng_trace_chunk_get_id(rstream->trace_chunk,
- &rchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
- viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
- goto send_reply;
- }
- if (lttng_trace_chunk_get_id(
- conn->viewer_session->current_trace_chunk,
- &vchunk_id) != LTTNG_TRACE_CHUNK_STATUS_OK) {
+ ret = viewer_session_set_trace_chunk_copy(
+ conn->viewer_session,
+ rstream->trace_chunk);
+ if (ret) {
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
}
-
- if (rchunk_id != vchunk_id) {
- DBG("Relay and viewer chunk ids differ: "
- "rchunk_id %" PRIu64 " vchunk_id %" PRIu64,
- rchunk_id, vchunk_id);
-
- lttng_trace_chunk_put(
- conn->viewer_session->current_trace_chunk);
- conn->viewer_session->current_trace_chunk = NULL;
- ret = viewer_session_set_trace_chunk_copy(
- conn->viewer_session,
- rstream->trace_chunk);
- if (ret) {
- viewer_index.status =
- htobe32(LTTNG_VIEWER_INDEX_ERR);
- goto send_reply;
- }
- }
}
if (conn->viewer_session->current_trace_chunk !=
vstream->stream_file.trace_chunk) {
goto send_reply;
}
+ if (vstream->stream->trace_chunk &&
+ !lttng_trace_chunk_ids_equal(
+ conn->viewer_session->current_trace_chunk,
+ vstream->stream->trace_chunk)) {
+ /* A rotation has occurred on the relay stream. */
+ DBG("Metadata relay stream and viewer chunk ids differ");
+
+ ret = viewer_session_set_trace_chunk_copy(
+ conn->viewer_session,
+ vstream->stream->trace_chunk);
+ if (ret) {
+ reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
+ goto send_reply;
+ }
+ }
+
+ if (conn->viewer_session->current_trace_chunk !=
+ vstream->stream_file.trace_chunk) {
+ bool acquired_reference;
+
+ DBG("Viewer session and viewer stream chunk differ: "
+ "vsession chunk %p vstream chunk %p",
+ conn->viewer_session->current_trace_chunk,
+ vstream->stream_file.trace_chunk);
+ lttng_trace_chunk_put(vstream->stream_file.trace_chunk);
+ acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk);
+ assert(acquired_reference);
+ vstream->stream_file.trace_chunk =
+ conn->viewer_session->current_trace_chunk;
+ viewer_stream_close_files(vstream);
+ }
+
len = vstream->stream->metadata_received - vstream->metadata_sent;
- /* first time, we open the metadata file */
- if (!vstream->stream_file.handle) {
+ /*
+ * Either this is the first time the metadata file is read, or a
+ * rotation of the corresponding relay stream has occured.
+ */
+ if (!vstream->stream_file.handle && len > 0) {
struct fs_handle *fs_handle;
char file_path[LTTNG_PATH_MAX];
enum lttng_trace_chunk_status status;
goto error;
}
vstream->stream_file.handle = fs_handle;
+
+ if (vstream->metadata_sent != 0) {
+ /*
+ * The client does not expect to receive any metadata
+ * it has received and metadata files in successive
+ * chunks must be a strict superset of one another.
+ *
+ * Skip the first `metadata_sent` bytes to ensure
+ * they are not sent a second time to the client.
+ *
+ * Baring a block layer error or an internal error,
+ * this seek should not fail as
+ * `vstream->stream->metadata_received` is reset when
+ * a relay stream is rotated. If this is reached, it is
+ * safe to assume that
+ * `metadata_received` > `metadata_sent`.
+ */
+ const off_t seek_ret = fs_handle_seek(fs_handle,
+ vstream->metadata_sent, SEEK_SET);
+
+ if (seek_ret < 0) {
+ PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64,
+ vstream->metadata_sent);
+ reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR);
+ goto send_reply;
+ }
+ }
}
reply.len = htobe64(len);
fs_handle_put_fd(vstream->stream_file.handle);
fd = -1;
if (read_len < len) {
- PERROR("Relay reading metadata file");
- goto error;
+ if (read_len < 0) {
+ PERROR("Failed to read metadata file");
+ goto error;
+ } else {
+ /*
+ * A clear has been performed which prevents the relay
+ * from sending `len` bytes of metadata.
+ *
+ * It is important not to send any metadata if we
+ * couldn't read all the available metadata in one shot:
+ * sending partial metadata can cause the client to
+ * attempt to parse an incomplete (incoherent) metadata
+ * stream, which would result in an error.
+ */
+ const off_t seek_ret = fs_handle_seek(
+ vstream->stream_file.handle, -read_len,
+ SEEK_CUR);
+
+ DBG("Failed to read metadata: requested = %zd, got = %zd",
+ len, read_len);
+ read_len = 0;
+ len = 0;
+ if (seek_ret < 0) {
+ PERROR("Failed to restore metadata file position after partial read");
+ ret = -1;
+ goto error;
+ }
+ }
}
vstream->metadata_sent += read_len;
reply.status = htobe32(LTTNG_VIEWER_METADATA_OK);