X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer-timer.cpp;h=8c9371bae780191207c345e3a39184f4b24c97ab;hb=f40b76aed659ff694cf948bf8ebd1d4b5741c986;hp=4e308383d22e5a1c22e7f9442c5b9a9dc87d1020;hpb=9cc4ae91845c03b141af7ef58a86a2a9689dfafd;p=lttng-tools.git diff --git a/src/common/consumer/consumer-timer.cpp b/src/common/consumer/consumer-timer.cpp index 4e308383d..8c9371bae 100644 --- a/src/common/consumer/consumer-timer.cpp +++ b/src/common/consumer/consumer-timer.cpp @@ -7,25 +7,24 @@ */ #define _LGPL_SOURCE -#include -#include - -#include #include #include -#include -#include #include -#include #include +#include +#include +#include +#include #include -typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream); -typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream, - unsigned long *consumed); -typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream, - unsigned long *produced); -typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream); +#include +#include +#include + +using sample_positions_cb = int (*)(struct lttng_consumer_stream *); +using get_consumed_cb = int (*)(struct lttng_consumer_stream *, unsigned long *); +using get_produced_cb = int (*)(struct lttng_consumer_stream *, unsigned long *); +using flush_index_cb = int (*)(struct lttng_consumer_stream *); static struct timer_signal_data timer_signal = { .tid = 0, @@ -76,8 +75,7 @@ static int the_channel_monitor_pipe = -1; * while consumer_timer_switch_stop() is called. It would result in * deadlocks. */ -static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, - siginfo_t *si) +static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; @@ -98,7 +96,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, * - metadata_socket_lock * - Calling lttng_ustconsumer_recv_metadata(): * - channel->metadata_cache->lock - * - Calling consumer_metadata_cache_flushed(): + * - Calling consumer_wait_metadata_cache_flushed(): * - channel->timer_lock * - channel->metadata_cache->lock * @@ -107,7 +105,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, * they are held while consumer_timer_switch_stop() is * called. */ - ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1); + ret = lttng_ustconsumer_request_metadata(ctx, channel, true, 1); if (ret < 0) { channel->switch_timer_error = 1; } @@ -119,8 +117,7 @@ static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, } } -static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, - uint64_t stream_id) +static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, uint64_t stream_id) { int ret; struct ctf_packet_index index; @@ -175,8 +172,7 @@ end: return ret; } -static int check_stream(struct lttng_consumer_stream *stream, - flush_index_cb flush_index) +static int check_stream(struct lttng_consumer_stream *stream, flush_index_cb flush_index) { int ret; @@ -195,14 +191,14 @@ static int check_stream(struct lttng_consumer_stream *stream, ret = pthread_mutex_trylock(&stream->lock); switch (ret) { case 0: - break; /* We have the lock. */ + break; /* We have the lock. */ case EBUSY: pthread_mutex_lock(&stream->metadata_timer_lock); if (stream->waiting_on_metadata) { ret = 0; stream->missed_metadata_flush = true; pthread_mutex_unlock(&stream->metadata_timer_lock); - goto end; /* Bail out. */ + goto end; /* Bail out. */ } pthread_mutex_unlock(&stream->metadata_timer_lock); /* Try again. */ @@ -267,18 +263,16 @@ end: /* * Execute action on a live timer */ -static void live_timer(struct lttng_consumer_local_data *ctx, - siginfo_t *si) +static void live_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si) { int ret; struct lttng_consumer_channel *channel; struct lttng_consumer_stream *stream; struct lttng_ht_iter iter; const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht; - const flush_index_cb flush_index = - ctx->type == LTTNG_CONSUMER_KERNEL ? - consumer_flush_kernel_index : - consumer_flush_ust_index; + const flush_index_cb flush_index = ctx->type == LTTNG_CONSUMER_KERNEL ? + consumer_flush_kernel_index : + consumer_flush_ust_index; channel = (lttng_consumer_channel *) si->si_value.sival_ptr; LTTNG_ASSERT(channel); @@ -289,26 +283,29 @@ static void live_timer(struct lttng_consumer_local_data *ctx, DBG("Live timer for channel %" PRIu64, channel->key); - rcu_read_lock(); - cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, &iter.iter, - stream, node_channel_id.node) { - ret = check_stream(stream, flush_index); - if (ret < 0) { - goto error_unlock; + { + lttng::urcu::read_lock_guard read_lock; + cds_lfht_for_each_entry_duplicate(ht->ht, + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { + ret = check_stream(stream, flush_index); + if (ret < 0) { + goto error_unlock; + } } } - error_unlock: - rcu_read_unlock(); error: return; } -static -void consumer_timer_signal_thread_qs(unsigned int signr) +static void consumer_timer_signal_thread_qs(unsigned int signr) { sigset_t pending_set; int ret; @@ -365,10 +362,10 @@ void consumer_timer_signal_thread_qs(unsigned int signr) * Returns a negative value on error, 0 if a timer was created, and * a positive value if no timer was created (not an error). */ -static -int consumer_channel_timer_start(timer_t *timer_id, - struct lttng_consumer_channel *channel, - unsigned int timer_interval_us, int signal) +static int consumer_channel_timer_start(timer_t *timer_id, + struct lttng_consumer_channel *channel, + unsigned int timer_interval_us, + int signal) { int ret = 0, delete_ret; struct sigevent sev = {}; @@ -397,7 +394,7 @@ int consumer_channel_timer_start(timer_t *timer_id, its.it_interval.tv_sec = its.it_value.tv_sec; its.it_interval.tv_nsec = its.it_value.tv_nsec; - ret = timer_settime(*timer_id, 0, &its, NULL); + ret = timer_settime(*timer_id, 0, &its, nullptr); if (ret == -1) { PERROR("timer_settime"); goto error_destroy_timer; @@ -412,8 +409,7 @@ error_destroy_timer: goto end; } -static -int consumer_channel_timer_stop(timer_t *timer_id, int signal) +static int consumer_channel_timer_stop(timer_t *timer_id, int signal) { int ret = 0; @@ -424,7 +420,7 @@ int consumer_channel_timer_stop(timer_t *timer_id, int signal) } consumer_timer_signal_thread_qs(signal); - *timer_id = 0; + *timer_id = nullptr; end: return ret; } @@ -433,15 +429,17 @@ end: * Set the channel's switch timer. */ void consumer_timer_switch_start(struct lttng_consumer_channel *channel, - unsigned int switch_timer_interval_us) + unsigned int switch_timer_interval_us) { int ret; LTTNG_ASSERT(channel); LTTNG_ASSERT(channel->key); - ret = consumer_channel_timer_start(&channel->switch_timer, channel, - switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH); + ret = consumer_channel_timer_start(&channel->switch_timer, + channel, + switch_timer_interval_us, + LTTNG_CONSUMER_SIG_SWITCH); channel->switch_timer_enabled = !!(ret == 0); } @@ -455,8 +453,7 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) LTTNG_ASSERT(channel); - ret = consumer_channel_timer_stop(&channel->switch_timer, - LTTNG_CONSUMER_SIG_SWITCH); + ret = consumer_channel_timer_stop(&channel->switch_timer, LTTNG_CONSUMER_SIG_SWITCH); if (ret == -1) { ERR("Failed to stop switch timer"); } @@ -468,15 +465,15 @@ void consumer_timer_switch_stop(struct lttng_consumer_channel *channel) * Set the channel's live timer. */ void consumer_timer_live_start(struct lttng_consumer_channel *channel, - unsigned int live_timer_interval_us) + unsigned int live_timer_interval_us) { int ret; LTTNG_ASSERT(channel); LTTNG_ASSERT(channel->key); - ret = consumer_channel_timer_start(&channel->live_timer, channel, - live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); + ret = consumer_channel_timer_start( + &channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE); channel->live_timer_enabled = !!(ret == 0); } @@ -490,8 +487,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel) LTTNG_ASSERT(channel); - ret = consumer_channel_timer_stop(&channel->live_timer, - LTTNG_CONSUMER_SIG_LIVE); + ret = consumer_channel_timer_stop(&channel->live_timer, LTTNG_CONSUMER_SIG_LIVE); if (ret == -1) { ERR("Failed to stop live timer"); } @@ -506,7 +502,7 @@ void consumer_timer_live_stop(struct lttng_consumer_channel *channel) * a positive value if no timer was created (not an error). */ int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, - unsigned int monitor_timer_interval_us) + unsigned int monitor_timer_interval_us) { int ret; @@ -514,8 +510,10 @@ int consumer_timer_monitor_start(struct lttng_consumer_channel *channel, LTTNG_ASSERT(channel->key); LTTNG_ASSERT(!channel->monitor_timer_enabled); - ret = consumer_channel_timer_start(&channel->monitor_timer, channel, - monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR); + ret = consumer_channel_timer_start(&channel->monitor_timer, + channel, + monitor_timer_interval_us, + LTTNG_CONSUMER_SIG_MONITOR); channel->monitor_timer_enabled = !!(ret == 0); return ret; } @@ -530,8 +528,7 @@ int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel) LTTNG_ASSERT(channel); LTTNG_ASSERT(channel->monitor_timer_enabled); - ret = consumer_channel_timer_stop(&channel->monitor_timer, - LTTNG_CONSUMER_SIG_MONITOR); + ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR); if (ret == -1) { ERR("Failed to stop live timer"); goto end; @@ -546,14 +543,14 @@ end: * Block the RT signals for the entire process. It must be called from the * consumer main before creating the threads */ -int consumer_signal_init(void) +int consumer_signal_init() { int ret; sigset_t mask; /* Block signal for entire process, so only our thread processes it. */ setmask(&mask); - ret = pthread_sigmask(SIG_BLOCK, &mask, NULL); + ret = pthread_sigmask(SIG_BLOCK, &mask, nullptr); if (ret) { errno = ret; PERROR("pthread_sigmask"); @@ -562,11 +559,13 @@ int consumer_signal_init(void) return 0; } -static -int sample_channel_positions(struct lttng_consumer_channel *channel, - uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed, - sample_positions_cb sample, get_consumed_cb get_consumed, - get_produced_cb get_produced) +static int sample_channel_positions(struct lttng_consumer_channel *channel, + uint64_t *_highest_use, + uint64_t *_lowest_use, + uint64_t *_total_consumed, + sample_positions_cb sample, + get_consumed_cb get_consumed, + get_produced_cb get_produced) { int ret = 0; struct lttng_ht_iter iter; @@ -577,12 +576,16 @@ int sample_channel_positions(struct lttng_consumer_channel *channel, *_total_consumed = 0; - rcu_read_lock(); + lttng::urcu::read_lock_guard read_lock; cds_lfht_for_each_entry_duplicate(ht->ht, - ht->hash_fct(&channel->key, lttng_ht_seed), - ht->match_fct, &channel->key, - &iter.iter, stream, node_channel_id.node) { + ht->hash_fct(&channel->key, lttng_ht_seed), + ht->match_fct, + &channel->key, + &iter.iter, + stream, + node_channel_id.node) + { unsigned long produced, consumed, usage; empty_channel = false; @@ -594,7 +597,8 @@ int sample_channel_positions(struct lttng_consumer_channel *channel, ret = sample(stream); if (ret) { - ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret); + ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", + ret); pthread_mutex_unlock(&stream->lock); goto end; } @@ -630,7 +634,6 @@ int sample_channel_positions(struct lttng_consumer_channel *channel, *_highest_use = high; *_lowest_use = low; end: - rcu_read_unlock(); if (empty_channel) { ret = -1; } @@ -641,13 +644,13 @@ end: void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel) { int ret; - int channel_monitor_pipe = - consumer_timer_thread_get_channel_monitor_pipe(); + int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe(); struct lttcomm_consumer_channel_monitor_msg msg = { .key = channel->key, + .session_id = channel->session_id, .lowest = 0, .highest = 0, - .total_consumed = 0, + .consumed_since_last_sample = 0, }; sample_positions_cb sample; get_consumed_cb get_consumed; @@ -676,14 +679,15 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel abort(); } - ret = sample_channel_positions(channel, &highest, &lowest, - &total_consumed, sample, get_consumed, get_produced); + ret = sample_channel_positions( + channel, &highest, &lowest, &total_consumed, sample, get_consumed, get_produced); if (ret) { return; } + msg.highest = highest; msg.lowest = lowest; - msg.total_consumed = total_consumed; + msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent; /* * Writes performed here are assumed to be atomic which is only @@ -698,18 +702,21 @@ void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel if (errno == EAGAIN) { /* Not an error, the sample is merely dropped. */ DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64, - channel->key); + channel->key); } else { PERROR("write to the channel monitor pipe"); } } else { DBG("Sent channel monitoring sample for channel key %" PRIu64 - ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")", - channel->key, msg.highest, msg.lowest); + ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")", + channel->key, + msg.highest, + msg.lowest); + channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample; } } -int consumer_timer_thread_get_channel_monitor_pipe(void) +int consumer_timer_thread_get_channel_monitor_pipe() { return uatomic_read(&the_channel_monitor_pipe); } @@ -754,7 +761,7 @@ void *consumer_timer_thread(void *data) setmask(&mask); CMM_STORE_SHARED(timer_signal.tid, pthread_self()); - while (1) { + while (true) { health_code_update(); health_poll_entry(); @@ -799,5 +806,5 @@ error_testpoint: end: health_unregister(health_consumerd); rcu_unregister_thread(); - return NULL; + return nullptr; }