health_code_update();
- cds_list_del(&stream->send_node);
+ cds_list_del_init(&stream->send_node);
lttng_ust_ctl_destroy_stream(stream->ustream);
lttng_trace_chunk_put(stream->trace_chunk);
free(stream);
* global.
*/
stream->globally_visible = 1;
- cds_list_del(&stream->send_node);
+ cds_list_del_init(&stream->send_node);
ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
if (ret < 0) {
* will make sure to clean that list.
*/
consumer_stream_destroy(metadata->metadata_stream, NULL);
- cds_list_del(&metadata->metadata_stream->send_node);
metadata->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata->metadata_pushed_wait_queue);
+
send_streams_error:
error_no_stream:
end:
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 1);
if (ret < 0) {
goto error;
}
* new metadata stream.
*/
consumer_stream_destroy(metadata_stream, NULL);
- cds_list_del(&metadata_stream->send_node);
metadata_channel->metadata_stream = NULL;
+ lttng_wait_queue_wake_all(&metadata_channel->metadata_pushed_wait_queue);
error:
rcu_read_unlock();
if (use_relayd) {
ret = consumer_send_relayd_stream(stream, path);
if (ret < 0) {
- goto error_unlock;
+ goto error_close_stream;
}
} else {
ret = consumer_stream_create_output_files(stream,
false);
if (ret < 0) {
- goto error_unlock;
+ goto error_close_stream;
}
DBG("UST consumer snapshot stream (%" PRIu64 ")",
stream->key);
ret = lttng_ustconsumer_take_snapshot(stream);
if (ret < 0) {
ERR("Taking UST snapshot");
- goto error_unlock;
+ goto error_close_stream;
}
ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
if (ret < 0) {
ERR("Produced UST snapshot position");
- goto error_unlock;
+ goto error_close_stream;
}
ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
if (ret < 0) {
ERR("Consumerd UST snapshot position");
- goto error_unlock;
+ goto error_close_stream;
}
/*
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, uint64_t version,
- struct lttng_consumer_channel *channel, int timer, int wait)
+ struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
{
int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
char *metadata_str;
if (!wait) {
goto end_free;
}
- while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
- DBG("Waiting for metadata to be flushed");
- health_code_update();
-
- usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
- }
+ consumer_wait_metadata_cache_flushed(channel, offset + len, invoked_by_timer);
end_free:
free(metadata_str);
switch (msg.cmd_type) {
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
+ uint32_t major = msg.u.relayd_sock.major;
+ uint32_t minor = msg.u.relayd_sock.minor;
+ enum lttcomm_sock_proto protocol =
+ (enum lttcomm_sock_proto) msg.u.relayd_sock
+ .relayd_socket_protocol;
+
/* Session daemon status message are handled in the following call. */
consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
- msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
- msg.u.relayd_sock.relayd_session_id);
+ msg.u.relayd_sock.type, ctx, sock,
+ consumer_sockpoll, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id, major,
+ minor, protocol);
goto end_nosignal;
}
case LTTNG_CONSUMER_DESTROY_RELAYD:
health_code_update();
ret = lttng_ustconsumer_recv_metadata(sock, key, offset, len,
- version, found_channel, 0, 1);
+ version, found_channel, false, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_push_metadata_fatal;
stream->quiescent = true;
}
}
- pthread_mutex_unlock(&stream->lock);
+
stream->hangup_flush_done = 1;
+ pthread_mutex_unlock(&stream->lock);
}
void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
goto end;
}
stream->ust_metadata_pushed += write_len;
+ lttng_wait_queue_wake_all(&stream->chan->metadata_pushed_wait_queue);
assert(stream->chan->metadata_cache->contents.size >=
stream->ust_metadata_pushed);
* Request metadata from the sessiond, but don't wait for the flush
* because we locked the metadata thread.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, false, 0);
pthread_mutex_lock(&metadata_stream->lock);
if (ret < 0) {
status = SYNC_METADATA_STATUS_ERROR;
* pushed out due to concurrent interaction with the session daemon.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer, int wait)
+ struct lttng_consumer_channel *channel, bool invoked_by_timer, int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
health_code_update();
ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, version, channel, timer, wait);
+ key, offset, len, version, channel, invoked_by_timer, wait);
if (ret >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive