#include <common/dynamic-array.hpp>
#include <common/index/ctf-index.hpp>
#include <common/index/index.hpp>
+#include <common/io-hint.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
#include <common/relayd/relayd.hpp>
#include <common/utils.hpp>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
ERR("Unknown consumer_data type");
abort();
}
- free(channel);
+
+ delete channel;
}
/*
cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
+ stream->chan->metadata_pushed_wait_queue.wake_all();
+
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
}
}
}
- channel = zmalloc<lttng_consumer_channel>();
- if (channel == nullptr) {
- PERROR("malloc struct lttng_consumer_channel");
+ try {
+ channel = new lttng_consumer_channel;
+ } catch (const std::bad_alloc& e) {
+ ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+ channel = nullptr;
goto end;
}
channel->monitor = monitor;
channel->live_timer_interval = live_timer_interval;
channel->is_live = is_in_live_session;
- pthread_mutex_init(&channel->lock, nullptr);
- pthread_mutex_init(&channel->timer_lock, nullptr);
+ pthread_mutex_init(&channel->lock, NULL);
+ pthread_mutex_init(&channel->timer_lock, NULL);
switch (output) {
case LTTNG_EVENT_SPLICE:
break;
default:
abort();
- free(channel);
+ delete channel;
channel = nullptr;
goto end;
}
*/
static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int ret;
int outfd = stream->out_fd;
/*
if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd,
- orig_offset - stream->max_sb_size,
- stream->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
- SYNC_FILE_RANGE_WAIT_AFTER);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- ret = posix_fadvise(
- outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
- if (ret && ret != -ENOSYS) {
- errno = ret;
- PERROR("posix_fadvise on fd %i", outfd);
- }
+ lttng::io::hint_flush_range_dont_need_sync(
+ outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
}
/*
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
stream->out_fd_offset += ret_splice;
}
stream->output_written += ret_splice;
* pointer value.
*/
channel->metadata_stream = nullptr;
+ channel->metadata_pushed_wait_queue.wake_all();
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);