#include <unistd.h>
#include <inttypes.h>
-#include <common/common.h>
-#include <common/defaults.h>
-#include <common/uri.h>
-#include <common/relayd/relayd.h>
-#include <common/string-utils/format.h>
-
-#include "consumer.h"
-#include "health-sessiond.h"
-#include "ust-app.h"
-#include "utils.h"
-#include "lttng-sessiond.h"
+#include <common/common.hpp>
+#include <common/defaults.hpp>
+#include <common/uri.hpp>
+#include <common/relayd/relayd.hpp>
+#include <common/string-utils/format.hpp>
+
+#include "consumer.hpp"
+#include "health-sessiond.hpp"
+#include "ust-app.hpp"
+#include "utils.hpp"
+#include "lttng-sessiond.hpp"
/*
* Return allocated full pathname of the session using the consumer trace path
* Allocate the string ourself to make sure we never exceed
* LTTNG_PATH_MAX.
*/
- pathname = (char *) zmalloc(LTTNG_PATH_MAX);
+ pathname = calloc<char>(LTTNG_PATH_MAX);
if (!pathname) {
goto error;
}
int consumer_fd;
struct consumer_socket *socket = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
switch (bits) {
case 64:
consumer_fd = uatomic_read(&the_ust_consumerd64_fd);
struct lttng_ht_node_ulong *node;
struct consumer_socket *socket = NULL;
+ ASSERT_RCU_READ_LOCKED();
+
/* Negative keys are lookup failures */
if (key < 0 || consumer == NULL) {
return NULL;
LTTNG_ASSERT(fd);
- socket = (consumer_socket *) zmalloc(sizeof(struct consumer_socket));
+ socket = zmalloc<consumer_socket>();
if (socket == NULL) {
PERROR("zmalloc consumer socket");
goto error;
{
LTTNG_ASSERT(sock);
LTTNG_ASSERT(consumer);
+ ASSERT_RCU_READ_LOCKED();
lttng_ht_add_unique_ulong(consumer->socks, &sock->node);
}
LTTNG_ASSERT(sock);
LTTNG_ASSERT(consumer);
+ ASSERT_RCU_READ_LOCKED();
iter.iter.node = &sock->node.node;
ret = lttng_ht_del(consumer->socks, &iter);
}
/*
- * Destroy and free socket pointer in a call RCU. Read side lock must be
- * acquired before calling this function.
+ * Destroy and free socket pointer in a call RCU. The call must either:
+ * - have acquired the read side lock before calling this function, or
+ * - guarantee the validity of the `struct consumer_socket` object for the
+ * duration of the call.
*/
void consumer_destroy_socket(struct consumer_socket *sock)
{
{
struct consumer_output *output = NULL;
- output = (consumer_output *) zmalloc(sizeof(struct consumer_output));
+ output = zmalloc<consumer_output>();
if (output == NULL) {
PERROR("zmalloc consumer_output");
goto error;
/*
* Delete the consumer_output object from the list and free the ptr.
- *
- * Should *NOT* be called with RCU read-side lock held.
*/
static void consumer_release_output(struct urcu_ref *ref)
{
if (obj->socks) {
/* Finally destroy HT */
- ht_cleanup_push(obj->socks);
+ lttng_ht_destroy(obj->socks);
}
free(obj);
/*
* Put the consumer_output object.
- *
- * Should *NOT* be called with RCU read-side lock held.
*/
void consumer_output_put(struct consumer_output *obj)
{
/*
* Copy consumer output and returned the newly allocated copy.
- *
- * Should *NOT* be called with RCU read-side lock held.
*/
struct consumer_output *consumer_copy_output(struct consumer_output *src)
{
const char *name,
uint64_t relayd_id,
uint64_t key,
- unsigned char *uuid,
+ const lttng_uuid& uuid,
uint32_t chan_id,
uint64_t tracefile_size,
uint64_t tracefile_count,
msg->u.ask_channel.ust_app_uid = ust_app_uid;
msg->u.ask_channel.blocking_timeout = blocking_timeout;
- memcpy(msg->u.ask_channel.uuid, uuid, sizeof(msg->u.ask_channel.uuid));
+ std::copy(uuid.begin(), uuid.end(), msg->u.ask_channel.uuid);
if (pathname) {
strncpy(msg->u.ask_channel.pathname, pathname,
uint64_t channel_key,
uint64_t session_id,
const char *pathname,
- uid_t uid,
- gid_t gid,
uint64_t relayd_id,
const char *name,
unsigned int nb_init_streams,
msg.u.relayd_sock.net_index = consumer->net_seq_index;
msg.u.relayd_sock.type = type;
msg.u.relayd_sock.session_id = session_id;
- memcpy(&msg.u.relayd_sock.sock, rsock, sizeof(msg.u.relayd_sock.sock));
+ msg.u.relayd_sock.major = rsock->major;
+ msg.u.relayd_sock.minor = rsock->minor;
+ msg.u.relayd_sock.relayd_socket_protocol = rsock->sock.proto;
DBG3("Sending relayd sock info to consumer on %d", *consumer_sock->fd_ptr);
ret = consumer_send_msg(consumer_sock, &msg);
struct lttcomm_consumer_msg msg;
LTTNG_ASSERT(socket);
+ ASSERT_RCU_READ_LOCKED();
DBG2("Consumer push metadata to consumer socket %d", *socket->fd_ptr);
*/
enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
uint64_t key, const struct consumer_output *output, int metadata,
- uid_t uid, gid_t gid, const char *channel_path, int wait,
+ const char *channel_path,
uint64_t nb_packets_per_stream)
{
int ret;
* chunk each stream is currently writing to (for the rotate_pending operation).
*/
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
- uid_t uid, gid_t gid, struct consumer_output *output,
+ struct consumer_output *output,
bool is_metadata_channel)
{
int ret;
int ret;
lttcomm_consumer_msg msg = {
.cmd_type = LTTNG_CONSUMER_OPEN_CHANNEL_PACKETS,
+ .u = {},
};
msg.u.open_channel_packets.key = key;
}
int consumer_init(struct consumer_socket *socket,
- const lttng_uuid sessiond_uuid)
+ const lttng_uuid& sessiond_uuid)
{
int ret;
struct lttcomm_consumer_msg msg = {
.cmd_type = LTTNG_CONSUMER_INIT,
+ .u = {},
};
LTTNG_ASSERT(socket);
DBG("Sending consumer initialization command");
- lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid);
+ std::copy(sessiond_uuid.begin(), sessiond_uuid.end(), msg.u.init.sessiond_uuid);
health_code_update();
ret = consumer_send_msg(socket, &msg);
enum lttng_trace_chunk_status tc_status;
struct lttcomm_consumer_msg msg = {
.cmd_type = LTTNG_CONSUMER_CREATE_TRACE_CHUNK,
+ .u = {},
};
msg.u.create_trace_chunk.session_id = session_id;
enum lttng_trace_chunk_status chunk_status;
lttcomm_consumer_msg msg = {
.cmd_type = LTTNG_CONSUMER_CLOSE_TRACE_CHUNK,
+ .u = {},
};
msg.u.close_trace_chunk.session_id = session_id;
enum lttng_trace_chunk_status chunk_status;
lttcomm_consumer_msg msg = {
.cmd_type = LTTNG_CONSUMER_TRACE_CHUNK_EXISTS,
+ .u = {},
};
msg.u.trace_chunk_exists.session_id = session_id;