X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=9d149a09f6f227ca455467e4a2bda0e5eecc3d3e;hb=7404cce212bf90ec908d4f197fd8607b591368a2;hp=2ffa75078779b7d0b9986696f5643267ca5ea7aa;hpb=451baac3a0cc0da7a73d20a2f1c8f4f133164e5f;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 2ffa75078..9d149a09f 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -1404,7 +1404,7 @@ static int relay_close_stream(const struct lttcomm_relayd_hdr *recv_hdr, vstream = viewer_stream_get_by_id(stream->stream_handle); if (vstream) { - if (vstream->metadata_sent == stream->metadata_received) { + if (stream->no_new_metadata_notified) { /* * Since all the metadata has been sent to the * viewer and that we have a request to close @@ -2071,6 +2071,9 @@ static int relay_recv_index(const struct lttcomm_relayd_hdr *recv_hdr, index_info.stream_instance_id = be64toh(index_info.stream_instance_id); index_info.packet_seq_num = be64toh(index_info.packet_seq_num); + } else { + index_info.stream_instance_id = -1ULL; + index_info.packet_seq_num = -1ULL; } stream = stream_get_by_id(index_info.relay_stream_id); @@ -2282,6 +2285,7 @@ static int relay_rotate_session_streams( } reply_code = LTTNG_OK; + ret = 0; end: if (stream) { stream_put(stream); @@ -2295,8 +2299,6 @@ end: send_ret); ret = -1; } - - ret = 0; end_no_reply: lttng_trace_chunk_put(next_trace_chunk); return ret; @@ -2469,7 +2471,7 @@ static int relay_close_trace_chunk(const struct lttcomm_relayd_hdr *recv_hdr, struct relay_connection *conn, const struct lttng_buffer_view *payload) { - int ret = 0; + int ret = 0, buf_ret; ssize_t send_ret; struct relay_session *session = conn->session; struct lttcomm_relayd_close_trace_chunk *msg; @@ -2630,17 +2632,17 @@ end_unlock_session: end: reply.generic.ret_code = htobe32((uint32_t) reply_code); reply.path_length = htobe32((uint32_t) path_length); - ret = lttng_dynamic_buffer_append( + buf_ret = lttng_dynamic_buffer_append( &reply_payload, &reply, sizeof(reply)); - if (ret) { + if (buf_ret) { ERR("Failed to append \"close trace chunk\" command reply header to payload buffer"); goto end_no_reply; } if (reply_code == LTTNG_OK) { - ret = lttng_dynamic_buffer_append(&reply_payload, + buf_ret = lttng_dynamic_buffer_append(&reply_payload, closed_trace_chunk_path, path_length); - if (ret) { + if (buf_ret) { ERR("Failed to append \"close trace chunk\" command reply path to payload buffer"); goto end_no_reply; } @@ -2675,8 +2677,8 @@ static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_trace_chunk_exists *msg; struct lttcomm_relayd_trace_chunk_exists_reply reply = {}; struct lttng_buffer_view header_view; - struct lttng_trace_chunk *chunk = NULL; uint64_t chunk_id; + bool chunk_exists; if (!session || !conn->version_check_done) { ERR("Trying to close a trace chunk before version check"); @@ -2701,25 +2703,29 @@ static int relay_trace_chunk_exists(const struct lttcomm_relayd_hdr *recv_hdr, msg = (typeof(msg)) header_view.data; chunk_id = be64toh(msg->chunk_id); - chunk = sessiond_trace_chunk_registry_get_chunk( + ret = sessiond_trace_chunk_registry_chunk_exists( sessiond_trace_chunk_registry, conn->session->sessiond_uuid, conn->session->id, - chunk_id); - - reply = (typeof(reply)) { - .generic.ret_code = htobe32((uint32_t) LTTNG_OK), - .trace_chunk_exists = !!chunk, + chunk_id, &chunk_exists); + /* + * If ret is not 0, send the reply and report the error to the caller. + * It is a protocol (or internal) error and the session/connection + * should be torn down. + */ + reply = (typeof(reply)){ + .generic.ret_code = htobe32((uint32_t) + (ret == 0 ? LTTNG_OK : LTTNG_ERR_INVALID_PROTOCOL)), + .trace_chunk_exists = ret == 0 ? chunk_exists : 0, }; - send_ret = conn->sock->ops->sendmsg(conn->sock, - &reply, sizeof(reply), 0); + send_ret = conn->sock->ops->sendmsg( + conn->sock, &reply, sizeof(reply), 0); if (send_ret < (ssize_t) sizeof(reply)) { ERR("Failed to send \"create trace chunk\" command reply (ret = %zd)", send_ret); ret = -1; } end_no_reply: - lttng_trace_chunk_put(chunk); return ret; } @@ -3400,8 +3406,13 @@ restart: if (ret < 0) { goto error; } - lttng_poll_add(&events, conn->sock->fd, + ret = lttng_poll_add(&events, + conn->sock->fd, LPOLLIN | LPOLLRDHUP); + if (ret) { + ERR("Failed to add new connection file descriptor to poll set"); + goto error; + } connection_ht_add(relay_connections_ht, conn); DBG("Connection socket %d added", conn->sock->fd); } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {