/*
* Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
* Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
+ * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License, version 2 only,
extern struct lttng_consumer_global_data consumer_data;
extern int consumer_poll_timeout;
-extern volatile int consumer_quit;
/*
* Take a snapshot for a specific fd
int infd = stream->wait_fd;
ret = kernctl_snapshot(infd);
- if (ret != 0) {
+ /*
+ * -EAGAIN is not an error, it just means that there is no data to
+ * be read.
+ */
+ if (ret != 0 && ret != -EAGAIN) {
PERROR("Getting sub-buffer snapshot.");
}
return ret;
}
+/*
+ * Sample consumed and produced positions for a specific fd.
+ *
+ * Returns 0 on success, < 0 on error.
+ */
+int lttng_kconsumer_sample_snapshot_positions(
+ struct lttng_consumer_stream *stream)
+{
+ assert(stream);
+
+ return kernctl_snapshot_sample_positions(stream->wait_fd);
+}
+
/*
* Get the produced position
*
}
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
- /* Are we at a position _before_ the first available packet ? */
- bool before_first_packet = true;
unsigned long consumed_pos, produced_pos;
health_code_update();
DBG("Kernel consumer snapshot stream %s/%s (%" PRIu64 ")",
path, stream->name, stream->key);
}
- if (relayd_id != -1ULL) {
- ret = consumer_send_relayd_streams_sent(relayd_id);
+
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
+ if (ret < 0) {
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect
+ * for stream intersection, but required as a
+ * fall-back when "flush_empty" is not
+ * implemented by lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
if (ret < 0) {
- ERR("sending streams sent to relayd");
+ ERR("Failed to flush kernel stream");
goto end_unlock;
}
- }
-
- ret = kernctl_buffer_flush(stream->wait_fd);
- if (ret < 0) {
- ERR("Failed to flush kernel stream");
goto end_unlock;
}
produced_pos, nb_packets_per_stream,
stream->max_sb_size);
- while (consumed_pos < produced_pos) {
+ while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
unsigned long len, padded_len;
- int lost_packet = 0;
health_code_update();
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
-
- /*
- * Start accounting lost packets only when we
- * already have extracted packets (to match the
- * content of the final snapshot).
- */
- if (!before_first_packet) {
- lost_packet = 1;
- }
+ stream->chan->lost_packets++;
continue;
}
goto end_unlock;
}
consumed_pos += stream->max_sb_size;
-
- /*
- * Only account lost packets located between
- * succesfully extracted packets (do not account before
- * and after since they are not visible in the
- * resulting snapshot).
- */
- stream->chan->lost_packets += lost_packet;
- lost_packet = 0;
- before_first_packet = false;
}
if (relayd_id == (uint64_t) -1ULL) {
*
* Returns 0 on success, < 0 on error
*/
-int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
+static int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
{
int ret, use_relayd = 0;
if (!metadata_channel) {
ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
ret = -1;
- goto error;
+ goto error_no_channel;
}
metadata_stream = metadata_channel->metadata_stream;
assert(metadata_stream);
+ pthread_mutex_lock(&metadata_stream->lock);
/* Flag once that we have a valid relayd for the stream. */
if (relayd_id != (uint64_t) -1ULL) {
if (use_relayd) {
ret = consumer_send_relayd_stream(metadata_stream, path);
if (ret < 0) {
- goto error;
+ goto error_snapshot;
}
} else {
ret = utils_create_stream_file(path, metadata_stream->name,
metadata_stream->tracefile_count_current,
metadata_stream->uid, metadata_stream->gid, NULL);
if (ret < 0) {
- goto error;
+ goto error_snapshot;
}
metadata_stream->out_fd = ret;
}
if (ret_read != -EAGAIN) {
ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
ret_read);
- goto error;
+ ret = ret_read;
+ goto error_snapshot;
}
/* ret_read is negative at this point so we will exit the loop. */
continue;
}
ret = 0;
-
+error_snapshot:
+ pthread_mutex_unlock(&metadata_stream->lock);
cds_list_del(&metadata_stream->send_node);
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
-error:
+error_no_channel:
rcu_read_unlock();
return ret;
}
case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
{
/* Session daemon status message are handled in the following call. */
- ret = consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
+ consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
&msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
- msg.u.relayd_sock.relayd_session_id);
+ msg.u.relayd_sock.relayd_session_id);
goto end_nosignal;
}
case LTTNG_CONSUMER_ADD_CHANNEL:
} else {
ret = consumer_add_channel(new_channel, ctx);
}
- if (CONSUMER_CHANNEL_TYPE_DATA) {
+ if (msg.u.channel.type == CONSUMER_CHANNEL_TYPE_DATA && !ret) {
+ int monitor_start_ret;
+
+ DBG("Consumer starting monitor timer");
consumer_timer_live_start(new_channel,
msg.u.channel.live_timer_interval);
+ monitor_start_ret = consumer_timer_monitor_start(
+ new_channel,
+ msg.u.channel.monitor_timer_interval);
+ if (monitor_start_ret < 0) {
+ ERR("Starting channel monitoring timer failed");
+ goto end_nosignal;
+ }
+
}
health_code_update();
consumer_stream_free(new_stream);
goto end_nosignal;
}
+
+ /*
+ * If adding an extra stream to an already
+ * existing channel (e.g. cpu hotplug), we need
+ * to send the "streams_sent" command to relayd.
+ */
+ if (channel->streams_sent_to_relayd) {
+ ret = consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
}
/* Get the right pipe where the stream will be sent. */
if (ret < 0) {
goto end_nosignal;
}
+ channel->streams_sent_to_relayd = true;
}
break;
}
}
case LTTNG_CONSUMER_DISCARDED_EVENTS:
{
- uint64_t ret;
+ ssize_t ret;
+ uint64_t count;
struct lttng_consumer_channel *channel;
uint64_t id = msg.u.discarded_events.session_id;
uint64_t key = msg.u.discarded_events.channel_key;
if (!channel) {
ERR("Kernel consumer discarded events channel %"
PRIu64 " not found", key);
- ret = 0;
+ count = 0;
} else {
- ret = channel->discarded_events;
+ count = channel->discarded_events;
}
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
if (ret < 0) {
PERROR("send discarded events");
goto error_fatal;
}
case LTTNG_CONSUMER_LOST_PACKETS:
{
- uint64_t ret;
+ ssize_t ret;
+ uint64_t count;
struct lttng_consumer_channel *channel;
uint64_t id = msg.u.lost_packets.session_id;
uint64_t key = msg.u.lost_packets.channel_key;
if (!channel) {
ERR("Kernel consumer lost packets channel %"
PRIu64 " not found", key);
- ret = 0;
+ count = 0;
} else {
- ret = channel->lost_packets;
+ count = channel->lost_packets;
}
health_code_update();
/* Send back returned value to session daemon */
- ret = lttcomm_send_unix_sock(sock, &ret, sizeof(ret));
+ ret = lttcomm_send_unix_sock(sock, &count, sizeof(count));
if (ret < 0) {
PERROR("send lost packets");
goto error_fatal;
break;
}
+ case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
+ {
+ int channel_monitor_pipe;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Successfully received the command's type. */
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+
+ ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
+ 1);
+ if (ret != sizeof(channel_monitor_pipe)) {
+ ERR("Failed to receive channel monitor pipe");
+ goto error_fatal;
+ }
+
+ DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
+ ret = consumer_timer_thread_set_channel_monitor_pipe(
+ channel_monitor_pipe);
+ if (!ret) {
+ int flags;
+
+ ret_code = LTTCOMM_CONSUMERD_SUCCESS;
+ /* Set the pipe as non-blocking. */
+ ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
+ if (ret == -1) {
+ PERROR("fcntl get flags of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ flags = ret;
+
+ ret = fcntl(channel_monitor_pipe, F_SETFL,
+ flags | O_NONBLOCK);
+ if (ret == -1) {
+ PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
+ goto error_fatal;
+ }
+ DBG("Channel monitor pipe set as non-blocking");
+ } else {
+ ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
+ }
+ ret = consumer_send_status_msg(sock, ret_code);
+ if (ret < 0) {
+ goto error_fatal;
+ }
+ break;
+ }
default:
goto end_nosignal;
}
static int get_index_values(struct ctf_packet_index *index, int infd)
{
int ret;
+ uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
+ events_discarded, stream_id, stream_instance_id,
+ packet_seq_num;
- ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin);
+ ret = kernctl_get_timestamp_begin(infd, ×tamp_begin);
if (ret < 0) {
PERROR("kernctl_get_timestamp_begin");
goto error;
}
- index->timestamp_begin = htobe64(index->timestamp_begin);
- ret = kernctl_get_timestamp_end(infd, &index->timestamp_end);
+ ret = kernctl_get_timestamp_end(infd, ×tamp_end);
if (ret < 0) {
PERROR("kernctl_get_timestamp_end");
goto error;
}
- index->timestamp_end = htobe64(index->timestamp_end);
- ret = kernctl_get_events_discarded(infd, &index->events_discarded);
+ ret = kernctl_get_events_discarded(infd, &events_discarded);
if (ret < 0) {
PERROR("kernctl_get_events_discarded");
goto error;
}
- index->events_discarded = htobe64(index->events_discarded);
- ret = kernctl_get_content_size(infd, &index->content_size);
+ ret = kernctl_get_content_size(infd, &content_size);
if (ret < 0) {
PERROR("kernctl_get_content_size");
goto error;
}
- index->content_size = htobe64(index->content_size);
- ret = kernctl_get_packet_size(infd, &index->packet_size);
+ ret = kernctl_get_packet_size(infd, &packet_size);
if (ret < 0) {
PERROR("kernctl_get_packet_size");
goto error;
}
- index->packet_size = htobe64(index->packet_size);
- ret = kernctl_get_stream_id(infd, &index->stream_id);
+ ret = kernctl_get_stream_id(infd, &stream_id);
if (ret < 0) {
PERROR("kernctl_get_stream_id");
goto error;
}
- index->stream_id = htobe64(index->stream_id);
- ret = kernctl_get_instance_id(infd, &index->stream_instance_id);
+ ret = kernctl_get_instance_id(infd, &stream_instance_id);
if (ret < 0) {
if (ret == -ENOTTY) {
/* Command not implemented by lttng-modules. */
- index->stream_instance_id = -1ULL;
- ret = 0;
+ stream_instance_id = -1ULL;
} else {
PERROR("kernctl_get_instance_id");
goto error;
}
}
- index->stream_instance_id = htobe64(index->stream_instance_id);
- ret = kernctl_get_sequence_number(infd, &index->packet_seq_num);
+ ret = kernctl_get_sequence_number(infd, &packet_seq_num);
if (ret < 0) {
if (ret == -ENOTTY) {
/* Command not implemented by lttng-modules. */
- index->packet_seq_num = -1ULL;
+ packet_seq_num = -1ULL;
ret = 0;
} else {
PERROR("kernctl_get_sequence_number");
}
index->packet_seq_num = htobe64(index->packet_seq_num);
+ *index = (typeof(*index)) {
+ .offset = index->offset,
+ .packet_size = htobe64(packet_size),
+ .content_size = htobe64(content_size),
+ .timestamp_begin = htobe64(timestamp_begin),
+ .timestamp_end = htobe64(timestamp_end),
+ .events_discarded = htobe64(events_discarded),
+ .stream_id = htobe64(stream_id),
+ .stream_instance_id = htobe64(stream_instance_id),
+ .packet_seq_num = htobe64(packet_seq_num),
+ };
+
error:
return ret;
}
if (ret == -ENOTTY) {
/* Command not implemented by lttng-modules. */
seq = -1ULL;
- ret = 0;
} else {
PERROR("kernctl_get_sequence_number");
goto end;
}
ret = update_stream_stats(stream);
if (ret < 0) {
+ err = kernctl_put_subbuf(infd);
+ if (err != 0) {
+ if (err == -EFAULT) {
+ PERROR("Error in unreserving sub buffer\n");
+ } else if (err == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ ret = err;
+ goto end;
+ }
goto end;
}
} else {
write_index = 0;
ret = metadata_stream_check_version(infd, stream);
if (ret < 0) {
+ err = kernctl_put_subbuf(infd);
+ if (err != 0) {
+ if (err == -EFAULT) {
+ PERROR("Error in unreserving sub buffer\n");
+ } else if (err == -EIO) {
+ /* Should never happen with newer LTTng versions */
+ PERROR("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ ret = err;
+ goto end;
+ }
goto end;
}
}
if (!index_file) {
goto error;
}
+ assert(!stream->index_file);
stream->index_file = index_file;
}
}