X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=2878355a563623eefad9dc0fb56af89d86cbebc7;hb=7775df526447e504735983ee359dcb902fa6dd1e;hp=edf8acb8286cedc3c8649b6c64313ccb0ff385fc;hpb=0ebdafe0e5c235d2e4ce6d4dcff48bfd9a77a479;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index edf8acb82..2878355a5 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -17,6 +17,7 @@ * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ +#include #define _LGPL_SOURCE #include #include @@ -123,6 +124,25 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, return ret; } +static +int get_current_subbuf_addr(struct lttng_consumer_stream *stream, + const char **addr) +{ + int ret; + unsigned long mmap_offset; + const char *mmap_base = stream->mmap_base; + + ret = kernctl_get_mmap_read_offset(stream->wait_fd, &mmap_offset); + if (ret < 0) { + PERROR("Failed to get mmap read offset"); + goto error; + } + + *addr = mmap_base + mmap_offset; +error: + return ret; +} + /* * Take a snapshot of all the stream of a channel * RCU read-side lock must be held across this function to ensure existence of @@ -231,15 +251,6 @@ static int lttng_kconsumer_snapshot_channel( goto end_unlock; } - if (stream->max_sb_size == 0) { - ret = kernctl_get_max_subbuf_size(stream->wait_fd, - &stream->max_sb_size); - if (ret < 0) { - ERR("Getting kernel max_sb_size"); - goto end_unlock; - } - } - consumed_pos = consumer_get_consume_start_pos(consumed_pos, produced_pos, nb_packets_per_stream, stream->max_sb_size); @@ -247,6 +258,7 @@ static int lttng_kconsumer_snapshot_channel( while ((long) (consumed_pos - produced_pos) < 0) { ssize_t read_len; unsigned long len, padded_len; + const char *subbuf_addr; health_code_update(); @@ -276,7 +288,13 @@ static int lttng_kconsumer_snapshot_channel( goto error_put_subbuf; } - read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, len, + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + + read_len = lttng_consumer_on_read_subbuffer_mmap(ctx, + stream, subbuf_addr, len, padded_len - len, NULL); /* * We write the padded len in local tracefiles but the data len @@ -600,14 +618,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto error_fatal; + goto error_add_stream_fatal; } health_code_update(); if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Channel was not found. */ - goto end_nosignal; + goto error_add_stream_nosignal; } /* Blocking call */ @@ -615,7 +633,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttng_consumer_poll_socket(consumer_sockpoll); health_poll_exit(); if (ret) { - goto error_fatal; + goto error_add_stream_fatal; } health_code_update(); @@ -624,8 +642,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = lttcomm_recv_fds_unix_sock(sock, &fd, 1); if (ret != sizeof(fd)) { lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_FD); - rcu_read_unlock(); - return ret; + goto end; } health_code_update(); @@ -638,7 +655,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_add_stream_nosignal; } health_code_update(); @@ -663,11 +680,19 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, break; } pthread_mutex_unlock(&channel->lock); - goto end_nosignal; + goto error_add_stream_nosignal; } new_stream->chan = channel; new_stream->wait_fd = fd; + ret = kernctl_get_max_subbuf_size(new_stream->wait_fd, + &new_stream->max_sb_size); + if (ret < 0) { + pthread_mutex_unlock(&channel->lock); + ERR("Failed to get kernel maximal subbuffer size"); + goto error_add_stream_nosignal; + } + consumer_stream_update_channel_attributes(new_stream, channel); switch (channel->output) { @@ -676,7 +701,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = utils_create_pipe(new_stream->splice_pipe); if (ret < 0) { pthread_mutex_unlock(&channel->lock); - goto end_nosignal; + goto error_add_stream_nosignal; } break; case CONSUMER_CHANNEL_MMAP: @@ -685,7 +710,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, default: ERR("Stream output unknown %d", channel->output); pthread_mutex_unlock(&channel->lock); - goto end_nosignal; + goto error_add_stream_nosignal; } /* @@ -717,7 +742,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); consumer_stream_free(new_stream); - goto end_nosignal; + goto error_add_stream_nosignal; } } health_code_update(); @@ -734,7 +759,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, cds_list_add(&new_stream->send_node, &channel->streams.head); pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); - break; + goto end_add_stream; } /* Send stream to relayd if the stream has an ID. */ @@ -745,7 +770,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); consumer_stream_free(new_stream); - goto end_nosignal; + goto error_add_stream_nosignal; } /* @@ -759,7 +784,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ret < 0) { pthread_mutex_unlock(&new_stream->lock); pthread_mutex_unlock(&channel->lock); - goto end_nosignal; + goto error_add_stream_nosignal; } } } @@ -790,12 +815,17 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } else { consumer_del_stream_for_data(new_stream); } - goto end_nosignal; + goto error_add_stream_nosignal; } DBG("Kernel consumer ADD_STREAM %s (fd: %d) %s with relayd id %" PRIu64, new_stream->name, fd, new_stream->chan->pathname, new_stream->relayd_stream_id); +end_add_stream: break; +error_add_stream_nosignal: + goto end_nosignal; +error_add_stream_fatal: + goto error_fatal; } case LTTNG_CONSUMER_STREAMS_SENT: { @@ -824,7 +854,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_streams_sent_nosignal; } health_code_update(); @@ -834,7 +864,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, * streams in this channel. */ if (!channel->monitor) { - break; + goto end_error_streams_sent; } health_code_update(); @@ -843,11 +873,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_relayd_streams_sent( msg.u.sent_streams.net_seq_idx); if (ret < 0) { - goto end_nosignal; + goto error_streams_sent_nosignal; } channel->streams_sent_to_relayd = true; } +end_error_streams_sent: break; +error_streams_sent_nosignal: + goto end_nosignal; } case LTTNG_CONSUMER_UPDATE_STREAM: { @@ -973,14 +1006,14 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto end_destroy_channel; } health_code_update(); /* Stop right now if no channel was found. */ if (!channel) { - goto end_nosignal; + goto end_destroy_channel; } /* @@ -996,7 +1029,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, assert(!uatomic_sub_return(&channel->refcount, 1)); consumer_del_channel(channel); - +end_destroy_channel: goto end_nosignal; } case LTTNG_CONSUMER_DISCARDED_EVENTS: @@ -1139,7 +1172,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { /* Somehow, the session daemon is not responding anymore. */ - goto end_nosignal; + goto error_rotate_channel; } if (channel) { /* Rotate the streams that are ready right now. */ @@ -1149,8 +1182,9 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ERR("Rotate ready streams failed"); } } - break; +error_rotate_channel: + goto end_nosignal; } case LTTNG_CONSUMER_INIT: { @@ -1239,16 +1273,34 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK: { + enum lttng_trace_chunk_command_type close_command = + msg.u.close_trace_chunk.close_command.value; const uint64_t relayd_id = msg.u.close_trace_chunk.relayd_id.value; + struct lttcomm_consumer_close_trace_chunk_reply reply; + char path[LTTNG_PATH_MAX]; ret_code = lttng_consumer_close_trace_chunk( msg.u.close_trace_chunk.relayd_id.is_set ? - &relayd_id : NULL, + &relayd_id : + NULL, msg.u.close_trace_chunk.session_id, msg.u.close_trace_chunk.chunk_id, - (time_t) msg.u.close_trace_chunk.close_timestamp); - goto end_msg_sessiond; + (time_t) msg.u.close_trace_chunk.close_timestamp, + msg.u.close_trace_chunk.close_command.is_set ? + &close_command : + NULL, path); + reply.ret_code = ret_code; + reply.path_length = strlen(path) + 1; + ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply)); + if (ret != sizeof(reply)) { + goto error_fatal; + } + ret = lttcomm_send_unix_sock(sock, path, reply.path_length); + if (ret != reply.path_length) { + goto error_fatal; + } + goto end_nosignal; } case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS: { @@ -1267,15 +1319,16 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } end_nosignal: - rcu_read_unlock(); - /* * Return 1 to indicate success since the 0 value can be a socket * shutdown during the recv() or send() call. */ - health_code_update(); - return 1; - + ret = 1; + goto end; +error_fatal: + /* This will issue a consumer stop. */ + ret = -1; + goto end; end_msg_sessiond: /* * The returned value here is not useful since either way we'll return 1 to @@ -1286,16 +1339,11 @@ end_msg_sessiond: if (ret < 0) { goto error_fatal; } - rcu_read_unlock(); - + ret = 1; +end: health_code_update(); - - return 1; - -error_fatal: rcu_read_unlock(); - /* This will issue a consumer stop. */ - return -1; + return ret; } /* @@ -1306,66 +1354,62 @@ error_fatal: static int get_index_values(struct ctf_packet_index *index, int infd) { int ret; + uint64_t packet_size, content_size, timestamp_begin, timestamp_end, + events_discarded, stream_id, stream_instance_id, + packet_seq_num; - ret = kernctl_get_timestamp_begin(infd, &index->timestamp_begin); + ret = kernctl_get_timestamp_begin(infd, ×tamp_begin); if (ret < 0) { PERROR("kernctl_get_timestamp_begin"); goto error; } - index->timestamp_begin = htobe64(index->timestamp_begin); - ret = kernctl_get_timestamp_end(infd, &index->timestamp_end); + ret = kernctl_get_timestamp_end(infd, ×tamp_end); if (ret < 0) { PERROR("kernctl_get_timestamp_end"); goto error; } - index->timestamp_end = htobe64(index->timestamp_end); - ret = kernctl_get_events_discarded(infd, &index->events_discarded); + ret = kernctl_get_events_discarded(infd, &events_discarded); if (ret < 0) { PERROR("kernctl_get_events_discarded"); goto error; } - index->events_discarded = htobe64(index->events_discarded); - ret = kernctl_get_content_size(infd, &index->content_size); + ret = kernctl_get_content_size(infd, &content_size); if (ret < 0) { PERROR("kernctl_get_content_size"); goto error; } - index->content_size = htobe64(index->content_size); - ret = kernctl_get_packet_size(infd, &index->packet_size); + ret = kernctl_get_packet_size(infd, &packet_size); if (ret < 0) { PERROR("kernctl_get_packet_size"); goto error; } - index->packet_size = htobe64(index->packet_size); - ret = kernctl_get_stream_id(infd, &index->stream_id); + ret = kernctl_get_stream_id(infd, &stream_id); if (ret < 0) { PERROR("kernctl_get_stream_id"); goto error; } - index->stream_id = htobe64(index->stream_id); - ret = kernctl_get_instance_id(infd, &index->stream_instance_id); + ret = kernctl_get_instance_id(infd, &stream_instance_id); if (ret < 0) { if (ret == -ENOTTY) { /* Command not implemented by lttng-modules. */ - index->stream_instance_id = -1ULL; + stream_instance_id = -1ULL; } else { PERROR("kernctl_get_instance_id"); goto error; } } - index->stream_instance_id = htobe64(index->stream_instance_id); - ret = kernctl_get_sequence_number(infd, &index->packet_seq_num); + ret = kernctl_get_sequence_number(infd, &packet_seq_num); if (ret < 0) { if (ret == -ENOTTY) { /* Command not implemented by lttng-modules. */ - index->packet_seq_num = -1ULL; + packet_seq_num = -1ULL; ret = 0; } else { PERROR("kernctl_get_sequence_number"); @@ -1374,6 +1418,18 @@ static int get_index_values(struct ctf_packet_index *index, int infd) } index->packet_seq_num = htobe64(index->packet_seq_num); + *index = (typeof(*index)) { + .offset = index->offset, + .packet_size = htobe64(packet_size), + .content_size = htobe64(content_size), + .timestamp_begin = htobe64(timestamp_begin), + .timestamp_end = htobe64(timestamp_end), + .events_discarded = htobe64(events_discarded), + .stream_id = htobe64(stream_id), + .stream_instance_id = htobe64(stream_instance_id), + .packet_seq_num = htobe64(packet_seq_num), + }; + error: return ret; } @@ -1425,6 +1481,7 @@ int update_stream_stats(struct lttng_consumer_stream *stream) if (ret == -ENOTTY) { /* Command not implemented by lttng-modules. */ seq = -1ULL; + stream->sequence_number_unavailable = true; } else { PERROR("kernctl_get_sequence_number"); goto end; @@ -1524,7 +1581,7 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, int err, write_index = 1, rotation_ret; ssize_t ret = 0; int infd = stream->wait_fd; - struct ctf_packet_index index; + struct ctf_packet_index index = {}; DBG("In read_subbuffer (infd : %d)", infd); @@ -1654,6 +1711,9 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, } break; case CONSUMER_CHANNEL_MMAP: + { + const char *subbuf_addr; + /* Get subbuffer size without padding */ err = kernctl_get_subbuf_size(infd, &subbuf_size); if (err != 0) { @@ -1673,13 +1733,20 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, goto error; } + ret = get_current_subbuf_addr(stream, &subbuf_addr); + if (ret) { + goto error_put_subbuf; + } + /* Make sure the tracer is not gone mad on us! */ assert(len >= subbuf_size); padding = len - subbuf_size; /* write the subbuffer to the tracefile */ - ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, subbuf_size, + ret = lttng_consumer_on_read_subbuffer_mmap(ctx, stream, + subbuf_addr, + subbuf_size, padding, &index); /* * The mmap operation should write subbuf_size amount of data when @@ -1699,11 +1766,12 @@ ssize_t lttng_kconsumer_read_subbuffer(struct lttng_consumer_stream *stream, write_index = 0; } break; + } default: ERR("Unknown output method"); ret = -EPERM; } - +error_put_subbuf: err = kernctl_put_next_subbuf(infd); if (err != 0) { if (err == -EFAULT) {