- Enforce symmetry between allocation and teardown,
- Handle all errors,
- Return all errors as EXIT_FAILURE,
- Standardize on zero being success, nonzero being error,
(rather than < 0 being error),
- Fix pthread PERROR: we need to store ret into errno before
calling PERROR, since pthread API does not set errno,
- Join errors now fall-through, rather than rely on the OS
to teardown the rest.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
* Cleanup the daemon
*/
static
* Cleanup the daemon
*/
static
+void cleanup_relayd_live(void)
* Stop all threads by closing the thread quit pipe.
*/
static
* Stop all threads by closing the thread quit pipe.
*/
static
/* Stopping all threads */
DBG("Terminating all live threads");
ret = notify_thread_pipe(thread_quit_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
/* Stopping all threads */
DBG("Terminating all live threads");
ret = notify_thread_pipe(thread_quit_pipe[1]);
if (ret < 0) {
ERR("write error on thread quit pipe");
}
/* Dispatch thread */
CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
futex_nto1_wake(&viewer_conn_queue.futex);
}
/* Dispatch thread */
CMM_STORE_SHARED(live_dispatch_thread_exit, 1);
futex_nto1_wake(&viewer_conn_queue.futex);
}
health_unregister(health_relayd);
DBG("Live viewer listener thread cleanup complete");
}
health_unregister(health_relayd);
DBG("Live viewer listener thread cleanup complete");
+ if (stop_threads()) {
+ ERR("Error stopping live threads");
+ }
}
health_unregister(health_relayd);
DBG("Live viewer dispatch thread dying");
}
health_unregister(health_relayd);
DBG("Live viewer dispatch thread dying");
+ if (stop_threads()) {
+ ERR("Error stopping live threads");
+ }
ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
ERR("Health error occurred in %s", __func__);
}
health_unregister(health_relayd);
+ if (stop_threads()) {
+ ERR("Error stopping live threads");
+ }
rcu_unregister_thread();
return NULL;
}
rcu_unregister_thread();
return NULL;
}
*/
static int create_conn_pipe(void)
{
*/
static int create_conn_pipe(void)
{
- int ret;
-
- ret = utils_create_pipe_cloexec(live_conn_pipe);
+ return utils_create_pipe_cloexec(live_conn_pipe);
+}
+int relayd_live_stop(void)
+{
+ return stop_threads();
-void live_stop_threads(void)
+int relayd_live_join(void)
ret = pthread_join(live_listener_thread, &status);
ret = pthread_join(live_listener_thread, &status);
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live listener");
PERROR("pthread_join live listener");
- goto error; /* join error, exit without cleanup */
}
ret = pthread_join(live_worker_thread, &status);
}
ret = pthread_join(live_worker_thread, &status);
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live worker");
PERROR("pthread_join live worker");
- goto error; /* join error, exit without cleanup */
}
ret = pthread_join(live_dispatcher_thread, &status);
}
ret = pthread_join(live_dispatcher_thread, &status);
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live dispatcher");
PERROR("pthread_join live dispatcher");
- goto error; /* join error, exit without cleanup */
-int live_start_threads(struct lttng_uri *uri,
+int relayd_live_create(struct lttng_uri *uri,
struct relay_local_data *relay_ctx)
{
struct relay_local_data *relay_ctx)
{
+ int ret = 0, retval = 0;
void *status;
int is_root;
void *status;
int is_root;
+ if (!uri) {
+ retval = -1;
+ goto exit_init_data;
+ }
live_uri = uri;
/* Check if daemon is UID = 0 */
live_uri = uri;
/* Check if daemon is UID = 0 */
if (!is_root) {
if (live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
if (!is_root) {
if (live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
+ retval = -1;
+ goto exit_init_data;
}
}
/* Setup the thread apps communication pipe. */
}
}
/* Setup the thread apps communication pipe. */
- if ((ret = create_conn_pipe()) < 0) {
- goto exit;
+ if (create_conn_pipe()) {
+ retval = -1;
+ goto exit_init_data;
}
/* Init relay command queue. */
}
/* Init relay command queue. */
/* Setup the dispatcher thread */
ret = pthread_create(&live_dispatcher_thread, NULL,
thread_dispatcher, (void *) NULL);
/* Setup the dispatcher thread */
ret = pthread_create(&live_dispatcher_thread, NULL,
thread_dispatcher, (void *) NULL);
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer dispatcher");
PERROR("pthread_create viewer dispatcher");
+ retval = -1;
+ goto exit_dispatcher_thread;
}
/* Setup the worker thread */
ret = pthread_create(&live_worker_thread, NULL,
thread_worker, relay_ctx);
}
/* Setup the worker thread */
ret = pthread_create(&live_worker_thread, NULL,
thread_worker, relay_ctx);
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer worker");
PERROR("pthread_create viewer worker");
+ retval = -1;
+ goto exit_worker_thread;
}
/* Setup the listener thread */
ret = pthread_create(&live_listener_thread, NULL,
thread_listener, (void *) NULL);
}
/* Setup the listener thread */
ret = pthread_create(&live_listener_thread, NULL,
thread_listener, (void *) NULL);
+ if (ret) {
+ errno = ret;
PERROR("pthread_create viewer listener");
PERROR("pthread_create viewer listener");
+ retval = -1;
+ goto exit_listener_thread;
+ /*
+ * All OK, started all threads.
+ */
+ return retval;
+
ret = pthread_join(live_listener_thread, &status);
ret = pthread_join(live_listener_thread, &status);
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live listener");
PERROR("pthread_join live listener");
- goto error; /* join error, exit without cleanup */
ret = pthread_join(live_worker_thread, &status);
ret = pthread_join(live_worker_thread, &status);
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live worker");
PERROR("pthread_join live worker");
- goto error; /* join error, exit without cleanup */
ret = pthread_join(live_dispatcher_thread, &status);
ret = pthread_join(live_dispatcher_thread, &status);
+ if (ret) {
+ errno = ret;
PERROR("pthread_join live dispatcher");
PERROR("pthread_join live dispatcher");
- goto error; /* join error, exit without cleanup */
+exit_init_data:
+ cleanup_relayd_live();
-end:
-error:
- return ret;
#include "lttng-relayd.h"
#include "lttng-relayd.h"
-int live_start_threads(struct lttng_uri *live_uri,
+int relayd_live_create(struct lttng_uri *live_uri,
struct relay_local_data *relay_ctx);
struct relay_local_data *relay_ctx);
-void live_stop_threads(void);
+int relayd_live_stop(void);
+int relayd_live_join(void);
struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id);
struct relay_viewer_stream *live_find_viewer_stream_by_id(uint64_t stream_id);
static
int set_options(int argc, char **argv)
{
static
int set_options(int argc, char **argv)
{
- int c, ret = 0, option_index = 0;
+ int c, ret = 0, option_index = 0, retval = 0;
int orig_optopt = optopt, orig_optind = optind;
char *default_address, *optstring;
const char *config_path = NULL;
int orig_optopt = optopt, orig_optind = optind;
char *default_address, *optstring;
const char *config_path = NULL;
optstring = utils_generate_optstring(long_options,
sizeof(long_options) / sizeof(struct option));
if (!optstring) {
optstring = utils_generate_optstring(long_options,
sizeof(long_options) / sizeof(struct option));
if (!optstring) {
while ((c = getopt_long(argc, argv, optstring, long_options,
&option_index)) != -1) {
if (c == '?') {
while ((c = getopt_long(argc, argv, optstring, long_options,
&option_index)) != -1) {
if (c == '?') {
goto exit;
} else if (c != 'f') {
continue;
goto exit;
} else if (c != 'f') {
continue;
if (ret) {
if (ret > 0) {
ERR("Invalid configuration option at line %i", ret);
if (ret) {
if (ret > 0) {
ERR("Invalid configuration option at line %i", ret);
ret = set_option(c, optarg, long_options[option_index].name);
if (ret < 0) {
ret = set_option(c, optarg, long_options[option_index].name);
if (ret < 0) {
DEFAULT_NETWORK_CONTROL_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
DEFAULT_NETWORK_CONTROL_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
free(default_address);
if (ret < 0) {
ERR("Invalid control URI specified");
free(default_address);
if (ret < 0) {
ERR("Invalid control URI specified");
DEFAULT_NETWORK_DATA_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
DEFAULT_NETWORK_DATA_PORT);
if (ret < 0) {
PERROR("asprintf default data address");
free(default_address);
if (ret < 0) {
ERR("Invalid data URI specified");
free(default_address);
if (ret < 0) {
ERR("Invalid data URI specified");
DEFAULT_NETWORK_VIEWER_PORT);
if (ret < 0) {
PERROR("asprintf default viewer control address");
DEFAULT_NETWORK_VIEWER_PORT);
if (ret < 0) {
PERROR("asprintf default viewer control address");
free(default_address);
if (ret < 0) {
ERR("Invalid viewer control URI specified");
free(default_address);
if (ret < 0) {
ERR("Invalid viewer control URI specified");
goto exit;
}
}
exit:
free(optstring);
goto exit;
}
}
exit:
free(optstring);
}
/*
* Cleanup the daemon
*/
static
}
/*
* Cleanup the daemon
*/
static
+void relayd_cleanup(struct relay_local_data *relay_ctx)
+ if (viewer_streams_ht)
+ lttng_ht_destroy(viewer_streams_ht);
+ if (relay_streams_ht)
+ lttng_ht_destroy(relay_streams_ht);
+ if (relay_ctx && relay_ctx->sessions_ht)
+ lttng_ht_destroy(relay_ctx->sessions_ht);
+ free(relay_ctx);
+
/* free the dynamically allocated opt_output_path */
free(opt_output_path);
/* free the dynamically allocated opt_output_path */
free(opt_output_path);
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_conn_queue.futex);
/* Dispatch thread */
CMM_STORE_SHARED(dispatch_thread_exit, 1);
futex_nto1_wake(&relay_conn_queue.futex);
+
+ ret = relayd_live_stop();
+ if (ret) {
+ ERR("Error stopping live threads");
+ }
*/
int main(int argc, char **argv)
{
*/
int main(int argc, char **argv)
{
+ int ret = 0, retval = 0;
- struct relay_local_data *relay_ctx;
+ struct relay_local_data *relay_ctx = NULL;
/* Parse arguments */
progname = argv[0];
/* Parse arguments */
progname = argv[0];
- if ((ret = set_options(argc, argv)) < 0) {
- goto exit;
+ if (set_options(argc, argv)) {
+ retval = -1;
+ goto exit_options;
- if ((ret = set_signal_handler()) < 0) {
- goto exit;
+ if (set_signal_handler()) {
+ retval = -1;
+ goto exit_options;
}
/* Try to create directory if -o, --output is specified. */
if (opt_output_path) {
if (*opt_output_path != '/') {
ERR("Please specify an absolute path for -o, --output PATH");
}
/* Try to create directory if -o, --output is specified. */
if (opt_output_path) {
if (*opt_output_path != '/') {
ERR("Please specify an absolute path for -o, --output PATH");
+ retval = -1;
+ goto exit_options;
}
ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
if (ret < 0) {
ERR("Unable to create %s", opt_output_path);
}
ret = utils_mkdir_recursive(opt_output_path, S_IRWXU | S_IRWXG);
if (ret < 0) {
ERR("Unable to create %s", opt_output_path);
+ retval = -1;
+ goto exit_options;
ret = lttng_daemonize(&child_ppid, &recv_child_signal,
!opt_background);
if (ret < 0) {
ret = lttng_daemonize(&child_ppid, &recv_child_signal,
!opt_background);
if (ret < 0) {
+ retval = -1;
+ goto exit_options;
+
+ /* Initialize thread health monitoring */
+ health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
+ if (!health_relayd) {
+ PERROR("health_app_create error");
+ retval = -1;
+ goto exit_health_app_create;
+ }
+
/* Create thread quit pipe */
/* Create thread quit pipe */
- if ((ret = init_thread_quit_pipe()) < 0) {
- goto error;
+ if (init_thread_quit_pipe()) {
+ retval = -1;
+ goto exit_init_data;
}
/* We need those values for the file/dir creation. */
}
/* We need those values for the file/dir creation. */
if (relayd_uid == 0) {
if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
if (relayd_uid == 0) {
if (control_uri->port < 1024 || data_uri->port < 1024 || live_uri->port < 1024) {
ERR("Need to be root to use ports < 1024");
+ retval = -1;
+ goto exit_init_data;
}
}
/* Setup the thread apps communication pipe. */
}
}
/* Setup the thread apps communication pipe. */
- if ((ret = create_relay_conn_pipe()) < 0) {
- goto exit;
+ if (create_relay_conn_pipe()) {
+ retval = -1;
+ goto exit_init_data;
}
/* Init relay command queue. */
}
/* Init relay command queue. */
relay_ctx = zmalloc(sizeof(struct relay_local_data));
if (!relay_ctx) {
PERROR("relay_ctx");
relay_ctx = zmalloc(sizeof(struct relay_local_data));
if (!relay_ctx) {
PERROR("relay_ctx");
+ retval = -1;
+ goto exit_init_data;
}
/* tables of sessions indexed by session ID */
relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_ctx->sessions_ht) {
}
/* tables of sessions indexed by session ID */
relay_ctx->sessions_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_ctx->sessions_ht) {
- goto exit_relay_ctx_sessions;
+ retval = -1;
+ goto exit_init_data;
}
/* tables of streams indexed by stream ID */
relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_streams_ht) {
}
/* tables of streams indexed by stream ID */
relay_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!relay_streams_ht) {
- goto exit_relay_ctx_streams;
+ retval = -1;
+ goto exit_init_data;
}
/* tables of streams indexed by stream ID */
viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!viewer_streams_ht) {
}
/* tables of streams indexed by stream ID */
viewer_streams_ht = lttng_ht_new(0, LTTNG_HT_TYPE_U64);
if (!viewer_streams_ht) {
- goto exit_relay_ctx_viewer_streams;
- }
-
- /* Initialize thread health monitoring */
- health_relayd = health_app_create(NR_HEALTH_RELAYD_TYPES);
- if (!health_relayd) {
- PERROR("health_app_create error");
- goto exit_health_app_create;
+ retval = -1;
+ goto exit_init_data;
}
ret = utils_create_pipe(health_quit_pipe);
}
ret = utils_create_pipe(health_quit_pipe);
- if (ret < 0) {
- goto error_health_pipe;
+ if (ret) {
+ retval = -1;
+ goto exit_health_quit_pipe;
}
/* Create thread to manage the client socket */
ret = pthread_create(&health_thread, NULL,
thread_manage_health, (void *) NULL);
}
/* Create thread to manage the client socket */
ret = pthread_create(&health_thread, NULL,
thread_manage_health, (void *) NULL);
+ if (ret) {
+ errno = ret;
PERROR("pthread_create health");
PERROR("pthread_create health");
+ retval = -1;
+ goto exit_health_thread;
}
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
}
/* Setup the dispatcher thread */
ret = pthread_create(&dispatcher_thread, NULL,
relay_thread_dispatcher, (void *) NULL);
+ if (ret) {
+ errno = ret;
PERROR("pthread_create dispatcher");
PERROR("pthread_create dispatcher");
+ retval = -1;
+ goto exit_dispatcher_thread;
}
/* Setup the worker thread */
ret = pthread_create(&worker_thread, NULL,
relay_thread_worker, (void *) relay_ctx);
}
/* Setup the worker thread */
ret = pthread_create(&worker_thread, NULL,
relay_thread_worker, (void *) relay_ctx);
+ if (ret) {
+ errno = ret;
PERROR("pthread_create worker");
PERROR("pthread_create worker");
+ retval = -1;
+ goto exit_worker_thread;
}
/* Setup the listener thread */
ret = pthread_create(&listener_thread, NULL,
relay_thread_listener, (void *) NULL);
}
/* Setup the listener thread */
ret = pthread_create(&listener_thread, NULL,
relay_thread_listener, (void *) NULL);
+ if (ret) {
+ errno = ret;
PERROR("pthread_create listener");
PERROR("pthread_create listener");
+ retval = -1;
+ goto exit_listener_thread;
- ret = live_start_threads(live_uri, relay_ctx);
- if (ret != 0) {
+ ret = relayd_live_create(live_uri, relay_ctx);
+ if (ret) {
ERR("Starting live viewer threads");
ERR("Starting live viewer threads");
+ /*
+ * This is where we start awaiting program completion (e.g. through
+ * signal that asks threads to teardown).
+ */
+
+ ret = relayd_live_join();
+ if (ret) {
+ retval = -1;
+ }
ret = pthread_join(listener_thread, &status);
ret = pthread_join(listener_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join listener_thread");
+ retval = -1;
ret = pthread_join(worker_thread, &status);
ret = pthread_join(worker_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join worker_thread");
+ retval = -1;
ret = pthread_join(dispatcher_thread, &status);
ret = pthread_join(dispatcher_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join dispatcher_thread");
+ retval = -1;
ret = pthread_join(health_thread, &status);
ret = pthread_join(health_thread, &status);
- if (ret != 0) {
- PERROR("pthread_join health thread");
- goto error; /* join error, exit without cleanup */
+ if (ret) {
+ errno = ret;
+ PERROR("pthread_join health_thread");
+ retval = -1;
- /*
- * Stop live threads only after joining other threads.
- */
- live_stop_threads();
-
-health_error:
utils_close_pipe(health_quit_pipe);
utils_close_pipe(health_quit_pipe);
health_app_destroy(health_relayd);
health_app_destroy(health_relayd);
- lttng_ht_destroy(viewer_streams_ht);
-
-exit_relay_ctx_viewer_streams:
- lttng_ht_destroy(relay_streams_ht);
-
-exit_relay_ctx_streams:
- lttng_ht_destroy(relay_ctx->sessions_ht);
+exit_options:
+ relayd_cleanup(relay_ctx);
-exit_relay_ctx_sessions:
- free(relay_ctx);
-
-exit:
- cleanup();
- if (!ret) {
+ } else {
+ exit(EXIT_FAILURE);
-
-error:
- exit(EXIT_FAILURE);