From 4c5691ba0756f7c0f4a604c814ee729e967da8e0 Mon Sep 17 00:00:00 2001 From: Jonathan Rajotte Date: Tue, 5 Jun 2018 21:00:28 -0400 Subject: [PATCH] Fix: perform relayd socket pair cleanup on control socket error MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit A reference to the local context for the socket pair is used to "force" an evaluation of the data and metadata streams since we changed the endpoint status. This imitates what is currently done for the data socket. This prevents hitting network timeouts multiple times in a row when an error occurs. For now, there is no mechanism for retry hence "terminating" all communication make sense and prevent unwanted delays on operation. Signed-off-by: Jonathan Rajotte Signed-off-by: Jérémie Galarneau --- src/common/consumer/consumer-stream.c | 17 +++++++++----- src/common/consumer/consumer.c | 33 +++++++++++++++++++-------- src/common/consumer/consumer.h | 2 ++ 3 files changed, 37 insertions(+), 15 deletions(-) diff --git a/src/common/consumer/consumer-stream.c b/src/common/consumer/consumer-stream.c index d0b1ddef2..ca2c4536c 100644 --- a/src/common/consumer/consumer-stream.c +++ b/src/common/consumer/consumer-stream.c @@ -73,12 +73,8 @@ void consumer_stream_relayd_close(struct lttng_consumer_stream *stream, stream->next_net_seq_num - 1); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { - DBG("Unable to close stream on the relayd. Continuing"); - /* - * Continue here. There is nothing we can do for the relayd. - * Chances are that the relayd has closed the socket so we just - * continue cleaning up. - */ + ERR("Relayd send close stream failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } /* Both conditions are met, we destroy the relayd. */ @@ -371,6 +367,15 @@ int consumer_stream_write_index(struct lttng_consumer_stream *stream, pthread_mutex_lock(&relayd->ctrl_sock_mutex); ret = relayd_send_index(&relayd->control_sock, element, stream->relayd_stream_id, stream->next_net_seq_num - 1); + if (ret < 0) { + /* + * Communication error with lttng-relayd, + * perform cleanup now + */ + ERR("Relayd send index failed. Cleaning up relayd %" PRIu64 ".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + ret = -1; + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); } else { ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't write index.", diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 68b63edaa..567babadf 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -465,14 +465,13 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, * If a local data context is available, notify the threads that the streams' * state have changed. */ -static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, - struct lttng_consumer_local_data *ctx) +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd) { uint64_t netidx; assert(relayd); - DBG("Cleaning up relayd sockets"); + DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx); /* Save the net sequence index before destroying the object */ netidx = relayd->net_seq_idx; @@ -492,10 +491,8 @@ static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd, * memory barrier ordering the updates of the end point status from the * read of this status which happens AFTER receiving this notify. */ - if (ctx) { - notify_thread_lttng_pipe(ctx->consumer_data_pipe); - notify_thread_lttng_pipe(ctx->consumer_metadata_pipe); - } + notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe); + notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe); } /* @@ -803,6 +800,8 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, stream->chan->tracefile_size, stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } @@ -844,6 +843,8 @@ int consumer_send_relayd_streams_sent(uint64_t net_seq_idx) ret = relayd_streams_sent(&relayd->control_sock); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto end; } } else { @@ -1711,7 +1712,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); } end: @@ -1937,7 +1939,8 @@ write_error: * cleanup the relayd object and all associated streams. */ if (relayd && relayd_hang_up) { - cleanup_relayd(relayd, ctx); + ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); /* Skip splice error so the consumer does not fail */ goto end; } @@ -3523,6 +3526,7 @@ error: * Add relayd socket pair to consumer data hashtable. If object already * exists or on error, the function gracefully returns. */ + relayd->ctx = ctx; add_relayd(relayd); /* All good! */ @@ -3649,6 +3653,8 @@ int consumer_data_pending(uint64_t id) pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { /* Communication error thus the relayd so no data pending. */ + ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } } @@ -3690,6 +3696,13 @@ int consumer_data_pending(uint64_t id) stream->relayd_stream_id, stream->next_net_seq_num - 1); } + if (ret < 0) { + ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); + pthread_mutex_unlock(&relayd->ctrl_sock_mutex); + pthread_mutex_unlock(&stream->lock); + goto data_not_pending; + } pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret == 1) { pthread_mutex_unlock(&stream->lock); @@ -3708,6 +3721,8 @@ int consumer_data_pending(uint64_t id) relayd->relayd_session_id, &is_data_inflight); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { + ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx); + lttng_consumer_cleanup_relayd(relayd); goto data_not_pending; } if (is_data_inflight) { diff --git a/src/common/consumer/consumer.h b/src/common/consumer/consumer.h index 322f1e64d..17e5b55a6 100644 --- a/src/common/consumer/consumer.h +++ b/src/common/consumer/consumer.h @@ -463,6 +463,7 @@ struct consumer_relayd_sock_pair { /* Session id on both sides for the sockets. */ uint64_t relayd_session_id; uint64_t sessiond_session_id; + struct lttng_consumer_local_data *ctx; }; /* @@ -753,5 +754,6 @@ void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); int consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); +void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); #endif /* LIB_CONSUMER_H */ -- 2.34.1