*/
#define _LGPL_SOURCE
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-testpoint.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+
+#include <bin/lttng-consumerd/health-consumerd.hpp>
#include <inttypes.h>
#include <signal.h>
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/compat/endian.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer-testpoint.h>
-#include <common/ust-consumer/ust-consumer.h>
-
typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
-typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
- unsigned long *consumed);
-typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
- unsigned long *produced);
+typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream, unsigned long *consumed);
+typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream, unsigned long *produced);
typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
static struct timer_signal_data timer_signal = {
* while consumer_timer_switch_stop() is called. It would result in
* deadlocks.
*/
-static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
- siginfo_t *si)
+static void metadata_switch_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
}
}
-static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
- uint64_t stream_id)
+static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts, uint64_t stream_id)
{
int ret;
struct ctf_packet_index index;
return ret;
}
-static int check_stream(struct lttng_consumer_stream *stream,
- flush_index_cb flush_index)
+static int check_stream(struct lttng_consumer_stream *stream, flush_index_cb flush_index)
{
int ret;
ret = pthread_mutex_trylock(&stream->lock);
switch (ret) {
case 0:
- break; /* We have the lock. */
+ break; /* We have the lock. */
case EBUSY:
pthread_mutex_lock(&stream->metadata_timer_lock);
if (stream->waiting_on_metadata) {
ret = 0;
stream->missed_metadata_flush = true;
pthread_mutex_unlock(&stream->metadata_timer_lock);
- goto end; /* Bail out. */
+ goto end; /* Bail out. */
}
pthread_mutex_unlock(&stream->metadata_timer_lock);
/* Try again. */
/*
* Execute action on a live timer
*/
-static void live_timer(struct lttng_consumer_local_data *ctx,
- siginfo_t *si)
+static void live_timer(struct lttng_consumer_local_data *ctx, siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
- const flush_index_cb flush_index =
- ctx->type == LTTNG_CONSUMER_KERNEL ?
- consumer_flush_kernel_index :
- consumer_flush_ust_index;
+ const flush_index_cb flush_index = ctx->type == LTTNG_CONSUMER_KERNEL ?
+ consumer_flush_kernel_index :
+ consumer_flush_ust_index;
channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
LTTNG_ASSERT(channel);
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
ret = check_stream(stream, flush_index);
if (ret < 0) {
goto error_unlock;
return;
}
-static
-void consumer_timer_signal_thread_qs(unsigned int signr)
+static void consumer_timer_signal_thread_qs(unsigned int signr)
{
sigset_t pending_set;
int ret;
* Returns a negative value on error, 0 if a timer was created, and
* a positive value if no timer was created (not an error).
*/
-static
-int consumer_channel_timer_start(timer_t *timer_id,
- struct lttng_consumer_channel *channel,
- unsigned int timer_interval_us, int signal)
+static int consumer_channel_timer_start(timer_t *timer_id,
+ struct lttng_consumer_channel *channel,
+ unsigned int timer_interval_us,
+ int signal)
{
int ret = 0, delete_ret;
struct sigevent sev = {};
goto end;
}
-static
-int consumer_channel_timer_stop(timer_t *timer_id, int signal)
+static int consumer_channel_timer_stop(timer_t *timer_id, int signal)
{
int ret = 0;
* Set the channel's switch timer.
*/
void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
- unsigned int switch_timer_interval_us)
+ unsigned int switch_timer_interval_us)
{
int ret;
LTTNG_ASSERT(channel);
LTTNG_ASSERT(channel->key);
- ret = consumer_channel_timer_start(&channel->switch_timer, channel,
- switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
+ ret = consumer_channel_timer_start(&channel->switch_timer,
+ channel,
+ switch_timer_interval_us,
+ LTTNG_CONSUMER_SIG_SWITCH);
channel->switch_timer_enabled = !!(ret == 0);
}
LTTNG_ASSERT(channel);
- ret = consumer_channel_timer_stop(&channel->switch_timer,
- LTTNG_CONSUMER_SIG_SWITCH);
+ ret = consumer_channel_timer_stop(&channel->switch_timer, LTTNG_CONSUMER_SIG_SWITCH);
if (ret == -1) {
ERR("Failed to stop switch timer");
}
* Set the channel's live timer.
*/
void consumer_timer_live_start(struct lttng_consumer_channel *channel,
- unsigned int live_timer_interval_us)
+ unsigned int live_timer_interval_us)
{
int ret;
LTTNG_ASSERT(channel);
LTTNG_ASSERT(channel->key);
- ret = consumer_channel_timer_start(&channel->live_timer, channel,
- live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
+ ret = consumer_channel_timer_start(
+ &channel->live_timer, channel, live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
channel->live_timer_enabled = !!(ret == 0);
}
LTTNG_ASSERT(channel);
- ret = consumer_channel_timer_stop(&channel->live_timer,
- LTTNG_CONSUMER_SIG_LIVE);
+ ret = consumer_channel_timer_stop(&channel->live_timer, LTTNG_CONSUMER_SIG_LIVE);
if (ret == -1) {
ERR("Failed to stop live timer");
}
* a positive value if no timer was created (not an error).
*/
int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
- unsigned int monitor_timer_interval_us)
+ unsigned int monitor_timer_interval_us)
{
int ret;
LTTNG_ASSERT(channel->key);
LTTNG_ASSERT(!channel->monitor_timer_enabled);
- ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
- monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
+ ret = consumer_channel_timer_start(&channel->monitor_timer,
+ channel,
+ monitor_timer_interval_us,
+ LTTNG_CONSUMER_SIG_MONITOR);
channel->monitor_timer_enabled = !!(ret == 0);
return ret;
}
LTTNG_ASSERT(channel);
LTTNG_ASSERT(channel->monitor_timer_enabled);
- ret = consumer_channel_timer_stop(&channel->monitor_timer,
- LTTNG_CONSUMER_SIG_MONITOR);
+ ret = consumer_channel_timer_stop(&channel->monitor_timer, LTTNG_CONSUMER_SIG_MONITOR);
if (ret == -1) {
ERR("Failed to stop live timer");
goto end;
return 0;
}
-static
-int sample_channel_positions(struct lttng_consumer_channel *channel,
- uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
- sample_positions_cb sample, get_consumed_cb get_consumed,
- get_produced_cb get_produced)
+static int sample_channel_positions(struct lttng_consumer_channel *channel,
+ uint64_t *_highest_use,
+ uint64_t *_lowest_use,
+ uint64_t *_total_consumed,
+ sample_positions_cb sample,
+ get_consumed_cb get_consumed,
+ get_produced_cb get_produced)
{
int ret = 0;
struct lttng_ht_iter iter;
rcu_read_lock();
cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key,
- &iter.iter, stream, node_channel_id.node) {
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct,
+ &channel->key,
+ &iter.iter,
+ stream,
+ node_channel_id.node)
+ {
unsigned long produced, consumed, usage;
empty_channel = false;
ret = sample(stream);
if (ret) {
- ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
+ ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)",
+ ret);
pthread_mutex_unlock(&stream->lock);
goto end;
}
return ret;
}
-/*
- * Execute action on a monitor timer.
- */
-static
-void monitor_timer(struct lttng_consumer_channel *channel)
+/* Sample and send channel buffering statistics to the session daemon. */
+void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel)
{
int ret;
- int channel_monitor_pipe =
- consumer_timer_thread_get_channel_monitor_pipe();
+ int channel_monitor_pipe = consumer_timer_thread_get_channel_monitor_pipe();
struct lttcomm_consumer_channel_monitor_msg msg = {
.key = channel->key,
+ .session_id = channel->session_id,
+ .lowest = 0,
+ .highest = 0,
+ .consumed_since_last_sample = 0,
};
sample_positions_cb sample;
get_consumed_cb get_consumed;
abort();
}
- ret = sample_channel_positions(channel, &highest, &lowest,
- &total_consumed, sample, get_consumed, get_produced);
+ ret = sample_channel_positions(
+ channel, &highest, &lowest, &total_consumed, sample, get_consumed, get_produced);
if (ret) {
return;
}
+
msg.highest = highest;
msg.lowest = lowest;
- msg.total_consumed = total_consumed;
+ msg.consumed_since_last_sample = total_consumed - channel->last_consumed_size_sample_sent;
/*
* Writes performed here are assumed to be atomic which is only
if (errno == EAGAIN) {
/* Not an error, the sample is merely dropped. */
DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64,
- channel->key);
+ channel->key);
} else {
PERROR("write to the channel monitor pipe");
}
} else {
DBG("Sent channel monitoring sample for channel key %" PRIu64
- ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
- channel->key, msg.highest, msg.lowest);
+ ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
+ channel->key,
+ msg.highest,
+ msg.lowest);
+ channel->last_consumed_size_sample_sent = msg.consumed_since_last_sample;
}
}
struct lttng_consumer_channel *channel;
channel = (lttng_consumer_channel *) info.si_value.sival_ptr;
- monitor_timer(channel);
+ sample_and_send_channel_buffer_stats(channel);
} else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit));
goto end;