X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=129203fa5a9bb8ca655ae22c428d9531a1368adb;hb=3628d5bd420ccb6e7937506a2dbadb95bd3d2d0b;hp=cba4a605a4f8d119d4f324427fe81761b605c240;hpb=f263b7fd113e51d0737554e8232b8669e142a260;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index cba4a605a..129203fa5 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -1302,12 +1302,6 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_quit_pipe; } - ret = pipe(ctx->consumer_thread_pipe); - if (ret < 0) { - PERROR("Error creating thread pipe"); - goto error_thread_pipe; - } - ret = pipe(ctx->consumer_channel_pipe); if (ret < 0) { PERROR("Error creating channel pipe"); @@ -1319,20 +1313,11 @@ struct lttng_consumer_local_data *lttng_consumer_create( goto error_metadata_pipe; } - ret = utils_create_pipe(ctx->consumer_splice_metadata_pipe); - if (ret < 0) { - goto error_splice_pipe; - } - return ctx; -error_splice_pipe: - lttng_pipe_destroy(ctx->consumer_metadata_pipe); error_metadata_pipe: utils_close_pipe(ctx->consumer_channel_pipe); error_channel_pipe: - utils_close_pipe(ctx->consumer_thread_pipe); -error_thread_pipe: utils_close_pipe(ctx->consumer_should_quit); error_quit_pipe: lttng_pipe_destroy(ctx->consumer_wakeup_pipe); @@ -1404,6 +1389,10 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) DBG("Consumer destroying it. Closing everything."); + if (!ctx) { + return; + } + destroy_data_stream_ht(data_ht); destroy_metadata_stream_ht(metadata_ht); @@ -1415,13 +1404,11 @@ void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx) if (ret) { PERROR("close"); } - utils_close_pipe(ctx->consumer_thread_pipe); utils_close_pipe(ctx->consumer_channel_pipe); lttng_pipe_destroy(ctx->consumer_data_pipe); lttng_pipe_destroy(ctx->consumer_metadata_pipe); lttng_pipe_destroy(ctx->consumer_wakeup_pipe); utils_close_pipe(ctx->consumer_should_quit); - utils_close_pipe(ctx->consumer_splice_metadata_pipe); unlink(ctx->consumer_command_sock_path); free(ctx); @@ -1714,17 +1701,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( goto end; } } - - /* - * Choose right pipe for splice. Metadata and trace data are handled by - * different threads hence the use of two pipes in order not to race or - * corrupt the written data. - */ - if (stream->metadata_flag) { - splice_pipe = ctx->consumer_splice_metadata_pipe; - } else { - splice_pipe = ctx->consumer_thread_pipe; - } + splice_pipe = stream->splice_pipe; /* Write metadata stream id before payload */ if (relayd) { @@ -1830,7 +1807,8 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* Splice data out */ ret_splice = splice(splice_pipe[0], NULL, outfd, NULL, ret_splice, SPLICE_F_MOVE | SPLICE_F_MORE); - DBG("Consumer splice pipe to file, ret %zd", ret_splice); + DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", + outfd, ret_splice); if (ret_splice < 0) { ret = errno; written = -ret; @@ -2211,9 +2189,12 @@ void *consumer_thread_metadata_poll(void *data) } restart: - DBG("Metadata poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); health_poll_entry(); + DBG("Metadata poll wait"); ret = lttng_poll_wait(&events, -1); + DBG("Metadata poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Metadata event catched in thread"); if (ret < 0) { @@ -2233,6 +2214,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); + if (!revents) { + /* No activity for this FD (poll implementation). */ + continue; + } + if (pollfd == lttng_pipe_get_readfd(ctx->consumer_metadata_pipe)) { if (revents & (LPOLLERR | LPOLLHUP )) { DBG("Metadata thread pipe hung up"); @@ -2781,9 +2767,12 @@ void *consumer_thread_channel_poll(void *data) } restart: - DBG("Channel poll wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); + health_code_update(); + DBG("Channel poll wait"); health_poll_entry(); ret = lttng_poll_wait(&events, -1); + DBG("Channel poll return from wait with %d fd(s)", + LTTNG_POLL_GETNB(&events)); health_poll_exit(); DBG("Channel event catched in thread"); if (ret < 0) { @@ -2803,10 +2792,11 @@ restart: revents = LTTNG_POLL_GETEV(&events, i); pollfd = LTTNG_POLL_GETFD(&events, i); - /* Just don't waste time if no returned events for the fd */ if (!revents) { + /* No activity for this FD (poll implementation). */ continue; } + if (pollfd == ctx->consumer_channel_pipe[0]) { if (revents & (LPOLLERR | LPOLLHUP)) { DBG("Channel thread pipe hung up"); @@ -3569,15 +3559,6 @@ int consumer_data_pending(uint64_t id) */ ret = cds_lfht_is_node_deleted(&stream->node.node); if (!ret) { - /* - * An empty output file is not valid. We need at least one packet - * generated per stream, even if it contains no event, so it - * contains at least one packet header. - */ - if (stream->output_written == 0) { - pthread_mutex_unlock(&stream->lock); - goto data_pending; - } /* Check the stream if there is data in the buffers. */ ret = data_pending(stream); if (ret == 1) {