From a4baae1b0463bc4ce65c2a458c4a941e7fabc594 Mon Sep 17 00:00:00 2001 From: Julien Desfossez Date: Thu, 9 Jan 2014 12:15:26 -0500 Subject: [PATCH] Fix: race with the viewer and readiness of streams Add a message to inform the relayd that all the streams of a certain channels were sent so it can make them available to the viewer. This fixes a race where the viewer could start reading some streams before having received them all. Signed-off-by: Julien Desfossez Signed-off-by: David Goulet --- src/bin/lttng-relayd/live.c | 17 ++- src/bin/lttng-relayd/lttng-relayd.h | 19 ++- src/bin/lttng-relayd/main.c | 126 +++++++++++++++++++ src/bin/lttng-sessiond/consumer.c | 13 ++ src/bin/lttng-sessiond/consumer.h | 3 + src/bin/lttng-sessiond/kernel-consumer.c | 49 ++++++++ src/bin/lttng-sessiond/kernel-consumer.h | 3 + src/common/consumer.c | 38 ++++++ src/common/consumer.h | 2 + src/common/kernel-consumer/kernel-consumer.c | 56 +++++++++ src/common/relayd/relayd.c | 53 ++++++++ src/common/relayd/relayd.h | 1 + src/common/sessiond-comm/sessiond-comm.h | 8 +- src/common/ust-consumer/ust-consumer.c | 21 ++++ 14 files changed, 402 insertions(+), 7 deletions(-) diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index c312e78e9..6864ff4ec 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -854,7 +854,8 @@ static int viewer_attach_session(struct relay_command *cmd, struct lttng_ht *sessions_ht) { - int ret, send_streams = 0, nb_streams = 0; + int ret, send_streams = 0; + uint32_t nb_streams = 0, nb_streams_ready = 0; struct lttng_viewer_attach_session_request request; struct lttng_viewer_attach_session_response response; struct lttng_viewer_stream send_stream; @@ -961,14 +962,16 @@ int viewer_attach_session(struct relay_command *cmd, if (stream->session != cmd->session) { continue; } + nb_streams++; /* - * Don't send streams with no ctf_trace, they are not ready to be - * read. + * Don't send streams with no ctf_trace, they are not + * ready to be read. */ - if (!stream->ctf_trace) { + if (!stream->ctf_trace || !stream->viewer_ready) { continue; } + nb_streams_ready++; vstream = live_find_viewer_stream_by_id(stream->stream_handle); if (!vstream) { @@ -977,7 +980,11 @@ int viewer_attach_session(struct relay_command *cmd, goto end_unlock; } } - nb_streams++; + } + + /* We must have the same amount of existing stream and ready stream. */ + if (nb_streams != nb_streams_ready) { + nb_streams = 0; } response.streams_count = htobe32(nb_streams); } diff --git a/src/bin/lttng-relayd/lttng-relayd.h b/src/bin/lttng-relayd/lttng-relayd.h index bb5b7a365..d0e0473b2 100644 --- a/src/bin/lttng-relayd/lttng-relayd.h +++ b/src/bin/lttng-relayd/lttng-relayd.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -44,6 +45,16 @@ enum connection_type { RELAY_VIEWER_NOTIFICATION = 4, }; +/* + * When we receive a stream, it gets stored in a list (on a per connection + * basis) until we have all the streams of the same channel and the metadata + * associated with it, then it gets flagged with viewer_ready. + */ +struct relay_stream_recv_handle { + uint64_t id; /* stream handle */ + struct cds_list_head node; +}; + /* * Represents a session for the relay point of view */ @@ -144,6 +155,11 @@ struct relay_stream { * update the oldest_tracefile_id. */ unsigned int tracefile_overwrite:1; + /* + * Can this stream be used by a viewer or are we waiting for additional + * information. + */ + unsigned int viewer_ready:1; }; /* @@ -200,12 +216,13 @@ struct relay_command { struct lttng_ht_node_ulong sock_n; struct rcu_head rcu_node; enum connection_type type; - unsigned int version_check_done:1; /* protocol version to use for this session */ uint32_t major; uint32_t minor; struct lttng_ht *ctf_traces_ht; /* indexed by path name */ uint64_t session_id; + struct cds_list_head recv_head; + unsigned int version_check_done:1; }; struct relay_local_data { diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index b46c16280..7e5733ec8 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1147,6 +1147,70 @@ error: return ret; } +/* + * When we have received all the streams and the metadata for a channel, + * we make them visible to the viewer threads. + */ +static +void set_viewer_ready_flag(struct relay_command *cmd) +{ + struct relay_stream_recv_handle *node, *tmp_node; + + cds_list_for_each_entry_safe(node, tmp_node, &cmd->recv_head, node) { + struct relay_stream *stream; + + rcu_read_lock(); + stream = relay_stream_find_by_id(node->id); + if (!stream) { + /* + * Stream is most probably being cleaned up by the data thread thus + * simply continue to the next one. + */ + continue; + } + + /* + * If any of the streams in the list doesn't have a ctf_trace assigned, + * it means that we never received the metadata stream, so we have to + * wait until it arrives to make the streams available to the viewer. + */ + if (!stream->ctf_trace) { + goto end; + } + + stream->viewer_ready = 1; + rcu_read_unlock(); + + /* Clean stream handle node. */ + cds_list_del(&node->node); + free(node); + } + +end: + return; +} + +/* + * Add a recv handle node to the connection recv list with the given stream + * handle. A new node is allocated thus must be freed when the node is deleted + * from the list. + */ +static void queue_stream_handle(uint64_t handle, struct relay_command *cmd) +{ + struct relay_stream_recv_handle *node; + + assert(cmd); + + node = zmalloc(sizeof(*node)); + if (!node) { + PERROR("zmalloc queue stream handle"); + return; + } + + node->id = handle; + cds_list_add(&node->node, &cmd->recv_head); +} + /* * relay_add_stream: allocate a new stream for a session */ @@ -1239,6 +1303,13 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, ctf_trace_assign(cmd->ctf_traces_ht, stream); stream->ctf_traces_ht = cmd->ctf_traces_ht; + /* + * Add the stream handle in the recv list of the connection. Once the end + * stream message is received, this list is emptied and streams are set + * with the viewer ready flag. + */ + queue_stream_handle(stream->stream_handle, cmd); + lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); lttng_ht_add_unique_ulong(relay_streams_ht, @@ -1987,6 +2058,48 @@ end_no_session: return ret; } +/* + * Receive the streams_sent message. + * + * Return 0 on success else a negative value. + */ +static +int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd) +{ + int ret, send_ret; + struct lttcomm_relayd_generic_reply reply; + + assert(cmd); + + DBG("Relay receiving streams_sent"); + + if (!cmd->session || cmd->version_check_done == 0) { + ERR("Trying to close a stream before version check"); + ret = -1; + goto end_no_session; + } + + /* + * Flag every pending stream in the connection recv list that they are + * ready to be used by the viewer. + */ + set_viewer_ready_flag(cmd); + + reply.ret_code = htobe32(LTTNG_OK); + send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (send_ret < 0) { + ERR("Relay sending sent_stream reply"); + ret = send_ret; + } else { + /* Success. */ + ret = 0; + } + +end_no_session: + return ret; +} + /* * Process the commands received on the control socket */ @@ -2030,6 +2143,9 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_SEND_INDEX: ret = relay_recv_index(recv_hdr, cmd); break; + case RELAYD_STREAMS_SENT: + ret = relay_streams_sent(recv_hdr, cmd); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -2322,6 +2438,7 @@ int relay_add_connection(int fd, struct lttng_poll_event *events, PERROR("read relay cmd pipe"); goto error_read; } + CDS_INIT_LIST_HEAD(&relay_connection->recv_head); /* * Only used by the control side and the reference is copied inside each @@ -2373,8 +2490,17 @@ void relay_del_connection(struct lttng_ht *relay_connections_ht, assert(!ret); if (relay_connection->type == RELAY_CONTROL) { + struct relay_stream_recv_handle *node, *tmp_node; + relay_delete_session(relay_connection, sessions_ht); lttng_ht_destroy(relay_connection->ctf_traces_ht); + + /* Clean up recv list. */ + cds_list_for_each_entry_safe(node, tmp_node, + &relay_connection->recv_head, node) { + cds_list_del(&node->node); + free(node); + } } call_rcu(&relay_connection->rcu_node, deferred_free_connection); diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 18931a8b7..8806e9c32 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -906,6 +906,19 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, msg->u.stream.cpu = cpu; } +void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + uint64_t channel_key, uint64_t net_seq_idx) +{ + assert(msg); + + memset(msg, 0, sizeof(struct lttcomm_consumer_msg)); + + msg->cmd_type = cmd; + msg->u.sent_streams.channel_key = channel_key; + msg->u.sent_streams.net_seq_idx = net_seq_idx; +} + /* * Send stream communication structure to the consumer. */ diff --git a/src/bin/lttng-sessiond/consumer.h b/src/bin/lttng-sessiond/consumer.h index 484d8f7ba..3601ed914 100644 --- a/src/bin/lttng-sessiond/consumer.h +++ b/src/bin/lttng-sessiond/consumer.h @@ -246,6 +246,9 @@ void consumer_init_stream_comm_msg(struct lttcomm_consumer_msg *msg, uint64_t channel_key, uint64_t stream_key, int cpu); +void consumer_init_streams_sent_comm_msg(struct lttcomm_consumer_msg *msg, + enum lttng_consumer_command cmd, + uint64_t channel_key, uint64_t net_seq_idx); void consumer_init_channel_comm_msg(struct lttcomm_consumer_msg *msg, enum lttng_consumer_command cmd, uint64_t channel_key, diff --git a/src/bin/lttng-sessiond/kernel-consumer.c b/src/bin/lttng-sessiond/kernel-consumer.c index 70759fb36..4c069588a 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.c +++ b/src/bin/lttng-sessiond/kernel-consumer.c @@ -262,6 +262,40 @@ error: return ret; } +/* + * Sending the notification that all streams were sent with STREAMS_SENT. + */ +int kernel_consumer_streams_sent(struct consumer_socket *sock, + struct ltt_kernel_session *session, uint64_t channel_key) +{ + int ret; + struct lttcomm_consumer_msg lkm; + struct consumer_output *consumer; + + assert(sock); + assert(session); + + DBG("Sending streams_sent"); + /* Get consumer output pointer */ + consumer = session->consumer; + + /* Prep stream consumer message */ + consumer_init_streams_sent_comm_msg(&lkm, + LTTNG_CONSUMER_STREAMS_SENT, + channel_key, consumer->net_seq_index); + + health_code_update(); + + /* Send stream and file descriptor */ + ret = consumer_send_msg(sock, &lkm); + if (ret < 0) { + goto error; + } + +error: + return ret; +} + /* * Send all stream fds of kernel channel to the consumer. */ @@ -271,6 +305,7 @@ int kernel_consumer_send_channel_stream(struct consumer_socket *sock, { int ret; struct ltt_kernel_stream *stream; + uint64_t channel_key = -1ULL; /* Safety net */ assert(channel); @@ -304,8 +339,22 @@ int kernel_consumer_send_channel_stream(struct consumer_socket *sock, if (ret < 0) { goto error; } + if (channel_key == -1ULL) { + channel_key = channel->fd; + } + } + + if (!monitor || channel_key == -1ULL) { + goto end; + } + + /* Add stream on the kernel consumer side. */ + ret = kernel_consumer_streams_sent(sock, session, channel_key); + if (ret < 0) { + goto error; } +end: error: return ret; } diff --git a/src/bin/lttng-sessiond/kernel-consumer.h b/src/bin/lttng-sessiond/kernel-consumer.h index 5390edafe..b58626c2c 100644 --- a/src/bin/lttng-sessiond/kernel-consumer.h +++ b/src/bin/lttng-sessiond/kernel-consumer.h @@ -44,3 +44,6 @@ int kernel_consumer_destroy_channel(struct consumer_socket *socket, int kernel_consumer_destroy_metadata(struct consumer_socket *socket, struct ltt_kernel_metadata *metadata); + +int kernel_consumer_streams_sent(struct consumer_socket *sock, + struct ltt_kernel_session *session, uint64_t channel_key); diff --git a/src/common/consumer.c b/src/common/consumer.c index 341f8a7fe..f47d8de1b 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -768,6 +768,44 @@ end: return ret; } +/* + * Find a relayd and send the streams sent message + * + * Returns 0 on success, < 0 on error + */ +int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) +{ + int ret = 0; + struct consumer_relayd_sock_pair *relayd; + + assert(net_seq_idx != -1ULL); + + /* The stream is not metadata. Get relayd reference if exists. */ + rcu_read_lock(); + relayd = consumer_find_relayd(net_seq_idx); + if (relayd != NULL) { + /* Add stream on the relayd */ + pthread_mutex_lock(&relayd->ctrl_sock_mutex); + ret = relayd_streams_sent(&relayd->control_sock); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + if (ret < 0) { + goto end; + } + } else { + ERR("Relayd ID %" PRIu64 " unknown. Can't send streams_sent.", + net_seq_idx); + ret = -1; + goto end; + } + + ret = 0; + DBG("All streams sent relayd id %" PRIu64, net_seq_idx); + +end: + rcu_read_unlock(); + return ret; +} + /* * Find a relayd and close the stream */ diff --git a/src/common/consumer.h b/src/common/consumer.h index c206970bf..8d7e1d0a4 100644 --- a/src/common/consumer.h +++ b/src/common/consumer.h @@ -57,6 +57,7 @@ enum lttng_consumer_command { LTTNG_CONSUMER_FLUSH_CHANNEL, LTTNG_CONSUMER_SNAPSHOT_CHANNEL, LTTNG_CONSUMER_SNAPSHOT_METADATA, + LTTNG_CONSUMER_STREAMS_SENT, }; /* State of each fd in consumer */ @@ -593,6 +594,7 @@ struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair( uint64_t net_seq_idx); struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key); int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path); +int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index 219da9037..c95355e9a 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -176,6 +176,11 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")", path, stream->name, stream->key); } + ret = consumer_send_relayd_streams_sent(relayd_id); + if (ret < 0) { + ERR("sending streams sent to relayd"); + goto end_unlock; + } ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { @@ -751,6 +756,57 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, new_stream->name, fd, new_stream->relayd_stream_id); break; } + case LTTNG_CONSUMER_STREAMS_SENT: + { + struct lttng_consumer_channel *channel; + + /* + * Get stream's channel reference. Needed when adding the stream to the + * global hash table. + */ + channel = consumer_find_channel(msg.u.sent_streams.channel_key); + if (!channel) { + /* + * We could not find the channel. Can happen if cpu hotplug + * happens while tearing down. + */ + ERR("Unable to find channel key %" PRIu64, + msg.u.sent_streams.channel_key); + ret_code = LTTNG_ERR_KERN_CHAN_NOT_FOUND; + } + + health_code_update(); + + /* + * Send status code to session daemon. + */ + ret = consumer_send_status_msg(sock, ret_code); + if (ret < 0) { + /* Somehow, the session daemon is not responding anymore. */ + goto end_nosignal; + } + + health_code_update(); + + /* + * We should not send this message if we don't monitor the + * streams in this channel. + */ + if (!channel->monitor) { + break; + } + + health_code_update(); + /* Send stream to relayd if the stream has an ID. */ + if (msg.u.sent_streams.net_seq_idx != (uint64_t) -1ULL) { + ret = consumer_send_relayd_streams_sent( + msg.u.sent_streams.net_seq_idx); + if (ret < 0) { + goto end_nosignal; + } + } + break; + } case LTTNG_CONSUMER_UPDATE_STREAM: { rcu_read_unlock(); diff --git a/src/common/relayd/relayd.c b/src/common/relayd/relayd.c index 448d19e6e..ccb23c065 100644 --- a/src/common/relayd/relayd.c +++ b/src/common/relayd/relayd.c @@ -293,6 +293,59 @@ error: return ret; } +/* + * Inform the relay that all the streams for the current channel has been sent. + * + * On success return 0 else return ret_code negative value. + */ +int relayd_streams_sent(struct lttcomm_relayd_sock *rsock) +{ + int ret; + struct lttcomm_relayd_generic_reply reply; + + /* Code flow error. Safety net. */ + assert(rsock); + + DBG("Relayd sending streams sent."); + + /* This feature was introduced in 2.4, ignore it for earlier versions. */ + if (rsock->minor < 4) { + ret = 0; + goto end; + } + + /* Send command */ + ret = send_command(rsock, RELAYD_STREAMS_SENT, NULL, 0, 0); + if (ret < 0) { + goto error; + } + + /* Waiting for reply */ + ret = recv_reply(rsock, (void *) &reply, sizeof(reply)); + if (ret < 0) { + goto error; + } + + /* Back to host bytes order. */ + reply.ret_code = be32toh(reply.ret_code); + + /* Return session id or negative ret code. */ + if (reply.ret_code != LTTNG_OK) { + ret = -1; + ERR("Relayd streams sent replied error %d", reply.ret_code); + goto error; + } else { + /* Success */ + ret = 0; + } + + DBG("Relayd streams sent success"); + +error: +end: + return ret; +} + /* * Check version numbers on the relayd. * If major versions are compatible, we assign minor_to_use to the diff --git a/src/common/relayd/relayd.h b/src/common/relayd/relayd.h index e61b2fff2..9890efb87 100644 --- a/src/common/relayd/relayd.h +++ b/src/common/relayd/relayd.h @@ -31,6 +31,7 @@ int relayd_create_session(struct lttcomm_relayd_sock *sock, uint64_t *session_id int relayd_add_stream(struct lttcomm_relayd_sock *sock, const char *channel_name, const char *pathname, uint64_t *stream_id, uint64_t tracefile_size, uint64_t tracefile_count); +int relayd_streams_sent(struct lttcomm_relayd_sock *rsock); int relayd_send_close_stream(struct lttcomm_relayd_sock *sock, uint64_t stream_id, uint64_t last_net_seq_num); int relayd_version_check(struct lttcomm_relayd_sock *sock); diff --git a/src/common/sessiond-comm/sessiond-comm.h b/src/common/sessiond-comm/sessiond-comm.h index 6861b0408..96106a77d 100644 --- a/src/common/sessiond-comm/sessiond-comm.h +++ b/src/common/sessiond-comm/sessiond-comm.h @@ -106,8 +106,10 @@ enum lttcomm_relayd_command { RELAYD_ADD_INDEX = 12, RELAYD_SEND_INDEX = 13, RELAYD_CLOSE_INDEX = 14, - /* Live-reading commands. */ + /* Live-reading commands (2.4+). */ RELAYD_LIST_SESSIONS = 15, + /* All streams of the channel have been sent to the relayd (2.4+). */ + RELAYD_STREAMS_SENT = 16, }; /* @@ -433,6 +435,10 @@ struct lttcomm_consumer_msg { uint64_t key; uint64_t max_stream_size; } LTTNG_PACKED snapshot_channel; + struct { + uint64_t channel_key; + uint64_t net_seq_idx; + } LTTNG_PACKED sent_streams; } u; } LTTNG_PACKED; diff --git a/src/common/ust-consumer/ust-consumer.c b/src/common/ust-consumer/ust-consumer.c index a077faafb..af3aca0a5 100644 --- a/src/common/ust-consumer/ust-consumer.c +++ b/src/common/ust-consumer/ust-consumer.c @@ -409,6 +409,7 @@ static int send_sessiond_channel(int sock, { int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS; struct lttng_consumer_stream *stream; + uint64_t net_seq_idx = -1ULL; assert(channel); assert(ctx); @@ -433,6 +434,20 @@ static int send_sessiond_channel(int sock, } ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; } + if (net_seq_idx == -1ULL) { + net_seq_idx = stream->net_seq_idx; + } + } + ret = consumer_send_relayd_streams_sent(net_seq_idx); + if (ret < 0) { + /* + * Flag that the relayd was the problem here probably due to a + * communicaton error on the socket. + */ + if (relayd_error) { + *relayd_error = 1; + } + ret_code = LTTNG_ERR_RELAYD_CONNECT_FAIL; } } @@ -939,6 +954,12 @@ static int snapshot_channel(uint64_t key, char *path, uint64_t relayd_id, DBG("UST consumer snapshot stream %s/%s (%" PRIu64 ")", path, stream->name, stream->key); } + if (relayd_id != -1ULL) { + ret = consumer_send_relayd_streams_sent(relayd_id); + if (ret < 0) { + goto error_unlock; + } + } ustctl_flush_buffer(stream->ustream, 1); -- 2.34.1