X-Git-Url: https://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-relayd%2Fmain.c;h=ca37b8bc41ed842111ee603ff2860e90b288b758;hb=1c20f0e29cbf8627bfb1ff444572d52d6655c4e2;hp=6d800f50b3c0db40de5997ead9625fee8cf8938c;hpb=4169f5ad171ab88ac181ec4080f46af379bc7676;p=lttng-tools.git diff --git a/src/bin/lttng-relayd/main.c b/src/bin/lttng-relayd/main.c index 6d800f50b..ca37b8bc4 100644 --- a/src/bin/lttng-relayd/main.c +++ b/src/bin/lttng-relayd/main.c @@ -47,21 +47,22 @@ #include #include #include -#include #include #include #include +#include "cmd.h" +#include "index.h" +#include "utils.h" #include "lttng-relayd.h" /* command line options */ +char *opt_output_path; static int opt_daemon; -static char *opt_output_path; static struct lttng_uri *control_uri; static struct lttng_uri *data_uri; const char *progname; -static int is_root; /* Set to 1 if the daemon is running as root */ /* * Quit pipe for all threads. This permits a single cancellation point @@ -97,6 +98,13 @@ static struct relay_cmd_queue relay_cmd_queue; static char *data_buffer; static unsigned int data_buffer_size; +/* Global hash table that stores relay index object. */ +static struct lttng_ht *indexes_ht; + +/* We need those values for the file/dir creation. */ +static uid_t relayd_uid; +static gid_t relayd_gid; + /* * usage function on stderr */ @@ -104,12 +112,12 @@ static void usage(void) { fprintf(stderr, "Usage: %s OPTIONS\n\nOptions:\n", progname); - fprintf(stderr, " -h, --help Display this usage.\n"); - fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); - fprintf(stderr, " -C, --control-port Control port listening (URI)\n"); - fprintf(stderr, " -D, --data-port Data port listening (URI)\n"); - fprintf(stderr, " -o, --output Output path for traces (PATH)\n"); - fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); + fprintf(stderr, " -h, --help Display this usage.\n"); + fprintf(stderr, " -d, --daemonize Start as a daemon.\n"); + fprintf(stderr, " -C, --control-port URL Control port listening.\n"); + fprintf(stderr, " -D, --data-port URL Data port listening.\n"); + fprintf(stderr, " -o, --output PATH Output path for traces. Must use an absolute path.\n"); + fprintf(stderr, " -v, --verbose Verbose mode. Activate DBG() macro.\n"); } static @@ -686,191 +694,40 @@ error: } /* - * Return the realpath(3) of the path even if the last directory token does not - * exist. For example, with /tmp/test1/test2, if test2/ does not exist but the - * /tmp/test1 does, the real path is returned. In normal time, realpath(3) - * fails if the end point directory does not exist. + * Get stream from stream id. + * Need to be called with RCU read-side lock held. */ static -char *expand_full_path(const char *path) +struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id, + struct lttng_ht *streams_ht) { - const char *end_path = path; - char *next, *cut_path, *expanded_path, *respath; - - /* Find last token delimited by '/' */ - while ((next = strpbrk(end_path + 1, "/"))) { - end_path = next; - } - - /* Cut last token from original path */ - cut_path = strndup(path, end_path - path); + struct lttng_ht_node_ulong *node; + struct lttng_ht_iter iter; + struct relay_stream *ret; - expanded_path = malloc(PATH_MAX); - if (expanded_path == NULL) { - respath = NULL; + lttng_ht_lookup(streams_ht, + (void *)((unsigned long) stream_id), + &iter); + node = lttng_ht_iter_get_node_ulong(&iter); + if (node == NULL) { + DBG("Relay stream %" PRIu64 " not found", stream_id); + ret = NULL; goto end; } - respath = realpath(cut_path, expanded_path); - if (respath == NULL) { - switch (errno) { - case ENOENT: - ERR("%s: No such file or directory", cut_path); - break; - default: - PERROR("realpath"); - break; - } - free(expanded_path); - } else { - /* Add end part to expanded path */ - strcat(respath, end_path); - } -end: - free(cut_path); - return respath; -} - - -/* - * config_get_default_path - * - * Returns the HOME directory path. Caller MUST NOT free(3) the return pointer. - */ -static -char *config_get_default_path(void) -{ - return getenv("HOME"); -} - -/* - * Create recursively directory using the FULL path. - */ -static -int mkdir_recursive(char *path, mode_t mode) -{ - char *p, tmp[PATH_MAX]; - struct stat statbuf; - size_t len; - int ret; - - ret = snprintf(tmp, sizeof(tmp), "%s", path); - if (ret < 0) { - PERROR("snprintf mkdir"); - goto error; - } - - len = ret; - if (tmp[len - 1] == '/') { - tmp[len - 1] = 0; - } - - for (p = tmp + 1; *p; p++) { - if (*p == '/') { - *p = 0; - if (tmp[strlen(tmp) - 1] == '.' && - tmp[strlen(tmp) - 2] == '.' && - tmp[strlen(tmp) - 3] == '/') { - ERR("Using '/../' is not permitted in the trace path (%s)", - tmp); - ret = -1; - goto error; - } - ret = stat(tmp, &statbuf); - if (ret < 0) { - ret = mkdir(tmp, mode); - if (ret < 0) { - if (errno != EEXIST) { - PERROR("mkdir recursive"); - ret = -errno; - goto error; - } - } - } - *p = '/'; - } - } - - ret = mkdir(tmp, mode); - if (ret < 0) { - if (errno != EEXIST) { - PERROR("mkdir recursive last piece"); - ret = -errno; - } else { - ret = 0; - } - } + ret = caa_container_of(node, struct relay_stream, stream_n); -error: +end: return ret; } -static -char *create_output_path_auto(char *path_name) -{ - int ret; - char *traces_path = NULL; - char *alloc_path = NULL; - char *default_path; - - default_path = config_get_default_path(); - if (default_path == NULL) { - ERR("Home path not found.\n \ - Please specify an output path using -o, --output PATH"); - goto exit; - } - alloc_path = strdup(default_path); - if (alloc_path == NULL) { - PERROR("Path allocation"); - goto exit; - } - ret = asprintf(&traces_path, "%s/" DEFAULT_TRACE_DIR_NAME - "/%s", alloc_path, path_name); - if (ret < 0) { - PERROR("asprintf trace dir name"); - goto exit; - } -exit: - free(alloc_path); - return traces_path; -} - -static -char *create_output_path_noauto(char *path_name) -{ - int ret; - char *traces_path = NULL; - char *full_path; - - full_path = expand_full_path(opt_output_path); - ret = asprintf(&traces_path, "%s/%s", full_path, path_name); - if (ret < 0) { - PERROR("asprintf trace dir name"); - goto exit; - } -exit: - free(full_path); - return traces_path; -} - -/* - * create_output_path: create the output trace directory - */ -static -char *create_output_path(char *path_name) -{ - if (opt_output_path == NULL) { - return create_output_path_auto(path_name); - } else { - return create_output_path_noauto(path_name); - } -} - static void deferred_free_stream(struct rcu_head *head) { struct relay_stream *stream = caa_container_of(head, struct relay_stream, rcu_node); + free(stream->path_name); + free(stream->channel_name); free(stream); } @@ -908,6 +765,9 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht call_rcu(&stream->rcu_node, deferred_free_stream); } + /* Cleanup index of that stream. */ + relay_index_destroy_by_stream_id(stream->stream_handle, + indexes_ht); } } rcu_read_unlock(); @@ -915,6 +775,28 @@ void relay_delete_session(struct relay_command *cmd, struct lttng_ht *streams_ht free(cmd->session); } +/* + * Copy index data from the control port to a given index object. + */ +static void copy_index_control_data(struct relay_index *index, + struct lttcomm_relayd_index *data) +{ + assert(index); + assert(data); + + /* + * The index on disk is encoded in big endian, so we don't need to convert + * the data received on the network. The data_offset value is NEVER + * modified here and is updated by the data thread. + */ + index->index_data.packet_size = data->packet_size; + index->index_data.content_size = data->content_size; + index->index_data.timestamp_begin = data->timestamp_begin; + index->index_data.timestamp_end = data->timestamp_end; + index->index_data.events_discarded = data->events_discarded; + index->index_data.stream_id = data->stream_id; +} + /* * Handle the RELAYD_CREATE_SESSION command. * @@ -972,10 +854,8 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, struct relay_command *cmd, struct lttng_ht *streams_ht) { struct relay_session *session = cmd->session; - struct lttcomm_relayd_add_stream stream_info; struct relay_stream *stream = NULL; struct lttcomm_relayd_status_stream reply; - char *path = NULL, *root_path = NULL; int ret, send_ret; if (!session || cmd->version_check_done == 0) { @@ -984,18 +864,6 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } - ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info, - sizeof(struct lttcomm_relayd_add_stream), 0); - if (ret < sizeof(struct lttcomm_relayd_add_stream)) { - if (ret == 0) { - /* Orderly shutdown. Not necessary to print an error. */ - DBG("Socket %d did an orderly shutdown", cmd->sock->fd); - } else { - ERR("Relay didn't receive valid add_stream struct size : %d", ret); - } - ret = -1; - goto end_no_session; - } stream = zmalloc(sizeof(struct relay_stream)); if (stream == NULL) { PERROR("relay stream zmalloc"); @@ -1003,54 +871,67 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr, goto end_no_session; } + switch (cmd->minor) { + case 1: /* LTTng sessiond 2.1 */ + ret = cmd_recv_stream_2_1(cmd, stream); + break; + case 2: /* LTTng sessiond 2.2 */ + default: + ret = cmd_recv_stream_2_2(cmd, stream); + break; + } + if (ret < 0) { + goto err_free_stream; + } + rcu_read_lock(); stream->stream_handle = ++last_relay_stream_id; stream->prev_seq = -1ULL; stream->session = session; + stream->index_fd = -1; - root_path = create_output_path(stream_info.pathname); - if (!root_path) { - ret = -1; - goto end; - } - ret = mkdir_recursive(root_path, S_IRWXU | S_IRWXG); + ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG); if (ret < 0) { ERR("relay creating output directory"); goto end; } - ret = asprintf(&path, "%s/%s", root_path, stream_info.channel_name); - if (ret < 0) { - PERROR("asprintf stream path"); - goto end; - } - - ret = open(path, O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO); + /* + * No need to use run_as API here because whatever we receives, the relayd + * uses its own credentials for the stream files. + */ + ret = utils_create_stream_file(stream->path_name, stream->channel_name, + stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL); if (ret < 0) { - PERROR("Relay creating trace file"); + ERR("Create output file"); goto end; } - stream->fd = ret; - DBG("Tracefile %s created", path); + if (stream->tracefile_size) { + DBG("Tracefile %s/%s_0 created", stream->path_name, stream->channel_name); + } else { + DBG("Tracefile %s/%s created", stream->path_name, stream->channel_name); + } lttng_ht_node_init_ulong(&stream->stream_n, (unsigned long) stream->stream_handle); lttng_ht_add_unique_ulong(streams_ht, &stream->stream_n); - DBG("Relay new stream added %s", stream_info.channel_name); + DBG("Relay new stream added %s with ID %" PRIu64, stream->channel_name, + stream->stream_handle); end: - free(path); - free(root_path); + reply.handle = htobe64(stream->stream_handle); /* send the session id to the client or a negative return code on error */ if (ret < 0) { reply.ret_code = htobe32(LTTNG_ERR_UNK); + /* stream was not properly added to the ht, so free it */ + free(stream); } else { reply.ret_code = htobe32(LTTNG_OK); } - reply.handle = htobe64(stream->stream_handle); + send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(struct lttcomm_relayd_status_stream), 0); if (send_ret < 0) { @@ -1061,6 +942,12 @@ end: end_no_session: return ret; + +err_free_stream: + free(stream->path_name); + free(stream->channel_name); + free(stream); + return ret; } /* @@ -1075,7 +962,6 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; int ret, send_ret; - struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; DBG("Close stream received"); @@ -1100,17 +986,8 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, } rcu_read_lock(); - lttng_ht_lookup(streams_ht, - (void *)((unsigned long) be64toh(stream_info.stream_id)), - &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - DBG("Relay stream %" PRIu64 " not found", be64toh(stream_info.stream_id)); - ret = -1; - goto end_unlock; - } - - stream = caa_container_of(node, struct relay_stream, stream_n); + stream = relay_stream_from_stream_id(be64toh(stream_info.stream_id), + streams_ht); if (!stream) { ret = -1; goto end_unlock; @@ -1126,6 +1003,14 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr, if (delret < 0) { PERROR("close stream"); } + + if (stream->index_fd >= 0) { + delret = close(stream->index_fd); + if (delret < 0) { + PERROR("close stream index_fd"); + } + } + iter.iter.node = &stream->stream_n.node; delret = lttng_ht_del(streams_ht, &iter); assert(!delret); call_rcu(&stream->rcu_node, @@ -1196,34 +1081,6 @@ int relay_start(struct lttcomm_relayd_hdr *recv_hdr, return ret; } -/* - * Get stream from stream id. - * Need to be called with RCU read-side lock held. - */ -static -struct relay_stream *relay_stream_from_stream_id(uint64_t stream_id, - struct lttng_ht *streams_ht) -{ - struct lttng_ht_node_ulong *node; - struct lttng_ht_iter iter; - struct relay_stream *ret; - - lttng_ht_lookup(streams_ht, - (void *)((unsigned long) stream_id), - &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - DBG("Relay stream %" PRIu64 " not found", stream_id); - ret = NULL; - goto end; - } - - ret = caa_container_of(node, struct relay_stream, stream_n); - -end: - return ret; -} - /* * Append padding to the file pointed by the file descriptor fd. */ @@ -1349,7 +1206,7 @@ end: */ static int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd) + struct relay_command *cmd, struct lttng_ht *streams_ht) { int ret; struct lttcomm_relayd_version reply, msg; @@ -1371,19 +1228,26 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, goto end; } - /* - * For now, we just ignore the received version but after 2.1 stable - * release, a check must be done to see if we either adapt to the other - * side version (which MUST be lower than us) or keep the latest data - * structure considering that the other side will adapt. - */ + reply.major = RELAYD_VERSION_COMM_MAJOR; + reply.minor = RELAYD_VERSION_COMM_MINOR; - ret = sscanf(VERSION, "%10u.%10u", &reply.major, &reply.minor); - if (ret < 2) { - ERR("Error in scanning version"); - ret = -1; + /* Major versions must be the same */ + if (reply.major != be32toh(msg.major)) { + DBG("Incompatible major versions (%u vs %u), deleting session", + reply.major, be32toh(msg.major)); + relay_delete_session(cmd, streams_ht); + ret = 0; goto end; } + + cmd->major = reply.major; + /* We adapt to the lowest compatible version */ + if (reply.minor <= be32toh(msg.minor)) { + cmd->minor = reply.minor; + } else { + cmd->minor = be32toh(msg.minor); + } + reply.major = htobe32(reply.major); reply.minor = htobe32(reply.minor); ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, @@ -1391,8 +1255,9 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr, if (ret < 0) { ERR("Relay sending version"); } - DBG("Version check done (%u.%u)", be32toh(reply.major), - be32toh(reply.minor)); + + DBG("Version check done using protocol %u.%u", cmd->major, + cmd->minor); end: return ret; @@ -1410,8 +1275,6 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, struct lttcomm_relayd_generic_reply reply; struct relay_stream *stream; int ret; - struct lttng_ht_node_ulong *node; - struct lttng_ht_iter iter; uint64_t last_net_seq_num, stream_id; DBG("Data pending command received"); @@ -1439,17 +1302,12 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr, last_net_seq_num = be64toh(msg.last_net_seq_num); rcu_read_lock(); - lttng_ht_lookup(streams_ht, (void *)((unsigned long) stream_id), &iter); - node = lttng_ht_iter_get_node_ulong(&iter); - if (node == NULL) { - DBG("Relay stream %" PRIu64 " not found", stream_id); + stream = relay_stream_from_stream_id(stream_id, streams_ht); + if (stream == NULL) { ret = -1; goto end_unlock; } - stream = caa_container_of(node, struct relay_stream, stream_n); - assert(stream); - DBG("Data pending for stream id %" PRIu64 " prev_seq %" PRIu64 " and last_seq %" PRIu64, stream_id, stream->prev_seq, last_net_seq_num); @@ -1686,12 +1544,132 @@ end_no_session: return ret; } +/* + * Receive an index for a specific stream. + * + * Return 0 on success else a negative value. + */ +static +int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr, + struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *indexes_ht) +{ + int ret, send_ret, index_created = 0; + struct relay_session *session = cmd->session; + struct lttcomm_relayd_index index_info; + struct relay_index *index, *wr_index = NULL; + struct lttcomm_relayd_generic_reply reply; + struct relay_stream *stream; + uint64_t net_seq_num; + + assert(cmd); + assert(streams_ht); + assert(indexes_ht); + + DBG("Relay receiving index"); + + if (!session || cmd->version_check_done == 0) { + ERR("Trying to close a stream before version check"); + ret = -1; + goto end_no_session; + } + + ret = cmd->sock->ops->recvmsg(cmd->sock, &index_info, + sizeof(index_info), 0); + if (ret < sizeof(index_info)) { + if (ret == 0) { + /* Orderly shutdown. Not necessary to print an error. */ + DBG("Socket %d did an orderly shutdown", cmd->sock->fd); + } else { + ERR("Relay didn't receive valid index struct size : %d", ret); + } + ret = -1; + goto end_no_session; + } + + net_seq_num = be64toh(index_info.net_seq_num); + + rcu_read_lock(); + stream = relay_stream_from_stream_id(be64toh(index_info.relay_stream_id), + streams_ht); + if (!stream) { + ret = -1; + goto end_rcu_unlock; + } + + index = relay_index_find(stream->stream_handle, net_seq_num, indexes_ht); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream->stream_handle, net_seq_num); + if (!index) { + goto end_rcu_unlock; + } + index_created = 1; + } + + copy_index_control_data(index, &index_info); + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created, set the data in this + * object and write it on disk. + */ + relay_index_add(index, indexes_ht, &wr_index); + if (wr_index) { + copy_index_control_data(wr_index, &index_info); + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; + } + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + /* Starting at 2.4, create the index file if none available. */ + if (cmd->minor >= 4 && stream->index_fd < 0) { + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + + ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + if (ret < 0) { + goto end_rcu_unlock; + } + } + +end_rcu_unlock: + rcu_read_unlock(); + + if (ret < 0) { + reply.ret_code = htobe32(LTTNG_ERR_UNK); + } else { + reply.ret_code = htobe32(LTTNG_OK); + } + send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply, sizeof(reply), 0); + if (send_ret < 0) { + ERR("Relay sending close index id reply"); + ret = send_ret; + } + +end_no_session: + return ret; +} + /* * relay_process_control: Process the commands received on the control socket */ static int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, - struct relay_command *cmd, struct lttng_ht *streams_ht) + struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *index_streams_ht, + struct lttng_ht *indexes_ht) { int ret = 0; @@ -1709,7 +1687,7 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, ret = relay_recv_metadata(recv_hdr, cmd, streams_ht); break; case RELAYD_VERSION: - ret = relay_send_version(recv_hdr, cmd); + ret = relay_send_version(recv_hdr, cmd, streams_ht); break; case RELAYD_CLOSE_STREAM: ret = relay_close_stream(recv_hdr, cmd, streams_ht); @@ -1726,6 +1704,9 @@ int relay_process_control(struct lttcomm_relayd_hdr *recv_hdr, case RELAYD_END_DATA_PENDING: ret = relay_end_data_pending(recv_hdr, cmd, streams_ht); break; + case RELAYD_SEND_INDEX: + ret = relay_recv_index(recv_hdr, cmd, streams_ht, indexes_ht); + break; case RELAYD_UPDATE_SYNC_INFO: default: ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd)); @@ -1742,12 +1723,14 @@ end: * relay_process_data: Process the data received on the data socket */ static -int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) +int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht, + struct lttng_ht *indexes_ht) { - int ret = 0; + int ret = 0, rotate_index = 0, index_created = 0; struct relay_stream *stream; + struct relay_index *index, *wr_index = NULL; struct lttcomm_relayd_data_hdr data_hdr; - uint64_t stream_id; + uint64_t stream_id, data_offset; uint64_t net_seq_num; uint32_t data_size; @@ -1770,7 +1753,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) stream = relay_stream_from_stream_id(stream_id, streams_ht); if (!stream) { ret = -1; - goto end_unlock; + goto end_rcu_unlock; } data_size = be32toh(data_hdr.data_size); @@ -1782,7 +1765,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) ERR("Allocating data buffer"); free(data_buffer); ret = -1; - goto end_unlock; + goto end_rcu_unlock; } data_buffer = tmp_data_ptr; data_buffer_size = data_size; @@ -1800,7 +1783,94 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) DBG("Socket %d did an orderly shutdown", cmd->sock->fd); } ret = -1; - goto end_unlock; + goto end_rcu_unlock; + } + + /* Check if a rotation is needed. */ + if (stream->tracefile_size > 0 && + (stream->tracefile_size_current + data_size) > + stream->tracefile_size) { + ret = utils_rotate_stream_file(stream->path_name, stream->channel_name, + stream->tracefile_size, stream->tracefile_count, + relayd_uid, relayd_gid, stream->fd, + &(stream->tracefile_count_current), &stream->fd); + if (ret < 0) { + ERR("Rotating stream output file"); + goto end_rcu_unlock; + } + /* Reset current size because we just perform a stream rotation. */ + stream->tracefile_size_current = 0; + rotate_index = 1; + } + + /* Get data offset because we are about to update the index. */ + data_offset = htobe64(stream->tracefile_size_current); + + /* + * Lookup for an existing index for that stream id/sequence number. If on + * exists, the control thread already received the data for it thus we need + * to write it on disk. + */ + index = relay_index_find(stream_id, net_seq_num, indexes_ht); + if (!index) { + /* A successful creation will add the object to the HT. */ + index = relay_index_create(stream->stream_handle, net_seq_num); + if (!index) { + goto end_rcu_unlock; + } + index_created = 1; + } + + if (rotate_index || stream->index_fd < 0) { + index->to_close_fd = stream->index_fd; + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + /* This will close the stream's index fd if one. */ + relay_index_free_safe(index); + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + index->fd = stream->index_fd; + index->index_data.offset = data_offset; + + if (index_created) { + /* + * Try to add the relay index object to the hash table. If an object + * already exist, destroy back the index created and set the data. + */ + relay_index_add(index, indexes_ht, &wr_index); + if (wr_index) { + /* Copy back data from the created index. */ + wr_index->fd = index->fd; + wr_index->to_close_fd = index->to_close_fd; + wr_index->index_data.offset = data_offset; + free(index); + } + } else { + /* The index already exists so write it on disk. */ + wr_index = index; + } + + /* Do we have a writable ready index to write on disk. */ + if (wr_index) { + /* Starting at 2.4, create the index file if none available. */ + if (cmd->minor >= 4 && stream->index_fd < 0) { + ret = index_create_file(stream->path_name, stream->channel_name, + relayd_uid, relayd_gid, stream->tracefile_size, + stream->tracefile_count_current); + if (ret < 0) { + goto end_rcu_unlock; + } + stream->index_fd = ret; + } + + ret = relay_index_write(wr_index->fd, wr_index, indexes_ht); + if (ret < 0) { + goto end_rcu_unlock; + } } do { @@ -1809,7 +1879,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) if (ret < 0 || ret != data_size) { ERR("Relay error writing data to file"); ret = -1; - goto end_unlock; + goto end_rcu_unlock; } DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64, @@ -1817,8 +1887,9 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) ret = write_padding_to_file(stream->fd, be32toh(data_hdr.padding_size)); if (ret < 0) { - goto end_unlock; + goto end_rcu_unlock; } + stream->tracefile_size_current += data_size + be32toh(data_hdr.padding_size); stream->prev_seq = net_seq_num; @@ -1831,6 +1902,11 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) if (cret < 0) { PERROR("close stream process data"); } + + cret = close(stream->index_fd); + if (cret < 0) { + PERROR("close stream index_fd"); + } iter.iter.node = &stream->stream_n.node; ret = lttng_ht_del(streams_ht, &iter); assert(!ret); @@ -1839,7 +1915,7 @@ int relay_process_data(struct relay_command *cmd, struct lttng_ht *streams_ht) DBG("Closed tracefile %d after recv data", stream->fd); } -end_unlock: +end_rcu_unlock: rcu_read_unlock(); end: return ret; @@ -1935,6 +2011,7 @@ void *relay_thread_worker(void *data) struct lttng_ht_node_ulong *node; struct lttng_ht_iter iter; struct lttng_ht *streams_ht; + struct lttng_ht *index_streams_ht; struct lttcomm_relayd_hdr recv_hdr; DBG("[thread] Relay worker started"); @@ -1953,6 +2030,12 @@ void *relay_thread_worker(void *data) goto streams_ht_error; } + /* Tables of received indexes indexed by index handle and net_seq_num. */ + indexes_ht = lttng_ht_new(0, LTTNG_HT_TYPE_TWO_U64); + if (!indexes_ht) { + goto indexes_ht_error; + } + ret = create_thread_poll_set(&events, 2); if (ret < 0) { goto error_poll_create; @@ -2064,7 +2147,9 @@ restart: } ret = relay_process_control(&recv_hdr, relay_connection, - streams_ht); + streams_ht, + index_streams_ht, + indexes_ht); if (ret < 0) { /* Clear the session on error. */ relay_cleanup_poll_connection(&events, pollfd); @@ -2136,7 +2221,8 @@ restart: continue; } - ret = relay_process_data(relay_connection, streams_ht); + ret = relay_process_data(relay_connection, streams_ht, + indexes_ht); /* connection closed */ if (ret < 0) { relay_cleanup_poll_connection(&events, pollfd); @@ -2180,6 +2266,8 @@ error: } rcu_read_unlock(); error_poll_create: + lttng_ht_destroy(indexes_ht); +indexes_ht_error: lttng_ht_destroy(streams_ht); streams_ht_error: lttng_ht_destroy(relay_connections_ht); @@ -2232,6 +2320,20 @@ int main(int argc, char **argv) goto exit; } + /* Try to create directory if -o, --output is specified. */ + if (opt_output_path) { + if (*opt_output_path != '/') { + ERR("Please specify an absolute path for -o, --output PATH"); + goto exit; + } + + ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG); + if (ret < 0) { + ERR("Unable to create %s", opt_output_path); + goto exit; + } + } + /* Daemonize */ if (opt_daemon) { ret = daemon(0, 0); @@ -2241,10 +2343,12 @@ int main(int argc, char **argv) } } - /* Check if daemon is UID = 0 */ - is_root = !getuid(); + /* We need those values for the file/dir creation. */ + relayd_uid = getuid(); + relayd_gid = getgid(); - if (!is_root) { + /* Check if daemon is UID = 0 */ + if (relayd_uid == 0) { if (control_uri->port < 1024 || data_uri->port < 1024) { ERR("Need to be root to use ports < 1024"); ret = -1; @@ -2263,6 +2367,9 @@ int main(int argc, char **argv) /* Set up max poll set size */ lttng_poll_set_max_size(); + /* Initialize communication library */ + lttcomm_init(); + /* Setup the dispatcher thread */ ret = pthread_create(&dispatcher_thread, NULL, relay_thread_dispatcher, (void *) NULL);