/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2017 Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
#include <stdbool.h>
#include <stdint.h>
-#include <bin/lttng-consumerd/health-consumerd.h>
-#include <common/common.h>
-#include <common/sessiond-comm/sessiond-comm.h>
-#include <common/relayd/relayd.h>
-#include <common/compat/fcntl.h>
-#include <common/compat/endian.h>
-#include <common/consumer/consumer-metadata-cache.h>
-#include <common/consumer/consumer-stream.h>
-#include <common/consumer/consumer-timer.h>
-#include <common/utils.h>
-#include <common/index/index.h>
-#include <common/consumer/consumer.h>
-#include <common/shm.h>
-#include <common/optional.h>
-
-#include "ust-consumer.h"
+#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <common/common.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/compat/endian.hpp>
+#include <common/consumer/consumer-metadata-cache.hpp>
+#include <common/consumer/consumer-stream.hpp>
+#include <common/consumer/consumer-timer.hpp>
+#include <common/utils.hpp>
+#include <common/index/index.hpp>
+#include <common/consumer/consumer.hpp>
+#include <common/shm.hpp>
+#include <common/optional.hpp>
+
+#include "ust-consumer.hpp"
#define INT_MAX_STR_LEN 12 /* includes \0 */
LTTNG_EXPORT DEFINE_LTTNG_UST_SIGBUS_STATE();
-/*
- * Free channel object and all streams associated with it. This MUST be used
- * only and only if the channel has _NEVER_ been added to the global channel
- * hash table.
- */
-static void destroy_channel(struct lttng_consumer_channel *channel)
-{
- struct lttng_consumer_stream *stream, *stmp;
-
- LTTNG_ASSERT(channel);
-
- DBG("UST consumer cleaning stream list");
-
- cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
- send_node) {
-
- health_code_update();
-
- cds_list_del(&stream->send_node);
- lttng_ust_ctl_destroy_stream(stream->ustream);
- lttng_trace_chunk_put(stream->trace_chunk);
- free(stream);
- }
-
- /*
- * If a channel is available meaning that was created before the streams
- * were, delete it.
- */
- if (channel->uchan) {
- lttng_ustconsumer_del_channel(channel);
- lttng_ustconsumer_free_channel(channel);
- }
-
- if (channel->trace_chunk) {
- lttng_trace_chunk_put(channel->trace_chunk);
- }
-
- free(channel);
-}
-
/*
* Add channel to internal consumer state.
*
* global.
*/
stream->globally_visible = 1;
- cds_list_del(&stream->send_node);
+ cds_list_del_init(&stream->send_node);
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
nr_stream_fds = 1;
else
nr_stream_fds = lttng_ust_ctl_get_nr_stream_per_channel();
- stream_fds = (int *) zmalloc(nr_stream_fds * sizeof(*stream_fds));
+ stream_fds = calloc<int>(nr_stream_fds);
if (!stream_fds) {
ret = -1;
goto error_alloc;
int ret;
struct lttng_consumer_channel *metadata;
+ ASSERT_RCU_READ_LOCKED();
+
DBG("UST consumer setup metadata key %" PRIu64, key);
metadata = consumer_find_channel(key);
* will make sure to clean that list.
*/
consumer_stream_destroy(metadata->metadata_stream, NULL);
- cds_list_del(&metadata->metadata_stream->send_node);
metadata->metadata_stream = NULL;
send_streams_error:
error_no_stream:
LTTNG_ASSERT(path);
LTTNG_ASSERT(ctx);
+ ASSERT_RCU_READ_LOCKED();
DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
key, path);
metadata_stream = metadata_channel->metadata_stream;
LTTNG_ASSERT(metadata_stream);
- pthread_mutex_lock(&metadata_stream->lock);
+ metadata_stream->read_subbuffer_ops.lock(metadata_stream);
if (relayd_id != (uint64_t) -1ULL) {
metadata_stream->net_seq_idx = relayd_id;
ret = consumer_send_relayd_stream(metadata_stream, path);
ret = consumer_stream_create_output_files(metadata_stream,
false);
}
- pthread_mutex_unlock(&metadata_stream->lock);
if (ret < 0) {
goto error_stream;
}
do {
health_code_update();
-
ret = lttng_consumer_read_subbuffer(metadata_stream, ctx, true);
if (ret < 0) {
goto error_stream;
} while (ret > 0);
error_stream:
+ metadata_stream->read_subbuffer_ops.unlock(metadata_stream);
/*
- * Clean up the stream completly because the next snapshot will use a new
- * metadata stream.
+ * Clean up the stream completely because the next snapshot will use a
+ * new metadata stream.
*/
consumer_stream_destroy(metadata_stream, NULL);
- cds_list_del(&metadata_stream->send_node);
metadata_channel->metadata_stream = NULL;
error:
LTTNG_ASSERT(path);
LTTNG_ASSERT(ctx);
+ ASSERT_RCU_READ_LOCKED();
rcu_read_lock();
DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
- metadata_str = (char *) zmalloc(len * sizeof(char));
+ metadata_str = calloc<char>(len);
if (!metadata_str) {
PERROR("zmalloc metadata string");
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
* metadata position to ensure the metadata poll thread consumes
* the whole cache.
*/
- pthread_mutex_lock(&channel->metadata_stream->lock);
- metadata_stream_reset_cache_consumed_position(
- channel->metadata_stream);
- pthread_mutex_unlock(&channel->metadata_stream->lock);
+
+ /*
+ * channel::metadata_stream can be null when the metadata
+ * channel is under a snapshot session type. No need to update
+ * the stream position in that scenario.
+ */
+ if (channel->metadata_stream != NULL) {
+ pthread_mutex_lock(&channel->metadata_stream->lock);
+ metadata_stream_reset_cache_consumed_position(
+ channel->metadata_stream);
+ pthread_mutex_unlock(&channel->metadata_stream->lock);
+ } else {
+ /* Validate we are in snapshot mode. */
+ LTTNG_ASSERT(!channel->monitor);
+ }
/* Fall-through. */
case CONSUMER_METADATA_CACHE_WRITE_STATUS_APPENDED_CONTENT:
/*
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
+ uint32_t major = msg.u.relayd_sock.major;
+ uint32_t minor = msg.u.relayd_sock.minor;
+ enum lttcomm_sock_proto protocol =
+ (enum lttcomm_sock_proto) msg.u.relayd_sock
+ .relayd_socket_protocol;
+
/* Session daemon status message are handled in the following call. */
consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
- msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
- msg.u.relayd_sock.relayd_session_id);
+ msg.u.relayd_sock.type, ctx, sock,
+ consumer_sockpoll, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id, major,
+ minor, protocol);
goto end_nosignal;
}
case LTTNG_CONSUMER_DESTROY_RELAYD:
*/
rotate_channel = lttng_consumer_rotate_channel(
found_channel, key,
- msg.u.rotate_channel.relayd_id,
- msg.u.rotate_channel.metadata, ctx);
+ msg.u.rotate_channel.relayd_id);
if (rotate_channel < 0) {
ERR("Rotate channel failed");
ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
ret_rotate_read_streams =
lttng_consumer_rotate_ready_streams(
- found_channel, key,
- ctx);
+ found_channel, key);
if (ret_rotate_read_streams < 0) {
ERR("Rotate channel failed");
}
case LTTNG_CONSUMER_INIT:
{
int ret_send_status;
+ lttng_uuid sessiond_uuid;
- ret_code = lttng_consumer_init_command(ctx,
- msg.u.init.sessiond_uuid);
+ std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid),
+ sessiond_uuid.begin());
+ ret_code = lttng_consumer_init_command(ctx, sessiond_uuid);
health_code_update();
ret_send_status = consumer_send_status_msg(sock, ret_code);
if (ret_send_status < 0) {
end_channel_error:
if (channel) {
- /*
- * Free channel here since no one has a reference to it. We don't
- * free after that because a stream can store this pointer.
- */
- destroy_channel(channel);
+ consumer_del_channel(channel);
}
/* We have to send a status channel message indicating an error. */
{
* if needed), the stream is "quiescent" after this commit.
*/
if (lttng_ust_ctl_flush_buffer(stream->ustream, 1)) {
- ERR("Failed to flush buffer while commiting one metadata packet");
+ ERR("Failed to flush buffer while committing one metadata packet");
ret = -EIO;
} else {
stream->quiescent = true;
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(metadata_stream);
+ ASSERT_RCU_READ_LOCKED();
metadata_channel = metadata_stream->chan;
pthread_mutex_unlock(&metadata_stream->lock);
}
static int put_next_subbuffer(struct lttng_consumer_stream *stream,
- struct stream_subbuffer *subbuffer)
+ struct stream_subbuffer *subbuffer __attribute__((unused)))
{
const int ret = lttng_ust_ctl_put_next_subbuf(stream->ustream);
}
static int signal_metadata(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx)
+ struct lttng_consumer_local_data *ctx __attribute__((unused)))
{
ASSERT_LOCKED(stream->metadata_rdv_lock);
return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;