2 * Copyright (C) 2013 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
4 * Copyright (C) 2015 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
6 * SPDX-License-Identifier: GPL-2.0-only
11 #include "ctf-trace.hpp"
13 #include "lttng-relayd.hpp"
14 #include "session.hpp"
16 #include "viewer-session.hpp"
17 #include "viewer-stream.hpp"
19 #include <common/common.hpp>
20 #include <common/urcu.hpp>
22 #include <urcu/rculist.h>
24 /* Global session id used in the session creation. */
25 static uint64_t last_viewer_session_id
;
26 static pthread_mutex_t last_viewer_session_id_lock
= PTHREAD_MUTEX_INITIALIZER
;
28 struct relay_viewer_session
*viewer_session_create()
30 struct relay_viewer_session
*vsession
;
32 vsession
= zmalloc
<relay_viewer_session
>();
36 pthread_mutex_lock(&last_viewer_session_id_lock
);
37 vsession
->id
= ++last_viewer_session_id
;
38 pthread_mutex_unlock(&last_viewer_session_id_lock
);
39 CDS_INIT_LIST_HEAD(&vsession
->session_list
);
40 CDS_INIT_LIST_HEAD(&vsession
->unannounced_stream_list
);
41 pthread_mutex_init(&vsession
->unannounced_stream_list_lock
, nullptr);
42 lttng_ht_node_init_u64(&vsession
->viewer_session_n
, vsession
->id
);
43 lttng_ht_add_unique_u64(viewer_sessions_ht
, &vsession
->viewer_session_n
);
48 int viewer_session_set_trace_chunk_copy(struct relay_viewer_session
*vsession
,
49 struct lttng_trace_chunk
*relay_session_trace_chunk
)
52 struct lttng_trace_chunk
*viewer_chunk
;
54 lttng_trace_chunk_put(vsession
->current_trace_chunk
);
55 vsession
->current_trace_chunk
= nullptr;
57 DBG("Copying relay session's current trace chunk to the viewer session");
58 if (!relay_session_trace_chunk
) {
62 viewer_chunk
= lttng_trace_chunk_copy(relay_session_trace_chunk
);
64 ERR("Failed to create a viewer trace chunk from the relay session's current chunk");
69 vsession
->current_trace_chunk
= viewer_chunk
;
74 /* The existence of session must be guaranteed by the caller. */
75 enum lttng_viewer_attach_return_code
viewer_session_attach(struct relay_viewer_session
*vsession
,
76 struct relay_session
*session
)
78 enum lttng_viewer_attach_return_code viewer_attach_status
= LTTNG_VIEWER_ATTACH_OK
;
80 ASSERT_LOCKED(session
->lock
);
82 /* Will not fail, as per the ownership guarantee. */
83 if (!session_get(session
)) {
84 viewer_attach_status
= LTTNG_VIEWER_ATTACH_UNK
;
87 if (session
->viewer_attached
) {
88 viewer_attach_status
= LTTNG_VIEWER_ATTACH_ALREADY
;
92 session
->viewer_attached
= true;
94 ret
= viewer_session_set_trace_chunk_copy(vsession
, session
->current_trace_chunk
);
97 * The live protocol does not define a generic error
98 * value for the "attach" command. The "unknown"
99 * status is used so that the viewer may handle this
100 * failure as if the session didn't exist anymore.
102 DBG("Failed to create a viewer trace chunk from the current trace chunk of session \"%s\", returning LTTNG_VIEWER_ATTACH_UNK",
103 session
->session_name
);
104 viewer_attach_status
= LTTNG_VIEWER_ATTACH_UNK
;
108 if (viewer_attach_status
== LTTNG_VIEWER_ATTACH_OK
) {
109 pthread_mutex_lock(&vsession
->session_list_lock
);
110 /* Ownership is transfered to the list. */
111 cds_list_add_rcu(&session
->viewer_session_node
, &vsession
->session_list
);
112 pthread_mutex_unlock(&vsession
->session_list_lock
);
115 * Immediately create new viewer streams for the attached session
116 * so that the viewer streams hold a reference on the any relay
117 * streams that could be unpublished between now and the next
118 * GET_NEW_STREAMS command from the live viewer.
120 uint32_t created
= 0;
124 const int make_viewer_streams_ret
= make_viewer_streams(session
,
126 LTTNG_VIEWER_SEEK_BEGINNING
,
132 if (make_viewer_streams_ret
== 0) {
133 DBG("Created %d new viewer streams while attaching to relay session %" PRIu64
,
138 * Warning, since the creation of the streams will be retried when
139 * the viewer next sends the GET_NEW_STREAMS commands.
141 WARN("Failed to create new viewer streams while attaching to relay session %" PRIu64
142 ", ret=%d, total=%d, unsent=%d, created=%d, closed=%d",
144 make_viewer_streams_ret
,
151 /* Put our local ref. */
152 session_put(session
);
155 return viewer_attach_status
;
158 /* The existence of session must be guaranteed by the caller. */
159 static int viewer_session_detach(struct relay_viewer_session
*vsession
,
160 struct relay_session
*session
)
164 pthread_mutex_lock(&session
->lock
);
165 if (!session
->viewer_attached
) {
168 session
->viewer_attached
= false;
172 pthread_mutex_lock(&vsession
->session_list_lock
);
173 cds_list_del_rcu(&session
->viewer_session_node
);
174 pthread_mutex_unlock(&vsession
->session_list_lock
);
175 /* Release reference held by the list. */
176 session_put(session
);
179 /* Safe since we know the session exists. */
180 pthread_mutex_unlock(&session
->lock
);
184 void viewer_session_destroy(struct relay_viewer_session
*vsession
)
186 struct lttng_ht_iter iter
;
188 LTTNG_ASSERT(cds_list_empty(&vsession
->unannounced_stream_list
));
190 iter
.iter
.node
= &vsession
->viewer_session_n
.node
;
191 lttng_ht_del(viewer_sessions_ht
, &iter
);
192 lttng_trace_chunk_put(vsession
->current_trace_chunk
);
197 * Release ownership of all the streams of one session and detach the viewer.
199 void viewer_session_close_one_session(struct relay_viewer_session
*vsession
,
200 struct relay_session
*session
)
203 * TODO: improvement: create more efficient list of
204 * vstream per session.
207 lttng::urcu::lfht_iteration_adapter
<relay_viewer_stream
,
208 decltype(relay_viewer_stream::stream_n
),
209 &relay_viewer_stream::stream_n
>(
210 *viewer_streams_ht
->ht
)) {
211 if (!viewer_stream_get(vstream
)) {
215 if (vstream
->stream
->trace
->session
!= session
) {
216 viewer_stream_put(vstream
);
219 /* Put local reference. */
220 viewer_stream_put(vstream
);
222 * We have reached one of the viewer stream's lifetime
223 * end condition. This "put" will cause the proper
224 * teardown of the viewer stream.
226 viewer_stream_put(vstream
);
230 lttng::urcu::rcu_list_iteration_adapter
<relay_viewer_stream
,
231 &relay_viewer_stream::viewer_stream_node
>(
232 vsession
->unannounced_stream_list
)) {
233 if (!viewer_stream_get(vstream
)) {
236 if (vstream
->stream
->trace
->session
!= session
) {
237 viewer_stream_put(vstream
);
240 pthread_mutex_lock(&vsession
->unannounced_stream_list_lock
);
241 cds_list_del_rcu(&vstream
->viewer_stream_node
);
242 pthread_mutex_unlock(&vsession
->unannounced_stream_list_lock
);
243 /* Local reference */
244 viewer_stream_put(vstream
);
245 /* Reference from unannounced_stream_list */
246 viewer_stream_put(vstream
);
249 lttng_trace_chunk_put(vsession
->current_trace_chunk
);
250 vsession
->current_trace_chunk
= nullptr;
251 viewer_session_detach(vsession
, session
);
254 void viewer_session_close(struct relay_viewer_session
*vsession
)
257 lttng::urcu::rcu_list_iteration_adapter
<relay_session
,
258 &relay_session::viewer_session_node
>(
259 vsession
->session_list
)) {
260 viewer_session_close_one_session(vsession
, session
);
265 * Check if a connection is attached to a session.
266 * Return 1 if attached, 0 if not attached, a negative value on error.
268 int viewer_session_is_attached(struct relay_viewer_session
*vsession
, struct relay_session
*session
)
272 pthread_mutex_lock(&session
->lock
);
276 if (!session
->viewer_attached
) {
280 for (auto *session_it
:
281 lttng::urcu::rcu_list_iteration_adapter
<relay_session
,
282 &relay_session::viewer_session_node
>(
283 vsession
->session_list
)) {
284 if (session
== session_it
) {
291 pthread_mutex_unlock(&session
->lock
);