Sample the "closed" flag with session lock held.
Also, if the session connection is closed while we attach to it, reply
with HUP, because there are risks of attaching to an incomplete session
(e.g. the metadata stream could be already closed).
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: Jérémie Galarneau <jeremie.galarneau@efficios.com>
static
int make_viewer_streams(struct relay_session *session,
enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent,
static
int make_viewer_streams(struct relay_session *session,
enum lttng_viewer_seek seek_t, uint32_t *nb_total, uint32_t *nb_unsent,
+ uint32_t *nb_created, bool *closed)
{
int ret;
struct lttng_ht_iter iter;
{
int ret;
struct lttng_ht_iter iter;
*/
pthread_mutex_lock(&session->lock);
*/
pthread_mutex_lock(&session->lock);
+ if (session->connection_closed) {
+ *closed = true;
+ }
+
/*
* Create viewer streams for relay streams that are ready to be
* used for a the given session id only.
/*
* Create viewer streams for relay streams that are ready to be
* used for a the given session id only.
struct lttng_viewer_new_streams_response response;
struct relay_session *session;
uint64_t session_id;
struct lttng_viewer_new_streams_response response;
struct relay_session *session;
uint64_t session_id;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_OK);
ret = make_viewer_streams(session, LTTNG_VIEWER_SEEK_LAST, &nb_total, &nb_unsent,
if (ret < 0) {
goto end_put_session;
}
if (ret < 0) {
goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
/*
response.streams_count = htobe32(nb_streams);
/*
- * If the session is closed and we have no new streams to send,
- * it means that the viewer has already received the whole trace
- * for this session and should now close it.
+ * If the session is closed, HUP when there are no more streams.
- if (nb_total == 0 && session->connection_closed) {
+ if (closed && nb_total == 0) {
+ response.streams_count = 0;
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
goto send_reply;
}
response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
goto send_reply;
}
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
struct relay_session *session = NULL;
struct lttng_viewer_attach_session_request request;
struct lttng_viewer_attach_session_response response;
struct relay_session *session = NULL;
- ret = make_viewer_streams(session, seek_type, &nb_streams, NULL, NULL);
+ ret = make_viewer_streams(session, seek_type, &nb_streams, NULL,
+ NULL, &closed);
if (ret < 0) {
goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
if (ret < 0) {
goto end_put_session;
}
response.streams_count = htobe32(nb_streams);
+ /*
+ * If the session is closed when the viewer is attaching, it
+ * means some of the streams may have been concurrently removed,
+ * so we don't allow the viewer to attach, even if there are
+ * streams available.
+ */
+ if (closed) {
+ send_streams = 0;
+ response.streams_count = 0;
+ response.status = htobe32(LTTNG_VIEWER_NEW_STREAMS_HUP);
+ goto send_reply;
+ }
+
send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));
send_reply:
health_code_update();
ret = send_response(conn->sock, &response, sizeof(response));