LTTNG_CONSUMER_ADD_RELAYD_SOCKET,
/* Inform the consumer to kill a specific relayd connection */
LTTNG_CONSUMER_DESTROY_RELAYD,
+ /* Return to the sessiond if there is data pending for a session */
+ LTTNG_CONSUMER_DATA_AVAILABLE,
};
/* State of each fd in consumer */
* uniquely a stream.
*/
struct lttng_consumer_stream {
+ /* HT node used by the data_ht and metadata_ht */
struct lttng_ht_node_ulong node;
- struct lttng_ht_node_ulong waitfd_node;
+ /* HT node used in consumer_data.stream_list_ht */
+ struct lttng_ht_node_ulong node_session_id;
struct lttng_consumer_channel *chan; /* associated channel */
/*
* key is the key used by the session daemon to refer to the
uint64_t relayd_stream_id;
/* Next sequence number to use for trace packet */
uint64_t next_net_seq_num;
+ /* Lock to use the stream FDs since they are used between threads. */
+ pthread_mutex_t lock;
+ /* Tracing session id */
+ uint64_t session_id;
};
/*
/* communication with splice */
int consumer_thread_pipe[2];
int consumer_splice_metadata_pipe[2];
- /* pipe to wake the poll thread when necessary */
- int consumer_poll_pipe[2];
+ /* Data stream poll thread pipe. To transfer data stream to the thread */
+ int consumer_data_pipe[2];
/* to let the signal handler wake up the fd receiver thread */
int consumer_should_quit[2];
/* Metadata poll thread pipe. Transfer metadata stream to it */
pthread_mutex_t lock;
/*
- * Number of streams in the hash table. Protected by consumer_data.lock.
+ * Number of streams in the data stream hash table declared outside.
+ * Protected by consumer_data.lock.
*/
int stream_count;
- /*
- * Hash tables of streams and channels. Protected by consumer_data.lock.
- */
- struct lttng_ht *stream_ht;
+
+ /* Channel hash table protected by consumer_data.lock. */
struct lttng_ht *channel_ht;
/*
* Flag specifying if the local array of FDs needs update in the
* stream has an index which associate the right relayd socket to use.
*/
struct lttng_ht *relayd_ht;
+
+ /*
+ * This hash table contains all streams (metadata and data) indexed by
+ * session id. In other words, the ht is indexed by session id and each
+ * bucket contains the list of associated streams.
+ *
+ * This HT uses the "node_session_id" of the consumer stream.
+ */
+ struct lttng_ht *stream_list_ht;
};
+/* Defined in consumer.c and coupled with explanations */
+extern struct lttng_ht *metadata_ht;
+extern struct lttng_ht *data_ht;
+
/*
* Init consumer data structures.
*/
*/
extern int lttng_consumer_poll_socket(struct pollfd *kconsumer_sockpoll);
-extern int consumer_update_poll_array(
- struct lttng_consumer_local_data *ctx, struct pollfd **pollfd,
- struct lttng_consumer_stream **local_consumer_streams);
-
extern struct lttng_consumer_stream *consumer_allocate_stream(
int channel_key, int stream_key,
int shm_fd, int wait_fd,
gid_t gid,
int net_index,
int metadata_flag,
+ uint64_t session_id,
int *alloc_ret);
-extern int consumer_add_stream(struct lttng_consumer_stream *stream);
extern void consumer_del_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
extern void consumer_del_metadata_stream(struct lttng_consumer_stream *stream,
struct lttng_ht *ht);
-extern void consumer_change_stream_state(int stream_key,
- enum lttng_consumer_stream_state state);
extern void consumer_del_channel(struct lttng_consumer_channel *channel);
extern struct lttng_consumer_channel *consumer_allocate_channel(
int channel_key,
struct consumer_relayd_sock_pair *consumer_find_relayd(int key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
size_t data_size);
+void consumer_steal_stream_key(int key, struct lttng_ht *ht);
extern struct lttng_consumer_local_data *lttng_consumer_create(
enum lttng_consumer_type type,
struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock);
void consumer_flag_relayd_for_destroy(
struct consumer_relayd_sock_pair *relayd);
+int consumer_data_available(uint64_t id);
#endif /* LIB_CONSUMER_H */