/* Delete streams that might have been left in the stream list. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
- cds_list_del(&stream->send_node);
/*
* Once a stream is added to this list, the buffers were created so we
* have a guarantee that this call will succeed. Setting the monitor
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream, node.node) {
if (stream->net_seq_idx == net_seq_idx) {
uatomic_set(&stream->endpoint_status, status);
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
+
DBG("Delete flag set to metadata stream %d", stream->wait_fd);
}
}
channel->is_live = is_in_live_session;
pthread_mutex_init(&channel->lock, NULL);
pthread_mutex_init(&channel->timer_lock, NULL);
+ lttng_wait_queue_init(&channel->metadata_pushed_wait_queue);
switch (output) {
case LTTNG_EVENT_SPLICE:
* Send return code to the session daemon.
* If the socket is not defined, we return 0, it is not a fatal error
*/
-int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx, int cmd)
+int lttng_consumer_send_error(struct lttng_consumer_local_data *ctx,
+ enum lttcomm_return_code error_code)
{
if (ctx->consumer_error_socket > 0) {
- return lttcomm_send_unix_sock(ctx->consumer_error_socket, &cmd,
- sizeof(enum lttcomm_sessiond_command));
+ const int32_t comm_code = (int32_t) error_code;
+
+ return lttcomm_send_unix_sock(
+ ctx->consumer_error_socket, &comm_code, sizeof(comm_code));
}
return 0;
* pointer value.
*/
channel->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&channel->metadata_pushed_wait_queue);
if (channel->metadata_cache) {
pthread_mutex_unlock(&channel->metadata_cache->lock);