X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fcommon%2Fconsumer.c;h=a9070b1c9bc3f976ef26a2fbbc183c7fd4b75343;hb=6151a90fe7fa3dea52c57771df9083e56de7a60b;hp=5f87f4b5018fbec1a2de542797f46b4f6d17475f;hpb=331744e34f56a5aec69b05d356d6901e67926acc;p=lttng-tools.git diff --git a/src/common/consumer.c b/src/common/consumer.c index 5f87f4b50..a9070b1c9 100644 --- a/src/common/consumer.c +++ b/src/common/consumer.c @@ -783,7 +783,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, } /* Metadata are always sent on the control socket. */ - outfd = relayd->control_sock.fd; + outfd = relayd->control_sock.sock.fd; } else { /* Set header with stream information */ data_hdr.stream_id = htobe64(stream->relayd_stream_id); @@ -808,7 +808,7 @@ static int write_relayd_stream_header(struct lttng_consumer_stream *stream, ++stream->next_net_seq_num; /* Set to go on data socket */ - outfd = relayd->data_sock.fd; + outfd = relayd->data_sock.sock.fd; } error: @@ -828,7 +828,9 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, uid_t uid, gid_t gid, int relayd_id, - enum lttng_event_output output) + enum lttng_event_output output, + uint64_t tracefile_size, + uint64_t tracefile_count) { struct lttng_consumer_channel *channel; @@ -845,6 +847,8 @@ struct lttng_consumer_channel *consumer_allocate_channel(uint64_t key, channel->gid = gid; channel->relayd_id = relayd_id; channel->output = output; + channel->tracefile_size = tracefile_size; + channel->tracefile_count = tracefile_count; strncpy(channel->pathname, pathname, sizeof(channel->pathname)); channel->pathname[sizeof(channel->pathname) - 1] = '\0'; @@ -1283,6 +1287,99 @@ end: return ret; } +/* + * Create the tracefile on disk. + * + * Return 0 on success or else a negative value. + */ +int lttng_create_output_file(struct lttng_consumer_stream *stream) +{ + int ret; + char full_path[PATH_MAX]; + char *path_name_id = NULL; + char *path; + + assert(stream); + + /* Don't create anything if this is set for streaming. */ + if (stream->net_seq_idx != (uint64_t) -1ULL) { + ret = 0; + goto end; + } + + ret = snprintf(full_path, sizeof(full_path), "%s/%s", + stream->chan->pathname, stream->name); + if (ret < 0) { + PERROR("snprintf create output file"); + goto error; + } + + /* + * If we split the trace in multiple files, we have to add the tracefile + * current count at the end of the tracefile name + */ + if (stream->chan->tracefile_size > 0) { + ret = asprintf(&path_name_id, "%s_%" PRIu64, full_path, + stream->tracefile_count_current); + if (ret < 0) { + PERROR("Allocating path name ID"); + goto error; + } + path = path_name_id; + } else { + path = full_path; + } + + ret = run_as_open(path, O_WRONLY | O_CREAT | O_TRUNC, + S_IRWXU | S_IRWXG | S_IRWXO, stream->uid, stream->gid); + if (ret < 0) { + PERROR("open stream path %s", path); + goto error_open; + } + stream->out_fd = ret; + stream->tracefile_size_current = 0; + +error_open: + free(path_name_id); +error: +end: + return ret; +} + +/* + * Change the output tracefile according to the tracefile_size and + * tracefile_count parameters. The stream lock MUST be held before calling this + * function because we are modifying the stream status. + * + * Return 0 on success or else a negative value. + */ +static int rotate_output_file(struct lttng_consumer_stream *stream) +{ + int ret; + + assert(stream); + assert(stream->tracefile_size_current); + + ret = close(stream->out_fd); + if (ret < 0) { + PERROR("Closing tracefile"); + goto end; + } + + if (stream->chan->tracefile_count > 0) { + stream->tracefile_count_current = + (stream->tracefile_count_current + 1) % + stream->chan->tracefile_count; + } else { + stream->tracefile_count_current++; + } + + return lttng_create_output_file(stream); + +end: + return ret; +} + /* * Mmap the ring buffer, read it and write the data to the tracefile. This is a * core function for writing trace buffers to either the local filesystem or @@ -1390,6 +1487,21 @@ ssize_t lttng_consumer_on_read_subbuffer_mmap( } else { /* No streaming, we have to set the len with the full padding */ len += padding; + + /* + * Check if we need to change the tracefile before writing the packet. + */ + if (stream->chan->tracefile_size > 0 && + (stream->tracefile_size_current + len) > + stream->chan->tracefile_size) { + ret = rotate_output_file(stream); + if (ret < 0) { + ERR("Rotating output file"); + goto end; + } + outfd = stream->out_fd; + } + stream->tracefile_size_current += len; } while (len > 0) { @@ -1552,6 +1664,21 @@ ssize_t lttng_consumer_on_read_subbuffer_splice( } else { /* No streaming, we have to set the len with the full padding */ len += padding; + + /* + * Check if we need to change the tracefile before writing the packet. + */ + if (stream->chan->tracefile_size > 0 && + (stream->tracefile_size_current + len) > + stream->chan->tracefile_size) { + ret = rotate_output_file(stream); + if (ret < 0) { + ERR("Rotating output file"); + goto end; + } + outfd = stream->out_fd; + } + stream->tracefile_size_current += len; } while (len > 0) { @@ -2954,13 +3081,16 @@ void lttng_consumer_init(void) */ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, struct lttng_consumer_local_data *ctx, int sock, - struct pollfd *consumer_sockpoll, struct lttcomm_sock *relayd_sock, - unsigned int sessiond_id) + struct pollfd *consumer_sockpoll, + struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id) { int fd = -1, ret = -1, relayd_created = 0; enum lttng_error_code ret_code = LTTNG_OK; struct consumer_relayd_sock_pair *relayd; + assert(ctx); + assert(relayd_sock); + DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx); /* First send a status message before receiving the fds. */ @@ -3010,11 +3140,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, switch (sock_type) { case LTTNG_STREAM_CONTROL: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->control_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->control_sock); + lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->control_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->control_sock.fd >= 0) { - if (close(relayd->control_sock.fd)) { + if (relayd->control_sock.sock.fd >= 0) { + if (close(relayd->control_sock.sock.fd)) { PERROR("close relayd control socket"); } } @@ -3024,7 +3154,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->control_sock.fd = fd; + relayd->control_sock.sock.fd = fd; + /* Assign version values. */ + relayd->control_sock.major = relayd_sock->major; + relayd->control_sock.minor = relayd_sock->minor; /* * Create a session on the relayd and store the returned id. Lock the @@ -3052,11 +3185,11 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, break; case LTTNG_STREAM_DATA: /* Copy received lttcomm socket */ - lttcomm_copy_sock(&relayd->data_sock, relayd_sock); - ret = lttcomm_create_sock(&relayd->data_sock); + lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock); + ret = lttcomm_create_sock(&relayd->data_sock.sock); /* Immediately try to close the created socket if valid. */ - if (relayd->data_sock.fd >= 0) { - if (close(relayd->data_sock.fd)) { + if (relayd->data_sock.sock.fd >= 0) { + if (close(relayd->data_sock.sock.fd)) { PERROR("close relayd data socket"); } } @@ -3066,7 +3199,10 @@ int consumer_add_relayd_socket(int net_seq_idx, int sock_type, } /* Assign new file descriptor */ - relayd->data_sock.fd = fd; + relayd->data_sock.sock.fd = fd; + /* Assign version values. */ + relayd->data_sock.major = relayd_sock->major; + relayd->data_sock.minor = relayd_sock->minor; break; default: ERR("Unknown relayd socket type (%d)", sock_type);