int infd = stream->wait_fd;
ret = kernctl_snapshot(infd);
- if (ret != 0) {
+ /*
+ * -EAGAIN is not an error, it just means that there is no data to
+ * be read.
+ */
+ if (ret != 0 && ret != -EAGAIN) {
PERROR("Getting sub-buffer snapshot.");
}
}
cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
- /* Are we at a position _before_ the first available packet ? */
- bool before_first_packet = true;
unsigned long consumed_pos, produced_pos;
health_code_update();
ERR("sending streams sent to relayd");
goto end_unlock;
}
+ channel->streams_sent_to_relayd = true;
}
- ret = kernctl_buffer_flush(stream->wait_fd);
+ ret = kernctl_buffer_flush_empty(stream->wait_fd);
if (ret < 0) {
- ERR("Failed to flush kernel stream");
+ /*
+ * Doing a buffer flush which does not take into
+ * account empty packets. This is not perfect
+ * for stream intersection, but required as a
+ * fall-back when "flush_empty" is not
+ * implemented by lttng-modules.
+ */
+ ret = kernctl_buffer_flush(stream->wait_fd);
+ if (ret < 0) {
+ ERR("Failed to flush kernel stream");
+ goto end_unlock;
+ }
goto end_unlock;
}
produced_pos, nb_packets_per_stream,
stream->max_sb_size);
- while (consumed_pos < produced_pos) {
+ while ((long) (consumed_pos - produced_pos) < 0) {
ssize_t read_len;
unsigned long len, padded_len;
- int lost_packet = 0;
health_code_update();
}
DBG("Kernel consumer get subbuf failed. Skipping it.");
consumed_pos += stream->max_sb_size;
-
- /*
- * Start accounting lost packets only when we
- * already have extracted packets (to match the
- * content of the final snapshot).
- */
- if (!before_first_packet) {
- lost_packet = 1;
- }
+ stream->chan->lost_packets++;
continue;
}
goto end_unlock;
}
consumed_pos += stream->max_sb_size;
-
- /*
- * Only account lost packets located between
- * succesfully extracted packets (do not account before
- * and after since they are not visible in the
- * resulting snapshot).
- */
- stream->chan->lost_packets += lost_packet;
- lost_packet = 0;
- before_first_packet = false;
}
if (relayd_id == (uint64_t) -1ULL) {
*
* Returns 0 on success, < 0 on error
*/
-int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
+static int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path,
uint64_t relayd_id, struct lttng_consumer_local_data *ctx)
{
int ret, use_relayd = 0;
if (!metadata_channel) {
ERR("Kernel snapshot metadata not found for key %" PRIu64, key);
ret = -1;
- goto error;
+ goto error_no_channel;
}
metadata_stream = metadata_channel->metadata_stream;
assert(metadata_stream);
+ pthread_mutex_lock(&metadata_stream->lock);
/* Flag once that we have a valid relayd for the stream. */
if (relayd_id != (uint64_t) -1ULL) {
if (use_relayd) {
ret = consumer_send_relayd_stream(metadata_stream, path);
if (ret < 0) {
- goto error;
+ goto error_snapshot;
}
} else {
ret = utils_create_stream_file(path, metadata_stream->name,
metadata_stream->tracefile_count_current,
metadata_stream->uid, metadata_stream->gid, NULL);
if (ret < 0) {
- goto error;
+ goto error_snapshot;
}
metadata_stream->out_fd = ret;
}
if (ret_read != -EAGAIN) {
ERR("Kernel snapshot reading metadata subbuffer (ret: %zd)",
ret_read);
- goto error;
+ ret = ret_read;
+ goto error_snapshot;
}
/* ret_read is negative at this point so we will exit the loop. */
continue;
}
ret = 0;
-
+error_snapshot:
+ pthread_mutex_unlock(&metadata_stream->lock);
cds_list_del(&metadata_stream->send_node);
consumer_stream_destroy(metadata_stream, NULL);
metadata_channel->metadata_stream = NULL;
-error:
+error_no_channel:
rcu_read_unlock();
return ret;
}
consumer_stream_free(new_stream);
goto end_nosignal;
}
+
+ /*
+ * If adding an extra stream to an already
+ * existing channel (e.g. cpu hotplug), we need
+ * to send the "streams_sent" command to relayd.
+ */
+ if (channel->streams_sent_to_relayd) {
+ ret = consumer_send_relayd_streams_sent(
+ new_stream->net_seq_idx);
+ if (ret < 0) {
+ goto end_nosignal;
+ }
+ }
}
/* Get the right pipe where the stream will be sent. */
if (ret < 0) {
goto end_nosignal;
}
+ channel->streams_sent_to_relayd = true;
}
break;
}
static int get_index_values(struct ctf_packet_index *index, int infd)
{
int ret;
+ uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
+ events_discarded, stream_id, stream_instance_id,
+ packet_seq_num;
- ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin);
+ ret = kernctl_get_timestamp_begin(infd, ×tamp_begin);
if (ret < 0) {
PERROR("kernctl_get_timestamp_begin");
goto error;
}
- index->timestamp_begin = htobe64(index->timestamp_begin);
- ret = kernctl_get_timestamp_end(infd, &index->timestamp_end);
+ ret = kernctl_get_timestamp_end(infd, ×tamp_end);
if (ret < 0) {
PERROR("kernctl_get_timestamp_end");
goto error;
}
- index->timestamp_end = htobe64(index->timestamp_end);
- ret = kernctl_get_events_discarded(infd, &index->events_discarded);
+ ret = kernctl_get_events_discarded(infd, &events_discarded);
if (ret < 0) {
PERROR("kernctl_get_events_discarded");
goto error;
}
- index->events_discarded = htobe64(index->events_discarded);
- ret = kernctl_get_content_size(infd, &index->content_size);
+ ret = kernctl_get_content_size(infd, &content_size);
if (ret < 0) {
PERROR("kernctl_get_content_size");
goto error;
}
- index->content_size = htobe64(index->content_size);
- ret = kernctl_get_packet_size(infd, &index->packet_size);
+ ret = kernctl_get_packet_size(infd, &packet_size);
if (ret < 0) {
PERROR("kernctl_get_packet_size");
goto error;
}
- index->packet_size = htobe64(index->packet_size);
- ret = kernctl_get_stream_id(infd, &index->stream_id);
+ ret = kernctl_get_stream_id(infd, &stream_id);
if (ret < 0) {
PERROR("kernctl_get_stream_id");
goto error;
}
- index->stream_id = htobe64(index->stream_id);
- ret = kernctl_get_instance_id(infd, &index->stream_instance_id);
+ ret = kernctl_get_instance_id(infd, &stream_instance_id);
if (ret < 0) {
if (ret == -ENOTTY) {
/* Command not implemented by lttng-modules. */
- index->stream_instance_id = -1ULL;
+ stream_instance_id = -1ULL;
ret = 0;
} else {
PERROR("kernctl_get_instance_id");
goto error;
}
}
- index->stream_instance_id = htobe64(index->stream_instance_id);
- ret = kernctl_get_sequence_number(infd, &index->packet_seq_num);
+ ret = kernctl_get_sequence_number(infd, &packet_seq_num);
if (ret < 0) {
if (ret == -ENOTTY) {
/* Command not implemented by lttng-modules. */
- index->packet_seq_num = -1ULL;
+ packet_seq_num = -1ULL;
ret = 0;
} else {
PERROR("kernctl_get_sequence_number");
}
index->packet_seq_num = htobe64(index->packet_seq_num);
+ *index = (typeof(*index)) {
+ .offset = index->offset,
+ .packet_size = htobe64(packet_size),
+ .content_size = htobe64(content_size),
+ .timestamp_begin = htobe64(timestamp_begin),
+ .timestamp_end = htobe64(timestamp_end),
+ .events_discarded = htobe64(events_discarded),
+ .stream_id = htobe64(stream_id),
+ .stream_instance_id = htobe64(stream_instance_id),
+ .packet_seq_num = htobe64(packet_seq_num),
+ };
+
error:
return ret;
}