#include <urcu/list.h>
#include <signal.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/relayd/relayd.h>
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
+
+ health_code_update();
+
cds_list_del(&stream->send_node);
ustctl_destroy_stream(stream->ustream);
free(stream);
int wait_fd;
int ust_metadata_pipe[2];
+ health_code_update();
+
if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
if (ret < 0) {
if (channel->relayd_id != (uint64_t) -1ULL) {
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Try to send the stream to the relayd if one is available. */
ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
if (ret < 0) {
/* The channel was sent successfully to the sessiond at this point. */
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Send stream to session daemon. */
ret = send_sessiond_stream(sock, stream);
if (ret < 0) {
/* Send streams to the corresponding thread. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
+
+ health_code_update();
+
/* Sending the stream to the thread. */
ret = send_stream_to_thread(stream, ctx);
if (ret < 0) {
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
&channel->key, &iter.iter, stream, node_channel_id.node) {
+
+ health_code_update();
+
ustctl_flush_buffer(stream->ustream, 1);
}
error:
}
assert(!metadata_channel->monitor);
+ health_code_update();
+
/*
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
if (ret < 0) {
goto error;
}
+ health_code_update();
+
/*
* The metadata stream is NOT created in no monitor mode when the channel
* is created on a sessiond ask channel command.
metadata_stream->tracefile_size_current = 0;
}
- pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
-
do {
+ health_code_update();
+
ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
if (ret < 0) {
- goto error_unlock;
+ goto error_stream;
}
} while (ret > 0);
-error_unlock:
- pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
-
error_stream:
/*
* Clean up the stream completly because the next snapshot will use a new
DBG("UST consumer snapshot channel %" PRIu64, key);
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Lock stream because we are about to change its state. */
pthread_mutex_lock(&stream->lock);
stream->net_seq_idx = relayd_id;
ssize_t read_len;
unsigned long len, padded_len;
+ health_code_update();
+
DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, struct lttng_consumer_channel *channel,
- int timer)
+ int timer, int wait)
{
int ret, ret_code = LTTNG_OK;
char *metadata_str;
goto end;
}
+ health_code_update();
+
/* Receive metadata string. */
ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
if (ret < 0) {
goto end_free;
}
+ health_code_update();
+
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ if (!wait) {
+ goto end_free;
+ }
while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
DBG("Waiting for metadata to be flushed");
+
+ health_code_update();
+
usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
}
struct lttcomm_consumer_msg msg;
struct lttng_consumer_channel *channel = NULL;
+ health_code_update();
+
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
DBG("Consumer received unexpected message size %zd (expects %zu)",
}
return ret;
}
+
+ health_code_update();
+
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
/*
* Notify the session daemon that the command is completed.
return -ENOENT;
}
+ health_code_update();
+
/* relayd needs RCU read-side lock */
rcu_read_lock();
goto error_fatal;
};
+ health_code_update();
+
ret = ask_channel(ctx, sock, channel, &attr);
if (ret < 0) {
goto end_channel_error;
}
consumer_timer_switch_start(channel, attr.switch_timer_interval);
attr.switch_timer_interval = 0;
+ } else {
+ consumer_timer_live_start(channel,
+ msg.u.ask_channel.live_timer_interval);
}
- consumer_timer_live_start(channel, msg.u.ask_channel.live_timer_interval);
+ health_code_update();
/*
* Add the channel to the internal state AFTER all streams were created
goto end_channel_error;
}
+ health_code_update();
+
/*
* Channel and streams are now created. Inform the session daemon that
* everything went well and should wait to receive the channel and
goto end_msg_sessiond;
}
+ health_code_update();
+
/* Send everything to sessiond. */
ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
/*
* In no monitor mode, the streams ownership is kept inside the channel
* so don't send them to the data thread.
goto end_msg_sessiond;
}
+ health_code_update();
+
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(sock, LTTNG_OK);
if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
/* Wait for more data. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
- len, channel, 0);
+ len, channel, 0, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_fatal;
}
}
+ health_code_update();
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
+ health_code_update();
break;
}
default:
end_nosignal:
rcu_read_unlock();
+ health_code_update();
+
/*
* Return 1 to indicate success since the 0 value can be a socket
* shutdown during the recv() or send() call.
goto error_fatal;
}
rcu_read_unlock();
+
+ health_code_update();
+
return 1;
end_channel_error:
if (channel) {
goto error_fatal;
}
rcu_read_unlock();
+
+ health_code_update();
+
return 1;
error_fatal:
rcu_read_unlock();
return ustctl_snapshot_get_consumed(stream->ustream, pos);
}
+void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
+ int producer)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ ustctl_flush_buffer(stream->ustream, producer);
+}
+
+int lttng_ustconsumer_get_current_timestamp(
+ struct lttng_consumer_stream *stream, uint64_t *ts)
+{
+ assert(stream);
+ assert(stream->ustream);
+ assert(ts);
+
+ return ustctl_get_current_timestamp(stream->ustream, ts);
+}
+
/*
* Called when the stream signal the consumer that it has hang up.
*/
return ret;
}
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed in the cache, or a negative value
+ * on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+ ssize_t write_len;
+ int ret;
+
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ if (stream->chan->metadata_cache->contiguous
+ == stream->ust_metadata_pushed) {
+ ret = 0;
+ goto end;
+ }
+ write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+ &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+ stream->chan->metadata_cache->contiguous
+ - stream->ust_metadata_pushed);
+ assert(write_len != 0);
+ if (write_len < 0) {
+ ERR("Writing one metadata packet");
+ ret = -1;
+ goto end;
+ }
+ stream->ust_metadata_pushed += write_len;
+
+ assert(stream->chan->metadata_cache->contiguous >=
+ stream->ust_metadata_pushed);
+ ret = write_len;
+
+end:
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ return ret;
+}
+
+
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata)
+{
+ int ret;
+ int retry = 0;
+
+ assert(ctx);
+ assert(metadata);
+
+ /*
+ * Request metadata from the sessiond, but don't wait for the flush
+ * because we locked the metadata thread.
+ */
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = commit_one_metadata_packet(metadata);
+ if (ret <= 0) {
+ goto end;
+ } else if (ret > 0) {
+ retry = 1;
+ }
+
+ ustctl_flush_buffer(metadata->ustream, 1);
+ ret = ustctl_snapshot(metadata->ustream);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking UST snapshot");
+ goto end;
+ }
+ DBG("No new metadata when syncing them.");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+ /*
+ * After this flush, we still need to extract metadata.
+ */
+ if (retry) {
+ ret = EAGAIN;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Read subbuffer from the given stream.
+ *
+ * Stream lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
/* Ease our life for what's next. */
ustream = stream->ustream;
- /* We can consume the 1 byte written into the wait_fd by UST */
+ /*
+ * We can consume the 1 byte written into the wait_fd by UST.
+ * Don't trigger error if we cannot read this one byte (read
+ * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+ */
if (stream->monitor && !stream->hangup_flush_done) {
ssize_t readlen;
- do {
- readlen = read(stream->wait_fd, &dummy, 1);
- } while (readlen == -1 && errno == EINTR);
- if (readlen == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ readlen = lttng_read(stream->wait_fd, &dummy, 1);
+ if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
ret = readlen;
goto end;
}
* already been read.
*/
if (stream->metadata_flag) {
- ssize_t write_len;
-
- if (stream->chan->metadata_cache->contiguous
- == stream->ust_metadata_pushed) {
- ret = 0;
+ ret = commit_one_metadata_packet(stream);
+ if (ret <= 0) {
goto end;
}
-
- write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
- &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
- stream->chan->metadata_cache->contiguous
- - stream->ust_metadata_pushed);
- assert(write_len != 0);
- if (write_len < 0) {
- ERR("Writing one metadata packet");
- ret = -1;
- goto end;
- }
- stream->ust_metadata_pushed += write_len;
ustctl_flush_buffer(stream->ustream, 1);
goto retry;
}
goto end;
}
+ if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+ /*
+ * In live, block until all the metadata is sent.
+ */
+ err = consumer_stream_sync_metadata(ctx, stream->session_id);
+ if (err < 0) {
+ goto end;
+ }
+ }
+
assert(!stream->metadata_flag);
err = consumer_stream_write_index(stream, &index);
if (err < 0) {
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
node.node) {
+
+ health_code_update();
+
pthread_mutex_lock(&stream->chan->lock);
/*
* Whatever returned value, we must continue to try to close everything
* introduces deadlocks.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer)
+ struct lttng_consumer_channel *channel, int timer, int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
request.key);
pthread_mutex_lock(&ctx->metadata_socket_lock);
+
+ health_code_update();
+
ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
sizeof(request));
if (ret < 0) {
goto end;
}
+ health_code_update();
+
/* Receive the metadata from sessiond */
ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
sizeof(msg));
goto end;
}
+ health_code_update();
+
if (msg.cmd_type == LTTNG_ERR_UND) {
/* No registry found */
(void) consumer_send_status_msg(ctx->consumer_metadata_socket,
DBG("No new metadata to receive for key %" PRIu64, key);
}
+ health_code_update();
+
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
LTTNG_OK);
goto end;
}
+ health_code_update();
+
ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, channel, timer);
+ key, offset, len, channel, timer, wait);
if (ret_code >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
ret = 0;
end:
+ health_code_update();
+
pthread_mutex_unlock(&ctx->metadata_socket_lock);
return ret;
}