/*
* Copyright (C) 2012 - David Goulet <dgoulet@efficios.com>
+ * 2018 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
*
* This program is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License, version 2 only, as
*
* Should *NOT* be called with RCU read-side lock held.
*/
-struct consumer_output *consumer_copy_output(struct consumer_output *obj)
+struct consumer_output *consumer_copy_output(struct consumer_output *src)
{
int ret;
struct consumer_output *output;
- assert(obj);
+ assert(src);
- output = consumer_create_output(obj->type);
+ output = consumer_create_output(src->type);
if (output == NULL) {
goto end;
}
- output->enabled = obj->enabled;
- output->net_seq_index = obj->net_seq_index;
- memcpy(output->subdir, obj->subdir, sizeof(output->subdir));
- output->snapshot = obj->snapshot;
- output->relay_major_version = obj->relay_major_version;
- output->relay_minor_version = obj->relay_minor_version;
- memcpy(&output->dst, &obj->dst, sizeof(output->dst));
- ret = consumer_copy_sockets(output, obj);
+ output->enabled = src->enabled;
+ output->net_seq_index = src->net_seq_index;
+ memcpy(output->domain_subdir, src->domain_subdir,
+ sizeof(output->domain_subdir));
+ output->snapshot = src->snapshot;
+ output->relay_major_version = src->relay_major_version;
+ output->relay_minor_version = src->relay_minor_version;
+ memcpy(&output->dst, &src->dst, sizeof(output->dst));
+ ret = consumer_copy_sockets(output, src);
if (ret < 0) {
goto error_put;
}
}
/*
- * Set network URI to the consumer output object.
+ * Set network URI to the consumer output.
*
* Return 0 on success. Return 1 if the URI were equal. Else, negative value on
* error.
*/
-int consumer_set_network_uri(struct consumer_output *obj,
+int consumer_set_network_uri(const struct ltt_session *session,
+ struct consumer_output *output,
struct lttng_uri *uri)
{
int ret;
- char tmp_path[PATH_MAX];
- char hostname[HOST_NAME_MAX];
struct lttng_uri *dst_uri = NULL;
/* Code flow error safety net. */
- assert(obj);
+ assert(output);
assert(uri);
switch (uri->stype) {
case LTTNG_STREAM_CONTROL:
- dst_uri = &obj->dst.net.control;
- obj->dst.net.control_isset = 1;
+ dst_uri = &output->dst.net.control;
+ output->dst.net.control_isset = 1;
if (uri->port == 0) {
/* Assign default port. */
uri->port = DEFAULT_NETWORK_CONTROL_PORT;
} else {
- if (obj->dst.net.data_isset && uri->port ==
- obj->dst.net.data.port) {
+ if (output->dst.net.data_isset && uri->port ==
+ output->dst.net.data.port) {
ret = -LTTNG_ERR_INVALID;
goto error;
}
DBG3("Consumer control URI set with port %d", uri->port);
break;
case LTTNG_STREAM_DATA:
- dst_uri = &obj->dst.net.data;
- obj->dst.net.data_isset = 1;
+ dst_uri = &output->dst.net.data;
+ output->dst.net.data_isset = 1;
if (uri->port == 0) {
/* Assign default port. */
uri->port = DEFAULT_NETWORK_DATA_PORT;
} else {
- if (obj->dst.net.control_isset && uri->port ==
- obj->dst.net.control.port) {
+ if (output->dst.net.control_isset && uri->port ==
+ output->dst.net.control.port) {
ret = -LTTNG_ERR_INVALID;
goto error;
}
}
/* URIs were not equal, replacing it. */
- memset(dst_uri, 0, sizeof(struct lttng_uri));
memcpy(dst_uri, uri, sizeof(struct lttng_uri));
- obj->type = CONSUMER_DST_NET;
-
- /* Handle subdir and add hostname in front. */
- if (dst_uri->stype == LTTNG_STREAM_CONTROL) {
- /* Get hostname to append it in the pathname */
- ret = gethostname(hostname, sizeof(hostname));
- if (ret < 0) {
- PERROR("gethostname. Fallback on default localhost");
- strncpy(hostname, "localhost", sizeof(hostname));
- }
- hostname[sizeof(hostname) - 1] = '\0';
+ output->type = CONSUMER_DST_NET;
+ if (dst_uri->stype != LTTNG_STREAM_CONTROL) {
+ /* Only the control uri needs to contain the path. */
+ goto end;
+ }
- /* Setup consumer subdir if none present in the control URI */
- if (strlen(dst_uri->subdir) == 0) {
- ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
- hostname, obj->subdir);
- } else {
- ret = snprintf(tmp_path, sizeof(tmp_path), "%s/%s",
- hostname, dst_uri->subdir);
- }
- if (ret < 0) {
- PERROR("snprintf set consumer uri subdir");
- ret = -LTTNG_ERR_NOMEM;
+ /*
+ * If the user has specified a subdir as part of the control
+ * URL, the session's base output directory is:
+ * /RELAYD_OUTPUT_PATH/HOSTNAME/USER_SPECIFIED_DIR
+ *
+ * Hence, the "base_dir" from which all stream files and
+ * session rotation chunks are created takes the form
+ * /HOSTNAME/USER_SPECIFIED_DIR
+ *
+ * If the user has not specified an output directory as part of
+ * the control URL, the base output directory has the form:
+ * /RELAYD_OUTPUT_PATH/HOSTNAME/SESSION_NAME-CREATION_TIME
+ *
+ * Hence, the "base_dir" from which all stream files and
+ * session rotation chunks are created takes the form
+ * /HOSTNAME/SESSION_NAME-CREATION_TIME
+ *
+ * Note that automatically generated session names already
+ * contain the session's creation time. In that case, the
+ * creation time is omitted to prevent it from being duplicated
+ * in the final directory hierarchy.
+ */
+ if (*uri->subdir) {
+ if (strstr(uri->subdir, "../")) {
+ ERR("Network URI subdirs are not allowed to walk up the path hierarchy");
+ ret = -LTTNG_ERR_INVALID;
goto error;
}
+ ret = snprintf(output->dst.net.base_dir,
+ sizeof(output->dst.net.base_dir),
+ "/%s/%s/", session->hostname, uri->subdir);
+ } else {
+ if (session->has_auto_generated_name) {
+ ret = snprintf(output->dst.net.base_dir,
+ sizeof(output->dst.net.base_dir),
+ "/%s/%s/", session->hostname,
+ session->name);
+ } else {
+ char session_creation_datetime[16];
+ size_t strftime_ret;
+ struct tm *timeinfo;
- if (lttng_strncpy(obj->dst.net.base_dir, tmp_path,
- sizeof(obj->dst.net.base_dir))) {
- ret = -LTTNG_ERR_INVALID;
- goto error;
+ timeinfo = localtime(&session->creation_time);
+ if (!timeinfo) {
+ ret = -LTTNG_ERR_FATAL;
+ goto error;
+ }
+ strftime_ret = strftime(session_creation_datetime,
+ sizeof(session_creation_datetime),
+ "%Y%m%d-%H%M%S", timeinfo);
+ if (strftime_ret == 0) {
+ ERR("Failed to format session creation timestamp while setting network URI");
+ ret = -LTTNG_ERR_FATAL;
+ goto error;
+ }
+ ret = snprintf(output->dst.net.base_dir,
+ sizeof(output->dst.net.base_dir),
+ "/%s/%s-%s/", session->hostname,
+ session->name,
+ session_creation_datetime);
}
- DBG3("Consumer set network uri base_dir path %s", tmp_path);
}
+ if (ret >= sizeof(output->dst.net.base_dir)) {
+ ret = -LTTNG_ERR_INVALID;
+ ERR("Truncation occurred while setting network output base directory");
+ goto error;
+ } else if (ret == -1) {
+ ret = -LTTNG_ERR_INVALID;
+ PERROR("Error occurred while setting network output base directory");
+ goto error;
+ }
+
+ DBG3("Consumer set network uri base_dir path %s",
+ output->dst.net.base_dir);
+end:
return 0;
equal:
return 1;
pipe_name = "channel monitor";
command_name = "SET_CHANNEL_MONITOR_PIPE";
break;
- case LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE:
- pipe_name = "channel rotate";
- command_name = "SET_CHANNEL_ROTATE_PIPE";
- break;
default:
ERR("Unexpected command received in %s (cmd = %d)", __func__,
(int) cmd);
LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE, pipe);
}
-int consumer_send_channel_rotate_pipe(struct consumer_socket *consumer_sock,
- int pipe)
-{
- return consumer_send_pipe(consumer_sock,
- LTTNG_CONSUMER_SET_CHANNEL_ROTATE_PIPE, pipe);
-}
-
-/*
- * Set consumer subdirectory using the session name and a generated datetime if
- * needed. This is appended to the current subdirectory.
- */
-int consumer_set_subdir(struct consumer_output *consumer,
- const char *session_name)
-{
- int ret = 0;
- unsigned int have_default_name = 0;
- char datetime[16], tmp_path[PATH_MAX];
- time_t rawtime;
- struct tm *timeinfo;
-
- assert(consumer);
- assert(session_name);
-
- memset(tmp_path, 0, sizeof(tmp_path));
-
- /* Flag if we have a default session. */
- if (strncmp(session_name, DEFAULT_SESSION_NAME "-",
- strlen(DEFAULT_SESSION_NAME) + 1) == 0) {
- have_default_name = 1;
- } else {
- /* Get date and time for session path */
- time(&rawtime);
- timeinfo = localtime(&rawtime);
- strftime(datetime, sizeof(datetime), "%Y%m%d-%H%M%S", timeinfo);
- }
-
- if (have_default_name) {
- ret = snprintf(tmp_path, sizeof(tmp_path),
- "%s/%s", consumer->subdir, session_name);
- } else {
- ret = snprintf(tmp_path, sizeof(tmp_path),
- "%s/%s-%s/", consumer->subdir, session_name, datetime);
- }
- if (ret < 0) {
- PERROR("snprintf session name date");
- goto error;
- }
-
- if (lttng_strncpy(consumer->subdir, tmp_path,
- sizeof(consumer->subdir))) {
- ret = -EINVAL;
- goto error;
- }
- DBG2("Consumer subdir set to %s", consumer->subdir);
-
-error:
- return ret;
-}
-
/*
* Ask the consumer if the data is pending for the specific session id.
* Returns 1 if data is pending, 0 otherwise, or < 0 on error.
/*
* Ask the consumer to snapshot a specific channel using the key.
*
- * Return 0 on success or else a negative error.
+ * Returns LTTNG_OK on success or else an LTTng error code.
*/
-int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key,
- struct snapshot_output *output, int metadata, uid_t uid, gid_t gid,
- const char *session_path, int wait, uint64_t nb_packets_per_stream,
- uint64_t trace_archive_id)
+enum lttng_error_code consumer_snapshot_channel(struct consumer_socket *socket,
+ uint64_t key, struct snapshot_output *output, int metadata,
+ uid_t uid, gid_t gid, const char *session_path, int wait,
+ uint64_t nb_packets_per_stream, uint64_t trace_archive_id)
{
int ret;
+ enum lttng_error_code status = LTTNG_OK;
struct lttcomm_consumer_msg msg;
assert(socket);
sizeof(msg.u.snapshot_channel.pathname),
"%s/%s/%s-%s-%" PRIu64 "%s",
output->consumer->dst.net.base_dir,
- output->consumer->subdir,
+ output->consumer->domain_subdir,
output->name, output->datetime,
output->nb_snapshot,
session_path);
if (ret < 0) {
- ret = -LTTNG_ERR_NOMEM;
+ status = LTTNG_ERR_INVALID;
goto error;
} else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) {
ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s/%s-%s-%" PRIu64 "%s\"",
sizeof(msg.u.snapshot_channel.pathname),
ret, output->consumer->dst.net.base_dir,
- output->consumer->subdir,
+ output->consumer->domain_subdir,
output->name, output->datetime,
output->nb_snapshot,
session_path);
- ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+ status = LTTNG_ERR_SNAPSHOT_FAIL;
goto error;
}
} else {
output->nb_snapshot,
session_path);
if (ret < 0) {
- ret = -LTTNG_ERR_NOMEM;
+ status = LTTNG_ERR_NOMEM;
goto error;
} else if (ret >= sizeof(msg.u.snapshot_channel.pathname)) {
ERR("Snapshot path exceeds the maximal allowed length of %zu bytes (%i bytes required) with path \"%s/%s-%s-%" PRIu64 "%s\"",
ret, output->consumer->dst.session_root_path,
output->name, output->datetime, output->nb_snapshot,
session_path);
- ret = -LTTNG_ERR_SNAPSHOT_FAIL;
+ status = LTTNG_ERR_SNAPSHOT_FAIL;
goto error;
}
S_IRWXU | S_IRWXG, uid, gid);
if (ret < 0) {
if (errno != EEXIST) {
- ERR("Trace directory creation error");
+ status = LTTNG_ERR_CREATE_DIR_FAIL;
+ PERROR("Trace directory creation error");
goto error;
}
}
ret = consumer_send_msg(socket, &msg);
pthread_mutex_unlock(socket->lock);
if (ret < 0) {
+ switch (-ret) {
+ case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
+ status = LTTNG_ERR_CHAN_NOT_FOUND;
+ break;
+ default:
+ status = LTTNG_ERR_SNAPSHOT_FAIL;
+ break;
+ }
goto error;
}
error:
health_code_update();
- return ret;
+ return status;
}
/*
*/
int consumer_rotate_channel(struct consumer_socket *socket, uint64_t key,
uid_t uid, gid_t gid, struct consumer_output *output,
- char *domain_path, bool is_metadata_channel,
- uint64_t new_chunk_id,
- bool *rotate_pending_relay)
+ const char *domain_path, bool is_metadata_channel,
+ uint64_t new_chunk_id)
{
int ret;
struct lttcomm_consumer_msg msg;
sizeof(msg.u.rotate_channel.pathname), "%s%s%s",
output->dst.net.base_dir,
output->chunk_path, domain_path);
- if (ret < 0 || ret == sizeof(msg.u.rotate_channel.pathname)) {
+ if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) {
ERR("Failed to format channel path name when asking consumer to rotate channel");
- ret = -1;
+ ret = -LTTNG_ERR_INVALID;
goto error;
}
- *rotate_pending_relay = true;
} else {
msg.u.rotate_channel.relayd_id = (uint64_t) -1ULL;
ret = snprintf(msg.u.rotate_channel.pathname,
- sizeof(msg.u.rotate_channel.pathname), "%s%s%s",
+ sizeof(msg.u.rotate_channel.pathname), "%s/%s%s",
output->dst.session_root_path,
output->chunk_path, domain_path);
- if (ret < 0 || ret == sizeof(msg.u.rotate_channel.pathname)) {
+ if (ret < 0 || ret >= sizeof(msg.u.rotate_channel.pathname)) {
ERR("Failed to format channel path name when asking consumer to rotate channel");
- ret = -1;
+ ret = -LTTNG_ERR_INVALID;
goto error;
}
}
health_code_update();
ret = consumer_send_msg(socket, &msg);
if (ret < 0) {
+ switch (-ret) {
+ case LTTCOMM_CONSUMERD_CHAN_NOT_FOUND:
+ ret = -LTTNG_ERR_CHAN_NOT_FOUND;
+ break;
+ default:
+ ret = -LTTNG_ERR_ROTATION_FAIL_CONSUMER;
+ break;
+ }
goto error;
}
-
error:
pthread_mutex_unlock(socket->lock);
health_code_update();
if (old_path_length >= sizeof(msg.u.rotate_rename.old_path)) {
ERR("consumer_rotate_rename: old path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)",
old_path_length + 1, sizeof(msg.u.rotate_rename.old_path));
- ret = -1;
+ ret = -LTTNG_ERR_INVALID;
goto error;
}
if (new_path_length >= sizeof(msg.u.rotate_rename.new_path)) {
ERR("consumer_rotate_rename: new path length (%zu bytes) exceeds the maximal length allowed by the consumer protocol (%zu bytes)",
new_path_length + 1, sizeof(msg.u.rotate_rename.new_path));
- ret = -1;
+ ret = -LTTNG_ERR_INVALID;
goto error;
}
health_code_update();
ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ ret = -LTTNG_ERR_ROTATE_RENAME_FAIL_CONSUMER;
+ goto error;
+ }
+
+error:
+ health_code_update();
+ return ret;
+}
+
+/*
+ * Ask the consumer if a rotation is locally pending. Must be called with the
+ * socket lock held.
+ *
+ * Return 1 if the rotation is still pending, 0 if finished, a negative value
+ * on error.
+ */
+int consumer_check_rotation_pending_local(struct consumer_socket *socket,
+ uint64_t session_id, uint64_t chunk_id)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg;
+ uint32_t pending = 0;
+
+ assert(socket);
+
+ DBG("Asking consumer to locally check for pending rotation for session %" PRIu64 ", chunk id %" PRIu64,
+ session_id, chunk_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_LOCAL;
+ msg.u.check_rotation_pending_local.session_id = session_id;
+ msg.u.check_rotation_pending_local.chunk_id = chunk_id;
+
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ ret = -LTTNG_ERR_ROTATION_PENDING_LOCAL_FAIL_CONSUMER;
+ goto error;
+ }
+
+ ret = consumer_socket_recv(socket, &pending, sizeof(pending));
if (ret < 0) {
goto error;
}
+ ret = pending;
+
error:
health_code_update();
return ret;
}
/*
- * Ask the relay if a rotation is still pending. Must be called with the socket
- * lock held.
+ * Ask the consumer if a rotation is pending on the relayd. Must be called with
+ * the socket lock held.
*
* Return 1 if the rotation is still pending, 0 if finished, a negative value
* on error.
*/
-int consumer_rotate_pending_relay(struct consumer_socket *socket,
- struct consumer_output *output, uint64_t session_id,
+int consumer_check_rotation_pending_relay(struct consumer_socket *socket,
+ const struct consumer_output *output, uint64_t session_id,
uint64_t chunk_id)
{
int ret;
assert(socket);
- DBG("Consumer rotate pending on relay for session %" PRIu64 ", chunk id %" PRIu64,
+ DBG("Asking consumer to check for pending rotation on relay for session %" PRIu64 ", chunk id %" PRIu64,
session_id, chunk_id);
assert(output->type == CONSUMER_DST_NET);
memset(&msg, 0, sizeof(msg));
- msg.cmd_type = LTTNG_CONSUMER_ROTATE_PENDING_RELAY;
- msg.u.rotate_pending_relay.session_id = session_id;
- msg.u.rotate_pending_relay.relayd_id = output->net_seq_index;
- msg.u.rotate_pending_relay.chunk_id = chunk_id;
+ msg.cmd_type = LTTNG_CONSUMER_CHECK_ROTATION_PENDING_RELAY;
+ msg.u.check_rotation_pending_relay.session_id = session_id;
+ msg.u.check_rotation_pending_relay.relayd_id = output->net_seq_index;
+ msg.u.check_rotation_pending_relay.chunk_id = chunk_id;
health_code_update();
ret = consumer_send_msg(socket, &msg);
if (ret < 0) {
+ ret = -LTTNG_ERR_ROTATION_PENDING_RELAY_FAIL_CONSUMER;
goto error;
}
ret = snprintf(msg.u.mkdir.path, sizeof(msg.u.mkdir.path), "%s", path);
if (ret < 0 || ret >= sizeof(msg.u.mkdir.path)) {
ERR("Format path");
- ret = -1;
+ ret = -LTTNG_ERR_INVALID;
goto error;
}
msg.u.mkdir.relayd_id = -1ULL;
}
+ health_code_update();
+ ret = consumer_send_msg(socket, &msg);
+ if (ret < 0) {
+ ret = -LTTNG_ERR_MKDIR_FAIL_CONSUMER;
+ goto error;
+ }
+
+error:
+ health_code_update();
+ return ret;
+}
+
+int consumer_init(struct consumer_socket *socket,
+ const lttng_uuid sessiond_uuid)
+{
+ int ret;
+ struct lttcomm_consumer_msg msg = {
+ .cmd_type = LTTNG_CONSUMER_INIT,
+ };
+
+ assert(socket);
+
+ DBG("Sending consumer initialization command");
+ lttng_uuid_copy(msg.u.init.sessiond_uuid, sessiond_uuid);
+
health_code_update();
ret = consumer_send_msg(socket, &msg);
if (ret < 0) {