X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Flive.c;h=c9a4575eb4e9d2f328a81164c6d2f9d0f1722d07;hb=6c5f583c3b31f5e3c240c9e4befa6b86c8b438e2;hp=78ea95cc247652185e5ab0fc218c259458e629c6;hpb=1b580372f5d528c07a2b2ad078594817a8391697;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/live.c b/src/bin/lttng-relayd/live.c index 78ea95cc2..c9a4575eb 100644 --- a/src/bin/lttng-relayd/live.c +++ b/src/bin/lttng-relayd/live.c @@ -339,7 +339,10 @@ int make_viewer_streams(struct relay_session *session, * Ensure a self-reference is preserved even * after we have put our local reference. */ - viewer_stream_get(vstream); + if (!viewer_stream_get(vstream)) { + ERR("Unable to get self-reference on viewer stream, logic error."); + abort(); + } } else { if (!vstream->sent_flag && nb_unsent) { /* Update number of unsent stream counter. */ @@ -651,12 +654,16 @@ void *thread_dispatcher(void *data) health_code_update(); - while (!CMM_LOAD_SHARED(live_dispatch_thread_exit)) { + for (;;) { health_code_update(); /* Atomically prepare the queue futex */ futex_nto1_prepare(&viewer_conn_queue.futex); + if (CMM_LOAD_SHARED(live_dispatch_thread_exit)) { + break; + } + do { health_code_update(); @@ -1119,8 +1126,8 @@ error: /* * Open the index file if needed for the given vstream. * - * If an index file is successfully opened, the vstream index_fd set with - * it. + * If an index file is successfully opened, the vstream will set it as its + * current index file. * * Return 0 on success, a negative value on error (-ENOENT if not ready yet). * @@ -1131,7 +1138,7 @@ static int try_open_index(struct relay_viewer_stream *vstream, { int ret = 0; - if (vstream->index_fd) { + if (vstream->index_file) { goto end; } @@ -1142,20 +1149,12 @@ static int try_open_index(struct relay_viewer_stream *vstream, ret = -ENOENT; goto end; } - ret = index_open(vstream->path_name, vstream->channel_name, + vstream->index_file = lttng_index_file_open(vstream->path_name, + vstream->channel_name, vstream->stream->tracefile_count, vstream->current_tracefile_id); - if (ret >= 0) { - vstream->index_fd = stream_fd_create(ret); - if (!vstream->index_fd) { - if (close(ret)) { - PERROR("close"); - } - ret = -1; - } else { - ret = 0; - } - goto end; + if (!vstream->index_file) { + ret = -1; } end: @@ -1274,7 +1273,6 @@ static int viewer_get_next_index(struct relay_connection *conn) { int ret; - ssize_t read_ret; struct lttng_viewer_get_next_index request_index; struct lttng_viewer_index viewer_index; struct ctf_packet_index packet_index; @@ -1397,11 +1395,10 @@ int viewer_get_next_index(struct relay_connection *conn) viewer_index.flags |= LTTNG_VIEWER_FLAG_NEW_STREAM; } - read_ret = lttng_read(vstream->index_fd->fd, &packet_index, - sizeof(packet_index)); - if (read_ret < sizeof(packet_index)) { - ERR("Relay reading index file %d returned %zd", - vstream->index_fd->fd, read_ret); + ret = lttng_index_file_read(vstream->index_file, &packet_index); + if (ret) { + ERR("Relay error reading index file %d", + vstream->index_file->fd); viewer_index.status = htobe32(LTTNG_VIEWER_INDEX_ERR); goto send_reply; } else { @@ -1482,13 +1479,15 @@ error_put: static int viewer_get_packet(struct relay_connection *conn) { - int ret, send_data = 0; + int ret; char *data = NULL; - uint32_t len = 0; - ssize_t read_len; struct lttng_viewer_get_packet get_packet_info; struct lttng_viewer_trace_packet reply; struct relay_viewer_stream *vstream = NULL; + bool skip_send_data = false; + uint32_t send_len = sizeof(reply); + uint32_t packet_data_len = 0; + ssize_t read_len; DBG2("Relay get data packet"); @@ -1506,21 +1505,26 @@ int viewer_get_packet(struct relay_connection *conn) vstream = viewer_stream_get_by_id(be64toh(get_packet_info.stream_id)); if (!vstream) { + skip_send_data = true; DBG("Client requested packet of unknown stream id %" PRIu64, be64toh(get_packet_info.stream_id)); reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_ERR); - goto send_reply_nolock; + } else { + packet_data_len = be32toh(get_packet_info.len); + send_len += packet_data_len; } - pthread_mutex_lock(&vstream->stream->lock); - - len = be32toh(get_packet_info.len); - data = zmalloc(len); + data = zmalloc(send_len); if (!data) { PERROR("relay data zmalloc"); goto error; } + if (skip_send_data) { + goto send_reply_nolock; + } + + pthread_mutex_lock(&vstream->stream->lock); ret = lseek(vstream->stream_fd->fd, be64toh(get_packet_info.offset), SEEK_SET); if (ret < 0) { @@ -1528,16 +1532,17 @@ int viewer_get_packet(struct relay_connection *conn) be64toh(get_packet_info.offset)); goto error; } - read_len = lttng_read(vstream->stream_fd->fd, data, len); - if (read_len < len) { + read_len = lttng_read(vstream->stream_fd->fd, + data + sizeof(reply), + packet_data_len); + if (read_len < packet_data_len) { PERROR("Relay reading trace file, fd: %d, offset: %" PRIu64, vstream->stream_fd->fd, be64toh(get_packet_info.offset)); goto error; } reply.status = htobe32(LTTNG_VIEWER_GET_PACKET_OK); - reply.len = htobe32(len); - send_data = 1; + reply.len = htobe32(packet_data_len); goto send_reply; error: @@ -1548,26 +1553,19 @@ send_reply: pthread_mutex_unlock(&vstream->stream->lock); } send_reply_nolock: - reply.flags = htobe32(reply.flags); health_code_update(); - ret = send_response(conn->sock, &reply, sizeof(reply)); + memcpy(data, &reply, sizeof(reply)); + health_code_update(); + ret = send_response(conn->sock, data, send_len); + health_code_update(); if (ret < 0) { + PERROR("sendmsg of packet data failed"); goto end_free; } - health_code_update(); - - if (send_data) { - health_code_update(); - ret = send_response(conn->sock, data, len); - if (ret < 0) { - goto end_free; - } - health_code_update(); - } - DBG("Sent %u bytes for stream %" PRIu64, len, + DBG("Sent %u bytes for stream %" PRIu64, send_len, be64toh(get_packet_info.stream_id)); end_free: