X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.cpp;h=ce1d3d8293463898d09b1d9f12eeab018dbbbf9e;hb=d119bd017a99d56ad36901ce8f2175a8ea3b5e5e;hp=aa443027effe021529816ad6a1ea7ffbd8dc3163;hpb=97535efaa975ca52bf02c2d5e76351bfd2e3defa;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.cpp b/src/common/kernel-consumer/kernel-consumer.cpp index aa443027e..ce1d3d829 100644 --- a/src/common/kernel-consumer/kernel-consumer.cpp +++ b/src/common/kernel-consumer/kernel-consumer.cpp @@ -1,5 +1,5 @@ /* - * Copyright (C) 2011 Julien Desfossez + * Copyright (C) 2011 EfficiOS Inc. * Copyright (C) 2011 Mathieu Desnoyers * Copyright (C) 2017 Jérémie Galarneau * @@ -20,25 +20,25 @@ #include #include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#include "kernel-consumer.h" +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "kernel-consumer.hpp" extern struct lttng_consumer_global_data the_consumer_data; extern int consumer_poll_timeout; @@ -138,21 +138,23 @@ error: /* * Take a snapshot of all the stream of a channel * RCU read-side lock must be held across this function to ensure existence of - * channel. The channel lock must be held by the caller. + * channel. * * Returns 0 on success, < 0 on error */ static int lttng_kconsumer_snapshot_channel( struct lttng_consumer_channel *channel, uint64_t key, char *path, uint64_t relayd_id, - uint64_t nb_packets_per_stream, - struct lttng_consumer_local_data *ctx) + uint64_t nb_packets_per_stream) { int ret; struct lttng_consumer_stream *stream; DBG("Kernel consumer snapshot channel %" PRIu64, key); + /* Prevent channel modifications while we perform the snapshot.*/ + pthread_mutex_lock(&channel->lock); + rcu_read_lock(); /* Splice is not supported yet for channel snapshot. */ @@ -196,13 +198,13 @@ static int lttng_kconsumer_snapshot_channel( ret = consumer_send_relayd_stream(stream, path); if (ret < 0) { ERR("sending stream to relayd"); - goto end_unlock; + goto error_close_stream_output; } } else { ret = consumer_stream_create_output_files(stream, false); if (ret < 0) { - goto end_unlock; + goto error_close_stream_output; } DBG("Kernel consumer snapshot stream (%" PRIu64 ")", stream->key); @@ -220,7 +222,7 @@ static int lttng_kconsumer_snapshot_channel( ret = kernctl_buffer_flush(stream->wait_fd); if (ret < 0) { ERR("Failed to flush kernel stream"); - goto end_unlock; + goto error_close_stream_output; } goto end_unlock; } @@ -228,19 +230,19 @@ static int lttng_kconsumer_snapshot_channel( ret = lttng_kconsumer_take_snapshot(stream); if (ret < 0) { ERR("Taking kernel snapshot"); - goto end_unlock; + goto error_close_stream_output; } ret = lttng_kconsumer_get_produced_snapshot(stream, &produced_pos); if (ret < 0) { ERR("Produced kernel snapshot position"); - goto end_unlock; + goto error_close_stream_output; } ret = lttng_kconsumer_get_consumed_snapshot(stream, &consumed_pos); if (ret < 0) { ERR("Consumerd kernel snapshot position"); - goto end_unlock; + goto error_close_stream_output; } consumed_pos = consumer_get_consume_start_pos(consumed_pos, @@ -260,7 +262,7 @@ static int lttng_kconsumer_snapshot_channel( if (ret < 0) { if (ret != -EAGAIN) { PERROR("kernctl_get_subbuf snapshot"); - goto end_unlock; + goto error_close_stream_output; } DBG("Kernel consumer get subbuf failed. Skipping it."); consumed_pos += stream->max_sb_size; @@ -310,26 +312,12 @@ static int lttng_kconsumer_snapshot_channel( ret = kernctl_put_subbuf(stream->wait_fd); if (ret < 0) { ERR("Snapshot kernctl_put_subbuf"); - goto end_unlock; + goto error_close_stream_output; } consumed_pos += stream->max_sb_size; } - if (relayd_id == (uint64_t) -1ULL) { - if (stream->out_fd >= 0) { - ret = close(stream->out_fd); - if (ret < 0) { - PERROR("Kernel consumer snapshot close out_fd"); - goto end_unlock; - } - stream->out_fd = -1; - } - } else { - close_relayd_stream(stream); - stream->net_seq_idx = (uint64_t) -1ULL; - } - lttng_trace_chunk_put(stream->trace_chunk); - stream->trace_chunk = NULL; + consumer_stream_close_output(stream); pthread_mutex_unlock(&stream->lock); } @@ -342,17 +330,20 @@ error_put_subbuf: if (ret < 0) { ERR("Snapshot kernctl_put_subbuf error path"); } +error_close_stream_output: + consumer_stream_close_output(stream); end_unlock: pthread_mutex_unlock(&stream->lock); end: rcu_read_unlock(); + pthread_mutex_unlock(&channel->lock); return ret; } /* * Read the whole metadata available for a snapshot. * RCU read-side lock must be held across this function to ensure existence of - * metadata_channel. The channel lock must be held by the caller. + * metadata_channel. * * Returns 0 on success, < 0 on error */ @@ -375,7 +366,7 @@ static int lttng_kconsumer_snapshot_metadata( metadata_stream = metadata_channel->metadata_stream; LTTNG_ASSERT(metadata_stream); - pthread_mutex_lock(&metadata_stream->lock); + metadata_stream->read_subbuffer_ops.lock(metadata_stream); LTTNG_ASSERT(metadata_channel->trace_chunk); LTTNG_ASSERT(metadata_stream->trace_chunk); @@ -430,8 +421,7 @@ static int lttng_kconsumer_snapshot_metadata( ret = 0; error_snapshot: - pthread_mutex_unlock(&metadata_stream->lock); - cds_list_del(&metadata_stream->send_node); + metadata_stream->read_subbuffer_ops.unlock(metadata_stream); consumer_stream_destroy(metadata_stream, NULL); metadata_channel->metadata_stream = NULL; rcu_read_unlock(); @@ -479,11 +469,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, switch (msg.cmd_type) { case LTTNG_CONSUMER_ADD_RELAYD_SOCKET: { + uint32_t major = msg.u.relayd_sock.major; + uint32_t minor = msg.u.relayd_sock.minor; + enum lttcomm_sock_proto protocol = (enum lttcomm_sock_proto) + msg.u.relayd_sock.relayd_socket_protocol; + /* Session daemon status message are handled in the following call. */ consumer_add_relayd_socket(msg.u.relayd_sock.net_index, - msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll, - &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id, - msg.u.relayd_sock.relayd_session_id); + msg.u.relayd_sock.type, ctx, sock, + consumer_sockpoll, msg.u.relayd_sock.session_id, + msg.u.relayd_sock.relayd_session_id, major, + minor, protocol); goto end_nosignal; } case LTTNG_CONSUMER_ADD_CHANNEL: @@ -960,7 +956,6 @@ error_streams_sent_nosignal: ERR("Channel %" PRIu64 " not found", key); ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND; } else { - pthread_mutex_lock(&channel->lock); if (msg.u.snapshot_channel.metadata == 1) { int ret_snapshot; @@ -981,14 +976,12 @@ error_streams_sent_nosignal: msg.u.snapshot_channel.pathname, msg.u.snapshot_channel.relayd_id, msg.u.snapshot_channel - .nb_packets_per_stream, - ctx); + .nb_packets_per_stream); if (ret_snapshot < 0) { ERR("Snapshot channel failed"); ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED; } } - pthread_mutex_unlock(&channel->lock); } health_code_update(); @@ -1177,8 +1170,7 @@ end_destroy_channel: ret_rotate_channel = lttng_consumer_rotate_channel( channel, key, - msg.u.rotate_channel.relayd_id, - msg.u.rotate_channel.metadata, ctx); + msg.u.rotate_channel.relayd_id); if (ret_rotate_channel < 0) { ERR("Rotate channel failed"); ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL; @@ -1197,7 +1189,7 @@ end_destroy_channel: int ret_rotate; ret_rotate = lttng_consumer_rotate_ready_streams( - channel, key, ctx); + channel, key); if (ret_rotate < 0) { ERR("Rotate ready streams failed"); } @@ -1240,9 +1232,13 @@ error_rotate_channel: case LTTNG_CONSUMER_INIT: { int ret_send_status; + lttng_uuid sessiond_uuid; + + std::copy(std::begin(msg.u.init.sessiond_uuid), std::end(msg.u.init.sessiond_uuid), + sessiond_uuid.begin()); ret_code = lttng_consumer_init_command(ctx, - msg.u.init.sessiond_uuid); + sessiond_uuid); health_code_update(); ret_send_status = consumer_send_status_msg(sock, ret_code); if (ret_send_status < 0) { @@ -1727,7 +1723,7 @@ end: static int put_next_subbuffer(struct lttng_consumer_stream *stream, - struct stream_subbuffer *subbuffer) + struct stream_subbuffer *subbuffer __attribute__((unused))) { const int ret = kernctl_put_next_subbuf(stream->wait_fd); @@ -1759,7 +1755,7 @@ bool is_get_next_check_metadata_available(int tracer_fd) static int signal_metadata(struct lttng_consumer_stream *stream, - struct lttng_consumer_local_data *ctx) + struct lttng_consumer_local_data *ctx __attribute__((unused))) { ASSERT_LOCKED(stream->metadata_rdv_lock); return pthread_cond_broadcast(&stream->metadata_rdv) ? -errno : 0;