Fix: Wait for in-flight data before closing a stream
[lttng-tools.git] / src / bin / lttng-relayd / stream.c
index 54b737cb1d797610e82d14a355cf776cde4d9b1c..c1968f34e76694558a6f40ff21662ea9cfdb4c42 100644 (file)
@@ -92,6 +92,7 @@ struct relay_stream *stream_create(struct ctf_trace *trace,
 
        stream->stream_handle = stream_handle;
        stream->prev_seq = -1ULL;
+       stream->last_net_seq_num = -1ULL;
        stream->ctf_stream_id = -1ULL;
        stream->tracefile_size = tracefile_size;
        stream->tracefile_count = tracefile_count;
@@ -337,14 +338,47 @@ void stream_put(struct relay_stream *stream)
        rcu_read_unlock();
 }
 
-void stream_close(struct relay_stream *stream)
+void try_stream_close(struct relay_stream *stream)
 {
-       DBG("closing stream %" PRIu64, stream->stream_handle);
+       DBG("Trying to close stream %" PRIu64, stream->stream_handle);
        pthread_mutex_lock(&stream->lock);
+       /*
+        * Can be called concurently by connection close and reception of last
+        * pending data.
+        */
+       if (stream->closed) {
+               pthread_mutex_unlock(&stream->lock);
+               DBG("closing stream %" PRIu64 " aborted since it is already marked as closed", stream->stream_handle);
+               return;
+       }
+
+       stream->close_requested = true;
+       /*
+        * We shortcut the data pending check if no bound is known for this
+        * stream. This prevents us from never closing the stream in the case
+        * where a connection would be closed before a "close" command has
+        * been received.
+        *
+        * TODO
+        * This still leaves open the question of handling missing data after
+        * a bound has been set by a stream close command. Since we have no
+        * way of pairing data and control connection, and that a data
+        * connection has no ownership of a stream, it is likely that a
+        * timeout approach would be appropriate to handle dangling streams.
+        */
+       if (stream->last_net_seq_num != -1ULL &&
+                       ((int64_t) (stream->prev_seq - stream->last_net_seq_num)) < 0) {
+               /* Don't close since we still have data pending. */
+               pthread_mutex_unlock(&stream->lock);
+               DBG("closing stream %" PRIu64 " aborted since it still has data pending", stream->stream_handle);
+               return;
+       }
        stream_unpublish(stream);
        stream->closed = true;
+       /* Relay indexes are only used by the "consumer/sessiond" end. */
        relay_index_close_all(stream);
        pthread_mutex_unlock(&stream->lock);
+       DBG("Succeeded in closing stream %" PRIu64, stream->stream_handle);
        stream_put(stream);
 }
 
This page took 0.02427 seconds and 4 git commands to generate.