static
int notify_thread_pipe(int wpipe)
{
- int ret;
+ ssize_t ret;
- do {
- ret = write(wpipe, "!", 1);
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret != 1) {
+ ret = lttng_write(wpipe, "!", 1);
+ if (ret < 1) {
PERROR("write poll pipe");
}
- return ret;
+ return (int) ret;
}
/*
static
void *thread_dispatcher(void *data)
{
- int ret, err = -1;
+ int err = -1;
+ ssize_t ret;
struct cds_wfq_node *node;
struct relay_command *relay_cmd = NULL;
* so we can be assured that the data will be read at some point in
* time or wait to the end of the world :)
*/
- do {
- ret = write(live_relay_cmd_pipe[1], relay_cmd,
- sizeof(*relay_cmd));
- } while (ret < 0 && errno == EINTR);
+ ret = lttng_write(live_relay_cmd_pipe[1], relay_cmd,
+ sizeof(*relay_cmd));
free(relay_cmd);
- if (ret < 0 || ret != sizeof(struct relay_command)) {
+ if (ret < sizeof(struct relay_command)) {
PERROR("write cmd pipe");
goto error;
}
char fullpath[PATH_MAX];
struct lttng_packet_index_file_hdr hdr;
- if (stream->tracefile_size > 0) {
- /* For now we don't support on-disk ring buffer. */
- ret = -1;
- goto end;
+ if (stream->tracefile_count > 0) {
+ ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s_%"
+ PRIu64 DEFAULT_INDEX_FILE_SUFFIX, stream->path_name,
+ stream->channel_name, stream->tracefile_count_current);
+ } else {
+ ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s"
+ DEFAULT_INDEX_FILE_SUFFIX, stream->path_name,
+ stream->channel_name);
}
-
- ret = snprintf(fullpath, sizeof(fullpath), "%s/" DEFAULT_INDEX_DIR "/%s"
- DEFAULT_INDEX_FILE_SUFFIX, stream->path_name,
- stream->channel_name);
if (ret < 0) {
PERROR("snprintf index path");
goto error;
stream->index_read_fd = ret;
DBG("Opening index file %s in read only, (fd: %d)", fullpath, ret);
- do {
- health_code_update();
- ret = read(stream->index_read_fd, &hdr, sizeof(hdr));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0) {
+ ret = lttng_read(stream->index_read_fd, &hdr, sizeof(hdr));
+ if (ret < sizeof(hdr)) {
PERROR("Reading index header");
goto error;
}
ret = 0;
error:
-end:
return ret;
}
ret = -1;
goto error;
}
-
- viewer_stream->read_fd = -1;
- viewer_stream->index_read_fd = -1;
viewer_stream->session_id = stream->session->id;
viewer_stream->stream_handle = stream->stream_handle;
viewer_stream->path_name = strndup(stream->path_name,
LTTNG_VIEWER_PATH_MAX);
viewer_stream->channel_name = strndup(stream->channel_name,
LTTNG_VIEWER_NAME_MAX);
- viewer_stream->total_index_received = stream->total_index_received;
- viewer_stream->tracefile_size = stream->tracefile_size;
viewer_stream->tracefile_count = stream->tracefile_count;
viewer_stream->metadata_flag = stream->metadata_flag;
+ if (seek_last) {
+ viewer_stream->tracefile_count_current =
+ stream->tracefile_count_current;
+ } else {
+ viewer_stream->tracefile_count_current =
+ stream->oldest_tracefile_id;
+ }
+
+ /*
+ * The deletion of this ctf_trace object is only done in a call RCU of the
+ * relay stream making it valid as long as we have the read side lock.
+ */
+ viewer_stream->ctf_trace = stream->ctf_trace;
+ uatomic_inc(&viewer_stream->ctf_trace->refcount);
+
+ lttng_ht_node_init_u64(&viewer_stream->stream_n, stream->stream_handle);
+ lttng_ht_add_unique_u64(viewer_streams_ht, &viewer_stream->stream_n);
+
+ viewer_stream->index_read_fd = -1;
+ viewer_stream->read_fd = -1;
+
+ /*
+ * This is to avoid a race between the initialization of this object and
+ * the close of the given stream. If the stream is unable to find this
+ * viewer stream when closing, this copy will at least take the latest
+ * value.
+ * We also need that for the seek_last.
+ */
+ viewer_stream->total_index_received = stream->total_index_received;
- if (seek_last && viewer_stream->total_index_received > 0) {
+ /*
+ * If we never received an index for the current stream, delay
+ * the opening of the index, otherwise open it right now.
+ */
+ if (viewer_stream->tracefile_count_current ==
+ stream->tracefile_count_current &&
+ viewer_stream->total_index_received == 0) {
+ viewer_stream->index_read_fd = -1;
+ } else {
ret = open_index(viewer_stream);
if (ret < 0) {
goto error;
}
+ }
+
+ if (seek_last && viewer_stream->index_read_fd > 0) {
ret = lseek(viewer_stream->index_read_fd,
viewer_stream->total_index_received *
sizeof(struct lttng_packet_index),
viewer_stream->total_index_received;
}
- /*
- * This is to avoid a race between the initialization of this object and
- * the close of the given stream. If the stream is unable to find this
- * viewer stream when closing, this copy will at least take the latest
- * value.
- */
- viewer_stream->total_index_received = stream->total_index_received;
+ ret = 0;
+error:
+ return ret;
+}
+
+/*
+ * Rotate a stream to the next tracefile.
+ *
+ * Returns 0 on success, a negative value on error.
+ */
+static
+int rotate_viewer_stream(struct relay_viewer_stream *viewer_stream,
+ struct relay_stream *stream)
+{
+ int ret;
+ uint64_t tracefile_id;
+
+ assert(viewer_stream);
+
+ tracefile_id = (viewer_stream->tracefile_count_current + 1) %
+ viewer_stream->tracefile_count;
+
+ if (stream) {
+ pthread_mutex_lock(&stream->viewer_stream_rotation_lock);
+ }
/*
- * The deletion of this ctf_trace object is only done in a call RCU of the
- * relay stream making it valid as long as we have the read side lock.
+ * The writer and the reader are not working in the same
+ * tracefile, we can read up to EOF, we don't care about the
+ * total_index_received.
*/
- viewer_stream->ctf_trace = stream->ctf_trace;
- uatomic_inc(&viewer_stream->ctf_trace->refcount);
+ if (!stream || (stream->tracefile_count_current != tracefile_id)) {
+ viewer_stream->close_write_flag = 1;
+ } else {
+ /*
+ * We are opening a file that is still open in write, make
+ * sure we limit our reading to the number of indexes
+ * received.
+ */
+ viewer_stream->close_write_flag = 0;
+ if (stream) {
+ viewer_stream->total_index_received =
+ stream->total_index_received;
+ }
+ }
+ viewer_stream->tracefile_count_current = tracefile_id;
- lttng_ht_node_init_u64(&viewer_stream->stream_n, stream->stream_handle);
- lttng_ht_add_unique_u64(viewer_streams_ht, &viewer_stream->stream_n);
+ if (viewer_stream->abort_flag == 0) {
+ if (viewer_stream->index_read_fd > 0) {
+ ret = close(viewer_stream->index_read_fd);
+ if (ret < 0) {
+ PERROR("close index file %d",
+ viewer_stream->index_read_fd);
+ }
+ viewer_stream->index_read_fd = -1;
+ }
+ if (viewer_stream->read_fd > 0) {
+ ret = close(viewer_stream->read_fd);
+ if (ret < 0) {
+ PERROR("close tracefile %d",
+ viewer_stream->read_fd);
+ }
+ viewer_stream->read_fd = -1;
+ }
+ } else {
+ viewer_stream->abort_flag = 0;
+ }
+
+ viewer_stream->index_read_fd = -1;
+ viewer_stream->read_fd = -1;
+
+ if (stream) {
+ pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
+ }
+ ret = open_index(viewer_stream);
+ if (ret < 0) {
+ goto error;
+ }
ret = 0;
rstream = relay_stream_find_by_id(vstream->stream_handle);
if (rstream) {
- if (rstream->beacon_ts_end != -1ULL &&
- vstream->last_sent_index == rstream->total_index_received) {
- viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
- viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
- goto send_reply;
+ if (vstream->abort_flag) {
+ /* Rotate on abort (overwrite). */
+ DBG("Viewer rotate because of overwrite");
+ ret = rotate_viewer_stream(vstream, rstream);
+ if (ret < 0) {
+ goto end_unlock;
+ }
}
-
- if (rstream->total_index_received <= vstream->last_sent_index) {
- /* No new index to send, retry later. */
- viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
- goto send_reply;
+ pthread_mutex_lock(&rstream->viewer_stream_rotation_lock);
+ if (rstream->tracefile_count_current == vstream->tracefile_count_current) {
+ if (rstream->beacon_ts_end != -1ULL &&
+ vstream->last_sent_index == rstream->total_index_received) {
+ viewer_index.status = htobe32(VIEWER_INDEX_INACTIVE);
+ viewer_index.timestamp_end = htobe64(rstream->beacon_ts_end);
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+ goto send_reply;
+ /*
+ * Reader and writer are working in the same tracefile, so we care
+ * about the number of index received and sent. Otherwise, we read
+ * up to EOF.
+ */
+ } else if (rstream->total_index_received <= vstream->last_sent_index
+ && !vstream->close_write_flag) {
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+ /* No new index to send, retry later. */
+ viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ goto send_reply;
+ }
}
- } else if (!rstream &&
+ pthread_mutex_unlock(&rstream->viewer_stream_rotation_lock);
+ } else if (!rstream && vstream->close_write_flag &&
vstream->total_index_received == vstream->last_sent_index) {
- /* Last index sent and stream closed */
+ /* Last index sent and current tracefile closed in write */
viewer_index.status = htobe32(VIEWER_INDEX_HUP);
goto send_reply;
+ } else {
+ vstream->close_write_flag = 1;
}
if (!vstream->ctf_trace->metadata_received ||
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA;
}
- do {
- health_code_update();
- ret = read(vstream->index_read_fd, &packet_index,
- sizeof(packet_index));
- } while (ret < 0 && errno == EINTR);
+ pthread_mutex_lock(&vstream->overwrite_lock);
+ if (vstream->abort_flag) {
+ /*
+ * The file is being overwritten by the writer, we cannot
+ * use it.
+ */
+ viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ pthread_mutex_unlock(&vstream->overwrite_lock);
+ ret = rotate_viewer_stream(vstream, rstream);
+ if (ret < 0) {
+ goto end_unlock;
+ }
+ goto send_reply;
+ }
+ ret = lttng_read(vstream->index_read_fd, &packet_index,
+ sizeof(packet_index));
+ pthread_mutex_unlock(&vstream->overwrite_lock);
if (ret < sizeof(packet_index)) {
- PERROR("Relay reading index file");
- viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+ /*
+ * The tracefile is closed in write, so we read up to EOF.
+ */
+ if (vstream->close_write_flag == 1) {
+ viewer_index.status = htobe32(VIEWER_INDEX_RETRY);
+ /* Rotate on normal EOF */
+ ret = rotate_viewer_stream(vstream, rstream);
+ if (ret < 0) {
+ goto end_unlock;
+ }
+ } else {
+ PERROR("Relay reading index file %d",
+ vstream->index_read_fd);
+ viewer_index.status = htobe32(VIEWER_INDEX_ERR);
+ }
+ goto send_reply;
} else {
viewer_index.status = htobe32(VIEWER_INDEX_OK);
vstream->last_sent_index++;
if (stream->read_fd < 0) {
char fullpath[PATH_MAX];
- ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
- stream->channel_name);
+ if (stream->tracefile_count > 0) {
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s_%" PRIu64, stream->path_name,
+ stream->channel_name,
+ stream->tracefile_count_current);
+ } else {
+ ret = snprintf(fullpath, PATH_MAX, "%s/%s", stream->path_name,
+ stream->channel_name);
+ }
if (ret < 0) {
goto error;
}
ret = lseek(stream->read_fd, be64toh(get_packet_info.offset), SEEK_SET);
if (ret < 0) {
- PERROR("lseek");
- goto error;
+ /*
+ * If the read fd was closed by the streaming side, the
+ * abort_flag will be set to 1, otherwise it is an error.
+ */
+ if (stream->abort_flag == 0) {
+ PERROR("lseek");
+ goto error;
+ }
+ reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+ goto send_reply;
}
- read_len = read(stream->read_fd, data, len);
- if (read_len < (ssize_t) len) {
- PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
- stream->read_fd, be64toh(get_packet_info.offset));
- goto error;
+ read_len = lttng_read(stream->read_fd, data, len);
+ if (read_len < len) {
+ /*
+ * If the read fd was closed by the streaming side, the
+ * abort_flag will be set to 1, otherwise it is an error.
+ */
+ if (stream->abort_flag == 0) {
+ PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64,
+ stream->read_fd,
+ be64toh(get_packet_info.offset));
+ goto error;
+ } else {
+ reply.status = htobe32(VIEWER_GET_PACKET_EOF);
+ goto send_reply;
+ }
}
reply.status = htobe32(VIEWER_GET_PACKET_OK);
reply.len = htobe32(len);
goto error;
}
- read_len = read(stream->read_fd, data, len);
- if (read_len < (ssize_t) len) {
+ read_len = lttng_read(stream->read_fd, data, len);
+ if (read_len < len) {
PERROR("Relay reading metadata file");
goto error;
}
goto error;
}
- do {
- health_code_update();
- ret = read(fd, relay_connection, sizeof(*relay_connection));
- } while (ret < 0 && errno == EINTR);
- if (ret < 0 || ret < sizeof(*relay_connection)) {
+ ret = lttng_read(fd, relay_connection, sizeof(*relay_connection));
+ if (ret < sizeof(*relay_connection)) {
PERROR("read relay cmd pipe");
goto error_read;
}
continue;
}
- if (stream->read_fd > 0) {
+ if (stream->read_fd >= 0) {
ret = close(stream->read_fd);
if (ret < 0) {
PERROR("close read_fd");
}
}
- if (stream->index_read_fd > 0) {
+ if (stream->index_read_fd >= 0) {
ret = close(stream->index_read_fd);
if (ret < 0) {
PERROR("close index_read_fd");
/* connection closed */
if (ret <= 0) {
cleanup_poll_connection(&events, pollfd);
- del_connection( relay_connections_ht, &iter,
+ del_connection(relay_connections_ht, &iter,
relay_connection);
DBG("Viewer control connection closed with %d",
pollfd);
return ret;
}
-void live_stop_threads()
+void live_stop_threads(void)
{
int ret;
void *status;