X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.c;h=7ae40c57c564a10c2bffbdff5c99618cdf37fff1;hb=5f3ea0040a33e37c9db44494f7ad64a3f78f9c6b;hp=7911a5b4e8d4d0701d40e88d3ebdd6841c4061af;hpb=fb83fe64f250bec7416f18891a8264450c61ead3;p=lttng-tools.git diff --git a/src/common/consumer/consumer.c b/src/common/consumer/consumer.c index 7911a5b4e..7ae40c57c 100644 --- a/src/common/consumer/consumer.c +++ b/src/common/consumer/consumer.c @@ -47,6 +47,7 @@ #include #include #include +#include struct lttng_consumer_global_data consumer_data = { .stream_count = 0, @@ -319,6 +320,7 @@ static void free_relayd_rcu(struct rcu_head *head) (void) relayd_close(&relayd->control_sock); (void) relayd_close(&relayd->data_sock); + pthread_mutex_destroy(&relayd->ctrl_sock_mutex); free(relayd); } @@ -570,7 +572,7 @@ struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key, stream->session_id = session_id; stream->monitor = monitor; stream->endpoint_status = CONSUMER_ENDPOINT_ACTIVE; - stream->index_fd = -1; + stream->index_file = NULL; stream->last_sequence_number = -1ULL; pthread_mutex_init(&stream->lock, NULL); pthread_mutex_init(&stream->metadata_timer_lock, NULL); @@ -1021,7 +1023,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, CDS_INIT_LIST_HEAD(&channel->streams.head); - DBG("Allocated channel (key %" PRIu64 ")", channel->key) + DBG("Allocated channel (key %" PRIu64 ")", channel->key); end: return channel; @@ -1070,7 +1072,7 @@ int consumer_add_channel(struct lttng_consumer_channel *channel, */ static int update_poll_array(struct lttng_consumer_local_data *ctx, struct pollfd **pollfd, struct lttng_consumer_stream **local_stream, - struct lttng_ht *ht) + struct lttng_ht *ht, int *nb_inactive_fd) { int i = 0; struct lttng_ht_iter iter; @@ -1082,6 +1084,7 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, assert(local_stream); DBG("Updating poll fd array"); + *nb_inactive_fd = 0; rcu_read_lock(); cds_lfht_for_each_entry(ht->ht, &iter.iter, stream, node.node) { /* @@ -1092,9 +1095,14 @@ static int update_poll_array(struct lttng_consumer_local_data *ctx, * just after the check. However, this is OK since the stream(s) will * be deleted once the thread is notified that the end point state has * changed where this function will be called back again. + * + * We track the number of inactive FDs because they still need to be + * closed by the polling thread after a wakeup on the data_pipe or + * metadata_pipe. */ if (stream->state != LTTNG_CONSUMER_ACTIVE_STREAM || stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) { + (*nb_inactive_fd)++; continue; } /* @@ -1229,9 +1237,15 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) DBG("Consumer flag that it should quit"); } + +/* + * Flush pending writes to trace output disk file. + */ +static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { + int ret; int outfd = stream->out_fd; /* @@ -1262,8 +1276,12 @@ void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, * defined. So it can be expected to lead to lower throughput in * streaming. */ - posix_fadvise(outfd, orig_offset - stream->max_sb_size, + ret = posix_fadvise(outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED); + if (ret && ret != -ENOSYS) { + errno = ret; + PERROR("posix_fadvise on fd %i", outfd); + } } /* @@ -1520,7 +1538,6 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( mmap_base = stream->mmap_base; ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); if (ret < 0) { - ret = -errno; PERROR("tracer ctl get_mmap_read_offset"); goto end; } @@ -1556,6 +1573,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( if (stream->metadata_flag) { /* Metadata requires the control socket. */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } netlen += sizeof(struct lttcomm_relayd_metadata_payload); } @@ -1579,6 +1606,15 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } + /* * Check if we need to change the tracefile before writing the packet. */ @@ -1596,21 +1632,16 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } outfd = stream->out_fd; - if (stream->index_fd >= 0) { - ret = close(stream->index_fd); - if (ret < 0) { - PERROR("Closing index"); - goto end; - } - stream->index_fd = -1; - ret = index_create_file(stream->chan->pathname, + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = lttng_index_file_create(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!stream->index_file) { goto end; } - stream->index_fd = ret; } /* Reset current size because we just perform a rotation. */ @@ -1663,8 +1694,8 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( lttng_sync_file_range(outfd, stream->out_fd_offset, len, SYNC_FILE_RANGE_WRITE); stream->out_fd_offset += len; + lttng_consumer_sync_trace_file(stream, orig_offset); } - lttng_consumer_sync_trace_file(stream, orig_offset); write_error: /* @@ -1744,6 +1775,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); + if (stream->reset_metadata_flag) { + ret = relayd_reset_metadata(&relayd->control_sock, + stream->relayd_stream_id, + stream->metadata_version); + if (ret < 0) { + relayd_hang_up = 1; + goto write_error; + } + stream->reset_metadata_flag = 0; + } ret = write_relayd_metadata_id(splice_pipe[1], stream, relayd, padding); if (ret < 0) { @@ -1767,6 +1808,14 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( /* No streaming, we have to set the len with the full padding */ len += padding; + if (stream->metadata_flag && stream->reset_metadata_flag) { + ret = utils_truncate_stream_file(stream->out_fd, 0); + if (ret < 0) { + ERR("Reset metadata file"); + goto end; + } + stream->reset_metadata_flag = 0; + } /* * Check if we need to change the tracefile before writing the packet. */ @@ -1785,22 +1834,16 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } outfd = stream->out_fd; - if (stream->index_fd >= 0) { - ret = close(stream->index_fd); - if (ret < 0) { - PERROR("Closing index"); - goto end; - } - stream->index_fd = -1; - ret = index_create_file(stream->chan->pathname, + if (stream->index_file) { + lttng_index_file_put(stream->index_file); + stream->index_file = lttng_index_file_create(stream->chan->pathname, stream->name, stream->uid, stream->gid, stream->chan->tracefile_size, - stream->tracefile_count_current); - if (ret < 0) { - written = ret; + stream->tracefile_count_current, + CTF_INDEX_MAJOR, CTF_INDEX_MINOR); + if (!stream->index_file) { goto end; } - stream->index_fd = ret; } /* Reset current size because we just perform a rotation. */ @@ -1875,7 +1918,9 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( stream->output_written += ret_splice; written += ret_splice; } - lttng_consumer_sync_trace_file(stream, orig_offset); + if (!relayd) { + lttng_consumer_sync_trace_file(stream, orig_offset); + } goto end; write_error: @@ -2015,6 +2060,10 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, pthread_mutex_lock(&consumer_data.lock); pthread_mutex_lock(&stream->chan->lock); pthread_mutex_lock(&stream->lock); + if (stream->chan->metadata_cache) { + /* Only applicable to userspace consumers. */ + pthread_mutex_lock(&stream->chan->metadata_cache->lock); + } /* Remove any reference to that stream. */ consumer_stream_delete(stream, ht); @@ -2038,6 +2087,9 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, */ stream->chan->metadata_stream = NULL; + if (stream->chan->metadata_cache) { + pthread_mutex_unlock(&stream->chan->metadata_cache->lock); + } pthread_mutex_unlock(&stream->lock); pthread_mutex_unlock(&stream->chan->lock); pthread_mutex_unlock(&consumer_data.lock); @@ -2100,7 +2152,7 @@ int consumer_add_metadata_stream(struct lttng_consumer_stream *stream) lttng_ht_add_unique_u64(ht, &stream->node); - lttng_ht_add_unique_u64(consumer_data.stream_per_chan_id_ht, + lttng_ht_add_u64(consumer_data.stream_per_chan_id_ht, &stream->node_channel_id); /* @@ -2224,10 +2276,10 @@ restart: DBG("Metadata poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); health_poll_exit(); - DBG("Metadata event catched in thread"); + DBG("Metadata event caught in thread"); if (ret < 0) { if (errno == EINTR) { - ERR("Poll EINTR catched"); + ERR("Poll EINTR caught"); goto restart; } if (LTTNG_POLL_GETNB(&events) == 0) { @@ -2325,7 +2377,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get - * a negative len, it means an error occured thus we + * a negative len, it means an error occurred thus we * simply remove it from the poll set and free the * stream. */ @@ -2352,7 +2404,7 @@ restart: len = ctx->on_buffer_ready(stream, ctx); /* * We don't check the return value here since if we get - * a negative len, it means an error occured thus we + * a negative len, it means an error occurred thus we * simply remove it from the poll set and free the * stream. */ @@ -2404,6 +2456,8 @@ void *consumer_thread_data_poll(void *data) struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL; /* local view of consumer_data.fds_count */ int nb_fd = 0; + /* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */ + int nb_inactive_fd = 0; struct lttng_consumer_local_data *ctx = data; ssize_t len; @@ -2460,7 +2514,7 @@ void *consumer_thread_data_poll(void *data) goto end; } ret = update_poll_array(ctx, &pollfd, local_stream, - data_ht); + data_ht, &nb_inactive_fd); if (ret < 0) { ERR("Error in allocating pollfd or local_outfds"); lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_POLL_ERROR); @@ -2473,7 +2527,7 @@ void *consumer_thread_data_poll(void *data) pthread_mutex_unlock(&consumer_data.lock); /* No FDs and consumer_quit, consumer_cleanup the thread */ - if (nb_fd == 0 && consumer_quit == 1) { + if (nb_fd == 0 && consumer_quit == 1 && nb_inactive_fd == 0) { err = 0; /* All is OK */ goto end; } @@ -2807,10 +2861,10 @@ restart: DBG("Channel poll return from wait with %d fd(s)", LTTNG_POLL_GETNB(&events)); health_poll_exit(); - DBG("Channel event catched in thread"); + DBG("Channel event caught in thread"); if (ret < 0) { if (errno == EINTR) { - ERR("Poll EINTR catched"); + ERR("Poll EINTR caught"); goto restart; } if (LTTNG_POLL_GETNB(&events) == 0) {