(void) relayd_close(&relayd->control_sock);
(void) relayd_close(&relayd->data_sock);
+ pthread_mutex_destroy(&relayd->ctrl_sock_mutex);
free(relayd);
}
* If a local data context is available, notify the threads that the streams'
* state have changed.
*/
-static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
- struct lttng_consumer_local_data *ctx)
+void lttng_consumer_cleanup_relayd(struct consumer_relayd_sock_pair *relayd)
{
uint64_t netidx;
assert(relayd);
- DBG("Cleaning up relayd sockets");
+ DBG("Cleaning up relayd object ID %"PRIu64, relayd->net_seq_idx);
/* Save the net sequence index before destroying the object */
netidx = relayd->net_seq_idx;
* memory barrier ordering the updates of the end point status from the
* read of this status which happens AFTER receiving this notify.
*/
- if (ctx) {
- notify_thread_lttng_pipe(ctx->consumer_data_pipe);
- notify_thread_lttng_pipe(ctx->consumer_metadata_pipe);
- }
+ notify_thread_lttng_pipe(relayd->ctx->consumer_data_pipe);
+ notify_thread_lttng_pipe(relayd->ctx->consumer_metadata_pipe);
}
/*
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_add_stream(&relayd->control_sock, stream->name,
path, &stream->relayd_stream_id,
- stream->chan->tracefile_size, stream->chan->tracefile_count);
+ stream->chan->tracefile_size, stream->chan->tracefile_count,
+ stream->trace_archive_id);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd add stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto end;
}
ret = relayd_streams_sent(&relayd->control_sock);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd streams sent failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto end;
}
} else {
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
}
end:
* cleanup the relayd object and all associated streams.
*/
if (relayd && relayd_hang_up) {
- cleanup_relayd(relayd, ctx);
+ ERR("Relayd hangup. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
/* Skip splice error so the consumer does not fail */
goto end;
}
stream->chan->name);
ret = rotate_notify_sessiond(ctx, stream->chan->key);
}
- assert(stream->chan->nr_stream_rotate_pending >= 0);
pthread_mutex_unlock(&stream->chan->lock);
return ret;
/* local view of the streams */
struct lttng_consumer_stream **local_stream = NULL, *new_stream = NULL;
/* local view of consumer_data.fds_count */
- int nb_fd = 0, nb_pipes_fd;
+ int nb_fd = 0;
+ /* 2 for the consumer_data_pipe and wake up pipe */
+ const int nb_pipes_fd = 2;
/* Number of FDs with CONSUMER_ENDPOINT_INACTIVE but still open. */
int nb_inactive_fd = 0;
struct lttng_consumer_local_data *ctx = data;
free(local_stream);
local_stream = NULL;
- /*
- * Allocate for all fds + 2:
- * +1 for the consumer_data_pipe
- * +1 for wake up pipe
- */
- nb_pipes_fd = 2;
+ /* Allocate for all fds */
pollfd = zmalloc((consumer_data.stream_count + nb_pipes_fd) * sizeof(struct pollfd));
if (pollfd == NULL) {
PERROR("pollfd malloc");
* Add relayd socket pair to consumer data hashtable. If object already
* exists or on error, the function gracefully returns.
*/
+ relayd->ctx = ctx;
add_relayd(relayd);
/* All good! */
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
/* Communication error thus the relayd so no data pending. */
+ ERR("Relayd begin data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto data_not_pending;
}
}
stream->relayd_stream_id,
stream->next_net_seq_num - 1);
}
+ if (ret < 0) {
+ ERR("Relayd data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ pthread_mutex_unlock(&stream->lock);
+ goto data_not_pending;
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret == 1) {
pthread_mutex_unlock(&stream->lock);
relayd->relayd_session_id, &is_data_inflight);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
if (ret < 0) {
+ ERR("Relayd end data pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
goto data_not_pending;
}
if (is_data_inflight) {
stream->chan->current_chunk_id,
stream->last_sequence_number);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
if (ret) {
ERR("Rotate relay stream");
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+ if (ret < 0) {
+ ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
end:
return ret;
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+ if (ret < 0) {
+ ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
end:
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_mkdir(&relayd->control_sock, path);
+ if (ret < 0) {
+ ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
end: