X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer%2Fconsumer.hpp;h=9de8d05a3cb188b98c12e8c0c5ac1a476500591a;hb=f40b76aed659ff694cf948bf8ebd1d4b5741c986;hp=4bcfe5d9529cc6460f293dc7fbc47e0e04728e2a;hpb=c9e313bc594f40a86eed237dce222c0fc99c957f;p=lttng-tools.git diff --git a/src/common/consumer/consumer.hpp b/src/common/consumer/consumer.hpp index 4bcfe5d95..9de8d05a3 100644 --- a/src/common/consumer/consumer.hpp +++ b/src/common/consumer/consumer.hpp @@ -11,25 +11,25 @@ #ifndef LIB_CONSUMER_H #define LIB_CONSUMER_H +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + #include #include #include #include #include -#include - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - struct lttng_consumer_local_data; /* Commands for consumer */ @@ -39,7 +39,7 @@ enum lttng_consumer_command { /* pause, delete, active depending on fd state */ LTTNG_CONSUMER_UPDATE_STREAM, /* inform the consumer to quit when all fd has hang up */ - LTTNG_CONSUMER_STOP, /* deprecated */ + LTTNG_CONSUMER_STOP, /* deprecated */ LTTNG_CONSUMER_ADD_RELAYD_SOCKET, /* Inform the consumer to kill a specific relayd connection */ LTTNG_CONSUMER_DESTROY_RELAYD, @@ -82,13 +82,13 @@ enum consumer_endpoint_status { }; enum consumer_channel_output { - CONSUMER_CHANNEL_MMAP = 0, - CONSUMER_CHANNEL_SPLICE = 1, + CONSUMER_CHANNEL_MMAP = 0, + CONSUMER_CHANNEL_SPLICE = 1, }; enum consumer_channel_type { - CONSUMER_CHANNEL_TYPE_METADATA = 0, - CONSUMER_CHANNEL_TYPE_DATA = 1, + CONSUMER_CHANNEL_TYPE_METADATA = 0, + CONSUMER_CHANNEL_TYPE_DATA = 1, }; enum sync_metadata_status { @@ -160,7 +160,7 @@ struct lttng_consumer_channel { enum consumer_channel_type type; /* For UST */ - uid_t ust_app_uid; /* Application UID. */ + uid_t ust_app_uid; /* Application UID. */ struct lttng_ust_ctl_consumer_channel *uchan; unsigned char uuid[LTTNG_UUID_STR_LEN]; /* @@ -185,6 +185,11 @@ struct lttng_consumer_channel { /* Metadata cache is metadata channel */ struct consumer_metadata_cache *metadata_cache; + /* + * Wait queue awaiting updates to metadata stream's flushed position. + */ + struct lttng_wait_queue metadata_pushed_wait_queue; + /* For UST metadata periodical flush */ int switch_timer_enabled; timer_t switch_timer; @@ -253,6 +258,7 @@ struct lttng_consumer_channel { uint64_t lost_packets; bool streams_sent_to_relayd; + uint64_t last_consumed_size_sample_sent; }; struct stream_subbuffer { @@ -312,7 +318,7 @@ enum get_next_subbuffer_status { * * Stream and channel locks are acquired during this call. */ -typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); +using on_wake_up_cb = int (*)(struct lttng_consumer_stream *); /* * Perform any operation required before a consumer stream is put @@ -320,16 +326,15 @@ typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *); * * Stream and channel locks are acquired during this call. */ -typedef int (*on_sleep_cb)(struct lttng_consumer_stream *, - struct lttng_consumer_local_data *); +using on_sleep_cb = int (*)(struct lttng_consumer_stream *, struct lttng_consumer_local_data *); /* * Acquire the subbuffer at the current 'consumed' position. * * Stream and channel locks are acquired during this call. */ -typedef enum get_next_subbuffer_status (*get_next_subbuffer_cb)( - struct lttng_consumer_stream *, struct stream_subbuffer *); +using get_next_subbuffer_cb = enum get_next_subbuffer_status (*)(struct lttng_consumer_stream *, + struct stream_subbuffer *); /* * Populate the stream_subbuffer's info member. The info to populate @@ -337,25 +342,25 @@ typedef enum get_next_subbuffer_status (*get_next_subbuffer_cb)( * * Stream and channel locks are acquired during this call. */ -typedef int (*extract_subbuffer_info_cb)( - struct lttng_consumer_stream *, struct stream_subbuffer *); +using extract_subbuffer_info_cb = int (*)(struct lttng_consumer_stream *, + struct stream_subbuffer *); /* * Invoked after a subbuffer's info has been filled. * * Stream and channel locks are acquired during this call. */ -typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *, - const struct stream_subbuffer *); +using pre_consume_subbuffer_cb = int (*)(struct lttng_consumer_stream *, + const struct stream_subbuffer *); /* * Consume subbuffer contents. * * Stream and channel locks are acquired during this call. */ -typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, - struct lttng_consumer_stream *, - const struct stream_subbuffer *); +using consume_subbuffer_cb = ssize_t (*)(struct lttng_consumer_local_data *, + struct lttng_consumer_stream *, + const struct stream_subbuffer *); /* * Release the current subbuffer and advance the 'consumed' position by @@ -363,31 +368,30 @@ typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *, * * Stream and channel locks are acquired during this call. */ -typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *, - struct stream_subbuffer *); +using put_next_subbuffer_cb = int (*)(struct lttng_consumer_stream *, struct stream_subbuffer *); /* * Invoked after consuming a subbuffer. * * Stream and channel locks are acquired during this call. */ -typedef int (*post_consume_cb)(struct lttng_consumer_stream *, - const struct stream_subbuffer *, - struct lttng_consumer_local_data *); +using post_consume_cb = int (*)(struct lttng_consumer_stream *, + const struct stream_subbuffer *, + struct lttng_consumer_local_data *); /* * Send a live beacon if no data is available. * * Stream and channel locks are acquired during this call. */ -typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *); +using send_live_beacon_cb = int (*)(struct lttng_consumer_stream *); /* * Lock the stream and channel locks and any other stream-type specific * lock that need to be acquired during the processing of an * availability notification. */ -typedef void (*lock_cb)(struct lttng_consumer_stream *); +using lock_cb = void (*)(struct lttng_consumer_stream *); /* * Unlock the stream and channel locks and any other stream-type specific @@ -395,14 +399,14 @@ typedef void (*lock_cb)(struct lttng_consumer_stream *); * * Stream and channel locks are acquired during this call. */ -typedef void (*unlock_cb)(struct lttng_consumer_stream *); +using unlock_cb = void (*)(struct lttng_consumer_stream *); /* * Assert that the stream and channel lock and any other stream type specific * lock that need to be acquired during the processing of a read_subbuffer * operation is acquired. */ -typedef void (*assert_locked_cb)(struct lttng_consumer_stream *); +using assert_locked_cb = void (*)(struct lttng_consumer_stream *); /* * Invoked when a subbuffer's metadata version does not match the last @@ -410,7 +414,7 @@ typedef void (*assert_locked_cb)(struct lttng_consumer_stream *); * * Stream and channel locks are acquired during this call. */ -typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *); +using reset_metadata_cb = void (*)(struct lttng_consumer_stream *); /* * Internal representation of the streams, sessiond_key is used to identify @@ -449,7 +453,11 @@ struct lttng_consumer_stream { /* Amount of bytes written to the output */ uint64_t output_written; int shm_fd_is_copy; - int data_read; + /* + * When a stream's pipe is hung up, a final flush is performed (see hangup_flush_done). This + * indicates whether or not the data resulting from this flush is still to be consumed. + */ + int has_data_left_to_be_read_before_teardown; int hangup_flush_done; /* @@ -718,8 +726,8 @@ struct lttng_consumer_local_data { * Returns the number of bytes read, or negative error value. */ ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); /* * function to call when we receive a new channel, it receives a * newly allocated channel, depending on the return code of this @@ -811,7 +819,7 @@ struct lttng_consumer_global_data { * This is nested OUTSIDE the stream lock. * This is nested OUTSIDE the consumer_relayd_sock_pair lock. */ - pthread_mutex_t lock {}; + pthread_mutex_t lock{}; /* * Number of streams in the data stream hash table declared outside. @@ -876,8 +884,7 @@ extern int consumer_quit; LTTNG_EXPORT extern int data_consumption_paused; /* Return a human-readable consumer type string that is suitable for logging. */ -static inline -const char *lttng_consumer_type_str(enum lttng_consumer_type type) +static inline const char *lttng_consumer_type_str(enum lttng_consumer_type type) { switch (type) { case LTTNG_CONSUMER_UNKNOWN: @@ -896,19 +903,17 @@ const char *lttng_consumer_type_str(enum lttng_consumer_type type) /* * Init consumer data structures. */ -int lttng_consumer_init(void); +int lttng_consumer_init(); /* * Set the error socket for communication with a session daemon. */ -void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, - int sock); +void lttng_consumer_set_error_sock(struct lttng_consumer_local_data *ctx, int sock); /* * Set the command socket path for communication with a session daemon. */ -void lttng_consumer_set_command_sock_path( - struct lttng_consumer_local_data *ctx, char *sock); +void lttng_consumer_set_command_sock_path(struct lttng_consumer_local_data *ctx, char *sock); /* * Send return code to session daemon. @@ -926,7 +931,7 @@ void lttng_consumer_should_exit(struct lttng_consumer_local_data *ctx); /* * Cleanup the daemon's socket on exit. */ -void lttng_consumer_cleanup(void); +void lttng_consumer_cleanup(); /* * Poll on the should_quit pipe and the command socket return -1 on error and @@ -938,43 +943,39 @@ int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll); * Copy the fields from the channel that need to be accessed (read-only) * directly from the stream. */ -void consumer_stream_update_channel_attributes( - struct lttng_consumer_stream *stream, - struct lttng_consumer_channel *channel); - -struct lttng_consumer_stream *consumer_allocate_stream( - struct lttng_consumer_channel *channel, - uint64_t channel_key, - uint64_t stream_key, - const char *channel_name, - uint64_t relayd_id, - uint64_t session_id, - struct lttng_trace_chunk *trace_chunk, - int cpu, - int *alloc_ret, - enum consumer_channel_type type, - unsigned int monitor); +void consumer_stream_update_channel_attributes(struct lttng_consumer_stream *stream, + struct lttng_consumer_channel *channel); + +struct lttng_consumer_stream *consumer_allocate_stream(struct lttng_consumer_channel *channel, + uint64_t channel_key, + uint64_t stream_key, + const char *channel_name, + uint64_t relayd_id, + uint64_t session_id, + struct lttng_trace_chunk *trace_chunk, + int cpu, + int *alloc_ret, + enum consumer_channel_type type, + unsigned int monitor); struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, - uint64_t session_id, - const uint64_t *chunk_id, - const char *pathname, - const char *name, - uint64_t relayd_id, - enum lttng_event_output output, - uint64_t tracefile_size, - uint64_t tracefile_count, - uint64_t session_id_per_pid, - unsigned int monitor, - unsigned int live_timer_interval, - bool is_in_live_session, - const char *root_shm_path, - const char *shm_path); -void consumer_del_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht); -void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, - struct lttng_ht *ht); + uint64_t session_id, + const uint64_t *chunk_id, + const char *pathname, + const char *name, + uint64_t relayd_id, + enum lttng_event_output output, + uint64_t tracefile_size, + uint64_t tracefile_count, + uint64_t session_id_per_pid, + unsigned int monitor, + unsigned int live_timer_interval, + bool is_in_live_session, + const char *root_shm_path, + const char *shm_path); +void consumer_del_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); +void consumer_del_metadata_stream(struct lttng_consumer_stream *stream, struct lttng_ht *ht); int consumer_add_channel(struct lttng_consumer_channel *channel, - struct lttng_consumer_local_data *ctx); + struct lttng_consumer_local_data *ctx); void consumer_del_channel(struct lttng_consumer_channel *channel); /* lttng-relayd consumer command */ @@ -983,33 +984,29 @@ int consumer_send_relayd_stream(struct lttng_consumer_stream *stream, char *path int consumer_send_relayd_streams_sent(uint64_t net_seq_idx); void close_relayd_stream(struct lttng_consumer_stream *stream); struct lttng_consumer_channel *consumer_find_channel(uint64_t key); -int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, - size_t data_size); +int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream, size_t data_size); void consumer_steal_stream_key(int key, struct lttng_ht *ht); -struct lttng_consumer_local_data *lttng_consumer_create( - enum lttng_consumer_type type, - ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller), - int (*recv_channel)(struct lttng_consumer_channel *channel), - int (*recv_stream)(struct lttng_consumer_stream *stream), - int (*update_stream)(uint64_t sessiond_key, uint32_t state)); +struct lttng_consumer_local_data * +lttng_consumer_create(enum lttng_consumer_type type, + ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream, + struct lttng_consumer_local_data *ctx, + bool locked_by_caller), + int (*recv_channel)(struct lttng_consumer_channel *channel), + int (*recv_stream)(struct lttng_consumer_stream *stream), + int (*update_stream)(uint64_t sessiond_key, uint32_t state)); void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx); -ssize_t lttng_consumer_on_read_subbuffer_mmap( - struct lttng_consumer_stream *stream, - const struct lttng_buffer_view *buffer, - unsigned long padding); -ssize_t lttng_consumer_on_read_subbuffer_splice( - struct lttng_consumer_local_data *ctx, - struct lttng_consumer_stream *stream, unsigned long len, - unsigned long padding); +ssize_t lttng_consumer_on_read_subbuffer_mmap(struct lttng_consumer_stream *stream, + const struct lttng_buffer_view *buffer, + unsigned long padding); +ssize_t lttng_consumer_on_read_subbuffer_splice(struct lttng_consumer_local_data *ctx, + struct lttng_consumer_stream *stream, + unsigned long len, + unsigned long padding); int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream); int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream); -int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos); -int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, - unsigned long *pos); +int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); +int lttng_consumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, unsigned long *pos); int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream); int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream); void *consumer_thread_metadata_poll(void *data); @@ -1017,69 +1014,71 @@ void *consumer_thread_data_poll(void *data); void *consumer_thread_sessiond_poll(void *data); void *consumer_thread_channel_poll(void *data); int lttng_consumer_recv_cmd(struct lttng_consumer_local_data *ctx, - int sock, struct pollfd *consumer_sockpoll); + int sock, + struct pollfd *consumer_sockpoll); ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx, - bool locked_by_caller); + struct lttng_consumer_local_data *ctx, + bool locked_by_caller); int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream); void consumer_add_relayd_socket(uint64_t net_seq_idx, - int sock_type, - struct lttng_consumer_local_data *ctx, - int sock, - struct pollfd *consumer_sockpoll, - uint64_t sessiond_id, - uint64_t relayd_session_id, - uint32_t relayd_version_major, - uint32_t relayd_version_minor, - enum lttcomm_sock_proto relayd_socket_protocol); -void consumer_flag_relayd_for_destroy( - struct consumer_relayd_sock_pair *relayd); + int sock_type, + struct lttng_consumer_local_data *ctx, + int sock, + struct pollfd *consumer_sockpoll, + uint64_t sessiond_id, + uint64_t relayd_session_id, + uint32_t relayd_version_major, + uint32_t relayd_version_minor, + enum lttcomm_sock_proto relayd_socket_protocol); +void consumer_flag_relayd_for_destroy(struct consumer_relayd_sock_pair *relayd); int consumer_data_pending(uint64_t id); int consumer_send_status_msg(int sock, int ret_code); -int consumer_send_status_channel(int sock, - struct lttng_consumer_channel *channel); -void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, - uint64_t key); +int consumer_send_status_channel(int sock, struct lttng_consumer_channel *channel); +void notify_thread_del_channel(struct lttng_consumer_local_data *ctx, uint64_t key); void consumer_destroy_relayd(struct consumer_relayd_sock_pair *relayd); unsigned long consumer_get_consume_start_pos(unsigned long consumed_pos, - unsigned long produced_pos, uint64_t nb_packets_per_stream, - uint64_t max_sb_size); + unsigned long produced_pos, + uint64_t nb_packets_per_stream, + uint64_t max_sb_size); void consumer_add_data_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_data(struct lttng_consumer_stream *stream); void consumer_add_metadata_stream(struct lttng_consumer_stream *stream); void consumer_del_stream_for_metadata(struct lttng_consumer_stream *stream); int consumer_create_index_file(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_channel(struct lttng_consumer_channel *channel, - uint64_t key, uint64_t relayd_id); + uint64_t key, + uint64_t relayd_id); int lttng_consumer_stream_is_rotate_ready(struct lttng_consumer_stream *stream); int lttng_consumer_rotate_stream(struct lttng_consumer_stream *stream); -int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, - uint64_t key); +int lttng_consumer_rotate_ready_streams(struct lttng_consumer_channel *channel, uint64_t key); void lttng_consumer_reset_stream_rotate_state(struct lttng_consumer_stream *stream); -enum lttcomm_return_code lttng_consumer_create_trace_chunk( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, - time_t chunk_creation_timestamp, - const char *chunk_override_name, - const struct lttng_credentials *credentials, - struct lttng_directory_handle *chunk_directory_handle); -enum lttcomm_return_code lttng_consumer_close_trace_chunk( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id, time_t chunk_close_timestamp, - const enum lttng_trace_chunk_command_type *close_command, - char *path); -enum lttcomm_return_code lttng_consumer_trace_chunk_exists( - const uint64_t *relayd_id, uint64_t session_id, - uint64_t chunk_id); +enum lttcomm_return_code +lttng_consumer_create_trace_chunk(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id, + time_t chunk_creation_timestamp, + const char *chunk_override_name, + const struct lttng_credentials *credentials, + struct lttng_directory_handle *chunk_directory_handle); +enum lttcomm_return_code +lttng_consumer_close_trace_chunk(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id, + time_t chunk_close_timestamp, + const enum lttng_trace_chunk_command_type *close_command, + char *path); +enum lttcomm_return_code lttng_consumer_trace_chunk_exists(const uint64_t *relayd_id, + uint64_t session_id, + uint64_t chunk_id); void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd); -enum lttcomm_return_code lttng_consumer_init_command( - struct lttng_consumer_local_data *ctx, - const lttng_uuid sessiond_uuid); +enum lttcomm_return_code lttng_consumer_init_command(struct lttng_consumer_local_data *ctx, + const lttng_uuid& sessiond_uuid); int lttng_consumer_clear_channel(struct lttng_consumer_channel *channel); -enum lttcomm_return_code lttng_consumer_open_channel_packets( - struct lttng_consumer_channel *channel); +enum lttcomm_return_code +lttng_consumer_open_channel_packets(struct lttng_consumer_channel *channel); int consumer_metadata_wakeup_pipe(const struct lttng_consumer_channel *channel); void lttng_consumer_sigbus_handle(void *addr); +void sample_and_send_channel_buffer_stats(struct lttng_consumer_channel *channel); #endif /* LIB_CONSUMER_H */