#ifndef LIB_CONSUMER_H
#define LIB_CONSUMER_H
+#include <common/buffer-view.hpp>
+#include <common/compat/fcntl.hpp>
+#include <common/credentials.hpp>
+#include <common/dynamic-array.hpp>
+#include <common/hashtable/hashtable.hpp>
+#include <common/index/ctf-index.hpp>
+#include <common/pipe.hpp>
+#include <common/sessiond-comm/sessiond-comm.hpp>
+#include <common/trace-chunk-registry.hpp>
+#include <common/uuid.hpp>
+
+#include <lttng/lttng.h>
+
#include <limits.h>
#include <poll.h>
#include <stdint.h>
#include <unistd.h>
#include <urcu/list.h>
-#include <lttng/lttng.h>
-
-#include <common/hashtable/hashtable.hpp>
-#include <common/compat/fcntl.hpp>
-#include <common/uuid.hpp>
-#include <common/sessiond-comm/sessiond-comm.hpp>
-#include <common/pipe.hpp>
-#include <common/index/ctf-index.hpp>
-#include <common/trace-chunk-registry.hpp>
-#include <common/credentials.hpp>
-#include <common/buffer-view.hpp>
-#include <common/dynamic-array.hpp>
-
struct lttng_consumer_local_data;
/* Commands for consumer */
/* pause, delete, active depending on fd state */
LTTNG_CONSUMER_UPDATE_STREAM,
/* inform the consumer to quit when all fd has hang up */
- LTTNG_CONSUMER_STOP, /* deprecated */
+ LTTNG_CONSUMER_STOP, /* deprecated */
LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
};
enum consumer_channel_output {
- CONSUMER_CHANNEL_MMAP = 0,
- CONSUMER_CHANNEL_SPLICE = 1,
+ CONSUMER_CHANNEL_MMAP = 0,
+ CONSUMER_CHANNEL_SPLICE = 1,
};
enum consumer_channel_type {
- CONSUMER_CHANNEL_TYPE_METADATA = 0,
- CONSUMER_CHANNEL_TYPE_DATA = 1,
+ CONSUMER_CHANNEL_TYPE_METADATA = 0,
+ CONSUMER_CHANNEL_TYPE_DATA = 1,
};
enum sync_metadata_status {
enum consumer_channel_type type;
/* For UST */
- uid_t ust_app_uid; /* Application UID. */
+ uid_t ust_app_uid; /* Application UID. */
struct lttng_ust_ctl_consumer_channel *uchan;
unsigned char uuid[LTTNG_UUID_STR_LEN];
/*
uint64_t lost_packets;
bool streams_sent_to_relayd;
+ uint64_t last_consumed_size_sample_sent;
};
struct stream_subbuffer {
*
* Stream and channel locks are acquired during this call.
*/
-typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *);
+using on_wake_up_cb = int (*)(struct lttng_consumer_stream *);
/*
* Perform any operation required before a consumer stream is put
*
* Stream and channel locks are acquired during this call.
*/
-typedef int (*on_sleep_cb)(struct lttng_consumer_stream *,
- struct lttng_consumer_local_data *);
+using on_sleep_cb = int (*)(struct lttng_consumer_stream *, struct lttng_consumer_local_data *);
/*
* Acquire the subbuffer at the current 'consumed' position.
*
* Stream and channel locks are acquired during this call.
*/
-typedef enum get_next_subbuffer_status (*get_next_subbuffer_cb)(
- struct lttng_consumer_stream *, struct stream_subbuffer *);
+using get_next_subbuffer_cb = enum get_next_subbuffer_status (*)(struct lttng_consumer_stream *,
+ struct stream_subbuffer *);
/*
* Populate the stream_subbuffer's info member. The info to populate
*
* Stream and channel locks are acquired during this call.
*/
-typedef int (*extract_subbuffer_info_cb)(
- struct lttng_consumer_stream *, struct stream_subbuffer *);
+using extract_subbuffer_info_cb = int (*)(struct lttng_consumer_stream *,
+ struct stream_subbuffer *);
/*
* Invoked after a subbuffer's info has been filled.
*
* Stream and channel locks are acquired during this call.
*/
-typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *,
- const struct stream_subbuffer *);
+using pre_consume_subbuffer_cb = int (*)(struct lttng_consumer_stream *,
+ const struct stream_subbuffer *);
/*
* Consume subbuffer contents.
*
* Stream and channel locks are acquired during this call.
*/
-typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *,
- struct lttng_consumer_stream *,
- const struct stream_subbuffer *);
+using consume_subbuffer_cb = ssize_t (*)(struct lttng_consumer_local_data *,
+ struct lttng_consumer_stream *,
+ const struct stream_subbuffer *);
/*
* Release the current subbuffer and advance the 'consumed' position by
*
* Stream and channel locks are acquired during this call.
*/
-typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *,
- struct stream_subbuffer *);
+using put_next_subbuffer_cb = int (*)(struct lttng_consumer_stream *, struct stream_subbuffer *);
/*
* Invoked after consuming a subbuffer.
*
* Stream and channel locks are acquired during this call.
*/
-typedef int (*post_consume_cb)(struct lttng_consumer_stream *,
- const struct stream_subbuffer *,
- struct lttng_consumer_local_data *);
+using post_consume_cb = int (*)(struct lttng_consumer_stream *,
+ const struct stream_subbuffer *,
+ struct lttng_consumer_local_data *);
/*
* Send a live beacon if no data is available.
*
* Stream and channel locks are acquired during this call.
*/
-typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *);
+using send_live_beacon_cb = int (*)(struct lttng_consumer_stream *);
/*
* Lock the stream and channel locks and any other stream-type specific
* lock that need to be acquired during the processing of an
* availability notification.
*/
-typedef void (*lock_cb)(struct lttng_consumer_stream *);
+using lock_cb = void (*)(struct lttng_consumer_stream *);
/*
* Unlock the stream and channel locks and any other stream-type specific
*
* Stream and channel locks are acquired during this call.
*/
-typedef void (*unlock_cb)(struct lttng_consumer_stream *);
+using unlock_cb = void (*)(struct lttng_consumer_stream *);
/*
* Assert that the stream and channel lock and any other stream type specific
* lock that need to be acquired during the processing of a read_subbuffer
* operation is acquired.
*/
-typedef void (*assert_locked_cb)(struct lttng_consumer_stream *);
+using assert_locked_cb = void (*)(struct lttng_consumer_stream *);
/*
* Invoked when a subbuffer's metadata version does not match the last
*
* Stream and channel locks are acquired during this call.
*/
-typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *);
+using reset_metadata_cb = void (*)(struct lttng_consumer_stream *);
/*
* Internal representation of the streams, sessiond_key is used to identify
* Returns the number of bytes read, or negative error value.
*/
ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller);
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller);
/*
* function to call when we receive a new channel, it receives a
* newly allocated channel, depending on the return code of this
* This is nested OUTSIDE the stream lock.
* This is nested OUTSIDE the consumer_relayd_sock_pair lock.
*/
- pthread_mutex_t lock {};
+ pthread_mutex_t lock{};
/*
* Number of streams in the data stream hash table declared outside.
LTTNG_EXPORT extern int data_consumption_paused;
/* Return a human-readable consumer type string that is suitable for logging. */
-static inline
-const char *lttng_consumer_type_str(enum lttng_consumer_type type)
+static inline const char *lttng_consumer_type_str(enum lttng_consumer_type type)
{
switch (type) {
case LTTNG_CONSUMER_UNKNOWN:
/*
* Init consumer data structures.
*/
-int lttng_consumer_init(void);
+int lttng_consumer_init();
/*
* Set the error socket for communication with a session daemon.
*/
-void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx,
- int sock);
+void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock);
/*
* Set the command socket path for communication with a session daemon.
*/
-void lttng_consumer_set_command_sock_path(
- struct lttng_consumer_local_data *ctx, char *sock);
+void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock);
/*
* Send return code to session daemon.
/*
* Cleanup the daemon's socket on exit.
*/
-void lttng_consumer_cleanup(void);
+void lttng_consumer_cleanup();
/*
* Poll on the should_quit pipe and the command socket return -1 on error and
* Copy the fields from the channel that need to be accessed (read-only)
* directly from the stream.
*/
-void consumer_stream_update_channel_attributes(
- struct lttng_consumer_stream *stream,
- struct lttng_consumer_channel *channel);
-
-struct lttng_consumer_stream *consumer_allocate_stream(
- struct lttng_consumer_channel *channel,
- uint64_t channel_key,
- uint64_t stream_key,
- const char *channel_name,
- uint64_t relayd_id,
- uint64_t session_id,
- struct lttng_trace_chunk *trace_chunk,
- int cpu,
- int *alloc_ret,
- enum consumer_channel_type type,
- unsigned int monitor);
+void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_channel *channel);
+
+struct lttng_consumer_stream *consumer_allocate_stream(struct lttng_consumer_channel *channel,
+ uint64_t channel_key,
+ uint64_t stream_key,
+ const char *channel_name,
+ uint64_t relayd_id,
+ uint64_t session_id,
+ struct lttng_trace_chunk *trace_chunk,
+ int cpu,
+ int *alloc_ret,
+ enum consumer_channel_type type,
+ unsigned int monitor);
struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key,
- uint64_t session_id,
- const uint64_t *chunk_id,
- const char *pathname,
- const char *name,
- uint64_t relayd_id,
- enum lttng_event_output output,
- uint64_t tracefile_size,
- uint64_t tracefile_count,
- uint64_t session_id_per_pid,
- unsigned int monitor,
- unsigned int live_timer_interval,
- bool is_in_live_session,
- const char *root_shm_path,
- const char *shm_path);
-void consumer_del_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht);
-void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
- struct lttng_ht *ht);
+ uint64_t session_id,
+ const uint64_t *chunk_id,
+ const char *pathname,
+ const char *name,
+ uint64_t relayd_id,
+ enum lttng_event_output output,
+ uint64_t tracefile_size,
+ uint64_t tracefile_count,
+ uint64_t session_id_per_pid,
+ unsigned int monitor,
+ unsigned int live_timer_interval,
+ bool is_in_live_session,
+ const char *root_shm_path,
+ const char *shm_path);
+void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht);
+void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht);
int consumer_add_channel(struct lttng_consumer_channel *channel,
- struct lttng_consumer_local_data *ctx);
+ struct lttng_consumer_local_data *ctx);
void consumer_del_channel(struct lttng_consumer_channel *channel);
/* lttng-relayd consumer command */
int consumer_send_relayd_streams_sent(uint64_t net_seq_idx);
void close_relayd_stream(struct lttng_consumer_stream *stream);
struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
-int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
- size_t data_size);
+int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size);
void consumer_steal_stream_key(int key, struct lttng_ht *ht);
-struct lttng_consumer_local_data *lttng_consumer_create(
- enum lttng_consumer_type type,
- ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller),
- int (*recv_channel)(struct lttng_consumer_channel *channel),
- int (*recv_stream)(struct lttng_consumer_stream *stream),
- int (*update_stream)(uint64_t sessiond_key, uint32_t state));
+struct lttng_consumer_local_data *
+lttng_consumer_create(enum lttng_consumer_type type,
+ ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller),
+ int (*recv_channel)(struct lttng_consumer_channel *channel),
+ int (*recv_stream)(struct lttng_consumer_stream *stream),
+ int (*update_stream)(uint64_t sessiond_key, uint32_t state));
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
-ssize_t lttng_consumer_on_read_subbuffer_mmap(
- struct lttng_consumer_stream *stream,
- const struct lttng_buffer_view *buffer,
- unsigned long padding);
-ssize_t lttng_consumer_on_read_subbuffer_splice(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding);
+ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream,
+ const struct lttng_buffer_view *buffer,
+ unsigned long padding);
+ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *stream,
+ unsigned long len,
+ unsigned long padding);
int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
-int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos);
-int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream,
- unsigned long *pos);
+int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos);
+int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos);
int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream);
int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream);
void *consumer_thread_metadata_poll(void *data);
void *consumer_thread_sessiond_poll(void *data);
void *consumer_thread_channel_poll(void *data);
int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx,
- int sock, struct pollfd *consumer_sockpoll);
+ int sock,
+ struct pollfd *consumer_sockpoll);
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx,
- bool locked_by_caller);
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
void consumer_add_relayd_socket(uint64_t net_seq_idx,
- int sock_type,
- struct lttng_consumer_local_data *ctx,
- int sock,
- struct pollfd *consumer_sockpoll,
- uint64_t sessiond_id,
- uint64_t relayd_session_id,
- uint32_t relayd_version_major,
- uint32_t relayd_version_minor,
- enum lttcomm_sock_proto relayd_socket_protocol);
-void consumer_flag_relayd_for_destroy(
- struct consumer_relayd_sock_pair *relayd);
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
+ struct pollfd *consumer_sockpoll,
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol);
+void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd);
int consumer_data_pending(uint64_t id);
int consumer_send_status_msg(int sock, int ret_code);
-int consumer_send_status_channel(int sock,
- struct lttng_consumer_channel *channel);
-void notify_thread_del_channel(struct lttng_consumer_local_data *ctx,
- uint64_t key);
+int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel);
+void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key);
void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd);
unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos,
- unsigned long produced_pos, uint64_t nb_packets_per_stream,
- uint64_t max_sb_size);
+ unsigned long produced_pos,
+ uint64_t nb_packets_per_stream,
+ uint64_t max_sb_size);
void consumer_add_data_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_data(struct lttng_consumer_stream *stream);
void consumer_add_metadata_stream(struct lttng_consumer_stream *stream);
void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream);
int consumer_create_index_file(struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel,
- uint64_t key, uint64_t relayd_id);
+ uint64_t key,
+ uint64_t relayd_id);
int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream);
int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream);
-int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel,
- uint64_t key);
+int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key);
void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream);
-enum lttcomm_return_code lttng_consumer_create_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id,
- time_t chunk_creation_timestamp,
- const char *chunk_override_name,
- const struct lttng_credentials *credentials,
- struct lttng_directory_handle *chunk_directory_handle);
-enum lttcomm_return_code lttng_consumer_close_trace_chunk(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp,
- const enum lttng_trace_chunk_command_type *close_command,
- char *path);
-enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
- const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id);
+enum lttcomm_return_code
+lttng_consumer_create_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_creation_timestamp,
+ const char *chunk_override_name,
+ const struct lttng_credentials *credentials,
+ struct lttng_directory_handle *chunk_directory_handle);
+enum lttcomm_return_code
+lttng_consumer_close_trace_chunk(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id,
+ time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path);
+enum lttcomm_return_code lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id,
+ uint64_t session_id,
+ uint64_t chunk_id);
void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd);
-enum lttcomm_return_code lttng_consumer_init_command(
- struct lttng_consumer_local_data *ctx,
- const lttng_uuid& sessiond_uuid);
+enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx,
+ const lttng_uuid& sessiond_uuid);
int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel);
-enum lttcomm_return_code lttng_consumer_open_channel_packets(
- struct lttng_consumer_channel *channel);
+enum lttcomm_return_code
+lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel);
int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel);
void lttng_consumer_sigbus_handle(void *addr);
+void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel);
#endif /* LIB_CONSUMER_H */