health_code_update();
- while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+ for (;;) {
health_code_update();
/* Atomically prepare the queue futex */
futex_nto1_prepare(&viewer_conn_queue.futex);
+ if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) {
+ break;
+ }
+
do {
health_code_update();
/*
* Open the index file if needed for the given vstream.
*
- * If an index file is successfully opened, the vstream index_fd set with
- * it.
+ * If an index file is successfully opened, the vstream will set it as its
+ * current index file.
*
* Return 0 on success, a negative value on error (-ENOENT if not ready yet).
*
{
int ret = 0;
- if (vstream->index_fd) {
+ if (vstream->index_file) {
goto end;
}
ret = -ENOENT;
goto end;
}
- ret = index_open(vstream->path_name, vstream->channel_name,
+ vstream->index_file = lttng_index_file_open(vstream->path_name,
+ vstream->channel_name,
vstream->stream->tracefile_count,
vstream->current_tracefile_id);
- if (ret >= 0) {
- vstream->index_fd = stream_fd_create(ret);
- if (!vstream->index_fd) {
- if (close(ret)) {
- PERROR("close");
- }
- ret = -1;
- } else {
- ret = 0;
- }
- goto end;
+ if (!vstream->index_file) {
+ ret = -1;
}
end:
{
int ret;
- if (trace->session->connection_closed
+ if ((trace->session->connection_closed || rstream->closed)
&& rstream->index_received_seqcount
== vstream->index_sent_seqcount) {
- /* Last index sent and session connection is closed. */
+ /*
+ * Last index sent and session connection or relay
+ * stream are closed.
+ */
index->status = htobe32(LTTNG_VIEWER_INDEX_HUP);
goto hup;
} else if (rstream->beacon_ts_end != -1ULL &&
int viewer_get_next_index(struct relay_connection *conn)
{
int ret;
- ssize_t read_ret;
struct lttng_viewer_get_next_index request_index;
struct lttng_viewer_index viewer_index;
struct ctf_packet_index packet_index;
viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM;
}
- read_ret = lttng_read(vstream->index_fd->fd, &packet_index,
- sizeof(packet_index));
- if (read_ret < sizeof(packet_index)) {
- ERR("Relay reading index file %d returned %zd",
- vstream->index_fd->fd, read_ret);
+ ret = lttng_index_file_read(vstream->index_file, &packet_index);
+ if (ret) {
+ ERR("Relay error reading index file %d",
+ vstream->index_file->fd);
viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR);
goto send_reply;
} else {
}
/* Setup the dispatcher thread */
- ret = pthread_create(&live_dispatcher_thread, NULL,
+ ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(),
thread_dispatcher, (void *) NULL);
if (ret) {
errno = ret;
}
/* Setup the worker thread */
- ret = pthread_create(&live_worker_thread, NULL,
+ ret = pthread_create(&live_worker_thread, default_pthread_attr(),
thread_worker, NULL);
if (ret) {
errno = ret;
}
/* Setup the listener thread */
- ret = pthread_create(&live_listener_thread, NULL,
+ ret = pthread_create(&live_listener_thread, default_pthread_attr(),
thread_listener, (void *) NULL);
if (ret) {
errno = ret;