}
}
-/*
- * Try to lock the stream mutex.
- *
- * On success, 1 is returned else 0 indicating that the mutex is NOT lock.
- */
-static int stream_try_lock(struct lttng_consumer_stream *stream)
-{
- int ret;
-
- assert(stream);
-
- /*
- * Try to lock the stream mutex. On failure, we know that the stream is
- * being used else where hence there is data still being extracted.
- */
- ret = pthread_mutex_trylock(&stream->lock);
- if (ret) {
- /* For both EBUSY and EINVAL error, the mutex is NOT locked. */
- ret = 0;
- goto end;
- }
-
- ret = 1;
-
-end:
- return ret;
-}
-
/*
* Search for a relayd associated to the session id and return the reference.
*
ht->hash_fct(&id, lttng_ht_seed),
ht->match_fct, &id,
&iter.iter, stream, node_session_id.node) {
- /* If this call fails, the stream is being used hence data pending. */
- ret = stream_try_lock(stream);
- if (!ret) {
- goto data_pending;
- }
+ pthread_mutex_lock(&stream->lock);
/*
* A removed node from the hash table indicates that the stream has
stream->chan->current_chunk_id,
stream->last_sequence_number);
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
+ if (ret < 0) {
+ ERR("Relayd rotate stream failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
if (ret) {
ERR("Rotate relay stream");
}
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_rotate_rename(&relayd->control_sock, old_path, new_path);
+ if (ret < 0) {
+ ERR("Relayd rotate rename failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
end:
return ret;
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_rotate_pending(&relayd->control_sock, chunk_id);
+ if (ret < 0) {
+ ERR("Relayd rotate pending failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
end:
pthread_mutex_lock(&relayd->ctrl_sock_mutex);
ret = relayd_mkdir(&relayd->control_sock, path);
+ if (ret < 0) {
+ ERR("Relayd mkdir failed. Cleaning up relayd %" PRIu64".", relayd->net_seq_idx);
+ lttng_consumer_cleanup_relayd(relayd);
+ }
pthread_mutex_unlock(&relayd->ctrl_sock_mutex);
end: