X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fkernel-consumer%2Fkernel-consumer.c;h=02198bd1b9e854f26a8614c5f76fbf76a36835f1;hb=d771f8323f5f8964145e149502d6dc8d8cac8745;hp=a68438253c9d694a6d29d7267b1da50548ad4172;hpb=29decac336fd0d72ae00105b7e3d0cdece2ccd25;p=lttng-tools.git diff --git a/src/common/kernel-consumer/kernel-consumer.c b/src/common/kernel-consumer/kernel-consumer.c index a68438253..02198bd1b 100644 --- a/src/common/kernel-consumer/kernel-consumer.c +++ b/src/common/kernel-consumer/kernel-consumer.c @@ -109,41 +109,46 @@ int lttng_kconsumer_get_consumed_snapshot(struct lttng_consumer_stream *stream, * * Returns 0 on success, < 0 on error */ -static -int send_relayd_stream(struct lttng_consumer_stream *stream, char *path) +static int send_relayd_stream(struct lttng_consumer_stream *stream, + char *path) { - struct consumer_relayd_sock_pair *relayd; int ret = 0; - char *stream_path; + const char *stream_path; + struct consumer_relayd_sock_pair *relayd; + + assert(stream); + assert(stream->net_seq_idx != -1ULL); if (path != NULL) { stream_path = path; } else { stream_path = stream->chan->pathname; } + /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); if (relayd != NULL) { /* Add stream on the relayd */ pthread_mutex_lock(&relayd->ctrl_sock_mutex); - ret = relayd_add_stream(&relayd->control_sock, - stream->name, stream_path, - &stream->relayd_stream_id, - stream->chan->tracefile_size, - stream->chan->tracefile_count); + ret = relayd_add_stream(&relayd->control_sock, stream->name, + stream_path, &stream->relayd_stream_id, + stream->chan->tracefile_size, stream->chan->tracefile_count); pthread_mutex_unlock(&relayd->ctrl_sock_mutex); if (ret < 0) { goto end; } uatomic_inc(&relayd->refcount); - } else if (stream->net_seq_idx != (uint64_t) -1ULL) { - ERR("Network sequence index %" PRIu64 " unknown. Not adding stream.", - stream->net_seq_idx); + } else { + ERR("Stream %" PRIu64 " relayd ID %" PRIu64 " unknown. Can't send it.", + stream->key, stream->net_seq_idx); ret = -1; goto end; } + DBG("Stream %s with key %" PRIu64 " sent to relayd id %" PRIu64, + stream->name, stream->key, stream->net_seq_idx); + end: rcu_read_unlock(); return ret; @@ -152,15 +157,14 @@ end: /* * Find a relayd and close the stream */ -static -void close_relayd_stream(struct lttng_consumer_stream *stream) +static void close_relayd_stream(struct lttng_consumer_stream *stream) { struct consumer_relayd_sock_pair *relayd; /* The stream is not metadata. Get relayd reference if exists. */ rcu_read_lock(); relayd = consumer_find_relayd(stream->net_seq_idx); - if (relayd != NULL) { + if (relayd) { consumer_stream_relayd_close(stream, relayd); } rcu_read_unlock(); @@ -216,7 +220,6 @@ int lttng_kconsumer_snapshot_channel(uint64_t key, char *path, ERR("sending stream to relayd"); goto end_unlock; } - DBG("Stream %s sent to the relayd", stream->name); } else { ret = utils_create_stream_file(path, stream->name, stream->chan->tracefile_size, stream->tracefile_count_current, @@ -360,9 +363,12 @@ end: int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, uint64_t relayd_id, struct lttng_consumer_local_data *ctx) { + int ret, use_relayd = 0; + ssize_t ret_read; struct lttng_consumer_channel *metadata_channel; struct lttng_consumer_stream *metadata_stream; - int ret; + + assert(ctx); DBG("Kernel consumer snapshot metadata with key %" PRIu64 " at path %s", key, path); @@ -371,58 +377,66 @@ int lttng_kconsumer_snapshot_metadata(uint64_t key, char *path, metadata_channel = consumer_find_channel(key); if (!metadata_channel) { - ERR("Snapshot kernel metadata channel not found for key %lu", key); + ERR("Kernel snapshot metadata not found for key %" PRIu64, key); ret = -1; - goto end; + goto error; } metadata_stream = metadata_channel->metadata_stream; assert(metadata_stream); + /* Flag once that we have a valid relayd for the stream. */ if (relayd_id != (uint64_t) -1ULL) { + use_relayd = 1; + } + + if (use_relayd) { ret = send_relayd_stream(metadata_stream, path); if (ret < 0) { - ERR("sending stream to relayd"); + goto error; } - DBG("Stream %s sent to the relayd", metadata_stream->name); } else { ret = utils_create_stream_file(path, metadata_stream->name, metadata_stream->chan->tracefile_size, metadata_stream->tracefile_count_current, metadata_stream->uid, metadata_stream->gid); if (ret < 0) { - goto end; + goto error; } metadata_stream->out_fd = ret; } - ret = 0; - while (ret >= 0) { - ret = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); - if (ret < 0) { - if (ret != -EPERM) { - ERR("Kernel snapshot reading subbuffer"); - goto end; + do { + ret_read = lttng_kconsumer_read_subbuffer(metadata_stream, ctx); + if (ret_read < 0) { + if (ret_read != -EPERM) { + ERR("Kernel snapshot reading metadata subbuffer (ret: %ld)", + ret_read); + goto error; } - /* "ret" is negative at this point so we will exit the loop. */ + /* ret_read is negative at this point so we will exit the loop. */ continue; } - } + } while (ret_read >= 0); - if (relayd_id == (uint64_t) -1ULL) { + if (use_relayd) { + close_relayd_stream(metadata_stream); + metadata_stream->net_seq_idx = (uint64_t) -1ULL; + } else { ret = close(metadata_stream->out_fd); if (ret < 0) { - PERROR("Kernel consumer snapshot close out_fd"); - goto end; + PERROR("Kernel consumer snapshot metadata close out_fd"); + /* + * Don't go on error here since the snapshot was successful at this + * point but somehow the close failed. + */ } metadata_stream->out_fd = -1; - } else { - close_relayd_stream(metadata_stream); - metadata_stream->net_seq_idx = (uint64_t) -1ULL; } ret = 0; -end: + +error: rcu_read_unlock(); return ret; } @@ -565,20 +579,15 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, /* First send a status message before receiving the fds. */ ret = consumer_send_status_msg(sock, ret_code); if (ret < 0) { - /* - * Somehow, the session daemon is not responding - * anymore. - */ + /* Somehow, the session daemon is not responding anymore. */ goto error_fatal; } if (ret_code != LTTNG_OK) { - /* - * Channel was not found. - */ + /* Channel was not found. */ goto end_nosignal; } - /* block */ + /* Blocking call */ if (lttng_consumer_poll_socket(consumer_sockpoll) < 0) { rcu_read_unlock(); return -EINTR; @@ -624,6 +633,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, } goto end_nosignal; } + new_stream->chan = channel; new_stream->wait_fd = fd; switch (channel->output) { @@ -661,7 +671,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, if (ctx->on_recv_stream) { ret = ctx->on_recv_stream(new_stream); if (ret < 0) { - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; } } @@ -682,7 +692,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ret = send_relayd_stream(new_stream, NULL); if (ret < 0) { - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; } @@ -698,7 +708,7 @@ int lttng_kconsumer_recv_cmd(struct lttng_consumer_local_data *ctx, ERR("Consumer write %s stream to pipe %d", new_stream->metadata_flag ? "metadata" : "data", lttng_pipe_get_writefd(stream_pipe)); - consumer_del_stream(new_stream, NULL); + consumer_stream_free(new_stream); goto end_nosignal; }