#include <common/dynamic-array.hpp>
#include <common/index/ctf-index.hpp>
#include <common/index/index.hpp>
+#include <common/io-hint.hpp>
#include <common/kernel-consumer/kernel-consumer.hpp>
#include <common/kernel-ctl/kernel-ctl.hpp>
#include <common/relayd/relayd.hpp>
#include <common/time.hpp>
#include <common/trace-chunk-registry.hpp>
#include <common/trace-chunk.hpp>
+#include <common/urcu.hpp>
#include <common/ust-consumer/ust-consumer.hpp>
#include <common/utils.hpp>
#include <bin/lttng-consumerd/health-consumerd.hpp>
+#include <fcntl.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
#include <sys/mman.h>
#include <sys/socket.h>
#include <sys/types.h>
+#include <type_traits>
#include <unistd.h>
lttng_consumer_global_data the_consumer_data;
*/
int consumer_quit;
-static const char *get_consumer_domain(void)
+static const char *get_consumer_domain()
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
*/
static void notify_thread_lttng_pipe(struct lttng_pipe *pipe)
{
- struct lttng_consumer_stream *null_stream = NULL;
+ struct lttng_consumer_stream *null_stream = nullptr;
LTTNG_ASSERT(pipe);
- (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream));
+ (void) lttng_pipe_write(pipe, &null_stream, sizeof(null_stream)); /* NOLINT sizeof used on a
+ pointer. */
}
static void notify_health_quit_pipe(int *pipe)
void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key)
{
- notify_channel_pipe(ctx, NULL, key, CONSUMER_CHANNEL_DEL);
+ notify_channel_pipe(ctx, nullptr, key, CONSUMER_CHANNEL_DEL);
}
static int read_channel_pipe(struct lttng_consumer_local_data *ctx,
* global hash table.
*/
stream->monitor = 0;
- consumer_stream_destroy(stream, NULL);
+ consumer_stream_destroy(stream, nullptr);
}
}
{
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
- struct lttng_consumer_stream *stream = NULL;
+ struct lttng_consumer_stream *stream = nullptr;
LTTNG_ASSERT(ht);
/* -1ULL keys are lookup failures */
if (key == (uint64_t) -1ULL) {
- return NULL;
+ return nullptr;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_lookup(ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
+ if (node != nullptr) {
stream = lttng::utils::container_of(node, <tng_consumer_stream::node);
}
- rcu_read_unlock();
-
return stream;
}
{
struct lttng_consumer_stream *stream;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
stream = find_stream(key, ht);
if (stream) {
stream->key = (uint64_t) -1ULL;
*/
stream->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
/*
{
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
- struct lttng_consumer_channel *channel = NULL;
+ struct lttng_consumer_channel *channel = nullptr;
ASSERT_RCU_READ_LOCKED();
/* -1ULL keys are lookup failures */
if (key == (uint64_t) -1ULL) {
- return NULL;
+ return nullptr;
}
lttng_ht_lookup(the_consumer_data.channel_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
+ if (node != nullptr) {
channel = lttng::utils::container_of(node, <tng_consumer_channel::node);
}
{
struct lttng_consumer_channel *channel;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
channel = consumer_find_channel(key);
if (channel) {
channel->key = (uint64_t) -1ULL;
*/
channel->node.key = (uint64_t) -1ULL;
}
- rcu_read_unlock();
}
static void free_channel_rcu(struct rcu_head *head)
ERR("Unknown consumer_data type");
abort();
}
- free(channel);
+
+ delete channel;
}
/*
int ret;
struct lttng_ht_iter iter;
- if (relayd == NULL) {
+ if (relayd == nullptr) {
return;
}
}
lttng_trace_chunk_put(channel->trace_chunk);
- channel->trace_chunk = NULL;
+ channel->trace_chunk = nullptr;
if (channel->is_published) {
int ret;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
iter.iter.node = &channel->node.node;
ret = lttng_ht_del(the_consumer_data.channel_ht, &iter);
LTTNG_ASSERT(!ret);
iter.iter.node = &channel->channels_by_session_id_ht_node.node;
ret = lttng_ht_del(the_consumer_data.channels_by_session_id_ht, &iter);
LTTNG_ASSERT(!ret);
- rcu_read_unlock();
}
channel->is_deleted = true;
* Iterate over the relayd hash table and destroy each element. Finally,
* destroy the whole hash table.
*/
-static void cleanup_relayd_ht(void)
+static void cleanup_relayd_ht()
{
struct lttng_ht_iter iter;
struct consumer_relayd_sock_pair *relayd;
- rcu_read_lock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
- consumer_destroy_relayd(relayd);
+ cds_lfht_for_each_entry (
+ the_consumer_data.relayd_ht->ht, &iter.iter, relayd, node.node) {
+ consumer_destroy_relayd(relayd);
+ }
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.relayd_ht);
}
DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Let's begin with metadata */
cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
+ stream->chan->metadata_pushed_wait_queue.wake_all();
+
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
}
DBG("Delete flag set to data stream %d", stream->wait_fd);
}
}
- rcu_read_unlock();
}
/*
pthread_mutex_lock(&stream->chan->lock);
pthread_mutex_lock(&stream->chan->timer_lock);
pthread_mutex_lock(&stream->lock);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Steal stream identifier to avoid having streams with the same key */
steal_stream_key(stream->key, ht);
the_consumer_data.stream_count++;
the_consumer_data.need_update = 1;
- rcu_read_unlock();
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
pthread_mutex_unlock(&stream->chan->lock);
lttng_ht_lookup(the_consumer_data.relayd_ht, &relayd->net_seq_idx, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
+ if (node != nullptr) {
goto end;
}
lttng_ht_add_unique_u64(the_consumer_data.relayd_ht, &relayd->node);
*/
static struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(uint64_t net_seq_idx)
{
- struct consumer_relayd_sock_pair *obj = NULL;
+ struct consumer_relayd_sock_pair *obj = nullptr;
/* net sequence index of -1 is a failure */
if (net_seq_idx == (uint64_t) -1ULL) {
}
obj = zmalloc<consumer_relayd_sock_pair>();
- if (obj == NULL) {
+ if (obj == nullptr) {
PERROR("zmalloc relayd sock");
goto error;
}
obj->control_sock.sock.fd = -1;
obj->data_sock.sock.fd = -1;
lttng_ht_node_init_u64(&obj->node, obj->net_seq_idx);
- pthread_mutex_init(&obj->ctrl_sock_mutex, NULL);
+ pthread_mutex_init(&obj->ctrl_sock_mutex, nullptr);
error:
return obj;
{
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
ASSERT_RCU_READ_LOCKED();
lttng_ht_lookup(the_consumer_data.relayd_ht, &key, &iter);
node = lttng_ht_iter_get_node_u64(&iter);
- if (node != NULL) {
+ if (node != nullptr) {
relayd = lttng::utils::container_of(node, &consumer_relayd_sock_pair::node);
}
LTTNG_ASSERT(path);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd != NULL) {
+ if (relayd != nullptr) {
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock,
stream->net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
LTTNG_ASSERT(net_seq_idx != -1ULL);
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(net_seq_idx);
- if (relayd != NULL) {
+ if (relayd != nullptr) {
/* Add stream on the relayd */
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_streams_sent(&relayd->control_sock);
DBG("All streams sent relayd id %" PRIu64, net_seq_idx);
end:
- rcu_read_unlock();
return ret;
}
struct consumer_relayd_sock_pair *relayd;
/* The stream is not metadata. Get relayd reference if exists. */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd) {
consumer_stream_relayd_close(stream, relayd);
}
- rcu_read_unlock();
}
/*
const char *root_shm_path,
const char *shm_path)
{
- struct lttng_consumer_channel *channel = NULL;
- struct lttng_trace_chunk *trace_chunk = NULL;
+ struct lttng_consumer_channel *channel = nullptr;
+ struct lttng_trace_chunk *trace_chunk = nullptr;
if (chunk_id) {
trace_chunk = lttng_trace_chunk_registry_find_chunk(
}
}
- channel = zmalloc<lttng_consumer_channel>();
- if (channel == NULL) {
- PERROR("malloc struct lttng_consumer_channel");
+ try {
+ channel = new lttng_consumer_channel;
+ } catch (const std::bad_alloc& e) {
+ ERR("Failed to allocate lttng_consumer_channel: %s", e.what());
+ channel = nullptr;
goto end;
}
break;
default:
abort();
- free(channel);
- channel = NULL;
+ delete channel;
+ channel = nullptr;
goto end;
}
return channel;
error:
consumer_del_channel(channel);
- channel = NULL;
+ channel = nullptr;
goto end;
}
*/
steal_channel_key(channel->key);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(the_consumer_data.channel_ht, &channel->node);
lttng_ht_add_u64(the_consumer_data.channels_by_session_id_ht,
&channel->channels_by_session_id_ht_node);
- rcu_read_unlock();
channel->is_published = true;
pthread_mutex_unlock(&channel->timer_lock);
DBG("Updating poll fd array");
*nb_inactive_fd = 0;
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Only active streams with an active end point can be added to the
- * poll set and local stream storage of the thread.
- *
- * There is a potential race here for endpoint_status to be updated
- * just after the check. However, this is OK since the stream(s) will
- * be deleted once the thread is notified that the end point state has
- * changed where this function will be called back again.
- *
- * We track the number of inactive FDs because they still need to be
- * closed by the polling thread after a wakeup on the data_pipe or
- * metadata_pipe.
- */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
- (*nb_inactive_fd)++;
- continue;
+
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Only active streams with an active end point can be added to the
+ * poll set and local stream storage of the thread.
+ *
+ * There is a potential race here for endpoint_status to be updated
+ * just after the check. However, this is OK since the stream(s) will
+ * be deleted once the thread is notified that the end point state has
+ * changed where this function will be called back again.
+ *
+ * We track the number of inactive FDs because they still need to be
+ * closed by the polling thread after a wakeup on the data_pipe or
+ * metadata_pipe.
+ */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_INACTIVE) {
+ (*nb_inactive_fd)++;
+ continue;
+ }
+
+ (*pollfd)[i].fd = stream->wait_fd;
+ (*pollfd)[i].events = POLLIN | POLLPRI;
+ local_stream[i] = stream;
+ i++;
}
- /*
- * This clobbers way too much the debug output. Uncomment that if you
- * need it for debugging purposes.
- */
- (*pollfd)[i].fd = stream->wait_fd;
- (*pollfd)[i].events = POLLIN | POLLPRI;
- local_stream[i] = stream;
- i++;
}
- rcu_read_unlock();
/*
* Insert the consumer_data_pipe at the end of the array and don't
* Send return code to the session daemon.
* If the socket is not defined, we return 0, it is not a fatal error
*/
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+ enum lttcomm_return_code error_code)
{
if (ctx->consumer_error_socket > 0) {
+ const std::int32_t comm_code = std::int32_t(error_code);
+
+ static_assert(
+ sizeof(comm_code) >= sizeof(std::underlying_type<lttcomm_return_code>),
+ "Fixed-size communication type too small to accomodate lttcomm_return_code");
return lttcomm_send_unix_sock(
- ctx->consumer_error_socket, &cmd, sizeof(enum lttcomm_sessiond_command));
+ ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
}
return 0;
* Close all the tracefiles and stream fds and MUST be called when all
* instances are destroyed i.e. when all threads were joined and are ended.
*/
-void lttng_consumer_cleanup(void)
+void lttng_consumer_cleanup()
{
struct lttng_ht_iter iter;
struct lttng_consumer_channel *channel;
unsigned int trace_chunks_left;
- rcu_read_lock();
+ {
+ lttng::urcu::read_lock_guard read_lock;
- cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- consumer_del_channel(channel);
+ cds_lfht_for_each_entry (
+ the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ consumer_del_channel(channel);
+ }
}
- rcu_read_unlock();
-
lttng_ht_destroy(the_consumer_data.channel_ht);
lttng_ht_destroy(the_consumer_data.channels_by_session_id_ht);
*/
static void lttng_consumer_sync_trace_file(struct lttng_consumer_stream *stream, off_t orig_offset)
{
- int ret;
int outfd = stream->out_fd;
/*
if (orig_offset < stream->max_sb_size) {
return;
}
- lttng_sync_file_range(outfd,
- orig_offset - stream->max_sb_size,
- stream->max_sb_size,
- SYNC_FILE_RANGE_WAIT_BEFORE | SYNC_FILE_RANGE_WRITE |
- SYNC_FILE_RANGE_WAIT_AFTER);
- /*
- * Give hints to the kernel about how we access the file:
- * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
- * we write it.
- *
- * We need to call fadvise again after the file grows because the
- * kernel does not seem to apply fadvise to non-existing parts of the
- * file.
- *
- * Call fadvise _after_ having waited for the page writeback to
- * complete because the dirty page writeback semantic is not well
- * defined. So it can be expected to lead to lower throughput in
- * streaming.
- */
- ret = posix_fadvise(
- outfd, orig_offset - stream->max_sb_size, stream->max_sb_size, POSIX_FADV_DONTNEED);
- if (ret && ret != -ENOSYS) {
- errno = ret;
- PERROR("posix_fadvise on fd %i", outfd);
- }
+ lttng::io::hint_flush_range_dont_need_sync(
+ outfd, orig_offset - stream->max_sb_size, stream->max_sb_size);
}
/*
the_consumer_data.type = type;
ctx = zmalloc<lttng_consumer_local_data>();
- if (ctx == NULL) {
+ if (ctx == nullptr) {
PERROR("allocating context");
goto error;
}
ctx->consumer_error_socket = -1;
ctx->consumer_metadata_socket = -1;
- pthread_mutex_init(&ctx->metadata_socket_lock, NULL);
+ pthread_mutex_init(&ctx->metadata_socket_lock, nullptr);
/* assign the callbacks */
ctx->on_buffer_ready = buffer_ready;
ctx->on_recv_channel = recv_channel;
error_poll_pipe:
free(ctx);
error:
- return NULL;
+ return nullptr;
}
/*
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
- if (ht == NULL) {
+ if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_stream(stream, ht);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_stream(stream, ht);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
- if (ht == NULL) {
+ if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
- /*
- * Ignore return value since we are currently cleaning up so any error
- * can't be handled.
- */
- (void) consumer_del_metadata_stream(stream, ht);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, stream, node.node) {
+ /*
+ * Ignore return value since we are currently cleaning up so any error
+ * can't be handled.
+ */
+ (void) consumer_del_metadata_stream(stream, ht);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
off_t orig_offset = stream->out_fd_offset;
/* Default is on the disk */
int outfd = stream->out_fd;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
unsigned int relayd_hang_up = 0;
const size_t subbuf_content_size = buffer->size - padding;
size_t write_len;
/* RCU lock for the relayd pointer */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
LTTNG_ASSERT(stream->net_seq_idx != (uint64_t) -1ULL || stream->trace_chunk);
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
ret = -EPIPE;
goto end;
}
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, write_len, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, write_len);
stream->out_fd_offset += write_len;
lttng_consumer_sync_trace_file(stream, orig_offset);
}
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return ret;
}
int fd = stream->wait_fd;
/* Default is on the disk */
int outfd = stream->out_fd;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
int *splice_pipe;
unsigned int relayd_hang_up = 0;
}
/* RCU lock for the relayd pointer */
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/* Flag that the current stream if set for network streaming. */
if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
written = -ret;
goto end;
}
fd,
splice_pipe[1]);
ret_splice = splice(
- fd, &offset, splice_pipe[1], NULL, len, SPLICE_F_MOVE | SPLICE_F_MORE);
+ fd, &offset, splice_pipe[1], nullptr, len, SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("splice chan to pipe, ret %zd", ret_splice);
if (ret_splice < 0) {
ret = errno;
/* Splice data out */
ret_splice = splice(splice_pipe[0],
- NULL,
+ nullptr,
outfd,
- NULL,
+ nullptr,
ret_splice,
SPLICE_F_MOVE | SPLICE_F_MORE);
DBG("Consumer splice pipe to file (out_fd: %d), ret %zd", outfd, ret_splice);
/* This call is useless on a socket so better save a syscall. */
if (!relayd) {
/* This won't block, but will start writeout asynchronously */
- lttng_sync_file_range(
- outfd, stream->out_fd_offset, ret_splice, SYNC_FILE_RANGE_WRITE);
+ lttng::io::hint_flush_range_async(outfd, stream->out_fd_offset, ret_splice);
stream->out_fd_offset += ret_splice;
}
stream->output_written += ret_splice;
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
}
- rcu_read_unlock();
return written;
}
}
}
-static void lttng_consumer_close_all_metadata(void)
+static void lttng_consumer_close_all_metadata()
{
switch (the_consumer_data.type) {
case LTTNG_CONSUMER_KERNEL:
*/
void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht)
{
- struct lttng_consumer_channel *channel = NULL;
+ struct lttng_consumer_channel *channel = nullptr;
bool free_channel = false;
LTTNG_ASSERT(stream);
/* Go for channel deletion! */
free_channel = true;
}
- stream->chan = NULL;
+ stream->chan = nullptr;
/*
* Nullify the stream reference so it is not used after deletion. The
* channel lock MUST be acquired before being able to check for a NULL
* pointer value.
*/
- channel->metadata_stream = NULL;
+ channel->metadata_stream = nullptr;
+ channel->metadata_pushed_wait_queue.wake_all();
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);
}
lttng_trace_chunk_put(stream->trace_chunk);
- stream->trace_chunk = NULL;
+ stream->trace_chunk = nullptr;
consumer_stream_free(stream);
}
* after this point.
*/
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
/*
* Lookup the stream just to make sure it does not exist in our internal
*/
lttng_ht_add_u64(the_consumer_data.stream_list_ht, &stream->node_session_id);
- rcu_read_unlock();
-
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&stream->chan->lock);
pthread_mutex_unlock(&stream->chan->timer_lock);
/*
* Delete data stream that are flagged for deletion (endpoint_status).
*/
-static void validate_endpoint_status_data_stream(void)
+static void validate_endpoint_status_data_stream()
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
DBG("Consumer delete flagged data stream");
- rcu_read_lock();
- cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (data_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
+ }
+ /* Delete it right now */
+ consumer_del_stream(stream, data_ht);
}
- /* Delete it right now */
- consumer_del_stream(stream, data_ht);
}
- rcu_read_unlock();
}
/*
LTTNG_ASSERT(pollset);
- rcu_read_lock();
- cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
- /* Validate delete flag of the stream */
- if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
- continue;
- }
- /*
- * Remove from pollset so the metadata thread can continue without
- * blocking on a deleted stream.
- */
- lttng_poll_del(pollset, stream->wait_fd);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (metadata_ht->ht, &iter.iter, stream, node.node) {
+ /* Validate delete flag of the stream */
+ if (stream->endpoint_status == CONSUMER_ENDPOINT_ACTIVE) {
+ continue;
+ }
+ /*
+ * Remove from pollset so the metadata thread can continue without
+ * blocking on a deleted stream.
+ */
+ lttng_poll_del(pollset, stream->wait_fd);
- /* Delete it right now */
- consumer_del_metadata_stream(stream, metadata_ht);
+ /* Delete it right now */
+ consumer_del_metadata_stream(stream, metadata_ht);
+ }
}
- rcu_read_unlock();
}
/*
{
int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
- struct lttng_consumer_stream *stream = NULL;
+ struct lttng_consumer_stream *stream = nullptr;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
struct lttng_poll_event events;
/* Main loop */
DBG("Metadata main loop started");
- while (1) {
+ while (true) {
restart:
health_code_update();
health_poll_entry();
pipe_len = lttng_pipe_read(ctx->consumer_metadata_pipe,
&stream,
- sizeof(stream));
- if (pipe_len < sizeof(stream)) {
+ sizeof(stream)); /* NOLINT sizeof
+ used on a
+ pointer. */
+ if (pipe_len < sizeof(stream)) { /* NOLINT sizeof used on a
+ pointer. */
if (pipe_len < 0) {
PERROR("read metadata stream");
}
}
/* A NULL stream means that the state has changed. */
- if (stream == NULL) {
+ if (stream == nullptr) {
/* Check for deleted streams. */
validate_endpoint_status_metadata_stream(&events);
goto restart;
continue;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
consumer_del_metadata_stream(stream, metadata_ht);
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the stream looked up */
- rcu_read_unlock();
}
}
}
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
/*
*/
void *consumer_thread_data_poll(void *data)
{
- int num_rdy, num_hup, high_prio, ret, i, err = -1;
- struct pollfd *pollfd = NULL;
+ int num_rdy, high_prio, ret, i, err = -1;
+ struct pollfd *pollfd = nullptr;
/* local view of the streams */
- struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
+ struct lttng_consumer_stream **local_stream = nullptr, *new_stream = nullptr;
/* local view of consumer_data.fds_count */
int nb_fd = 0;
/* 2 for the consumer_data_pipe and wake up pipe */
health_code_update();
local_stream = zmalloc<lttng_consumer_stream *>();
- if (local_stream == NULL) {
+ if (local_stream == nullptr) {
PERROR("local_stream malloc");
goto end;
}
- while (1) {
+ while (true) {
health_code_update();
high_prio = 0;
- num_hup = 0;
/*
* the fds set has been updated, we need to update our
pthread_mutex_lock(&the_consumer_data.lock);
if (the_consumer_data.need_update) {
free(pollfd);
- pollfd = NULL;
+ pollfd = nullptr;
free(local_stream);
- local_stream = NULL;
+ local_stream = nullptr;
/* Allocate for all fds */
pollfd =
calloc<struct pollfd>(the_consumer_data.stream_count + nb_pipes_fd);
- if (pollfd == NULL) {
+ if (pollfd == nullptr) {
PERROR("pollfd malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
local_stream = calloc<lttng_consumer_stream *>(
the_consumer_data.stream_count + nb_pipes_fd);
- if (local_stream == NULL) {
+ if (local_stream == nullptr) {
PERROR("local_stream malloc");
pthread_mutex_unlock(&the_consumer_data.lock);
goto end;
ssize_t pipe_readlen;
DBG("consumer_data_pipe wake up");
- pipe_readlen = lttng_pipe_read(
- ctx->consumer_data_pipe, &new_stream, sizeof(new_stream));
- if (pipe_readlen < sizeof(new_stream)) {
+ pipe_readlen = lttng_pipe_read(ctx->consumer_data_pipe,
+ &new_stream,
+ sizeof(new_stream)); /* NOLINT sizeof used on
+ a pointer. */
+ if (pipe_readlen < sizeof(new_stream)) { /* NOLINT sizeof used on a pointer.
+ */
PERROR("Consumer data pipe");
/* Continue so we can at least handle the current stream(s). */
continue;
* the sessiond poll thread changed the consumer_quit state and is
* waking us up to test it.
*/
- if (new_stream == NULL) {
+ if (new_stream == nullptr) {
validate_endpoint_status_data_stream();
continue;
}
for (i = 0; i < nb_fd; i++) {
health_code_update();
- if (local_stream[i] == NULL) {
+ if (local_stream[i] == nullptr) {
continue;
}
if (pollfd[i].revents & POLLPRI) {
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
+ local_stream[i] = nullptr;
} else if (len > 0) {
local_stream[i]->has_data_left_to_be_read_before_teardown =
1;
for (i = 0; i < nb_fd; i++) {
health_code_update();
- if (local_stream[i] == NULL) {
+ if (local_stream[i] == nullptr) {
continue;
}
if ((pollfd[i].revents & POLLIN) || local_stream[i]->hangup_flush_done ||
if (len < 0 && len != -EAGAIN && len != -ENODATA) {
/* Clean the stream and free it. */
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
+ local_stream[i] = nullptr;
} else if (len > 0) {
local_stream[i]->has_data_left_to_be_read_before_teardown =
1;
for (i = 0; i < nb_fd; i++) {
health_code_update();
- if (local_stream[i] == NULL) {
+ if (local_stream[i] == nullptr) {
continue;
}
if (!local_stream[i]->hangup_flush_done &&
DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
- num_hup++;
+ local_stream[i] = nullptr;
}
} else if (pollfd[i].revents & POLLERR) {
ERR("Error returned in polling fd %d.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
- num_hup++;
+ local_stream[i] = nullptr;
}
} else if (pollfd[i].revents & POLLNVAL) {
ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
if (!local_stream[i]->has_data_left_to_be_read_before_teardown) {
consumer_del_stream(local_stream[i], data_ht);
- local_stream[i] = NULL;
- num_hup++;
+ local_stream[i] = nullptr;
}
}
- if (local_stream[i] != NULL) {
+ if (local_stream[i] != nullptr) {
local_stream[i]->has_data_left_to_be_read_before_teardown = 0;
}
}
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
/*
ht = the_consumer_data.stream_per_chan_id_ht;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct,
next:
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
}
static void destroy_channel_ht(struct lttng_ht *ht)
struct lttng_consumer_channel *channel;
int ret;
- if (ht == NULL) {
+ if (ht == nullptr) {
return;
}
- rcu_read_lock();
- cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
- ret = lttng_ht_del(ht, &iter);
- LTTNG_ASSERT(ret != 0);
+ {
+ lttng::urcu::read_lock_guard read_lock;
+
+ cds_lfht_for_each_entry (ht->ht, &iter.iter, channel, wait_fd_node.node) {
+ ret = lttng_ht_del(ht, &iter);
+ LTTNG_ASSERT(ret != 0);
+ }
}
- rcu_read_unlock();
lttng_ht_destroy(ht);
}
{
int ret, i, pollfd, err = -1;
uint32_t revents, nb_fd;
- struct lttng_consumer_channel *chan = NULL;
+ struct lttng_consumer_channel *chan = nullptr;
struct lttng_ht_iter iter;
struct lttng_ht_node_u64 *node;
struct lttng_poll_event events;
/* Main loop */
DBG("Channel main loop started");
- while (1) {
+ while (true) {
restart:
health_code_update();
DBG("Channel poll wait");
switch (action) {
case CONSUMER_CHANNEL_ADD:
+ {
DBG("Adding channel %d to poll set", chan->wait_fd);
lttng_ht_node_init_u64(&chan->wait_fd_node,
chan->wait_fd);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
lttng_ht_add_unique_u64(channel_ht,
&chan->wait_fd_node);
- rcu_read_unlock();
/* Add channel to the global poll events list */
// FIXME: Empty flag on a pipe pollset, this might
// hang on FreeBSD.
lttng_poll_add(&events, chan->wait_fd, 0);
break;
+ }
case CONSUMER_CHANNEL_DEL:
{
/*
* GET_CHANNEL failed.
*/
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
chan = consumer_find_channel(key);
if (!chan) {
- rcu_read_unlock();
ERR("UST consumer get channel key %" PRIu64
" not found for del channel",
key);
if (!uatomic_sub_return(&chan->refcount, 1)) {
consumer_del_channel(chan);
}
- rcu_read_unlock();
goto restart;
}
case CONSUMER_CHANNEL_QUIT:
continue;
}
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
{
uint64_t tmp_id = (uint64_t) pollfd;
}
} else {
ERR("Unexpected poll events %u for sock %d", revents, pollfd);
- rcu_read_unlock();
goto end;
}
/* Release RCU lock for the channel looked up */
- rcu_read_unlock();
}
}
}
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
static int set_metadata_socket(struct lttng_consumer_local_data *ctx,
consumer_sockpoll[1].fd = sock;
consumer_sockpoll[1].events = POLLIN | POLLPRI;
- while (1) {
+ while (true) {
health_code_update();
health_poll_entry();
*/
notify_thread_lttng_pipe(ctx->consumer_data_pipe);
- notify_channel_pipe(ctx, NULL, -1, CONSUMER_CHANNEL_QUIT);
+ notify_channel_pipe(ctx, nullptr, -1, CONSUMER_CHANNEL_QUIT);
notify_health_quit_pipe(health_quit_pipe);
health_unregister(health_consumerd);
rcu_unregister_thread();
- return NULL;
+ return nullptr;
}
static int post_consume(struct lttng_consumer_stream *stream,
/*
* Allocate and set consumer data hash tables.
*/
-int lttng_consumer_init(void)
+int lttng_consumer_init()
{
the_consumer_data.channel_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!the_consumer_data.channel_ht) {
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
LTTNG_ASSERT(ctx);
LTTNG_ASSERT(sock >= 0);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
LTTNG_ASSERT(sock_type == LTTNG_STREAM_CONTROL);
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
- if (relayd == NULL) {
+ if (relayd == nullptr) {
ret_code = LTTCOMM_CONSUMERD_ENOMEM;
goto error;
} else {
static struct consumer_relayd_sock_pair *find_relayd_by_session_id(uint64_t id)
{
struct lttng_ht_iter iter;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
ASSERT_RCU_READ_LOCKED();
}
}
- return NULL;
+ return nullptr;
found:
return relayd;
struct lttng_ht_iter iter;
struct lttng_ht *ht;
struct lttng_consumer_stream *stream;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
int (*data_pending)(struct lttng_consumer_stream *);
DBG("Consumer data pending command on session id %" PRIu64, id);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&the_consumer_data.lock);
switch (the_consumer_data.type) {
data_not_pending:
/* Data is available to be read by a viewer. */
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 0;
data_pending:
/* Data is still being extracted from buffers. */
pthread_mutex_unlock(&the_consumer_data.lock);
- rcu_read_unlock();
return 1;
}
uint64_t next_chunk_id, stream_count = 0;
enum lttng_trace_chunk_status chunk_status;
const bool is_local_trace = relayd_id == -1ULL;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
bool rotating_to_new_chunk = true;
/* Array of `struct lttng_consumer_stream *` */
struct lttng_dynamic_pointer_array streams_packet_to_open;
DBG("Consumer sample rotate position for channel %" PRIu64, key);
- lttng_dynamic_array_init(
- &stream_rotation_positions, sizeof(struct relayd_stream_rotation_position), NULL);
- lttng_dynamic_pointer_array_init(&streams_packet_to_open, NULL);
+ lttng_dynamic_array_init(&stream_rotation_positions,
+ sizeof(struct relayd_stream_rotation_position),
+ nullptr);
+ lttng_dynamic_pointer_array_init(&streams_packet_to_open, nullptr);
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
LTTNG_ASSERT(channel->trace_chunk);
chunk_status = lttng_trace_chunk_get_name(
stream->trace_chunk,
&trace_chunk_name,
- NULL);
+ nullptr);
if (chunk_status == LTTNG_TRACE_CHUNK_STATUS_NONE) {
trace_chunk_name = "none";
}
pthread_mutex_unlock(&stream->lock);
}
- stream = NULL;
+ stream = nullptr;
if (!is_local_trace) {
relayd = consumer_find_relayd(relayd_id);
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_rotate_streams(&relayd->control_sock,
stream_count,
- rotating_to_new_chunk ? &next_chunk_id : NULL,
+ rotating_to_new_chunk ? &next_chunk_id : nullptr,
(const struct relayd_stream_rotation_position *)
stream_rotation_positions.buffer.data);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
end_unlock_channel:
pthread_mutex_unlock(&channel->lock);
end:
- rcu_read_unlock();
lttng_dynamic_array_reset(&stream_rotation_positions);
lttng_dynamic_pointer_array_reset(&streams_packet_to_open);
return ret;
{
int ret;
- ret = consumer_stream_flush_buffer(stream, 1);
+ ret = consumer_stream_flush_buffer(stream, true);
if (ret < 0) {
ERR("Failed to flush stream %" PRIu64 " during channel clear", stream->key);
ret = LTTCOMM_CONSUMERD_FATAL;
int ret;
struct lttng_consumer_stream *stream;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
pthread_mutex_lock(&channel->lock);
cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
health_code_update();
pthread_mutex_unlock(&stream->lock);
}
pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
return 0;
error_unlock:
pthread_mutex_unlock(&stream->lock);
pthread_mutex_unlock(&channel->lock);
- rcu_read_unlock();
return ret;
}
if (stream->index_file) {
lttng_index_file_put(stream->index_file);
- stream->index_file = NULL;
+ stream->index_file = nullptr;
}
if (!stream->trace_chunk) {
* parent channel, becomes part of no chunk and can't output
* anything until a new trace chunk is created.
*/
- stream->trace_chunk = NULL;
+ stream->trace_chunk = nullptr;
} else if (stream->chan->trace_chunk && !lttng_trace_chunk_get(stream->chan->trace_chunk)) {
ERR("Failed to acquire a reference to channel's trace chunk during stream rotation");
ret = -1;
ASSERT_RCU_READ_LOCKED();
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
DBG("Consumer rotate ready streams in channel %" PRIu64, key);
ret = 0;
end:
- rcu_read_unlock();
return ret;
}
{
int ret;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
- struct lttng_trace_chunk *created_chunk = NULL, *published_chunk = NULL;
+ struct lttng_trace_chunk *created_chunk = nullptr, *published_chunk = nullptr;
enum lttng_trace_chunk_status chunk_status;
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
char creation_timestamp_buffer[ISO8601_STR_LEN];
* the session daemon through the LTTNG_CONSUMER_CREATE_TRACE_CHUNK
* and LTTNG_CONSUMER_DESTROY_TRACE_CHUNK commands.
*/
- created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, NULL);
+ created_chunk = lttng_trace_chunk_create(chunk_id, chunk_creation_timestamp, nullptr);
if (!created_chunk) {
ERR("Failed to create trace chunk");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
* directory.
*/
chunk_status = lttng_trace_chunk_set_as_user(created_chunk, chunk_directory_handle);
- chunk_directory_handle = NULL;
+ chunk_directory_handle = nullptr;
if (chunk_status != LTTNG_TRACE_CHUNK_STATUS_OK) {
ERR("Failed to set trace chunk's directory handle");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
published_chunk = lttng_trace_chunk_registry_publish_chunk(
the_consumer_data.chunk_registry, session_id, created_chunk);
lttng_trace_chunk_put(created_chunk);
- created_chunk = NULL;
+ created_chunk = nullptr;
if (!published_chunk) {
ERR("Failed to publish trace chunk");
ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
goto error;
}
- rcu_read_lock();
- cds_lfht_for_each_entry_duplicate(
- the_consumer_data.channels_by_session_id_ht->ht,
- the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id, lttng_ht_seed),
- the_consumer_data.channels_by_session_id_ht->match_fct,
- &session_id,
- &iter.iter,
- channel,
- channels_by_session_id_ht_node.node)
{
- ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
- if (ret) {
- /*
- * Roll-back the creation of this chunk.
- *
- * This is important since the session daemon will
- * assume that the creation of this chunk failed and
- * will never ask for it to be closed, resulting
- * in a leak and an inconsistent state for some
- * channels.
- */
- enum lttcomm_return_code close_ret;
- char path[LTTNG_PATH_MAX];
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry_duplicate(
+ the_consumer_data.channels_by_session_id_ht->ht,
+ the_consumer_data.channels_by_session_id_ht->hash_fct(&session_id,
+ lttng_ht_seed),
+ the_consumer_data.channels_by_session_id_ht->match_fct,
+ &session_id,
+ &iter.iter,
+ channel,
+ channels_by_session_id_ht_node.node)
+ {
+ ret = lttng_consumer_channel_set_trace_chunk(channel, published_chunk);
+ if (ret) {
+ /*
+ * Roll-back the creation of this chunk.
+ *
+ * This is important since the session daemon will
+ * assume that the creation of this chunk failed and
+ * will never ask for it to be closed, resulting
+ * in a leak and an inconsistent state for some
+ * channels.
+ */
+ enum lttcomm_return_code close_ret;
+ char path[LTTNG_PATH_MAX];
+
+ DBG("Failed to set new trace chunk on existing channels, rolling back");
+ close_ret =
+ lttng_consumer_close_trace_chunk(relayd_id,
+ session_id,
+ chunk_id,
+ chunk_creation_timestamp,
+ nullptr,
+ path);
+ if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
+ ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
+ ", chunk_id = %" PRIu64,
+ session_id,
+ chunk_id);
+ }
- DBG("Failed to set new trace chunk on existing channels, rolling back");
- close_ret = lttng_consumer_close_trace_chunk(relayd_id,
- session_id,
- chunk_id,
- chunk_creation_timestamp,
- NULL,
- path);
- if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
- ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
- ", chunk_id = %" PRIu64,
- session_id,
- chunk_id);
+ ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
+ break;
}
-
- ret_code = LTTCOMM_CONSUMERD_CREATE_TRACE_CHUNK_FAILED;
- break;
}
}
session_id,
chunk_id,
chunk_creation_timestamp,
- NULL,
+ nullptr,
path);
if (close_ret != LTTCOMM_CONSUMERD_SUCCESS) {
ERR("Failed to roll-back the creation of new chunk: session_id = %" PRIu64
}
}
error_unlock:
- rcu_read_unlock();
error:
/* Release the reference returned by the "publish" operation. */
lttng_trace_chunk_put(published_chunk);
* it; it is only kept around to compare it (by address) to the
* current chunk found in the session's channels.
*/
- rcu_read_lock();
- cds_lfht_for_each_entry (the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
- int ret;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_lfht_for_each_entry (
+ the_consumer_data.channel_ht->ht, &iter.iter, channel, node.node) {
+ int ret;
- /*
- * Only change the channel's chunk to NULL if it still
- * references the chunk being closed. The channel may
- * reference a newer channel in the case of a session
- * rotation. When a session rotation occurs, the "next"
- * chunk is created before the "current" chunk is closed.
- */
- if (channel->trace_chunk != chunk) {
- continue;
- }
- ret = lttng_consumer_channel_set_trace_chunk(channel, NULL);
- if (ret) {
/*
- * Attempt to close the chunk on as many channels as
- * possible.
+ * Only change the channel's chunk to NULL if it still
+ * references the chunk being closed. The channel may
+ * reference a newer channel in the case of a session
+ * rotation. When a session rotation occurs, the "next"
+ * chunk is created before the "current" chunk is closed.
*/
- ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ if (channel->trace_chunk != chunk) {
+ continue;
+ }
+ ret = lttng_consumer_channel_set_trace_chunk(channel, nullptr);
+ if (ret) {
+ /*
+ * Attempt to close the chunk on as many channels as
+ * possible.
+ */
+ ret_code = LTTCOMM_CONSUMERD_CLOSE_TRACE_CHUNK_FAILED;
+ }
}
}
-
if (relayd_id) {
int ret;
struct consumer_relayd_sock_pair *relayd;
}
}
error_unlock:
- rcu_read_unlock();
end:
/*
* Release the reference returned by the "find" operation and
char relayd_id_buffer[MAX_INT_DEC_LEN(*relayd_id)];
const char *relayd_id_str = "(none)";
const bool is_local_trace = !relayd_id;
- struct consumer_relayd_sock_pair *relayd = NULL;
+ struct consumer_relayd_sock_pair *relayd = nullptr;
bool chunk_exists_local, chunk_exists_remote;
+ lttng::urcu::read_lock_guard read_lock;
if (relayd_id) {
/* Only used for logging purposes. */
goto end;
}
- rcu_read_lock();
relayd = consumer_find_relayd(*relayd_id);
if (!relayd) {
ERR("Failed to find relayd %" PRIu64, *relayd_id);
DBG("Trace chunk %s on relay daemon", chunk_exists_remote ? "exists" : "does not exist");
end_rcu_unlock:
- rcu_read_unlock();
end:
return ret_code;
}
ht = the_consumer_data.stream_per_chan_id_ht;
- rcu_read_lock();
+ lttng::urcu::read_lock_guard read_lock;
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed),
ht->match_fct,
next:
pthread_mutex_unlock(&stream->lock);
}
- rcu_read_unlock();
return LTTCOMM_CONSUMERD_SUCCESS;
error_unlock:
pthread_mutex_unlock(&stream->lock);
- rcu_read_unlock();
return ret;
}
goto end;
}
- rcu_read_lock();
- cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
- enum consumer_stream_open_packet_status status;
+ {
+ lttng::urcu::read_lock_guard read_lock;
+ cds_list_for_each_entry (stream, &channel->streams.head, send_node) {
+ enum consumer_stream_open_packet_status status;
- pthread_mutex_lock(&stream->lock);
- if (cds_lfht_is_node_deleted(&stream->node.node)) {
- goto next;
- }
+ pthread_mutex_lock(&stream->lock);
+ if (cds_lfht_is_node_deleted(&stream->node.node)) {
+ goto next;
+ }
- status = consumer_stream_open_packet(stream);
- switch (status) {
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
- DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- stream->opened_packet_in_current_trace_chunk = true;
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
- DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
- ", channel name = %s, session id = %" PRIu64,
- stream->key,
- stream->chan->name,
- stream->chan->session_id);
- break;
- case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
- /*
- * Only unexpected internal errors can lead to this
- * failing. Report an unknown error.
- */
- ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
- ", channel id = %" PRIu64 ", channel name = %s"
- ", session id = %" PRIu64,
- stream->key,
- channel->key,
- channel->name,
- channel->session_id);
- ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
- goto error_unlock;
- default:
- abort();
- }
+ status = consumer_stream_open_packet(stream);
+ switch (status) {
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_OPENED:
+ DBG("Opened a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ stream->opened_packet_in_current_trace_chunk = true;
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_NO_SPACE:
+ DBG("No space left to open a packet in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel name = %s, session id = %" PRIu64,
+ stream->key,
+ stream->chan->name,
+ stream->chan->session_id);
+ break;
+ case CONSUMER_STREAM_OPEN_PACKET_STATUS_ERROR:
+ /*
+ * Only unexpected internal errors can lead to this
+ * failing. Report an unknown error.
+ */
+ ERR("Failed to flush empty buffer in \"open channel packets\" command: stream id = %" PRIu64
+ ", channel id = %" PRIu64 ", channel name = %s"
+ ", session id = %" PRIu64,
+ stream->key,
+ channel->key,
+ channel->name,
+ channel->session_id);
+ ret = LTTCOMM_CONSUMERD_UNKNOWN_ERROR;
+ goto error_unlock;
+ default:
+ abort();
+ }
- next:
- pthread_mutex_unlock(&stream->lock);
+ next:
+ pthread_mutex_unlock(&stream->lock);
+ }
}
-
end_rcu_unlock:
- rcu_read_unlock();
end:
return ret;