waiter: modernize the waiter interface
[lttng-tools.git] / src / common / consumer / consumer.cpp
index 5f3dc4e621ad303a3b605e134182ee8177448877..7e7e7d384cd20565e48bf3bad2793a41f31d1ca6 100644 (file)
@@ -20,6 +20,7 @@
 #include <common/dynamic-array.hpp>
 #include <common/index/ctf-index.hpp>
 #include <common/index/index.hpp>
+#include <common/io-hint.hpp>
 #include <common/kernel-consumer/kernel-consumer.hpp>
 #include <common/kernel-ctl/kernel-ctl.hpp>
 #include <common/relayd/relayd.hpp>
@@ -34,6 +35,7 @@
 #include <common/utils.hpp>
 
 #include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
 #include <inttypes.h>
 #include <poll.h>
 #include <pthread.h>
@@ -301,7 +303,8 @@ static void free_channel_rcu(struct rcu_head *head)
                ERR("Unknown consumer_data type");
                abort();
        }
-       free(channel);
+
+       delete channel;
 }
 
 /*
@@ -460,6 +463,8 @@ static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
        cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
                if (stream->net_seq_idx == net_seq_idx) {
                        uatomic_set(&stream->endpoint_status, status);
+                       stream->chan->metadata_pushed_wait_queue.wake_all();
+
                        DBG("Delete flag set to metadata stream %d", stream->wait_fd);
                }
        }
@@ -1015,9 +1020,11 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                }
        }
 
-       channel = zmalloc<lttng_consumer_channel>();
-       if (channel == nullptr) {
-               PERROR("malloc struct lttng_consumer_channel");
+       try {
+               channel = new lttng_consumer_channel;
+       } catch (const std::bad_alloc& e) {
+               ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+               channel = nullptr;
                goto end;
        }
 
@@ -1031,8 +1038,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
        channel->monitor = monitor;
        channel->live_timer_interval = live_timer_interval;
        channel->is_live = is_in_live_session;
-       pthread_mutex_init(&channel->lock, nullptr);
-       pthread_mutex_init(&channel->timer_lock, nullptr);
+       pthread_mutex_init(&channel->lock, NULL);
+       pthread_mutex_init(&channel->timer_lock, NULL);
 
        switch (output) {
        case LTTNG_EVENT_SPLICE:
@@ -1043,7 +1050,7 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
                break;
        default:
                abort();
-               free(channel);
+               delete channel;
                channel = nullptr;
                goto end;
        }
@@ -1336,7 +1343,6 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx)
  */
 static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
 {
-       int ret;
        int outfd = stream->out_fd;
 
        /*
@@ -1348,31 +1354,8 @@ static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream,
        if (orig_offset < stream->max_sb_size) {
                return;
        }
-       lttng_sync_file_range(outfd,
-                             orig_offset - stream->max_sb_size,
-                             stream->max_sb_size,
-                             SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
-                                     SYNC_FILE_RANGE_WAIT_AFTER);
-       /*
-        * Give hints to the kernel about how we access the file:
-        * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
-        * we write it.
-        *
-        * We need to call fadvise again after the file grows because the
-        * kernel does not seem to apply fadvise to non-existing parts of the
-        * file.
-        *
-        * Call fadvise _after_ having waited for the page writeback to
-        * complete because the dirty page writeback semantic is not well
-        * defined. So it can be expected to lead to lower throughput in
-        * streaming.
-        */
-       ret = posix_fadvise(
-               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
-       if (ret && ret != -ENOSYS) {
-               errno = ret;
-               PERROR("posix_fadvise on fd %i", outfd);
-       }
+       lttng::io::hint_flush_range_dont_need_sync(
+               outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
 }
 
 /*
@@ -1733,8 +1716,7 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stre
        /* This call is useless on a socket so better save a syscall. */
        if (!relayd) {
                /* This won't block, but will start writeout asynchronously */
-               lttng_sync_file_range(
-                       outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+               lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
                stream->out_fd_offset += write_len;
                lttng_consumer_sync_trace_file(stream, orig_offset);
        }
@@ -1933,8 +1915,7 @@ ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data
                /* This call is useless on a socket so better save a syscall. */
                if (!relayd) {
                        /* This won't block, but will start writeout asynchronously */
-                       lttng_sync_file_range(
-                               outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+                       lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
                        stream->out_fd_offset += ret_splice;
                }
                stream->output_written += ret_splice;
@@ -2154,6 +2135,7 @@ void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct l
         * pointer value.
         */
        channel->metadata_stream = nullptr;
+       channel->metadata_pushed_wait_queue.wake_all();
 
        if (channel->metadata_cache) {
                pthread_mutex_unlock(&channel->metadata_cache->lock);
This page took 0.02491 seconds and 4 git commands to generate.