/*
- * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
- * David Goulet <dgoulet@efficios.com>
+ * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
+ * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
*
- * This program is free software; you can redistribute it and/or modify it
- * under the terms of the GNU General Public License, version 2 only, as
- * published by the Free Software Foundation.
+ * SPDX-License-Identifier: GPL-2.0-only
*
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
- * more details.
- *
- * You should have received a copy of the GNU General Public License along with
- * this program; if not, write to the Free Software Foundation, Inc., 51
- * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
#define _LGPL_SOURCE
-#include <assert.h>
#include <inttypes.h>
#include <signal.h>
-#include <bin/lttng-sessiond/ust-ctl.h>
#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/compat/endian.h>
unsigned long *consumed);
typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
unsigned long *produced);
+typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
static struct timer_signal_data timer_signal = {
.tid = 0,
}
}
-static int channel_monitor_pipe = -1;
+static int the_channel_monitor_pipe = -1;
/*
* Execute action on a timer switch.
* deadlocks.
*/
static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si)
+ siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
channel = si->si_value.sival_ptr;
- assert(channel);
+ LTTNG_ASSERT(channel);
if (channel->switch_timer_error) {
return;
break;
case LTTNG_CONSUMER_KERNEL:
case LTTNG_CONSUMER_UNKNOWN:
- assert(0);
+ abort();
break;
}
}
return ret;
}
-static int check_kernel_stream(struct lttng_consumer_stream *stream)
+static int check_stream(struct lttng_consumer_stream *stream,
+ flush_index_cb flush_index)
{
int ret;
}
break;
}
- ret = consumer_flush_kernel_index(stream);
+ ret = flush_index(stream);
pthread_mutex_unlock(&stream->lock);
end:
return ret;
ERR("Failed to get the current timestamp");
goto end;
}
- lttng_ustconsumer_flush_buffer(stream, 1);
+ ret = lttng_ustconsumer_flush_buffer(stream, 1);
+ if (ret < 0) {
+ ERR("Failed to flush buffer while flushing index");
+ goto end;
+ }
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
if (ret != -EAGAIN) {
}
ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
if (ret < 0) {
- PERROR("ustctl_get_stream_id");
+ PERROR("lttng_ust_ctl_get_stream_id");
goto end;
}
DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
return ret;
}
-static int check_ust_stream(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
- assert(stream->ustream);
- /*
- * While holding the stream mutex, try to take a snapshot, if it
- * succeeds, it means that data is ready to be sent, just let the data
- * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
- * means that there is no data to read after the flush, so we can
- * safely send the empty index.
- *
- * Doing a trylock and checking if waiting on metadata if
- * trylock fails. Bail out of the stream is indeed waiting for
- * metadata to be pushed. Busy wait on trylock otherwise.
- */
- for (;;) {
- ret = pthread_mutex_trylock(&stream->lock);
- switch (ret) {
- case 0:
- break; /* We have the lock. */
- case EBUSY:
- pthread_mutex_lock(&stream->metadata_timer_lock);
- if (stream->waiting_on_metadata) {
- ret = 0;
- stream->missed_metadata_flush = true;
- pthread_mutex_unlock(&stream->metadata_timer_lock);
- goto end; /* Bail out. */
- }
- pthread_mutex_unlock(&stream->metadata_timer_lock);
- /* Try again. */
- caa_cpu_relax();
- continue;
- default:
- ERR("Unexpected pthread_mutex_trylock error %d", ret);
- ret = -1;
- goto end;
- }
- break;
- }
- ret = consumer_flush_ust_index(stream);
- pthread_mutex_unlock(&stream->lock);
-end:
- return ret;
-}
-
/*
* Execute action on a live timer
*/
static void live_timer(struct lttng_consumer_local_data *ctx,
- int sig, siginfo_t *si)
+ siginfo_t *si)
{
int ret;
struct lttng_consumer_channel *channel;
struct lttng_consumer_stream *stream;
- struct lttng_ht *ht;
struct lttng_ht_iter iter;
+ const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ const flush_index_cb flush_index =
+ ctx->type == LTTNG_CONSUMER_KERNEL ?
+ consumer_flush_kernel_index :
+ consumer_flush_ust_index;
channel = si->si_value.sival_ptr;
- assert(channel);
+ LTTNG_ASSERT(channel);
if (channel->switch_timer_error) {
goto error;
}
- ht = consumer_data.stream_per_chan_id_ht;
DBG("Live timer for channel %" PRIu64, channel->key);
rcu_read_lock();
- switch (ctx->type) {
- case LTTNG_CONSUMER32_UST:
- case LTTNG_CONSUMER64_UST:
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
- ret = check_ust_stream(stream);
- if (ret < 0) {
- goto error_unlock;
- }
- }
- break;
- case LTTNG_CONSUMER_KERNEL:
- cds_lfht_for_each_entry_duplicate(ht->ht,
- ht->hash_fct(&channel->key, lttng_ht_seed),
- ht->match_fct, &channel->key, &iter.iter,
- stream, node_channel_id.node) {
- ret = check_kernel_stream(stream);
- if (ret < 0) {
- goto error_unlock;
- }
+ cds_lfht_for_each_entry_duplicate(ht->ht,
+ ht->hash_fct(&channel->key, lttng_ht_seed),
+ ht->match_fct, &channel->key, &iter.iter,
+ stream, node_channel_id.node) {
+ ret = check_stream(stream, flush_index);
+ if (ret < 0) {
+ goto error_unlock;
}
- break;
- case LTTNG_CONSUMER_UNKNOWN:
- assert(0);
- break;
}
error_unlock:
unsigned int timer_interval_us, int signal)
{
int ret = 0, delete_ret;
- struct sigevent sev;
+ struct sigevent sev = {};
struct itimerspec its;
- assert(channel);
- assert(channel->key);
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
if (timer_interval_us == 0) {
/* No creation needed; not an error. */
{
int ret;
- assert(channel);
- assert(channel->key);
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
ret = consumer_channel_timer_start(&channel->switch_timer, channel,
switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
{
int ret;
- assert(channel);
+ LTTNG_ASSERT(channel);
ret = consumer_channel_timer_stop(&channel->switch_timer,
LTTNG_CONSUMER_SIG_SWITCH);
{
int ret;
- assert(channel);
- assert(channel->key);
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
ret = consumer_channel_timer_start(&channel->live_timer, channel,
live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
{
int ret;
- assert(channel);
+ LTTNG_ASSERT(channel);
ret = consumer_channel_timer_stop(&channel->live_timer,
LTTNG_CONSUMER_SIG_LIVE);
{
int ret;
- assert(channel);
- assert(channel->key);
- assert(!channel->monitor_timer_enabled);
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->key);
+ LTTNG_ASSERT(!channel->monitor_timer_enabled);
ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
{
int ret;
- assert(channel);
- assert(channel->monitor_timer_enabled);
+ LTTNG_ASSERT(channel);
+ LTTNG_ASSERT(channel->monitor_timer_enabled);
ret = consumer_channel_timer_stop(&channel->monitor_timer,
LTTNG_CONSUMER_SIG_MONITOR);
sample_positions_cb sample, get_consumed_cb get_consumed,
get_produced_cb get_produced)
{
- int ret;
+ int ret = 0;
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
bool empty_channel = true;
uint64_t high = 0, low = UINT64_MAX;
- struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
+ struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
*_total_consumed = 0;
* Execute action on a monitor timer.
*/
static
-void monitor_timer(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel)
+void monitor_timer(struct lttng_consumer_channel *channel)
{
int ret;
int channel_monitor_pipe =
sample_positions_cb sample;
get_consumed_cb get_consumed;
get_produced_cb get_produced;
+ uint64_t lowest = 0, highest = 0, total_consumed = 0;
- assert(channel);
+ LTTNG_ASSERT(channel);
if (channel_monitor_pipe < 0) {
return;
}
- switch (consumer_data.type) {
+ switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
sample = lttng_kconsumer_sample_snapshot_positions;
get_consumed = lttng_kconsumer_get_consumed_snapshot;
abort();
}
- ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
- &msg.total_consumed, sample, get_consumed, get_produced);
+ ret = sample_channel_positions(channel, &highest, &lowest,
+ &total_consumed, sample, get_consumed, get_produced);
if (ret) {
return;
}
+ msg.highest = highest;
+ msg.lowest = lowest;
+ msg.total_consumed = total_consumed;
/*
* Writes performed here are assumed to be atomic which is only
* guaranteed for sizes < than PIPE_BUF.
*/
- assert(sizeof(msg) <= PIPE_BUF);
+ LTTNG_ASSERT(sizeof(msg) <= PIPE_BUF);
do {
ret = write(channel_monitor_pipe, &msg, sizeof(msg));
int consumer_timer_thread_get_channel_monitor_pipe(void)
{
- return uatomic_read(&channel_monitor_pipe);
+ return uatomic_read(&the_channel_monitor_pipe);
}
int consumer_timer_thread_set_channel_monitor_pipe(int fd)
{
int ret;
- ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
+ ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
if (ret != -1) {
ret = -1;
goto end;
}
continue;
} else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
- metadata_switch_timer(ctx, info.si_signo, &info);
+ metadata_switch_timer(ctx, &info);
} else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
cmm_smp_mb();
CMM_STORE_SHARED(timer_signal.qs_done, 1);
cmm_smp_mb();
DBG("Signal timer metadata thread teardown");
} else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
- live_timer(ctx, info.si_signo, &info);
+ live_timer(ctx, &info);
} else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
struct lttng_consumer_channel *channel;
channel = info.si_value.sival_ptr;
- monitor_timer(ctx, channel);
+ monitor_timer(channel);
} else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
- assert(CMM_LOAD_SHARED(consumer_quit));
+ LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit));
goto end;
} else {
ERR("Unexpected signal %d\n", info.si_signo);