X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.cpp;h=7e7e7d384cd20565e48bf3bad2793a41f31d1ca6;hb=32670d719327feb585374283a50eeb76ce36b962;hp=5f3dc4e621ad303a3b605e134182ee8177448877;hpb=ff9309595a046512269302e98a3859bb3941f6ae;p=lttng-tools.git diff --git a/src/common/consumer/consumer.cpp b/src/common/consumer/consumer.cpp index 5f3dc4e62..7e7e7d384 100644 --- a/src/common/consumer/consumer.cpp +++ b/src/common/consumer/consumer.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -34,6 +35,7 @@ #include #include +#include #include #include #include @@ -301,7 +303,8 @@ static void free_channel_rcu(struct rcu_head *head) ERR("Unknown consumer_data type"); abort(); } - free(channel); + + delete channel; } /* @@ -460,6 +463,8 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx, cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) { if (stream->net_seq_idx == net_seq_idx) { uatomic_set(&stream->endpoint_status, status); + stream->chan->metadata_pushed_wait_queue.wake_all(); + DBG("Delete flag set to metadata stream %d", stream->wait_fd); } } @@ -1015,9 +1020,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, } } - channel = zmalloc(); - if (channel == nullptr) { - PERROR("malloc struct lttng_consumer_channel"); + try { + channel = new lttng_consumer_channel; + } catch (const std::bad_alloc& e) { + ERR("Failed to allocate lttng_consumer_channel: %s", e.what()); + channel = nullptr; goto end; } @@ -1031,8 +1038,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->monitor = monitor; channel->live_timer_interval = live_timer_interval; channel->is_live = is_in_live_session; - pthread_mutex_init(&channel->lock, nullptr); - pthread_mutex_init(&channel->timer_lock, nullptr); + pthread_mutex_init(&channel->lock, NULL); + pthread_mutex_init(&channel->timer_lock, NULL); switch (output) { case LTTNG_EVENT_SPLICE: @@ -1043,7 +1050,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, break; default: abort(); - free(channel); + delete channel; channel = nullptr; goto end; } @@ -1336,7 +1343,6 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx) */ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset) { - int ret; int outfd = stream->out_fd; /* @@ -1348,31 +1354,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, if (orig_offset < stream->max_sb_size) { return; } - lttng_sync_file_range(outfd, - orig_offset - stream->max_sb_size, - stream->max_sb_size, - SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE | - SYNC_FILE_RANGE_WAIT_AFTER); - /* - * Give hints to the kernel about how we access the file: - * POSIX_FADV_DONTNEED : we won't re-access data in a near future after - * we write it. - * - * We need to call fadvise again after the file grows because the - * kernel does not seem to apply fadvise to non-existing parts of the - * file. - * - * Call fadvise _after_ having waited for the page writeback to - * complete because the dirty page writeback semantic is not well - * defined. So it can be expected to lead to lower throughput in - * streaming. - */ - ret = posix_fadvise( - outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED); - if (ret && ret != -ENOSYS) { - errno = ret; - PERROR("posix_fadvise on fd %i", outfd); - } + lttng::io::hint_flush_range_dont_need_sync( + outfd, orig_offset - stream->max_sb_size, stream->max_sb_size); } /* @@ -1733,8 +1716,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len); stream->out_fd_offset += write_len; lttng_consumer_sync_trace_file(stream, orig_offset); } @@ -1933,8 +1915,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data /* This call is useless on a socket so better save a syscall. */ if (!relayd) { /* This won't block, but will start writeout asynchronously */ - lttng_sync_file_range( - outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE); + lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice); stream->out_fd_offset += ret_splice; } stream->output_written += ret_splice; @@ -2154,6 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l * pointer value. */ channel->metadata_stream = nullptr; + channel->metadata_pushed_wait_queue.wake_all(); if (channel->metadata_cache) { pthread_mutex_unlock(&channel->metadata_cache->lock);