X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.cpp;h=869c88d98b645d26783408efca8fff0cc07ae53a;hb=28ab034a2c3582d07d3423d2d746731f87d3969f;hp=4724a3adaab0bcbfe5d64dfc44245b57c0f202da;hpb=52e345b9ac912d033c2a2c25a170a01cf209839d;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.cpp b/src/bin/lttng-relayd/live.cpp index 4724a3ada..869c88d98 100644 --- a/src/bin/lttng-relayd/live.cpp +++ b/src/bin/lttng-relayd/live.cpp @@ -8,6 +8,36 @@ */ #define _LGPL_SOURCE +#include "cmd.hpp" +#include "connection.hpp" +#include "ctf-trace.hpp" +#include "health-relayd.hpp" +#include "live.hpp" +#include "lttng-relayd.hpp" +#include "session.hpp" +#include "stream.hpp" +#include "testpoint.hpp" +#include "utils.hpp" +#include "viewer-session.hpp" +#include "viewer-stream.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + #include #include #include @@ -18,6 +48,7 @@ #include #include #include +#include #include #include #include @@ -29,38 +60,8 @@ #include #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "cmd.hpp" -#include "connection.hpp" -#include "ctf-trace.hpp" -#include "health-relayd.hpp" -#include "live.hpp" -#include "lttng-relayd.hpp" -#include "session.hpp" -#include "stream.hpp" -#include "testpoint.hpp" -#include "utils.hpp" -#include "viewer-session.hpp" -#include "viewer-stream.hpp" - -#define SESSION_BUF_DEFAULT_COUNT 16 +#define SESSION_BUF_DEFAULT_COUNT 16 static struct lttng_uri *live_uri; @@ -86,11 +87,9 @@ static pthread_t live_worker_thread; static struct relay_conn_queue viewer_conn_queue; static uint64_t last_relay_viewer_session_id; -static pthread_mutex_t last_relay_viewer_session_id_lock = - PTHREAD_MUTEX_INITIALIZER; +static pthread_mutex_t last_relay_viewer_session_id_lock = PTHREAD_MUTEX_INITIALIZER; -static -const char *lttng_viewer_command_str(lttng_viewer_command cmd) +static const char *lttng_viewer_command_str(lttng_viewer_command cmd) { switch (cmd) { case LTTNG_VIEWER_CONNECT: @@ -116,9 +115,8 @@ const char *lttng_viewer_command_str(lttng_viewer_command cmd) } } -static -const char *lttng_viewer_next_index_return_code_str( - enum lttng_viewer_next_index_return_code code) +static const char * +lttng_viewer_next_index_return_code_str(enum lttng_viewer_next_index_return_code code) { switch (code) { case LTTNG_VIEWER_INDEX_OK: @@ -138,9 +136,7 @@ const char *lttng_viewer_next_index_return_code_str( } } -static -const char *lttng_viewer_attach_return_code_str( - enum lttng_viewer_attach_return_code code) +static const char *lttng_viewer_attach_return_code_str(enum lttng_viewer_attach_return_code code) { switch (code) { case LTTNG_VIEWER_ATTACH_OK: @@ -160,9 +156,8 @@ const char *lttng_viewer_attach_return_code_str( } }; -static -const char *lttng_viewer_get_packet_return_code_str( - enum lttng_viewer_get_packet_return_code code) +static const char * +lttng_viewer_get_packet_return_code_str(enum lttng_viewer_get_packet_return_code code) { switch (code) { case LTTNG_VIEWER_GET_PACKET_OK: @@ -181,8 +176,7 @@ const char *lttng_viewer_get_packet_return_code_str( /* * Cleanup the daemon */ -static -void cleanup_relayd_live(void) +static void cleanup_relayd_live(void) { DBG("Cleaning up"); @@ -196,8 +190,7 @@ void cleanup_relayd_live(void) * Return the size of the received message or else a negative value on error * with errno being set by recvmsg() syscall. */ -static -ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size) +static ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size) { ssize_t ret; @@ -222,8 +215,7 @@ ssize_t recv_request(struct lttcomm_sock *sock, void *buf, size_t size) * Return the size of the sent message or else a negative value on error with * errno being set by sendmsg() syscall. */ -static -ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) +static ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) { ssize_t ret; @@ -242,8 +234,7 @@ ssize_t send_response(struct lttcomm_sock *sock, void *buf, size_t size) * Returns 1 if new streams got added, 0 if nothing changed, a negative value * on error. */ -static -int check_new_streams(struct relay_connection *conn) +static int check_new_streams(struct relay_connection *conn) { struct relay_session *session; unsigned long current_val; @@ -253,9 +244,9 @@ int check_new_streams(struct relay_connection *conn) goto end; } rcu_read_lock(); - cds_list_for_each_entry_rcu(session, - &conn->viewer_session->session_list, - viewer_session_node) { + cds_list_for_each_entry_rcu( + session, &conn->viewer_session->session_list, viewer_session_node) + { if (!session_get(session)) { continue; } @@ -277,9 +268,8 @@ end: * * Return 0 on success or else a negative value. */ -static -ssize_t send_viewer_streams(struct lttcomm_sock *sock, - uint64_t session_id, unsigned int ignore_sent_flag) +static ssize_t +send_viewer_streams(struct lttcomm_sock *sock, uint64_t session_id, unsigned int ignore_sent_flag) { ssize_t ret; struct lttng_ht_iter iter; @@ -287,8 +277,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, rcu_read_lock(); - cds_lfht_for_each_entry(viewer_streams_ht->ht, &iter.iter, vstream, - stream_n.node) { + cds_lfht_for_each_entry (viewer_streams_ht->ht, &iter.iter, vstream, stream_n.node) { struct ctf_trace *ctf_trace; struct lttng_viewer_stream send_stream = {}; @@ -301,7 +290,7 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, pthread_mutex_lock(&vstream->stream->lock); /* Ignore if not the same session. */ if (vstream->stream->trace->session->id != session_id || - (!ignore_sent_flag && vstream->sent_flag)) { + (!ignore_sent_flag && vstream->sent_flag)) { pthread_mutex_unlock(&vstream->stream->lock); viewer_stream_put(vstream); continue; @@ -310,26 +299,25 @@ ssize_t send_viewer_streams(struct lttcomm_sock *sock, ctf_trace = vstream->stream->trace; send_stream.id = htobe64(vstream->stream->stream_handle); send_stream.ctf_trace_id = htobe64(ctf_trace->id); - send_stream.metadata_flag = htobe32( - vstream->stream->is_metadata); - if (lttng_strncpy(send_stream.path_name, vstream->path_name, - sizeof(send_stream.path_name))) { + send_stream.metadata_flag = htobe32(vstream->stream->is_metadata); + if (lttng_strncpy(send_stream.path_name, + vstream->path_name, + sizeof(send_stream.path_name))) { pthread_mutex_unlock(&vstream->stream->lock); viewer_stream_put(vstream); - ret = -1; /* Error. */ + ret = -1; /* Error. */ goto end_unlock; } if (lttng_strncpy(send_stream.channel_name, - vstream->channel_name, - sizeof(send_stream.channel_name))) { + vstream->channel_name, + sizeof(send_stream.channel_name))) { pthread_mutex_unlock(&vstream->stream->lock); viewer_stream_put(vstream); - ret = -1; /* Error. */ + ret = -1; /* Error. */ goto end_unlock; } - DBG("Sending stream %" PRIu64 " to viewer", - vstream->stream->stream_handle); + DBG("Sending stream %" PRIu64 " to viewer", vstream->stream->stream_handle); vstream->sent_flag = 1; pthread_mutex_unlock(&vstream->stream->lock); @@ -359,12 +347,12 @@ end_unlock: * Return 0 on success or else a negative value. */ static int make_viewer_streams(struct relay_session *relay_session, - struct relay_viewer_session *viewer_session, - enum lttng_viewer_seek seek_t, - uint32_t *nb_total, - uint32_t *nb_unsent, - uint32_t *nb_created, - bool *closed) + struct relay_viewer_session *viewer_session, + enum lttng_viewer_seek seek_t, + uint32_t *nb_total, + uint32_t *nb_unsent, + uint32_t *nb_created, + bool *closed) { int ret; struct lttng_ht_iter iter; @@ -383,8 +371,8 @@ static int make_viewer_streams(struct relay_session *relay_session, * used for a the given session id only. */ rcu_read_lock(); - cds_lfht_for_each_entry (relay_session->ctf_traces_ht->ht, &iter.iter, - ctf_trace, node.node) { + cds_lfht_for_each_entry ( + relay_session->ctf_traces_ht->ht, &iter.iter, ctf_trace, node.node) { bool trace_has_metadata_stream = false; health_code_update(); @@ -397,8 +385,7 @@ static int make_viewer_streams(struct relay_session *relay_session, * Iterate over all the streams of the trace to see if we have a * metadata stream. */ - cds_list_for_each_entry_rcu(relay_stream, - &ctf_trace->stream_list, stream_node) + cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node) { bool is_metadata_stream; @@ -419,14 +406,12 @@ static int make_viewer_streams(struct relay_session *relay_session, * and we never sent one to the viewer, skip the trace. We * accept that the viewer will not see this trace at all. */ - if (!trace_has_metadata_stream && - !ctf_trace->metadata_stream_sent_to_viewer) { + if (!trace_has_metadata_stream && !ctf_trace->metadata_stream_sent_to_viewer) { ctf_trace_put(ctf_trace); continue; } - cds_list_for_each_entry_rcu(relay_stream, - &ctf_trace->stream_list, stream_node) + cds_list_for_each_entry_rcu(relay_stream, &ctf_trace->stream_list, stream_node) { struct relay_viewer_stream *viewer_stream; @@ -441,8 +426,7 @@ static int make_viewer_streams(struct relay_session *relay_session, if (!relay_stream->published) { goto next; } - viewer_stream = viewer_stream_get_by_id( - relay_stream->stream_handle); + viewer_stream = viewer_stream_get_by_id(relay_stream->stream_handle); if (!viewer_stream) { struct lttng_trace_chunk *viewer_stream_trace_chunk = NULL; @@ -464,10 +448,10 @@ static int make_viewer_streams(struct relay_session *relay_session, * chunk can be used safely. */ if ((relay_stream->ongoing_rotation.is_set || - session_has_ongoing_rotation(relay_session)) && - relay_stream->trace_chunk) { - viewer_stream_trace_chunk = lttng_trace_chunk_copy( - relay_stream->trace_chunk); + session_has_ongoing_rotation(relay_session)) && + relay_stream->trace_chunk) { + viewer_stream_trace_chunk = + lttng_trace_chunk_copy(relay_stream->trace_chunk); if (!viewer_stream_trace_chunk) { ret = -1; ctf_trace_put(ctf_trace); @@ -475,14 +459,14 @@ static int make_viewer_streams(struct relay_session *relay_session, } } else { /* - * Transition the viewer session into the newest trace chunk available. + * Transition the viewer session into the newest trace chunk + * available. */ - if (!lttng_trace_chunk_ids_equal(viewer_session->current_trace_chunk, - relay_stream->trace_chunk)) { - + if (!lttng_trace_chunk_ids_equal( + viewer_session->current_trace_chunk, + relay_stream->trace_chunk)) { ret = viewer_session_set_trace_chunk_copy( - viewer_session, - relay_stream->trace_chunk); + viewer_session, relay_stream->trace_chunk); if (ret) { ret = -1; ctf_trace_put(ctf_trace); @@ -503,19 +487,18 @@ static int make_viewer_streams(struct relay_session *relay_session, * clear against a stopped * session). */ - const bool reference_acquired = lttng_trace_chunk_get( + const bool reference_acquired = + lttng_trace_chunk_get( viewer_session->current_trace_chunk); LTTNG_ASSERT(reference_acquired); viewer_stream_trace_chunk = - viewer_session->current_trace_chunk; + viewer_session->current_trace_chunk; } } viewer_stream = viewer_stream_create( - relay_stream, - viewer_stream_trace_chunk, - seek_t); + relay_stream, viewer_stream_trace_chunk, seek_t); lttng_trace_chunk_put(viewer_stream_trace_chunk); viewer_stream_trace_chunk = NULL; if (!viewer_stream) { @@ -546,15 +529,14 @@ static int make_viewer_streams(struct relay_session *relay_session, if (nb_total) { if (relay_stream->is_metadata) { if (!relay_stream->closed || - relay_stream->metadata_received > - viewer_stream->metadata_sent) { + relay_stream->metadata_received > + viewer_stream->metadata_sent) { (*nb_total)++; } } else { if (!relay_stream->closed || - !(((int64_t)(relay_stream->prev_data_seq - - relay_stream->last_net_seq_num)) >= - 0)) { + !(((int64_t) (relay_stream->prev_data_seq - + relay_stream->last_net_seq_num)) >= 0)) { (*nb_total)++; } } @@ -590,8 +572,7 @@ int relayd_live_stop(void) return 0; } -static -int create_sock(void *data, int *out_fd) +static int create_sock(void *data, int *out_fd) { int ret; struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -606,8 +587,7 @@ end: return ret; } -static -int close_sock(void *data, int *in_fd __attribute__((unused))) +static int close_sock(void *data, int *in_fd __attribute__((unused))) { struct lttcomm_sock *sock = (lttcomm_sock *) data; @@ -631,16 +611,14 @@ end: return ret; } -static -struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock, - const char *name) +static struct lttcomm_sock *accept_live_sock(struct lttcomm_sock *listening_sock, const char *name) { int out_fd, ret; struct lttcomm_sock *socks[2] = { listening_sock, NULL }; struct lttcomm_sock *new_sock = NULL; - ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &out_fd, - (const char **) &name, 1, accept_sock, &socks); + ret = fd_tracker_open_unsuspendable_fd( + the_fd_tracker, &out_fd, (const char **) &name, 1, accept_sock, &socks); if (ret) { goto end; } @@ -653,8 +631,7 @@ end: /* * Create and init socket from uri. */ -static -struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) +static struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) { int ret, sock_fd; struct lttcomm_sock *sock = NULL; @@ -674,19 +651,21 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) ret = uri_to_str_url(uri, uri_str, sizeof(uri_str)); uri_str[sizeof(uri_str) - 1] = '\0'; if (ret >= 0) { - ret = asprintf(&formated_name, "%s socket @ %s", name, - uri_str); + ret = asprintf(&formated_name, "%s socket @ %s", name, uri_str); if (ret < 0) { formated_name = NULL; } } - ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, &sock_fd, - (const char **) (formated_name ? &formated_name : NULL), - 1, create_sock, sock); + ret = fd_tracker_open_unsuspendable_fd(the_fd_tracker, + &sock_fd, + (const char **) (formated_name ? &formated_name : + NULL), + 1, + create_sock, + sock); if (ret) { - PERROR("Failed to create \"%s\" socket", - formated_name ?: "Unknown"); + PERROR("Failed to create \"%s\" socket", formated_name ?: "Unknown"); goto error; } DBG("Listening on %s socket %d", name, sock->fd); @@ -700,7 +679,6 @@ struct lttcomm_sock *init_socket(struct lttng_uri *uri, const char *name) ret = sock->ops->listen(sock, -1); if (ret < 0) { goto error; - } free(formated_name); @@ -717,8 +695,7 @@ error: /* * This thread manages the listening for new connections on the network */ -static -void *thread_listener(void *data __attribute__((unused))) +static void *thread_listener(void *data __attribute__((unused))) { int i, ret, err = -1; uint32_t nb_fd; @@ -738,8 +715,7 @@ void *thread_listener(void *data __attribute__((unused))) } /* Pass 2 as size here for the thread quit pipe and control sockets. */ - ret = create_named_thread_poll_set(&events, 2, - "Live listener thread epoll"); + ret = create_named_thread_poll_set(&events, 2, "Live listener thread epoll"); if (ret < 0) { goto error_create_poll; } @@ -761,7 +737,7 @@ void *thread_listener(void *data __attribute__((unused))) DBG("Listener accepting live viewers connections"); -restart: + restart: health_poll_entry(); ret = lttng_poll_wait(&events, -1); health_poll_exit(); @@ -803,15 +779,15 @@ restart: struct lttcomm_sock *newsock; newsock = accept_live_sock(live_control_sock, - "Live socket to client"); + "Live socket to client"); if (!newsock) { PERROR("accepting control sock"); goto error; } DBG("Relay viewer connection accepted socket %d", newsock->fd); - ret = setsockopt(newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val, - sizeof(val)); + ret = setsockopt( + newsock->fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); if (ret < 0) { PERROR("setsockopt inet"); lttcomm_destroy_sock(newsock); @@ -828,8 +804,7 @@ restart: /* Enqueue request for the dispatcher thread. */ cds_wfcq_head_ptr_t head; head.h = &viewer_conn_queue.head; - cds_wfcq_enqueue(head, &viewer_conn_queue.tail, - &new_conn->qnode); + cds_wfcq_enqueue(head, &viewer_conn_queue.tail, &new_conn->qnode); /* * Wake the dispatch queue futex. @@ -856,9 +831,8 @@ error_create_poll: if (live_control_sock->fd >= 0) { int sock_fd = live_control_sock->fd; - ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, - &sock_fd, 1, close_sock, - live_control_sock); + ret = fd_tracker_close_unsuspendable_fd( + the_fd_tracker, &sock_fd, 1, close_sock, live_control_sock); if (ret) { PERROR("close"); } @@ -882,8 +856,7 @@ error_sock_control: /* * This thread manages the dispatching of the requests to worker threads */ -static -void *thread_dispatcher(void *data __attribute__((unused))) +static void *thread_dispatcher(void *data __attribute__((unused))) { int err = -1; ssize_t ret; @@ -918,13 +891,12 @@ void *thread_dispatcher(void *data __attribute__((unused))) &viewer_conn_queue.tail); if (node == NULL) { DBG("Woken up but nothing in the live-viewer " - "relay command queue"); + "relay command queue"); /* Continue thread execution */ break; } conn = lttng::utils::container_of(node, &relay_connection::qnode); - DBG("Dispatching viewer request waiting on sock %d", - conn->sock->fd); + DBG("Dispatching viewer request waiting on sock %d", conn->sock->fd); /* * Inform worker thread of the new request. This @@ -968,8 +940,7 @@ error_testpoint: * * Return 0 on success or else negative value. */ -static -int viewer_connect(struct relay_connection *conn) +static int viewer_connect(struct relay_connection *conn) { int ret; struct lttng_viewer_connect reply, msg; @@ -992,7 +963,8 @@ int viewer_connect(struct relay_connection *conn) /* Major versions must be the same */ if (reply.major != be32toh(msg.major)) { DBG("Incompatible major versions ([relayd] %u vs [client] %u)", - reply.major, be32toh(msg.major)); + reply.major, + be32toh(msg.major)); ret = -1; goto end; } @@ -1053,8 +1025,7 @@ end: * * Return 0 on success or else a negative value. */ -static -int viewer_list_sessions(struct relay_connection *conn) +static int viewer_list_sessions(struct relay_connection *conn) { int ret = 0; struct lttng_viewer_list_sessions session_list; @@ -1070,8 +1041,7 @@ int viewer_list_sessions(struct relay_connection *conn) } rcu_read_lock(); - cds_lfht_for_each_entry(sessions_ht->ht, &iter.iter, session, - session_n.node) { + cds_lfht_for_each_entry (sessions_ht->ht, &iter.iter, session, session_n.node) { struct lttng_viewer_session *send_session; health_code_update(); @@ -1086,8 +1056,8 @@ int viewer_list_sessions(struct relay_connection *conn) struct lttng_viewer_session *newbuf; uint32_t new_buf_count = buf_count << 1; - newbuf = (lttng_viewer_session *) realloc(send_session_buf, - new_buf_count * sizeof(*send_session_buf)); + newbuf = (lttng_viewer_session *) realloc( + send_session_buf, new_buf_count * sizeof(*send_session_buf)); if (!newbuf) { ret = -1; goto break_loop; @@ -1097,13 +1067,14 @@ int viewer_list_sessions(struct relay_connection *conn) } send_session = &send_session_buf[count]; if (lttng_strncpy(send_session->session_name, - session->session_name, - sizeof(send_session->session_name))) { + session->session_name, + sizeof(send_session->session_name))) { ret = -1; goto break_loop; } - if (lttng_strncpy(send_session->hostname, session->hostname, - sizeof(send_session->hostname))) { + if (lttng_strncpy(send_session->hostname, + session->hostname, + sizeof(send_session->hostname))) { ret = -1; goto break_loop; } @@ -1139,8 +1110,7 @@ int viewer_list_sessions(struct relay_connection *conn) health_code_update(); - ret = send_response(conn->sock, send_session_buf, - count * sizeof(*send_session_buf)); + ret = send_response(conn->sock, send_session_buf, count * sizeof(*send_session_buf)); if (ret < 0) { goto end_free; } @@ -1155,8 +1125,7 @@ end_free: /* * Send the viewer the list of current streams. */ -static -int viewer_get_new_streams(struct relay_connection *conn) +static int viewer_get_new_streams(struct relay_connection *conn) { int ret, send_streams = 0; uint32_t nb_created = 0, nb_unsent = 0, nb_streams = 0, nb_total = 0; @@ -1214,9 +1183,12 @@ int viewer_get_new_streams(struct relay_connection *conn) goto send_reply_unlock; } ret = make_viewer_streams(session, - conn->viewer_session, - LTTNG_VIEWER_SEEK_BEGINNING, &nb_total, &nb_unsent, - &nb_created, &closed); + conn->viewer_session, + LTTNG_VIEWER_SEEK_BEGINNING, + &nb_total, + &nb_unsent, + &nb_created, + &closed); if (ret < 0) { /* * This is caused by an internal error; propagate the negative @@ -1283,8 +1255,7 @@ error: /* * Send the viewer the list of current sessions. */ -static -int viewer_attach_session(struct relay_connection *conn) +static int viewer_attach_session(struct relay_connection *conn) { int send_streams = 0; ssize_t ret; @@ -1316,7 +1287,7 @@ int viewer_attach_session(struct relay_connection *conn) if (!conn->viewer_session) { viewer_attach_status = LTTNG_VIEWER_ATTACH_NO_SESSION; DBG("Client trying to attach before creating a live viewer session, returning status=%s", - lttng_viewer_attach_return_code_str(viewer_attach_status)); + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1324,8 +1295,8 @@ int viewer_attach_session(struct relay_connection *conn) if (!session) { viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; DBG("Relay session %" PRIu64 " not found, returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } DBG("Attach relay session ID %" PRIu64 " received", session_id); @@ -1334,18 +1305,17 @@ int viewer_attach_session(struct relay_connection *conn) if (session->live_timer == 0) { viewer_attach_status = LTTNG_VIEWER_ATTACH_NOT_LIVE; DBG("Relay session ID %" PRIu64 " is not a live session, returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } send_streams = 1; - viewer_attach_status = viewer_session_attach(conn->viewer_session, - session); + viewer_attach_status = viewer_session_attach(conn->viewer_session, session); if (viewer_attach_status != LTTNG_VIEWER_ATTACH_OK) { DBG("Error attaching to relay session %" PRIu64 ", returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1356,9 +1326,9 @@ int viewer_attach_session(struct relay_connection *conn) seek_type = (lttng_viewer_seek) be32toh(request.seek); break; default: - ERR("Wrong seek parameter for relay session %" PRIu64 - ", returning status=%s", session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + ERR("Wrong seek parameter for relay session %" PRIu64 ", returning status=%s", + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); viewer_attach_status = LTTNG_VIEWER_ATTACH_SEEK_ERR; send_streams = 0; goto send_reply; @@ -1375,9 +1345,8 @@ int viewer_attach_session(struct relay_connection *conn) goto send_reply; } - ret = make_viewer_streams(session, - conn->viewer_session, seek_type, - &nb_streams, NULL, NULL, &closed); + ret = make_viewer_streams( + session, conn->viewer_session, seek_type, &nb_streams, NULL, NULL, &closed); if (ret < 0) { goto end_put_session; } @@ -1397,8 +1366,8 @@ int viewer_attach_session(struct relay_connection *conn) response.streams_count = 0; viewer_attach_status = LTTNG_VIEWER_ATTACH_UNK; ERR("Session %" PRIu64 " is closed, returning status=%s", - session_id, - lttng_viewer_attach_return_code_str(viewer_attach_status)); + session_id, + lttng_viewer_attach_return_code_str(viewer_attach_status)); goto send_reply; } @@ -1447,8 +1416,7 @@ error: * * Called with rstream lock held. */ -static int try_open_index(struct relay_viewer_stream *vstream, - struct relay_stream *rstream) +static int try_open_index(struct relay_viewer_stream *vstream, struct relay_stream *rstream) { int ret = 0; const uint32_t connection_major = rstream->trace->session->major; @@ -1462,19 +1430,21 @@ static int try_open_index(struct relay_viewer_stream *vstream, /* * First time, we open the index file and at least one index is ready. */ - if (rstream->index_received_seqcount == 0 || - !vstream->stream_file.trace_chunk) { + if (rstream->index_received_seqcount == 0 || !vstream->stream_file.trace_chunk) { ret = -ENOENT; goto end; } chunk_status = lttng_index_file_create_from_trace_chunk_read_only( - vstream->stream_file.trace_chunk, rstream->path_name, - rstream->channel_name, rstream->tracefile_size, - vstream->current_tracefile_id, - lttng_to_index_major(connection_major, connection_minor), - lttng_to_index_minor(connection_major, connection_minor), - true, &vstream->index_file); + vstream->stream_file.trace_chunk, + rstream->path_name, + rstream->channel_name, + rstream->tracefile_size, + vstream->current_tracefile_id, + lttng_to_index_major(connection_major, connection_minor), + lttng_to_index_minor(connection_major, connection_minor), + true, + &vstream->index_file); if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { ret = -ENOENT; @@ -1499,37 +1469,37 @@ end: * Called with rstream lock held. */ static int check_index_status(struct relay_viewer_stream *vstream, - struct relay_stream *rstream, struct ctf_trace *trace, - struct lttng_viewer_index *index) + struct relay_stream *rstream, + struct ctf_trace *trace, + struct lttng_viewer_index *index) { int ret; DBG("Check index status: index_received_seqcount %" PRIu64 " " - "index_sent_seqcount %" PRIu64 " " - "for stream %" PRIu64, - rstream->index_received_seqcount, - vstream->index_sent_seqcount, - vstream->stream->stream_handle); - if ((trace->session->connection_closed || rstream->closed) - && rstream->index_received_seqcount - == vstream->index_sent_seqcount) { + "index_sent_seqcount %" PRIu64 " " + "for stream %" PRIu64, + rstream->index_received_seqcount, + vstream->index_sent_seqcount, + vstream->stream->stream_handle); + if ((trace->session->connection_closed || rstream->closed) && + rstream->index_received_seqcount == vstream->index_sent_seqcount) { /* * Last index sent and session connection or relay * stream are closed. */ index->status = LTTNG_VIEWER_INDEX_HUP; DBG("Check index status: Connection or stream are closed, stream %" PRIu64 - ",connection-closed=%d, relay-stream-closed=%d, returning status=%s", - vstream->stream->stream_handle, - trace->session->connection_closed, rstream->closed, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + ",connection-closed=%d, relay-stream-closed=%d, returning status=%s", + vstream->stream->stream_handle, + trace->session->connection_closed, + rstream->closed, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto hup; } else if (rstream->beacon_ts_end != -1ULL && - (rstream->index_received_seqcount == 0 || - (vstream->index_sent_seqcount != 0 && - rstream->index_received_seqcount - <= vstream->index_sent_seqcount))) { + (rstream->index_received_seqcount == 0 || + (vstream->index_sent_seqcount != 0 && + rstream->index_received_seqcount <= vstream->index_sent_seqcount))) { /* * We've received a synchronization beacon and the last index * available has been sent, the index for now is inactive. @@ -1549,15 +1519,14 @@ static int check_index_status(struct relay_viewer_stream *vstream, index->timestamp_end = htobe64(rstream->beacon_ts_end); index->stream_id = htobe64(rstream->ctf_stream_id); DBG("Check index status: inactive with beacon, for stream %" PRIu64 - ", returning status=%s", - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } else if (rstream->index_received_seqcount == 0 || - (vstream->index_sent_seqcount != 0 && - rstream->index_received_seqcount - <= vstream->index_sent_seqcount)) { + (vstream->index_sent_seqcount != 0 && + rstream->index_received_seqcount <= vstream->index_sent_seqcount)) { /* * This checks whether received <= sent seqcount. In * this case, we have not received a beacon. Therefore, @@ -1571,32 +1540,29 @@ static int check_index_status(struct relay_viewer_stream *vstream, */ index->status = LTTNG_VIEWER_INDEX_RETRY; DBG("Check index status:" - "did not received beacon for stream %" PRIu64 - ", returning status=%s", - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + "did not received beacon for stream %" PRIu64 ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } else if (!tracefile_array_seq_in_file(rstream->tfa, - vstream->current_tracefile_id, - vstream->index_sent_seqcount)) { + vstream->current_tracefile_id, + vstream->index_sent_seqcount)) { /* * The next index we want to send cannot be read either * because we need to perform a rotation, or due to * the producer having overwritten its trace file. */ - DBG("Viewer stream %" PRIu64 " rotation", - vstream->stream->stream_handle); + DBG("Viewer stream %" PRIu64 " rotation", vstream->stream->stream_handle); ret = viewer_stream_rotate(vstream); if (ret == 1) { /* EOF across entire stream. */ index->status = LTTNG_VIEWER_INDEX_HUP; DBG("Check index status:" - "reached end of file for stream %" PRIu64 - ", returning status=%s", - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + "reached end of file for stream %" PRIu64 ", returning status=%s", + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto hup; } /* @@ -1614,24 +1580,21 @@ static int check_index_status(struct relay_viewer_stream *vstream, * still unavailable. */ if (rstream->tracefile_count == 1 && - !tracefile_array_seq_in_file( - rstream->tfa, - vstream->current_tracefile_id, - vstream->index_sent_seqcount)) { + !tracefile_array_seq_in_file(rstream->tfa, + vstream->current_tracefile_id, + vstream->index_sent_seqcount)) { index->status = LTTNG_VIEWER_INDEX_RETRY; DBG("Check index status:" - "tracefile array sequence number %" PRIu64 - " not in file for stream %" PRIu64 - ", returning status=%s", - vstream->index_sent_seqcount, - vstream->stream->stream_handle, - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) index->status)); + "tracefile array sequence number %" PRIu64 + " not in file for stream %" PRIu64 ", returning status=%s", + vstream->index_sent_seqcount, + vstream->stream->stream_handle, + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) index->status)); goto index_ready; } - LTTNG_ASSERT(tracefile_array_seq_in_file(rstream->tfa, - vstream->current_tracefile_id, - vstream->index_sent_seqcount)); + LTTNG_ASSERT(tracefile_array_seq_in_file( + rstream->tfa, vstream->current_tracefile_id, vstream->index_sent_seqcount)); } /* ret == 0 means successful so we continue. */ ret = 0; @@ -1643,15 +1606,13 @@ index_ready: return 1; } -static -void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, - struct lttng_trace_chunk *new_trace_chunk) +static void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, + struct lttng_trace_chunk *new_trace_chunk) { lttng_trace_chunk_put(vstream->stream_file.trace_chunk); if (new_trace_chunk) { - const bool acquired_reference = lttng_trace_chunk_get( - new_trace_chunk); + const bool acquired_reference = lttng_trace_chunk_get(new_trace_chunk); LTTNG_ASSERT(acquired_reference); } @@ -1666,8 +1627,7 @@ void viewer_stream_rotate_to_trace_chunk(struct relay_viewer_stream *vstream, * * Return 0 on success or else a negative value. */ -static -int viewer_get_next_index(struct relay_connection *conn) +static int viewer_get_next_index(struct relay_connection *conn) { int ret; struct lttng_viewer_get_next_index request_index; @@ -1695,10 +1655,10 @@ int viewer_get_next_index(struct relay_connection *conn) vstream = viewer_stream_get_by_id(be64toh(request_index.stream_id)); if (!vstream) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - DBG("Client requested index of unknown stream id %" PRIu64", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index of unknown stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } @@ -1707,8 +1667,7 @@ int viewer_get_next_index(struct relay_connection *conn) ctf_trace = rstream->trace; /* metadata_viewer_stream may be NULL. */ - metadata_viewer_stream = - ctf_trace_get_viewer_metadata_stream(ctf_trace); + metadata_viewer_stream = ctf_trace_get_viewer_metadata_stream(ctf_trace); /* * Hold the session lock to protect against concurrent changes @@ -1724,51 +1683,52 @@ int viewer_get_next_index(struct relay_connection *conn) */ if (rstream->is_metadata) { viewer_index.status = LTTNG_VIEWER_INDEX_HUP; - DBG("Client requested index of a metadata stream id %" PRIu64", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index of a metadata stream id %" PRIu64 + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } if (rstream->ongoing_rotation.is_set) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; - DBG("Client requested index for stream id %" PRIu64" while a stream rotation is ongoing, returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index for stream id %" PRIu64 + " while a stream rotation is ongoing, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } if (session_has_ongoing_rotation(rstream->trace->session)) { /* Rotation is ongoing, try again later. */ viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; - DBG("Client requested index for stream id %" PRIu64" while a session rotation is ongoing, returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Client requested index for stream id %" PRIu64 + " while a session rotation is ongoing, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } /* * Transition the viewer session into the newest trace chunk available. */ - if (!lttng_trace_chunk_ids_equal( - conn->viewer_session->current_trace_chunk, - rstream->trace_chunk)) { + if (!lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + rstream->trace_chunk)) { DBG("Relay stream and viewer chunk ids differ"); - ret = viewer_session_set_trace_chunk_copy( - conn->viewer_session, - rstream->trace_chunk); + ret = viewer_session_set_trace_chunk_copy(conn->viewer_session, + rstream->trace_chunk); if (ret) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; ERR("Error copying trace chunk for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } } @@ -1785,53 +1745,49 @@ int viewer_get_next_index(struct relay_connection *conn) * after a session's destruction. */ if (vstream->stream_file.trace_chunk) { - status = lttng_trace_chunk_get_id( - vstream->stream_file.trace_chunk, - &stream_file_chunk_id); + status = lttng_trace_chunk_get_id(vstream->stream_file.trace_chunk, + &stream_file_chunk_id); LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); } if (conn->viewer_session->current_trace_chunk) { - status = lttng_trace_chunk_get_id( - conn->viewer_session->current_trace_chunk, - &viewer_session_chunk_id); + status = lttng_trace_chunk_get_id(conn->viewer_session->current_trace_chunk, + &viewer_session_chunk_id); LTTNG_ASSERT(status == LTTNG_TRACE_CHUNK_STATUS_OK); } viewer_stream_and_session_in_same_chunk = lttng_trace_chunk_ids_equal( - conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk); + conn->viewer_session->current_trace_chunk, vstream->stream_file.trace_chunk); viewer_stream_one_rotation_behind = rstream->completed_rotation_count == - vstream->last_seen_rotation_count + 1; + vstream->last_seen_rotation_count + 1; if (viewer_stream_and_session_in_same_chunk) { DBG("Transition to latest chunk check (%s -> %s): Same chunk, no need to rotate", - vstream->stream_file.trace_chunk ? - std::to_string(stream_file_chunk_id).c_str() : - "None", - conn->viewer_session->current_trace_chunk ? - std::to_string(viewer_session_chunk_id).c_str() : - "None"); + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); } else if (viewer_stream_one_rotation_behind && !rstream->trace_chunk) { DBG("Transition to latest chunk check (%s -> %s): One chunk behind relay stream which is being destroyed, no need to rotate", - vstream->stream_file.trace_chunk ? - std::to_string(stream_file_chunk_id).c_str() : - "None", - conn->viewer_session->current_trace_chunk ? - std::to_string(viewer_session_chunk_id).c_str() : - "None"); + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); } else { DBG("Transition to latest chunk check (%s -> %s): Viewer stream chunk ID and viewer session chunk ID differ, rotating viewer stream", - vstream->stream_file.trace_chunk ? - std::to_string(stream_file_chunk_id).c_str() : - "None", - conn->viewer_session->current_trace_chunk ? - std::to_string(viewer_session_chunk_id).c_str() : - "None"); + vstream->stream_file.trace_chunk ? + std::to_string(stream_file_chunk_id).c_str() : + "None", + conn->viewer_session->current_trace_chunk ? + std::to_string(viewer_session_chunk_id).c_str() : + "None"); viewer_stream_rotate_to_trace_chunk(vstream, - conn->viewer_session->current_trace_chunk); - vstream->last_seen_rotation_count = - rstream->completed_rotation_count; + conn->viewer_session->current_trace_chunk); + vstream->last_seen_rotation_count = rstream->completed_rotation_count; } ret = check_index_status(vstream, rstream, ctf_trace, &viewer_index); @@ -1850,31 +1806,29 @@ int viewer_get_next_index(struct relay_connection *conn) /* Try to open an index if one is needed for that stream. */ ret = try_open_index(vstream, rstream); if (ret == -ENOENT) { - if (rstream->closed) { + if (rstream->closed) { viewer_index.status = LTTNG_VIEWER_INDEX_HUP; DBG("Cannot open index for stream id %" PRIu64 - "stream is closed, returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + "stream is closed, returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; - } else { + } else { viewer_index.status = LTTNG_VIEWER_INDEX_RETRY; - DBG("Cannot open index for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Cannot open index for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; - } + } } if (ret < 0) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Error opening index for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ERR("Error opening index for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } @@ -1890,9 +1844,12 @@ int viewer_get_next_index(struct relay_connection *conn) struct fs_handle *fs_handle; ret = utils_stream_file_path(rstream->path_name, - rstream->channel_name, rstream->tracefile_size, - vstream->current_tracefile_id, NULL, file_path, - sizeof(file_path)); + rstream->channel_name, + rstream->tracefile_size, + vstream->current_tracefile_id, + NULL, + file_path, + sizeof(file_path)); if (ret < 0) { goto error_put; } @@ -1903,17 +1860,16 @@ int viewer_get_next_index(struct relay_connection *conn) * per-pid buffers) and a clear command has been performed. */ status = lttng_trace_chunk_open_fs_handle( - vstream->stream_file.trace_chunk, - file_path, O_RDONLY, 0, &fs_handle, true); + vstream->stream_file.trace_chunk, file_path, O_RDONLY, 0, &fs_handle, true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { - if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && - rstream->closed) { + if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE && rstream->closed) { viewer_index.status = LTTNG_VIEWER_INDEX_HUP; DBG("Cannot find trace chunk file and stream is closed for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) + viewer_index.status)); goto send_reply; } PERROR("Failed to open trace file for viewer stream"); @@ -1926,10 +1882,10 @@ int viewer_get_next_index(struct relay_connection *conn) if (ret < 0) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; ERR("Error checking for new streams before sending new index to stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } else if (ret == 1) { viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; @@ -1938,19 +1894,17 @@ int viewer_get_next_index(struct relay_connection *conn) ret = lttng_index_file_read(vstream->index_file, &packet_index); if (ret) { viewer_index.status = LTTNG_VIEWER_INDEX_ERR; - ERR("Relay error reading index file for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + ERR("Relay error reading index file for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); goto send_reply; } else { viewer_index.status = LTTNG_VIEWER_INDEX_OK; - DBG("Read index file for stream id %" PRIu64 - ", returning status=%s", - (uint64_t) be64toh(request_index.stream_id), - lttng_viewer_next_index_return_code_str( - (enum lttng_viewer_next_index_return_code) viewer_index.status)); + DBG("Read index file for stream id %" PRIu64 ", returning status=%s", + (uint64_t) be64toh(request_index.stream_id), + lttng_viewer_next_index_return_code_str( + (enum lttng_viewer_next_index_return_code) viewer_index.status)); vstream->index_sent_seqcount++; } @@ -1958,8 +1912,8 @@ int viewer_get_next_index(struct relay_connection *conn) * Indexes are stored in big endian, no need to switch before sending. */ DBG("Sending viewer index for stream %" PRIu64 " offset %" PRIu64, - rstream->stream_handle, - (uint64_t) be64toh(packet_index.offset)); + rstream->stream_handle, + (uint64_t) be64toh(packet_index.offset)); viewer_index.offset = packet_index.offset; viewer_index.packet_size = packet_index.packet_size; viewer_index.content_size = packet_index.content_size; @@ -1976,13 +1930,12 @@ send_reply: if (metadata_viewer_stream) { pthread_mutex_lock(&metadata_viewer_stream->stream->lock); - DBG("get next index metadata check: recv %" PRIu64 - " sent %" PRIu64, - metadata_viewer_stream->stream->metadata_received, - metadata_viewer_stream->metadata_sent); + DBG("get next index metadata check: recv %" PRIu64 " sent %" PRIu64, + metadata_viewer_stream->stream->metadata_received, + metadata_viewer_stream->metadata_sent); if (!metadata_viewer_stream->stream->metadata_received || - metadata_viewer_stream->stream->metadata_received > - metadata_viewer_stream->metadata_sent) { + metadata_viewer_stream->stream->metadata_received > + metadata_viewer_stream->metadata_sent) { viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_METADATA; } pthread_mutex_unlock(&metadata_viewer_stream->stream->lock); @@ -2000,8 +1953,8 @@ send_reply: if (vstream) { DBG("Index %" PRIu64 " for stream %" PRIu64 " sent", - vstream->index_sent_seqcount, - vstream->stream->stream_handle); + vstream->index_sent_seqcount, + vstream->stream->stream_handle); } end: if (metadata_viewer_stream) { @@ -2027,8 +1980,7 @@ error_put: * * Return 0 on success or else a negative value. */ -static -int viewer_get_packet(struct relay_connection *conn) +static int viewer_get_packet(struct relay_connection *conn) { int ret; off_t lseek_ret; @@ -2044,8 +1996,7 @@ int viewer_get_packet(struct relay_connection *conn) health_code_update(); - ret = recv_request(conn->sock, &get_packet_info, - sizeof(get_packet_info)); + ret = recv_request(conn->sock, &get_packet_info, sizeof(get_packet_info)); if (ret < 0) { goto end; } @@ -2058,9 +2009,9 @@ int viewer_get_packet(struct relay_connection *conn) vstream = viewer_stream_get_by_id(stream_id); if (!vstream) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; - DBG("Client requested packet of unknown stream id %" PRIu64 - ", returning status=%s", stream_id, - lttng_viewer_get_packet_return_code_str(get_packet_status)); + DBG("Client requested packet of unknown stream id %" PRIu64 ", returning status=%s", + stream_id, + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto send_reply_nolock; } else { packet_data_len = be32toh(get_packet_info.len); @@ -2071,29 +2022,31 @@ int viewer_get_packet(struct relay_connection *conn) if (!reply) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Falled to allocate reply, returning status=%s", - lttng_viewer_get_packet_return_code_str(get_packet_status)); + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } pthread_mutex_lock(&vstream->stream->lock); - lseek_ret = fs_handle_seek(vstream->stream_file.handle, - be64toh(get_packet_info.offset), SEEK_SET); + lseek_ret = fs_handle_seek( + vstream->stream_file.handle, be64toh(get_packet_info.offset), SEEK_SET); if (lseek_ret < 0) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to seek file system handle of viewer stream %" PRIu64 - " to offset %" PRIu64", returning status=%s", stream_id, - (uint64_t) be64toh(get_packet_info.offset), - lttng_viewer_get_packet_return_code_str(get_packet_status)); + " to offset %" PRIu64 ", returning status=%s", + stream_id, + (uint64_t) be64toh(get_packet_info.offset), + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } - read_len = fs_handle_read(vstream->stream_file.handle, - reply + sizeof(reply_header), packet_data_len); + read_len = fs_handle_read( + vstream->stream_file.handle, reply + sizeof(reply_header), packet_data_len); if (read_len < packet_data_len) { get_packet_status = LTTNG_VIEWER_GET_PACKET_ERR; PERROR("Failed to read from file system handle of viewer stream id %" PRIu64 - ", offset: %" PRIu64 ", returning status=%s", stream_id, + ", offset: %" PRIu64 ", returning status=%s", + stream_id, (uint64_t) be64toh(get_packet_info.offset), - lttng_viewer_get_packet_return_code_str(get_packet_status)); + lttng_viewer_get_packet_return_code_str(get_packet_status)); goto error; } @@ -2119,8 +2072,7 @@ send_reply_nolock: ret = send_response(conn->sock, reply, reply_size); } else { /* No reply to send. */ - ret = send_response(conn->sock, &reply_header, - reply_size); + ret = send_response(conn->sock, &reply_header, reply_size); } health_code_update(); @@ -2145,8 +2097,7 @@ end: * * Return 0 on success else a negative value. */ -static -int viewer_get_metadata(struct relay_connection *conn) +static int viewer_get_metadata(struct relay_connection *conn) { int ret = 0; int fd = -1; @@ -2180,7 +2131,7 @@ int viewer_get_metadata(struct relay_connection *conn) * find it. */ DBG("Client requested metadata of unknown stream id %" PRIu64, - (uint64_t) be64toh(request.stream_id)); + (uint64_t) be64toh(request.stream_id)); reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } @@ -2222,15 +2173,13 @@ int viewer_get_metadata(struct relay_connection *conn) } if (vstream->stream->trace_chunk && - !lttng_trace_chunk_ids_equal( - conn->viewer_session->current_trace_chunk, - vstream->stream->trace_chunk)) { + !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + vstream->stream->trace_chunk)) { /* A rotation has occurred on the relay stream. */ DBG("Metadata relay stream and viewer chunk ids differ"); - ret = viewer_session_set_trace_chunk_copy( - conn->viewer_session, - vstream->stream->trace_chunk); + ret = viewer_session_set_trace_chunk_copy(conn->viewer_session, + vstream->stream->trace_chunk); if (ret) { reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; @@ -2238,19 +2187,19 @@ int viewer_get_metadata(struct relay_connection *conn) } if (conn->viewer_session->current_trace_chunk && - !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk)) { + !lttng_trace_chunk_ids_equal(conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk)) { bool acquired_reference; DBG("Viewer session and viewer stream chunk differ: " - "vsession chunk %p vstream chunk %p", - conn->viewer_session->current_trace_chunk, - vstream->stream_file.trace_chunk); + "vsession chunk %p vstream chunk %p", + conn->viewer_session->current_trace_chunk, + vstream->stream_file.trace_chunk); lttng_trace_chunk_put(vstream->stream_file.trace_chunk); - acquired_reference = lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); + acquired_reference = + lttng_trace_chunk_get(conn->viewer_session->current_trace_chunk); LTTNG_ASSERT(acquired_reference); - vstream->stream_file.trace_chunk = - conn->viewer_session->current_trace_chunk; + vstream->stream_file.trace_chunk = conn->viewer_session->current_trace_chunk; viewer_stream_close_files(vstream); } @@ -2260,8 +2209,7 @@ int viewer_get_metadata(struct relay_connection *conn) reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); len = 0; goto send_reply; - } else if (vstream->stream_file.trace_chunk && - !vstream->stream_file.handle && len > 0) { + } else if (vstream->stream_file.trace_chunk && !vstream->stream_file.handle && len > 0) { /* * Either this is the first time the metadata file is read, or a * rotation of the corresponding relay stream has occurred. @@ -2272,9 +2220,12 @@ int viewer_get_metadata(struct relay_connection *conn) struct relay_stream *rstream = vstream->stream; ret = utils_stream_file_path(rstream->path_name, - rstream->channel_name, rstream->tracefile_size, - vstream->current_tracefile_id, NULL, file_path, - sizeof(file_path)); + rstream->channel_name, + rstream->tracefile_size, + vstream->current_tracefile_id, + NULL, + file_path, + sizeof(file_path)); if (ret < 0) { goto error; } @@ -2285,8 +2236,7 @@ int viewer_get_metadata(struct relay_connection *conn) * per-pid buffers) and a clear command has been performed. */ status = lttng_trace_chunk_open_fs_handle( - vstream->stream_file.trace_chunk, - file_path, O_RDONLY, 0, &fs_handle, true); + vstream->stream_file.trace_chunk, file_path, O_RDONLY, 0, &fs_handle, true); if (status != LTTNG_TRACE_CHUNK_STATUS_OK) { if (status == LTTNG_TRACE_CHUNK_STATUS_NO_FILE) { reply.status = htobe32(LTTNG_VIEWER_NO_NEW_METADATA); @@ -2317,12 +2267,12 @@ int viewer_get_metadata(struct relay_connection *conn) * safe to assume that * `metadata_received` > `metadata_sent`. */ - const off_t seek_ret = fs_handle_seek(fs_handle, - vstream->metadata_sent, SEEK_SET); + const off_t seek_ret = + fs_handle_seek(fs_handle, vstream->metadata_sent, SEEK_SET); if (seek_ret < 0) { PERROR("Failed to seek metadata viewer stream file to `sent` position: pos = %" PRId64, - vstream->metadata_sent); + vstream->metadata_sent); reply.status = htobe32(LTTNG_VIEWER_METADATA_ERR); goto send_reply; } @@ -2359,12 +2309,12 @@ int viewer_get_metadata(struct relay_connection *conn) * attempt to parse an incomplete (incoherent) metadata * stream, which would result in an error. */ - const off_t seek_ret = fs_handle_seek( - vstream->stream_file.handle, -read_len, - SEEK_CUR); + const off_t seek_ret = + fs_handle_seek(vstream->stream_file.handle, -read_len, SEEK_CUR); DBG("Failed to read metadata: requested = %" PRIu64 ", got = %zd", - len, read_len); + len, + read_len); read_len = 0; len = 0; if (seek_ret < 0) { @@ -2400,8 +2350,9 @@ send_reply: } } - DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, len, - (uint64_t) be64toh(request.stream_id)); + DBG("Sent %" PRIu64 " bytes of metadata for stream %" PRIu64, + len, + (uint64_t) be64toh(request.stream_id)); DBG("Metadata sent"); @@ -2419,8 +2370,7 @@ end: * * Return 0 on success or else a negative value. */ -static -int viewer_create_session(struct relay_connection *conn) +static int viewer_create_session(struct relay_connection *conn) { int ret; struct lttng_viewer_create_session_response resp; @@ -2452,8 +2402,7 @@ end: * * Return 0 on success or else a negative value. */ -static -int viewer_detach_session(struct relay_connection *conn) +static int viewer_detach_session(struct relay_connection *conn) { int ret; struct lttng_viewer_detach_session_response response; @@ -2485,8 +2434,7 @@ int viewer_detach_session(struct relay_connection *conn) session = session_get_by_id(be64toh(request.session_id)); if (!session) { - DBG("Relay session %" PRIu64 " not found", - (uint64_t) be64toh(request.session_id)); + DBG("Relay session %" PRIu64 " not found", (uint64_t) be64toh(request.session_id)); response.status = htobe32(LTTNG_VIEWER_DETACH_SESSION_UNK); goto send_reply; } @@ -2521,8 +2469,7 @@ end: /* * live_relay_unknown_command: send -1 if received unknown command */ -static -void live_relay_unknown_command(struct relay_connection *conn) +static void live_relay_unknown_command(struct relay_connection *conn) { struct lttcomm_relayd_generic_reply reply; @@ -2534,13 +2481,10 @@ void live_relay_unknown_command(struct relay_connection *conn) /* * Process the commands received on the control socket */ -static -int process_control(struct lttng_viewer_cmd *recv_hdr, - struct relay_connection *conn) +static int process_control(struct lttng_viewer_cmd *recv_hdr, struct relay_connection *conn) { int ret = 0; - lttng_viewer_command cmd = - (lttng_viewer_command) be32toh(recv_hdr->cmd); + lttng_viewer_command cmd = (lttng_viewer_command) be32toh(recv_hdr->cmd); /* * Make sure we've done the version check before any command other then @@ -2548,13 +2492,15 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, */ if (cmd != LTTNG_VIEWER_CONNECT && !conn->version_check_done) { ERR("Viewer on connection %d requested %s command before version check", - conn->sock->fd, lttng_viewer_command_str(cmd)); + conn->sock->fd, + lttng_viewer_command_str(cmd)); ret = -1; goto end; } DBG("Processing %s viewer command from connection %d", - lttng_viewer_command_str(cmd), conn->sock->fd); + lttng_viewer_command_str(cmd), + conn->sock->fd); switch (cmd) { case LTTNG_VIEWER_CONNECT: @@ -2585,8 +2531,7 @@ int process_control(struct lttng_viewer_cmd *recv_hdr, ret = viewer_detach_session(conn); break; default: - ERR("Received unknown viewer command (%u)", - be32toh(recv_hdr->cmd)); + ERR("Received unknown viewer command (%u)", be32toh(recv_hdr->cmd)); live_relay_unknown_command(conn); ret = -1; goto end; @@ -2596,15 +2541,14 @@ end: return ret; } -static -void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) +static void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) { int ret; (void) lttng_poll_del(events, pollfd); - ret = fd_tracker_close_unsuspendable_fd(the_fd_tracker, &pollfd, 1, - fd_tracker_util_close_fd, NULL); + ret = fd_tracker_close_unsuspendable_fd( + the_fd_tracker, &pollfd, 1, fd_tracker_util_close_fd, NULL); if (ret < 0) { ERR("Closing pollfd %d", pollfd); } @@ -2613,8 +2557,7 @@ void cleanup_connection_pollfd(struct lttng_poll_event *events, int pollfd) /* * This thread does the actual work */ -static -void *thread_worker(void *data __attribute__((unused))) +static void *thread_worker(void *data __attribute__((unused))) { int ret, err = -1; uint32_t nb_fd; @@ -2640,8 +2583,7 @@ void *thread_worker(void *data __attribute__((unused))) goto viewer_connections_ht_error; } - ret = create_named_thread_poll_set(&events, 2, - "Live viewer worker thread epoll"); + ret = create_named_thread_poll_set(&events, 2, "Live viewer worker thread epoll"); if (ret < 0) { goto error_poll_create; } @@ -2698,14 +2640,12 @@ restart: if (revents & LPOLLIN) { struct relay_connection *conn; - ret = lttng_read(live_conn_pipe[0], - &conn, sizeof(conn)); + ret = lttng_read(live_conn_pipe[0], &conn, sizeof(conn)); if (ret < 0) { goto error; } - ret = lttng_poll_add(&events, - conn->sock->fd, - LPOLLIN | LPOLLRDHUP); + ret = lttng_poll_add( + &events, conn->sock->fd, LPOLLIN | LPOLLRDHUP); if (ret) { ERR("Failed to add new live connection file descriptor to poll set"); goto error; @@ -2716,7 +2656,9 @@ restart: ERR("Relay live pipe error"); goto error; } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); + ERR("Unexpected poll events %u for sock %d", + revents, + pollfd); goto error; } } else { @@ -2729,8 +2671,8 @@ restart: } if (revents & LPOLLIN) { - ret = conn->sock->ops->recvmsg(conn->sock, &recv_hdr, - sizeof(recv_hdr), 0); + ret = conn->sock->ops->recvmsg( + conn->sock, &recv_hdr, sizeof(recv_hdr), 0); if (ret <= 0) { /* Connection closed. */ cleanup_connection_pollfd(&events, pollfd); @@ -2744,7 +2686,8 @@ restart: cleanup_connection_pollfd(&events, pollfd); /* Put "create" ownership reference. */ connection_put(conn); - DBG("Viewer connection closed with %d", pollfd); + DBG("Viewer connection closed with %d", + pollfd); } } } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) { @@ -2752,7 +2695,9 @@ restart: /* Put "create" ownership reference. */ connection_put(conn); } else { - ERR("Unexpected poll events %u for sock %d", revents, pollfd); + ERR("Unexpected poll events %u for sock %d", + revents, + pollfd); connection_put(conn); goto error; } @@ -2768,9 +2713,7 @@ error: /* Cleanup remaining connection object. */ rcu_read_lock(); - cds_lfht_for_each_entry(viewer_connections_ht->ht, &iter.iter, - destroy_conn, - sock_n.node) { + cds_lfht_for_each_entry (viewer_connections_ht->ht, &iter.iter, destroy_conn, sock_n.node) { health_code_update(); connection_put(destroy_conn); } @@ -2803,8 +2746,8 @@ error_testpoint: */ static int create_conn_pipe(void) { - return fd_tracker_util_pipe_open_cloexec(the_fd_tracker, - "Live connection pipe", live_conn_pipe); + return fd_tracker_util_pipe_open_cloexec( + the_fd_tracker, "Live connection pipe", live_conn_pipe); } int relayd_live_join(void) @@ -2880,8 +2823,8 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the dispatcher thread */ - ret = pthread_create(&live_dispatcher_thread, default_pthread_attr(), - thread_dispatcher, (void *) NULL); + ret = pthread_create( + &live_dispatcher_thread, default_pthread_attr(), thread_dispatcher, (void *) NULL); if (ret) { errno = ret; PERROR("pthread_create viewer dispatcher"); @@ -2890,8 +2833,7 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the worker thread */ - ret = pthread_create(&live_worker_thread, default_pthread_attr(), - thread_worker, NULL); + ret = pthread_create(&live_worker_thread, default_pthread_attr(), thread_worker, NULL); if (ret) { errno = ret; PERROR("pthread_create viewer worker"); @@ -2900,8 +2842,8 @@ int relayd_live_create(struct lttng_uri *uri) } /* Setup the listener thread */ - ret = pthread_create(&live_listener_thread, default_pthread_attr(), - thread_listener, (void *) NULL); + ret = pthread_create( + &live_listener_thread, default_pthread_attr(), thread_listener, (void *) NULL); if (ret) { errno = ret; PERROR("pthread_create viewer listener");