static struct lttng_uri *live_uri;
-/*
- * Quit pipe for all threads. This permits a single cancellation point
- * for all threads when receiving an event on the pipe.
- */
-static int live_thread_quit_pipe[2] = { -1, -1 };
-
/*
* This pipe is used to inform the worker thread that a command is queued and
* ready to be processed.
/* Stopping all threads */
DBG("Terminating all live threads");
- ret = notify_thread_pipe(live_thread_quit_pipe[1]);
+ ret = notify_thread_pipe(thread_quit_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
}
}
/* Add quit pipe */
- ret = lttng_poll_add(events, live_thread_quit_pipe[0], LPOLLIN);
+ ret = lttng_poll_add(events, thread_quit_pipe[0], LPOLLIN | LPOLLERR);
if (ret < 0) {
goto error;
}
static
int check_thread_quit_pipe(int fd, uint32_t events)
{
- if (fd == live_thread_quit_pipe[0] && (events & LPOLLIN)) {
+ if (fd == thread_quit_pipe[0] && (events & LPOLLIN)) {
return 1;
}
goto error_sock_control;
}
- /*
- * Pass 3 as size here for the thread quit pipe, control and data socket.
- */
+ /* Pass 2 as size here for the thread quit pipe and control sockets. */
ret = create_thread_poll_set(&events, 2);
if (ret < 0) {
goto error_create_poll;
goto error_poll_add;
}
+ lttng_relay_notify_ready();
+
while (1) {
health_code_update();
int viewer_attach_session(struct relay_command *cmd,
struct lttng_ht *sessions_ht)
{
- int ret, send_streams = 0, nb_streams = 0;
+ int ret, send_streams = 0;
+ uint32_t nb_streams = 0, nb_streams_ready = 0;
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
struct lttng_viewer_stream send_stream;
if (stream->session != cmd->session) {
continue;
}
+ nb_streams++;
/*
- * Don't send streams with no ctf_trace, they are not ready to be
- * read.
+ * Don't send streams with no ctf_trace, they are not
+ * ready to be read.
*/
- if (!stream->ctf_trace) {
+ if (!stream->ctf_trace || !stream->viewer_ready) {
continue;
}
+ nb_streams_ready++;
vstream = live_find_viewer_stream_by_id(stream->stream_handle);
if (!vstream) {
goto end_unlock;
}
}
- nb_streams++;
+ }
+
+ /* We must have the same amount of existing stream and ready stream. */
+ if (nb_streams != nb_streams_ready) {
+ nb_streams = 0;
}
response.streams_count = htobe32(nb_streams);
}
* we need to remove it because we won't detect a EOF for this
* stream.
*/
- if (ret_ref == 1 && vstream->ctf_trace->metadata_stream) {
+ if (ret_ref == 1 && vstream->ctf_trace->viewer_metadata_stream) {
+ delete_viewer_stream(vstream->ctf_trace->viewer_metadata_stream);
destroy_viewer_stream(vstream->ctf_trace->viewer_metadata_stream);
vstream->ctf_trace->metadata_stream = NULL;
DBG("Freeing ctf_trace %" PRIu64, vstream->ctf_trace->id);
* main
*/
int live_start_threads(struct lttng_uri *uri,
- struct relay_local_data *relay_ctx, int quit_pipe[2])
+ struct relay_local_data *relay_ctx)
{
int ret = 0;
void *status;
assert(uri);
live_uri = uri;
- live_thread_quit_pipe[0] = quit_pipe[0];
- live_thread_quit_pipe[1] = quit_pipe[1];
-
/* Check if daemon is UID = 0 */
is_root = !getuid();