X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.hpp;fp=src%2Fcommon%2Fconsumer%2Fconsumer.hpp;h=1efbc4eeeec33e6a57ebeb9748cf3d51f2e28907;hb=28f23191dcbf047429d51950a337a57d7a3f866a;hp=a28bc160959277431e8555256e1a0d71c559689a;hpb=f250b40e2179eccdb83766bf4abef5a35036c47b;p=lttng-tools.git diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index a28bc1609..1efbc4eee 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -11,25 +11,25 @@ #ifndef LIB_CONSUMER_H #define LIB_CONSUMER_H +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + #include #include #include #include #include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - struct lttng_consumer_local_data; /* Commands for consumer */ @@ -39,7 +39,7 @@ enum lttng_consumer_command { /* pause, delete, active depending on fd state */ LTTNG_CONSUMER_UPDATE_STREAM, /* inform the consumer to quit when all fd has hang up */ - LTTNG_CONSUMER_STOP, /* deprecated */ + LTTNG_CONSUMER_STOP, /* deprecated */ LTTNG_CONSUMER_ADD_RELAYD_SOCKET, /* Inform the consumer to kill a specific relayd connection */ LTTNG_CONSUMER_DESTROY_RELAYD, @@ -82,13 +82,13 @@ enum consumer_endpoint_status { }; enum consumer_channel_output { - CONSUMER_CHANNEL_MMAP = 0, - CONSUMER_CHANNEL_SPLICE = 1, + CONSUMER_CHANNEL_MMAP = 0, + CONSUMER_CHANNEL_SPLICE = 1, }; enum consumer_channel_type { - CONSUMER_CHANNEL_TYPE_METADATA = 0, - CONSUMER_CHANNEL_TYPE_DATA = 1, + CONSUMER_CHANNEL_TYPE_METADATA = 0, + CONSUMER_CHANNEL_TYPE_DATA = 1, }; enum sync_metadata_status { @@ -160,7 +160,7 @@ struct lttng_consumer_channel { enum consumer_channel_type type; /* For UST */ - uid_t ust_app_uid; /* Application UID. */ + uid_t ust_app_uid; /* Application UID. */ struct lttng_ust_ctl_consumer_channel *uchan; unsigned char uuid[LTTNG_UUID_STR_LEN]; /* @@ -721,8 +721,8 @@ struct lttng_consumer_local_data { * Returns the number of bytes read, or negative error value. */ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -814,7 +814,7 @@ struct lttng_consumer_global_data { * This is nested OUTSIDE the stream lock. * This is nested OUTSIDE the consumer_relayd_sock_pair lock. */ - pthread_mutex_t lock {}; + pthread_mutex_t lock{}; /* * Number of streams in the data stream hash table declared outside. @@ -879,8 +879,7 @@ extern int consumer_quit; LTTNG_EXPORT extern int data_consumption_paused; /* Return a human-readable consumer type string that is suitable for logging. */ -static inline -const char *lttng_consumer_type_str(enum lttng_consumer_type type) +static inline const char *lttng_consumer_type_str(enum lttng_consumer_type type) { switch (type) { case LTTNG_CONSUMER_UNKNOWN: @@ -904,14 +903,12 @@ int lttng_consumer_init(); /* * Set the error socket for communication with a session daemon. */ -void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, - int sock); +void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock); /* * Set the command socket path for communication with a session daemon. */ -void lttng_consumer_set_command_sock_path( - struct lttng_consumer_local_data *ctx, char *sock); +void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock); /* * Send return code to session daemon. @@ -941,43 +938,39 @@ int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); * Copy the fields from the channel that need to be accessed (read-only) * directly from the stream. */ -void consumer_stream_update_channel_attributes( - struct lttng_consumer_stream *stream, - struct lttng_consumer_channel *channel); - -struct lttng_consumer_stream *consumer_allocate_stream( - struct lttng_consumer_channel *channel, - uint64_t channel_key, - uint64_t stream_key, - const char *channel_name, - uint64_t relayd_id, - uint64_t session_id, - struct lttng_trace_chunk *trace_chunk, - int cpu, - int *alloc_ret, - enum consumer_channel_type type, - unsigned int monitor); +void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel); + +struct lttng_consumer_stream *consumer_allocate_stream(struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + const char *channel_name, + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor); struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, - uint64_t session_id, - const uint64_t *chunk_id, - const char *pathname, - const char *name, - uint64_t relayd_id, - enum lttng_event_output output, - uint64_t tracefile_size, - uint64_t tracefile_count, - uint64_t session_id_per_pid, - unsigned int monitor, - unsigned int live_timer_interval, - bool is_in_live_session, - const char *root_shm_path, - const char *shm_path); -void consumer_del_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht); -void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht); + uint64_t session_id, + const uint64_t *chunk_id, + const char *pathname, + const char *name, + uint64_t relayd_id, + enum lttng_event_output output, + uint64_t tracefile_size, + uint64_t tracefile_count, + uint64_t session_id_per_pid, + unsigned int monitor, + unsigned int live_timer_interval, + bool is_in_live_session, + const char *root_shm_path, + const char *shm_path); +void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); +void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); int consumer_add_channel(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx); void consumer_del_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ @@ -986,33 +979,29 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); -int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, - size_t data_size); +int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); void consumer_steal_stream_key(int key, struct lttng_ht *ht); -struct lttng_consumer_local_data *lttng_consumer_create( - enum lttng_consumer_type type, - ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller), - int (*recv_channel)(struct lttng_consumer_channel *channel), - int (*recv_stream)(struct lttng_consumer_stream *stream), - int (*update_stream)(uint64_t sessiond_key, uint32_t state)); +struct lttng_consumer_local_data * +lttng_consumer_create(enum lttng_consumer_type type, + ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx, + bool locked_by_caller), + int (*recv_channel)(struct lttng_consumer_channel *channel), + int (*recv_stream)(struct lttng_consumer_stream *stream), + int (*update_stream)(uint64_t sessiond_key, uint32_t state)); void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); -ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_stream *stream, - const struct lttng_buffer_view *buffer, - unsigned long padding); -ssize_t lttng_consumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding); +ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream, + const struct lttng_buffer_view *buffer, + unsigned long padding); +ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long len, + unsigned long padding); int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); -int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos); -int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos); +int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream); int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream); void *consumer_thread_metadata_poll(void *data); @@ -1020,68 +1009,69 @@ void *consumer_thread_data_poll(void *data); void *consumer_thread_sessiond_poll(void *data); void *consumer_thread_channel_poll(void *data); int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, - int sock, struct pollfd *consumer_sockpoll); + int sock, + struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); void consumer_add_relayd_socket(uint64_t net_seq_idx, - int sock_type, - struct lttng_consumer_local_data *ctx, - int sock, - struct pollfd *consumer_sockpoll, - uint64_t sessiond_id, - uint64_t relayd_session_id, - uint32_t relayd_version_major, - uint32_t relayd_version_minor, - enum lttcomm_sock_proto relayd_socket_protocol); -void consumer_flag_relayd_for_destroy( - struct consumer_relayd_sock_pair *relayd); + int sock_type, + struct lttng_consumer_local_data *ctx, + int sock, + struct pollfd *consumer_sockpoll, + uint64_t sessiond_id, + uint64_t relayd_session_id, + uint32_t relayd_version_major, + uint32_t relayd_version_minor, + enum lttcomm_sock_proto relayd_socket_protocol); +void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd); int consumer_data_pending(uint64_t id); int consumer_send_status_msg(int sock, int ret_code); -int consumer_send_status_channel(int sock, - struct lttng_consumer_channel *channel); -void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, - uint64_t key); +int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel); +void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key); void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd); unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, - unsigned long produced_pos, uint64_t nb_packets_per_stream, - uint64_t max_sb_size); + unsigned long produced_pos, + uint64_t nb_packets_per_stream, + uint64_t max_sb_size); void consumer_add_data_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); void consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, uint64_t relayd_id); + uint64_t key, + uint64_t relayd_id); int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream); -int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, - uint64_t key); +int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key); void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); -enum lttcomm_return_code lttng_consumer_create_trace_chunk( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, - time_t chunk_creation_timestamp, - const char *chunk_override_name, - const struct lttng_credentials *credentials, - struct lttng_directory_handle *chunk_directory_handle); -enum lttcomm_return_code lttng_consumer_close_trace_chunk( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, time_t chunk_close_timestamp, - const enum lttng_trace_chunk_command_type *close_command, - char *path); -enum lttcomm_return_code lttng_consumer_trace_chunk_exists( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id); +enum lttcomm_return_code +lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id, + time_t chunk_creation_timestamp, + const char *chunk_override_name, + const struct lttng_credentials *credentials, + struct lttng_directory_handle *chunk_directory_handle); +enum lttcomm_return_code +lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id, + time_t chunk_close_timestamp, + const enum lttng_trace_chunk_command_type *close_command, + char *path); +enum lttcomm_return_code lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id); void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); -enum lttcomm_return_code lttng_consumer_init_command( - struct lttng_consumer_local_data *ctx, - const lttng_uuid& sessiond_uuid); +enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx, + const lttng_uuid& sessiond_uuid); int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); -enum lttcomm_return_code lttng_consumer_open_channel_packets( - struct lttng_consumer_channel *channel); +enum lttcomm_return_code +lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel); int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel); void lttng_consumer_sigbus_handle(void *addr); void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel);