/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
*
*
*/
-#include "common/index/ctf-index.h"
#define _LGPL_SOURCE
+#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
+#include <signal.h>
#include <stdlib.h>
#include <string.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <unistd.h>
-#include <inttypes.h>
-#include <signal.h>
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/utils.h>
-#include <common/time.h>
-#include <common/compat/poll.h>
-#include <common/compat/endian.h>
-#include <common/index/index.h>
-#include <common/kernel-ctl/kernel-ctl.h>
-#include <common/sessiond-comm/relayd.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/kernel-consumer/kernel-consumer.h>
-#include <common/relayd/relayd.h>
-#include <common/ust-consumer/ust-consumer.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/consumer/consumer.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-testpoint.h>
-#include <common/align.h>
-#include <common/consumer/consumer-metadata-cache.h>
-#include <common/trace-chunk.h>
-#include <common/trace-chunk-registry.h>
-#include <common/string-utils/format.h>
-#include <common/dynamic-array.h>
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/align.hpp>
+#include <common/common.hpp>
+#include <common/compat/endian.hpp>
+#include <common/compat/poll.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-testpoint.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/dynamic-array.hpp>
+#include <common/index/ctf-index.hpp>
+#include <common/index/index.hpp>
+#include <common/kernel-consumer/kernel-consumer.hpp>
+#include <common/kernel-ctl/kernel-ctl.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/sessiond-comm/relayd.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/string-utils/format.hpp>
+#include <common/time.hpp>
+#include <common/trace-chunk-registry.hpp>
+#include <common/trace-chunk.hpp>
+#include <common/ust-consumer/ust-consumer.hpp>
+#include <common/utils.hpp>
lttng_consumer_global_data the_consumer_data;
CONSUMER_CHANNEL_QUIT,
};
+namespace {
struct consumer_channel_msg {
enum consumer_channel_action action;
struct lttng_consumer_channel *chan; /* add */
uint64_t key; /* del */
};
+/*
+ * Global hash table containing respectively metadata and data streams. The
+ * stream element in this ht should only be updated by the metadata poll thread
+ * for the metadata and the data poll thread for the data.
+ */
+struct lttng_ht *metadata_ht;
+struct lttng_ht *data_ht;
+} /* namespace */
+
/* Flag used to temporarily pause data consumption from testpoints. */
int data_consumption_paused;
*/
int consumer_quit;
-/*
- * Global hash table containing respectively metadata and data streams. The
- * stream element in this ht should only be updated by the metadata poll thread
- * for the metadata and the data poll thread for the data.
- */
-static struct lttng_ht *metadata_ht;
-static struct lttng_ht *data_ht;
-
static const char *get_consumer_domain(void)
{
switch (the_consumer_data.type) {
/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
- cds_list_del(&stream->send_node);
/*
* Once a stream is added to this list, the buffers were created so we
* have a guarantee that this call will succeed. Setting the monitor
lttng_ht_lookup(ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
- stream = caa_container_of(node, struct lttng_consumer_stream, node);
+ stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
}
rcu_read_unlock();
struct lttng_ht_node_u64 *node;
struct lttng_consumer_channel *channel = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
/* -1ULL keys are lookup failures */
if (key == (uint64_t) -1ULL) {
return NULL;
lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
- channel = caa_container_of(node, struct lttng_consumer_channel, node);
+ channel = lttng::utils::container_of(node, <tng_consumer_channel::node);
}
return channel;
static void free_channel_rcu(struct rcu_head *head)
{
struct lttng_ht_node_u64 *node =
- caa_container_of(head, struct lttng_ht_node_u64, head);
+ lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct lttng_consumer_channel *channel =
- caa_container_of(node, struct lttng_consumer_channel, node);
+ lttng::utils::container_of(node, <tng_consumer_channel::node);
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
static void free_relayd_rcu(struct rcu_head *head)
{
struct lttng_ht_node_u64 *node =
- caa_container_of(head, struct lttng_ht_node_u64, head);
+ lttng::utils::container_of(head, <tng_ht_node_u64::head);
struct consumer_relayd_sock_pair *relayd =
- caa_container_of(node, struct consumer_relayd_sock_pair, node);
+ lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
/*
* Close all sockets. This is done in the call RCU since we don't want the
consumer_timer_monitor_stop(channel);
}
+ /*
+ * Send a last buffer statistics sample to the session daemon
+ * to ensure it tracks the amount of data consumed by this channel.
+ */
+ sample_and_send_channel_buffer_stats(channel);
+
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
break;
void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd)
{
LTTNG_ASSERT(relayd);
+ ASSERT_RCU_READ_LOCKED();
/* Set destroy flag for this object */
uatomic_set(&relayd->destroy_flag, 1);
struct lttng_ht_iter iter;
LTTNG_ASSERT(relayd);
+ ASSERT_RCU_READ_LOCKED();
lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx,
&iter);
goto error;
}
- obj = (consumer_relayd_sock_pair *) zmalloc(sizeof(struct consumer_relayd_sock_pair));
+ obj = zmalloc<consumer_relayd_sock_pair>();
if (obj == NULL) {
PERROR("zmalloc relayd sock");
goto error;
struct lttng_ht_node_u64 *node;
struct consumer_relayd_sock_pair *relayd = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
/* Negative keys are lookup failures */
if (key == (uint64_t) -1ULL) {
goto error;
lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
if (node != NULL) {
- relayd = caa_container_of(node, struct consumer_relayd_sock_pair, node);
+ relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
}
error:
}
}
- channel = (lttng_consumer_channel *) zmalloc(sizeof(*channel));
+ channel = zmalloc<lttng_consumer_channel>();
if (channel == NULL) {
PERROR("malloc struct lttng_consumer_channel");
goto end;
the_consumer_data.type == type);
the_consumer_data.type = type;
- ctx = (lttng_consumer_local_data *) zmalloc(sizeof(struct lttng_consumer_local_data));
+ ctx = zmalloc<lttng_consumer_local_data>();
if (ctx == NULL) {
PERROR("allocating context");
goto error;
consumer_stream_delete(stream, ht);
/* Close down everything including the relayd if one. */
- consumer_stream_close(stream);
+ consumer_stream_close_output(stream);
/* Destroy tracer buffers of the stream. */
consumer_stream_destroy_buffers(stream);
health_code_update();
- local_stream = (lttng_consumer_stream **) zmalloc(sizeof(struct lttng_consumer_stream *));
+ local_stream = zmalloc<lttng_consumer_stream *>();
if (local_stream == NULL) {
PERROR("local_stream malloc");
goto end;
local_stream = NULL;
/* Allocate for all fds */
- pollfd = (struct pollfd *) zmalloc((the_consumer_data.stream_count +
- nb_pipes_fd) *
- sizeof(struct pollfd));
+ pollfd = calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
if (pollfd == NULL) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
}
- local_stream = (lttng_consumer_stream **) zmalloc((the_consumer_data.stream_count +
- nb_pipes_fd) *
- sizeof(struct lttng_consumer_stream *));
+ local_stream = calloc<lttng_consumer_stream *>(the_consumer_data.stream_count + nb_pipes_fd);
if (local_stream == NULL) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
}
}
}
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
} else if (len > 0) {
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
}
}
}
pollfd[i].fd);
lttng_ustconsumer_on_stream_hangup(local_stream[i]);
/* Attempt read again, for the data we just flushed. */
- local_stream[i]->data_read = 1;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 1;
}
/*
+ * When a stream's pipe dies (hup/err/nval), an "inactive producer" flush is
+ * performed. This type of flush ensures that a new packet is produced no
+ * matter the consumed/produced positions are.
+ *
+ * This, in turn, causes the next pass to see that data available for the
+ * stream. When we come back here, we can be assured that all available
+ * data has been consumed and we can finally destroy the stream.
+ *
* If the poll flag is HUP/ERR/NVAL and we have
* read no data in this pass, we can remove the
* stream from its hash table.
*/
if ((pollfd[i].revents & POLLHUP)) {
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
- if (!local_stream[i]->data_read) {
+ if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
local_stream[i] = NULL;
num_hup++;
}
}
if (local_stream[i] != NULL) {
- local_stream[i]->data_read = 0;
+ local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
}
}
}
*/
if (stream->rotate_ready) {
DBG("Rotate stream before consuming data");
- ret = lttng_consumer_rotate_stream(ctx, stream);
+ ret = lttng_consumer_rotate_stream(stream);
if (ret < 0) {
ERR("Stream rotation error before consuming data");
goto end;
*/
rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
if (rotation_ret == 1) {
- rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
+ rotation_ret = lttng_consumer_rotate_stream(stream);
if (rotation_ret < 0) {
ret = rotation_ret;
ERR("Stream rotation error after consuming data");
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
- void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
- struct lttng_consumer_local_data *ctx, int sock,
+void consumer_add_relayd_socket(uint64_t net_seq_idx,
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
- uint64_t relayd_session_id)
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
LTTNG_ASSERT(ctx);
- LTTNG_ASSERT(relayd_sock);
+ LTTNG_ASSERT(sock >= 0);
+ ASSERT_RCU_READ_LOCKED();
DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
- ret = lttcomm_create_sock(&relayd->control_sock.sock);
- /* Handle create_sock error. */
- if (ret < 0) {
- ret_code = LTTCOMM_CONSUMERD_ENOMEM;
- goto error;
- }
- /*
- * Close the socket created internally by
- * lttcomm_create_sock, so we can replace it by the one
- * received from sessiond.
- */
- if (close(relayd->control_sock.sock.fd)) {
- PERROR("close");
- }
+ ret = lttcomm_populate_sock_from_open_socket(
+ &relayd->control_sock.sock, fd,
+ relayd_socket_protocol);
- /* Assign new file descriptor */
- relayd->control_sock.sock.fd = fd;
/* Assign version values. */
- relayd->control_sock.major = relayd_sock->major;
- relayd->control_sock.minor = relayd_sock->minor;
+ relayd->control_sock.major = relayd_version_major;
+ relayd->control_sock.minor = relayd_version_minor;
relayd->relayd_session_id = relayd_session_id;
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
- ret = lttcomm_create_sock(&relayd->data_sock.sock);
- /* Handle create_sock error. */
- if (ret < 0) {
- ret_code = LTTCOMM_CONSUMERD_ENOMEM;
- goto error;
- }
- /*
- * Close the socket created internally by
- * lttcomm_create_sock, so we can replace it by the one
- * received from sessiond.
- */
- if (close(relayd->data_sock.sock.fd)) {
- PERROR("close");
- }
-
- /* Assign new file descriptor */
- relayd->data_sock.sock.fd = fd;
+ ret = lttcomm_populate_sock_from_open_socket(
+ &relayd->data_sock.sock, fd,
+ relayd_socket_protocol);
/* Assign version values. */
- relayd->data_sock.major = relayd_sock->major;
- relayd->data_sock.minor = relayd_sock->minor;
+ relayd->data_sock.major = relayd_version_major;
+ relayd->data_sock.minor = relayd_version_minor;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
goto error;
}
+ if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
struct lttng_ht_iter iter;
struct consumer_relayd_sock_pair *relayd = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
/* Iterate over all relayd since they are indexed by net_seq_idx. */
cds_lfht_for_each_entry(the_consumer_data.relayd_ht->ht, &iter.iter,
relayd, node.node) {
* Returns 0 on success, < 0 on error
*/
int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
- uint64_t key, uint64_t relayd_id, uint32_t metadata,
- struct lttng_consumer_local_data *ctx)
+ uint64_t key, uint64_t relayd_id)
{
int ret;
struct lttng_consumer_stream *stream;
struct lttng_dynamic_pointer_array streams_packet_to_open;
size_t stream_idx;
+ ASSERT_RCU_READ_LOCKED();
+
DBG("Consumer sample rotate position for channel %" PRIu64, key);
lttng_dynamic_array_init(&stream_rotation_positions,
* Perform the rotation a local stream file.
*/
static
-int rotate_local_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int rotate_local_stream(struct lttng_consumer_stream *stream)
{
int ret = 0;
*
* Return 0 on success, a negative number of error.
*/
-int lttng_consumer_rotate_stream(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream)
+int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream)
{
int ret;
}
if (stream->net_seq_idx == (uint64_t) -1ULL) {
- ret = rotate_local_stream(ctx, stream);
+ ret = rotate_local_stream(stream);
if (ret < 0) {
ERR("Failed to rotate stream, ret = %i", ret);
goto error;
* Returns 0 on success, < 0 on error
*/
int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
- uint64_t key, struct lttng_consumer_local_data *ctx)
+ uint64_t key)
{
int ret;
struct lttng_consumer_stream *stream;
struct lttng_ht_iter iter;
struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
+ ASSERT_RCU_READ_LOCKED();
+
rcu_read_lock();
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
}
DBG("Consumer rotate ready stream %" PRIu64, stream->key);
- ret = lttng_consumer_rotate_stream(ctx, stream);
+ ret = lttng_consumer_rotate_stream(stream);
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
if (ret) {
enum lttcomm_return_code lttng_consumer_init_command(
struct lttng_consumer_local_data *ctx,
- const lttng_uuid sessiond_uuid)
+ const lttng_uuid& sessiond_uuid)
{
enum lttcomm_return_code ret;
char uuid_str[LTTNG_UUID_STR_LEN];
}
ctx->sessiond_uuid.is_set = true;
- memcpy(ctx->sessiond_uuid.value, sessiond_uuid, sizeof(lttng_uuid));
+ ctx->sessiond_uuid.value = sessiond_uuid;
ret = LTTCOMM_CONSUMERD_SUCCESS;
lttng_uuid_to_str(sessiond_uuid, uuid_str);
DBG("Received session daemon UUID: %s", uuid_str);