#include <urcu/list.h>
#include <signal.h>
+#include <bin/lttng-consumerd/health-consumerd.h>
#include <common/common.h>
#include <common/sessiond-comm/sessiond-comm.h>
#include <common/relayd/relayd.h>
#include <common/consumer-stream.h>
#include <common/consumer-timer.h>
#include <common/utils.h>
+#include <common/index/index.h>
#include "ust-consumer.h"
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
+
+ health_code_update();
+
cds_list_del(&stream->send_node);
ustctl_destroy_stream(stream->ustream);
free(stream);
const char *pathname, const char *name, uid_t uid, gid_t gid,
uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
uint64_t tracefile_size, uint64_t tracefile_count,
- uint64_t session_id_per_pid, unsigned int monitor)
+ uint64_t session_id_per_pid, unsigned int monitor,
+ unsigned int live_timer_interval)
{
assert(pathname);
assert(name);
return consumer_allocate_channel(key, session_id, pathname, name, uid,
gid, relayd_id, output, tracefile_size,
- tracefile_count, session_id_per_pid, monitor);
+ tracefile_count, session_id_per_pid, monitor, live_timer_interval);
}
/*
int wait_fd;
int ust_metadata_pipe[2];
+ health_code_update();
+
if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
if (ret < 0) {
if (channel->relayd_id != (uint64_t) -1ULL) {
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Try to send the stream to the relayd if one is available. */
ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
if (ret < 0) {
/* The channel was sent successfully to the sessiond at this point. */
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Send stream to session daemon. */
ret = send_sessiond_stream(sock, stream);
if (ret < 0) {
/* Send streams to the corresponding thread. */
cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
send_node) {
+
+ health_code_update();
+
/* Sending the stream to the thread. */
ret = send_stream_to_thread(stream, ctx);
if (ret < 0) {
cds_lfht_for_each_entry_duplicate(ht->ht,
ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
&channel->key, &iter.iter, stream, node_channel_id.node) {
+
+ health_code_update();
+
ustctl_flush_buffer(stream->ustream, 1);
}
error:
}
assert(!metadata_channel->monitor);
+ health_code_update();
+
/*
* Ask the sessiond if we have new metadata waiting and update the
* consumer metadata cache.
*/
- ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0);
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
if (ret < 0) {
goto error;
}
+ health_code_update();
+
/*
* The metadata stream is NOT created in no monitor mode when the channel
* is created on a sessiond ask channel command.
ret = utils_create_stream_file(path, metadata_stream->name,
metadata_stream->chan->tracefile_size,
metadata_stream->tracefile_count_current,
- metadata_stream->uid, metadata_stream->gid);
+ metadata_stream->uid, metadata_stream->gid, NULL);
if (ret < 0) {
goto error_stream;
}
metadata_stream->tracefile_size_current = 0;
}
- pthread_mutex_lock(&metadata_channel->metadata_cache->lock);
-
do {
+ health_code_update();
+
ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
if (ret < 0) {
- goto error_unlock;
+ goto error_stream;
}
} while (ret > 0);
-error_unlock:
- pthread_mutex_unlock(&metadata_channel->metadata_cache->lock);
-
error_stream:
/*
* Clean up the stream completly because the next snapshot will use a new
DBG("UST consumer snapshot channel %" PRIu64, key);
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
+
+ health_code_update();
+
/* Lock stream because we are about to change its state. */
pthread_mutex_lock(&stream->lock);
stream->net_seq_idx = relayd_id;
ret = utils_create_stream_file(path, stream->name,
stream->chan->tracefile_size,
stream->tracefile_count_current,
- stream->uid, stream->gid);
+ stream->uid, stream->gid, NULL);
if (ret < 0) {
goto error_unlock;
}
ssize_t read_len;
unsigned long len, padded_len;
+ health_code_update();
+
DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
}
read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len,
- padded_len - len);
+ padded_len - len, NULL);
if (use_relayd) {
if (read_len != len) {
ret = -EPERM;
*/
int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
uint64_t len, struct lttng_consumer_channel *channel,
- int timer)
+ int timer, int wait)
{
int ret, ret_code = LTTNG_OK;
char *metadata_str;
goto end;
}
+ health_code_update();
+
/* Receive metadata string. */
ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
if (ret < 0) {
goto end_free;
}
+ health_code_update();
+
pthread_mutex_lock(&channel->metadata_cache->lock);
ret = consumer_metadata_cache_write(channel, offset, len, metadata_str);
if (ret < 0) {
}
pthread_mutex_unlock(&channel->metadata_cache->lock);
+ if (!wait) {
+ goto end_free;
+ }
while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
DBG("Waiting for metadata to be flushed");
+
+ health_code_update();
+
usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
}
struct lttcomm_consumer_msg msg;
struct lttng_consumer_channel *channel = NULL;
+ health_code_update();
+
ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
if (ret != sizeof(msg)) {
DBG("Consumer received unexpected message size %zd (expects %zu)",
}
return ret;
}
+
+ health_code_update();
+
if (msg.cmd_type == LTTNG_CONSUMER_STOP) {
/*
* Notify the session daemon that the command is completed.
return -ENOENT;
}
+ health_code_update();
+
/* relayd needs RCU read-side lock */
rcu_read_lock();
/* Session daemon status message are handled in the following call. */
ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
- &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id);
+ &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
+ msg.u.relayd_sock.relayd_session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_DESTROY_RELAYD:
msg.u.ask_channel.tracefile_size,
msg.u.ask_channel.tracefile_count,
msg.u.ask_channel.session_id_per_pid,
- msg.u.ask_channel.monitor);
+ msg.u.ask_channel.monitor,
+ msg.u.ask_channel.live_timer_interval);
if (!channel) {
goto end_channel_error;
}
goto error_fatal;
};
+ health_code_update();
+
ret = ask_channel(ctx, sock, channel, &attr);
if (ret < 0) {
goto end_channel_error;
}
consumer_timer_switch_start(channel, attr.switch_timer_interval);
attr.switch_timer_interval = 0;
+ } else {
+ consumer_timer_live_start(channel,
+ msg.u.ask_channel.live_timer_interval);
}
+ health_code_update();
+
/*
* Add the channel to the internal state AFTER all streams were created
* and successfully sent to session daemon. This way, all streams must
}
consumer_metadata_cache_destroy(channel);
}
+ if (channel->live_timer_enabled == 1) {
+ consumer_timer_live_stop(channel);
+ }
goto end_channel_error;
}
+ health_code_update();
+
/*
* Channel and streams are now created. Inform the session daemon that
* everything went well and should wait to receive the channel and
goto end_msg_sessiond;
}
+ health_code_update();
+
/* Send everything to sessiond. */
ret = send_sessiond_channel(sock, channel, ctx, &relayd_err);
if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
/*
* In no monitor mode, the streams ownership is kept inside the channel
* so don't send them to the data thread.
goto end_msg_sessiond;
}
+ health_code_update();
+
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(sock, LTTNG_OK);
if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
/* Wait for more data. */
- if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) {
+ health_poll_entry();
+ ret = lttng_consumer_poll_socket(consumer_sockpoll);
+ health_poll_exit();
+ if (ret < 0) {
goto error_fatal;
}
+ health_code_update();
+
ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
- len, channel, 0);
+ len, channel, 0, 1);
if (ret < 0) {
/* error receiving from sessiond */
goto error_fatal;
}
}
+ health_code_update();
ret = consumer_send_status_msg(sock, ret_code);
if (ret < 0) {
/* Somehow, the session daemon is not responding anymore. */
goto end_nosignal;
}
+ health_code_update();
break;
}
default:
end_nosignal:
rcu_read_unlock();
+ health_code_update();
+
/*
* Return 1 to indicate success since the 0 value can be a socket
* shutdown during the recv() or send() call.
goto error_fatal;
}
rcu_read_unlock();
+
+ health_code_update();
+
return 1;
end_channel_error:
if (channel) {
goto error_fatal;
}
rcu_read_unlock();
+
+ health_code_update();
+
return 1;
error_fatal:
rcu_read_unlock();
return ustctl_snapshot_get_consumed(stream->ustream, pos);
}
+void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
+ int producer)
+{
+ assert(stream);
+ assert(stream->ustream);
+
+ ustctl_flush_buffer(stream->ustream, producer);
+}
+
+int lttng_ustconsumer_get_current_timestamp(
+ struct lttng_consumer_stream *stream, uint64_t *ts)
+{
+ assert(stream);
+ assert(stream->ustream);
+ assert(ts);
+
+ return ustctl_get_current_timestamp(stream->ustream, ts);
+}
+
/*
* Called when the stream signal the consumer that it has hang up.
*/
ustctl_destroy_stream(stream->ustream);
}
+/*
+ * Populate index values of a UST stream. Values are set in big endian order.
+ *
+ * Return 0 on success or else a negative value.
+ */
+static int get_index_values(struct lttng_packet_index *index,
+ struct ustctl_consumer_stream *ustream)
+{
+ int ret;
+
+ ret = ustctl_get_timestamp_begin(ustream, &index->timestamp_begin);
+ if (ret < 0) {
+ PERROR("ustctl_get_timestamp_begin");
+ goto error;
+ }
+ index->timestamp_begin = htobe64(index->timestamp_begin);
+
+ ret = ustctl_get_timestamp_end(ustream, &index->timestamp_end);
+ if (ret < 0) {
+ PERROR("ustctl_get_timestamp_end");
+ goto error;
+ }
+ index->timestamp_end = htobe64(index->timestamp_end);
+
+ ret = ustctl_get_events_discarded(ustream, &index->events_discarded);
+ if (ret < 0) {
+ PERROR("ustctl_get_events_discarded");
+ goto error;
+ }
+ index->events_discarded = htobe64(index->events_discarded);
+
+ ret = ustctl_get_content_size(ustream, &index->content_size);
+ if (ret < 0) {
+ PERROR("ustctl_get_content_size");
+ goto error;
+ }
+ index->content_size = htobe64(index->content_size);
+
+ ret = ustctl_get_packet_size(ustream, &index->packet_size);
+ if (ret < 0) {
+ PERROR("ustctl_get_packet_size");
+ goto error;
+ }
+ index->packet_size = htobe64(index->packet_size);
+
+ ret = ustctl_get_stream_id(ustream, &index->stream_id);
+ if (ret < 0) {
+ PERROR("ustctl_get_stream_id");
+ goto error;
+ }
+ index->stream_id = htobe64(index->stream_id);
+
+error:
+ return ret;
+}
+
+/*
+ * Write up to one packet from the metadata cache to the channel.
+ *
+ * Returns the number of bytes pushed in the cache, or a negative value
+ * on error.
+ */
+static
+int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
+{
+ ssize_t write_len;
+ int ret;
+
+ pthread_mutex_lock(&stream->chan->metadata_cache->lock);
+ if (stream->chan->metadata_cache->contiguous
+ == stream->ust_metadata_pushed) {
+ ret = 0;
+ goto end;
+ }
+
+ write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
+ &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
+ stream->chan->metadata_cache->contiguous
+ - stream->ust_metadata_pushed);
+ assert(write_len != 0);
+ if (write_len < 0) {
+ ERR("Writing one metadata packet");
+ ret = -1;
+ goto end;
+ }
+ stream->ust_metadata_pushed += write_len;
+
+ assert(stream->chan->metadata_cache->contiguous >=
+ stream->ust_metadata_pushed);
+ ret = write_len;
+
+end:
+ pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
+ return ret;
+}
+
+
+/*
+ * Sync metadata meaning request them to the session daemon and snapshot to the
+ * metadata thread can consumer them.
+ *
+ * Metadata stream lock MUST be acquired.
+ *
+ * Return 0 if new metadatda is available, EAGAIN if the metadata stream
+ * is empty or a negative value on error.
+ */
+int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
+ struct lttng_consumer_stream *metadata)
+{
+ int ret;
+ int retry = 0;
+
+ assert(ctx);
+ assert(metadata);
+
+ /*
+ * Request metadata from the sessiond, but don't wait for the flush
+ * because we locked the metadata thread.
+ */
+ ret = lttng_ustconsumer_request_metadata(ctx, metadata->chan, 0, 0);
+ if (ret < 0) {
+ goto end;
+ }
+
+ ret = commit_one_metadata_packet(metadata);
+ if (ret <= 0) {
+ goto end;
+ } else if (ret > 0) {
+ retry = 1;
+ }
+
+ ustctl_flush_buffer(metadata->ustream, 1);
+ ret = ustctl_snapshot(metadata->ustream);
+ if (ret < 0) {
+ if (errno != EAGAIN) {
+ ERR("Sync metadata, taking UST snapshot");
+ goto end;
+ }
+ DBG("No new metadata when syncing them.");
+ /* No new metadata, exit. */
+ ret = ENODATA;
+ goto end;
+ }
+
+ /*
+ * After this flush, we still need to extract metadata.
+ */
+ if (retry) {
+ ret = EAGAIN;
+ }
+
+end:
+ return ret;
+}
+
+/*
+ * Read subbuffer from the given stream.
+ *
+ * Stream lock MUST be acquired.
+ *
+ * Return 0 on success else a negative value.
+ */
int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx)
{
unsigned long len, subbuf_size, padding;
- int err;
+ int err, write_index = 1;
long ret = 0;
char dummy;
struct ustctl_consumer_stream *ustream;
+ struct lttng_packet_index index;
assert(stream);
assert(stream->ustream);
/* Ease our life for what's next. */
ustream = stream->ustream;
- /* We can consume the 1 byte written into the wait_fd by UST */
+ /*
+ * We can consume the 1 byte written into the wait_fd by UST.
+ * Don't trigger error if we cannot read this one byte (read
+ * returns 0), or if the error is EAGAIN or EWOULDBLOCK.
+ */
if (stream->monitor && !stream->hangup_flush_done) {
ssize_t readlen;
- do {
- readlen = read(stream->wait_fd, &dummy, 1);
- } while (readlen == -1 && errno == EINTR);
- if (readlen == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+ readlen = lttng_read(stream->wait_fd, &dummy, 1);
+ if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
ret = readlen;
goto end;
}
* already been read.
*/
if (stream->metadata_flag) {
- ssize_t write_len;
-
- if (stream->chan->metadata_cache->contiguous
- == stream->ust_metadata_pushed) {
- ret = 0;
- goto end;
- }
-
- write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
- &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
- stream->chan->metadata_cache->contiguous
- - stream->ust_metadata_pushed);
- assert(write_len != 0);
- if (write_len < 0) {
- ERR("Writing one metadata packet");
- ret = -1;
+ ret = commit_one_metadata_packet(stream);
+ if (ret <= 0) {
goto end;
}
- stream->ust_metadata_pushed += write_len;
ustctl_flush_buffer(stream->ustream, 1);
goto retry;
}
goto end;
}
assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
+
+ if (!stream->metadata_flag) {
+ index.offset = htobe64(stream->out_fd_offset);
+ ret = get_index_values(&index, ustream);
+ if (ret < 0) {
+ goto end;
+ }
+ } else {
+ write_index = 0;
+ }
+
/* Get the full padded subbuffer size */
err = ustctl_get_padded_subbuf_size(ustream, &len);
assert(err == 0);
padding = len - subbuf_size;
/* write the subbuffer to the tracefile */
- ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding);
+ ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, padding, &index);
/*
* The mmap operation should write subbuf_size amount of data when network
* streaming or the full padding (len) size when we are _not_ streaming.
DBG("Error writing to tracefile "
"(ret: %ld != len: %lu != subbuf_size: %lu)",
ret, len, subbuf_size);
+ write_index = 0;
}
err = ustctl_put_next_subbuf(ustream);
assert(err == 0);
+ /* Write index if needed. */
+ if (!write_index) {
+ goto end;
+ }
+
+ if (stream->chan->live_timer_interval && !stream->metadata_flag) {
+ /*
+ * In live, block until all the metadata is sent.
+ */
+ err = consumer_stream_sync_metadata(ctx, stream->session_id);
+ if (err < 0) {
+ goto end;
+ }
+ }
+
+ assert(!stream->metadata_flag);
+ err = consumer_stream_write_index(stream, &index);
+ if (err < 0) {
+ goto end;
+ }
+
end:
return ret;
}
if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor) {
ret = utils_create_stream_file(stream->chan->pathname, stream->name,
stream->chan->tracefile_size, stream->tracefile_count_current,
- stream->uid, stream->gid);
+ stream->uid, stream->gid, NULL);
if (ret < 0) {
goto error;
}
stream->out_fd = ret;
stream->tracefile_size_current = 0;
+
+ if (!stream->metadata_flag) {
+ ret = index_create_file(stream->chan->pathname,
+ stream->name, stream->uid, stream->gid,
+ stream->chan->tracefile_size,
+ stream->tracefile_count_current);
+ if (ret < 0) {
+ goto error;
+ }
+ stream->index_fd = ret;
+ }
}
ret = 0;
rcu_read_lock();
cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
node.node) {
+
+ health_code_update();
+
pthread_mutex_lock(&stream->chan->lock);
/*
* Whatever returned value, we must continue to try to close everything
* introduces deadlocks.
*/
int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
- struct lttng_consumer_channel *channel, int timer)
+ struct lttng_consumer_channel *channel, int timer, int wait)
{
struct lttcomm_metadata_request_msg request;
struct lttcomm_consumer_msg msg;
request.key);
pthread_mutex_lock(&ctx->metadata_socket_lock);
+
+ health_code_update();
+
ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
sizeof(request));
if (ret < 0) {
goto end;
}
+ health_code_update();
+
/* Receive the metadata from sessiond */
ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
sizeof(msg));
goto end;
}
+ health_code_update();
+
if (msg.cmd_type == LTTNG_ERR_UND) {
/* No registry found */
(void) consumer_send_status_msg(ctx->consumer_metadata_socket,
DBG("No new metadata to receive for key %" PRIu64, key);
}
+ health_code_update();
+
/* Tell session daemon we are ready to receive the metadata. */
ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
LTTNG_OK);
goto end;
}
+ health_code_update();
+
ret_code = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
- key, offset, len, channel, timer);
+ key, offset, len, channel, timer, wait);
if (ret_code >= 0) {
/*
* Only send the status msg if the sessiond is alive meaning a positive
ret = 0;
end:
+ health_code_update();
+
pthread_mutex_unlock(&ctx->metadata_socket_lock);
return ret;
}