#include <common/index/ctf-index.h>
#include <common/trace-chunk-registry.h>
#include <common/credentials.h>
+#include <common/buffer-view.h>
+
+struct lttng_consumer_local_data;
/* Commands for consumer */
enum lttng_consumer_command {
struct lttng_consumer_channel {
/* Is the channel published in the channel hash tables? */
bool is_published;
+ /*
+ * Was the channel deleted (logically) and waiting to be reclaimed?
+ * If this flag is set, no modification that is not cleaned-up by the
+ * RCU reclamation callback should be made
+ */
+ bool is_deleted;
/* HT node used for consumer_data.channel_ht */
struct lttng_ht_node_u64 node;
/* HT node used for consumer_data.channels_by_session_id_ht */
* a session with per-PID buffers.
*/
uint64_t session_id_per_pid;
- /* Channel trace file path name. */
+ /*
+ * In the case of local streams, this field contains the channel's
+ * output path; a path relative to the session's output path.
+ * e.g. ust/uid/1000/64-bit
+ *
+ * In the case of remote streams, the contents of this field depends
+ * on the version of the relay daemon peer. For 2.11+ peers, the
+ * contents are the same as in the local case. However, for legacy
+ * peers, this contains a path of the form:
+ * /hostname/session_path/ust/uid/1000/64-bit
+ */
char pathname[PATH_MAX];
/* Channel name. */
char name[LTTNG_SYMBOL_NAME_LEN];
int live_timer_enabled;
timer_t live_timer;
int live_timer_error;
+ /* Channel is part of a live session ? */
+ bool is_live;
/* For channel monitoring timer. */
int monitor_timer_enabled;
bool streams_sent_to_relayd;
};
+struct stream_subbuffer {
+ union {
+ /*
+ * CONSUMER_CHANNEL_SPLICE
+ * No ownership assumed.
+ */
+ int fd;
+ /* CONSUMER_CHANNEL_MMAP */
+ struct lttng_buffer_view buffer;
+ } buffer;
+ union {
+ /*
+ * Common members are fine to access through either
+ * union entries (as per C11, Common Initial Sequence).
+ */
+ struct {
+ unsigned long subbuf_size;
+ unsigned long padded_subbuf_size;
+ uint64_t version;
+ /*
+ * Left unset when unsupported.
+ *
+ * Indicates that this is the last sub-buffer of
+ * a series of sub-buffer that makes-up a coherent
+ * (parseable) unit of metadata.
+ */
+ LTTNG_OPTIONAL(bool) coherent;
+ } metadata;
+ struct {
+ unsigned long subbuf_size;
+ unsigned long padded_subbuf_size;
+ uint64_t packet_size;
+ uint64_t content_size;
+ uint64_t timestamp_begin;
+ uint64_t timestamp_end;
+ uint64_t events_discarded;
+ /* Left unset when unsupported. */
+ LTTNG_OPTIONAL(uint64_t) sequence_number;
+ uint64_t stream_id;
+ /* Left unset when unsupported. */
+ LTTNG_OPTIONAL(uint64_t) stream_instance_id;
+ } data;
+ } info;
+};
+
+/*
+ * Perform any operation required to acknowledge
+ * the wake-up of a consumer stream (e.g. consume a byte on a wake-up pipe).
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_wake_up_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Perform any operation required before a consumer stream is put
+ * to sleep before awaiting a data availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*on_sleep_cb)(struct lttng_consumer_stream *,
+ struct lttng_consumer_local_data *);
+
+/*
+ * Acquire the subbuffer at the current 'consumed' position.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*get_next_subbuffer_cb)(struct lttng_consumer_stream *,
+ struct stream_subbuffer *);
+
+/*
+ * Populate the stream_subbuffer's info member. The info to populate
+ * depends on the type (metadata/data) of the stream.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*extract_subbuffer_info_cb)(
+ struct lttng_consumer_stream *, struct stream_subbuffer *);
+
+/*
+ * Invoked after a subbuffer's info has been filled.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*pre_consume_subbuffer_cb)(struct lttng_consumer_stream *,
+ const struct stream_subbuffer *);
+
+/*
+ * Consume subbuffer contents.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef ssize_t (*consume_subbuffer_cb)(struct lttng_consumer_local_data *,
+ struct lttng_consumer_stream *,
+ const struct stream_subbuffer *);
+
+/*
+ * Release the current subbuffer and advance the 'consumed' position by
+ * one subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*put_next_subbuffer_cb)(struct lttng_consumer_stream *,
+ struct stream_subbuffer *);
+
+/*
+ * Invoked after consuming a subbuffer.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*post_consume_cb)(struct lttng_consumer_stream *,
+ const struct stream_subbuffer *,
+ struct lttng_consumer_local_data *);
+
+/*
+ * Send a live beacon if no data is available.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef int (*send_live_beacon_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Lock the stream and channel locks and any other stream-type specific
+ * lock that need to be acquired during the processing of an
+ * availability notification.
+ */
+typedef void (*lock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Unlock the stream and channel locks and any other stream-type specific
+ * lock before sleeping until the next availability notification.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*unlock_cb)(struct lttng_consumer_stream *);
+
+/*
+ * Invoked when a subbuffer's metadata version does not match the last
+ * known metadata version.
+ *
+ * Stream and channel locks are acquired during this call.
+ */
+typedef void (*reset_metadata_cb)(struct lttng_consumer_stream *);
+
/*
* Internal representation of the streams, sessiond_key is used to identify
* uniquely a stream.
*/
bool quiescent;
+ /*
+ * True if the sequence number is not available (lttng-modules < 2.8).
+ */
+ bool sequence_number_unavailable;
+
/*
* metadata_timer_lock protects flags waiting_on_metadata and
* missed_metadata_flush.
bool missed_metadata_flush;
enum lttng_event_output output;
- /* Maximum subbuffer size. */
+ /* Maximum subbuffer size (in bytes). */
unsigned long max_sb_size;
/*
pthread_mutex_t metadata_rdv_lock;
/*
- * rotate_position represents the position in the ring-buffer that has to
- * be flushed to disk to complete the ongoing rotation. When that position
- * is reached, this tracefile can be closed and a new one is created in
- * channel_read_only_attributes.path.
+ * rotate_position represents the packet sequence number of the last
+ * packet which belongs to the current trace chunk prior to the rotation.
+ * When that position is reached, this tracefile can be closed and a
+ * new one is created in channel_read_only_attributes.path.
*/
- unsigned long rotate_position;
+ uint64_t rotate_position;
/*
* Read-only copies of channel values. We cannot safely access the
* file before writing in it (regeneration).
*/
unsigned int reset_metadata_flag:1;
+ struct {
+ /*
+ * Invoked in the order of declaration.
+ * See callback type definitions.
+ */
+ lock_cb lock;
+ on_wake_up_cb on_wake_up;
+ get_next_subbuffer_cb get_next_subbuffer;
+ extract_subbuffer_info_cb extract_subbuffer_info;
+ pre_consume_subbuffer_cb pre_consume_subbuffer;
+ reset_metadata_cb reset_metadata;
+ consume_subbuffer_cb consume_subbuffer;
+ put_next_subbuffer_cb put_next_subbuffer;
+ post_consume_cb post_consume;
+ send_live_beacon_cb send_live_beacon;
+ on_sleep_cb on_sleep;
+ unlock_cb unlock;
+ } read_subbuffer_ops;
+ struct metadata_bucket *metadata_bucket;
};
/*
* Returns the number of bytes read, or negative error value.
*/
ssize_t (*on_buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx);
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller);
/*
* function to call when we receive a new channel, it receives a
* newly allocated channel, depending on the return code of this
struct lttng_consumer_stream *stream,
struct lttng_consumer_channel *channel);
-struct lttng_consumer_stream *consumer_allocate_stream(uint64_t channel_key,
+struct lttng_consumer_stream *consumer_allocate_stream(
+ struct lttng_consumer_channel *channel,
+ uint64_t channel_key,
uint64_t stream_key,
const char *channel_name,
uint64_t relayd_id,
uint64_t session_id_per_pid,
unsigned int monitor,
unsigned int live_timer_interval,
+ bool is_in_live_session,
const char *root_shm_path,
const char *shm_path);
void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
ssize_t (*buffer_ready)(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx),
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller),
int (*recv_channel)(struct lttng_consumer_channel *channel),
int (*recv_stream)(struct lttng_consumer_stream *stream),
int (*update_stream)(uint64_t sessiond_key, uint32_t state));
void lttng_consumer_destroy(struct lttng_consumer_local_data *ctx);
ssize_t lttng_consumer_on_read_subbuffer_mmap(
- struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding,
- struct ctf_packet_index *index);
+ struct lttng_consumer_stream *stream,
+ const struct lttng_buffer_view *buffer,
+ unsigned long padding);
ssize_t lttng_consumer_on_read_subbuffer_splice(
struct lttng_consumer_local_data *ctx,
struct lttng_consumer_stream *stream, unsigned long len,
- unsigned long padding,
- struct ctf_packet_index *index);
+ unsigned long padding);
int lttng_consumer_sample_snapshot_positions(struct lttng_consumer_stream *stream);
int lttng_consumer_take_snapshot(struct lttng_consumer_stream *stream);
int lttng_consumer_get_produced_snapshot(struct lttng_consumer_stream *stream,
int sock, struct pollfd *consumer_sockpoll);
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
- struct lttng_consumer_local_data *ctx);
+ struct lttng_consumer_local_data *ctx,
+ bool locked_by_caller);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct lttng_directory_handle *chunk_directory_handle);
enum lttcomm_return_code lttng_consumer_close_trace_chunk(
const uint64_t *relayd_id, uint64_t session_id,
- uint64_t chunk_id, time_t chunk_close_timestamp);
+ uint64_t chunk_id, time_t chunk_close_timestamp,
+ const enum lttng_trace_chunk_command_type *close_command,
+ char *path);
enum lttcomm_return_code lttng_consumer_trace_chunk_exists(
const uint64_t *relayd_id, uint64_t session_id,
uint64_t chunk_id);