9b5a5b569f5bc75245f0093b32e3cb2bb9ae2dcb
[lttng-tools.git] / src / bin / lttng-sessiond / agent-thread.cpp
1 /*
2 * Copyright (C) 2013 David Goulet <dgoulet@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #define _LGPL_SOURCE
9
10 #include "agent-thread.hpp"
11 #include "agent.hpp"
12 #include "fd-limit.hpp"
13 #include "lttng-sessiond.hpp"
14 #include "session.hpp"
15 #include "thread.hpp"
16 #include "utils.hpp"
17
18 #include <common/common.hpp>
19 #include <common/compat/endian.hpp>
20 #include <common/sessiond-comm/sessiond-comm.hpp>
21 #include <common/urcu.hpp>
22 #include <common/uri.hpp>
23 #include <common/utils.hpp>
24
25 #include <fcntl.h>
26
27 namespace {
28 struct thread_notifiers {
29 struct lttng_pipe *quit_pipe;
30 sem_t ready;
31 };
32
33 struct agent_app_id {
34 pid_t pid;
35 enum lttng_domain_type domain;
36 };
37
38 struct agent_protocol_version {
39 unsigned int major, minor;
40 };
41
42 int agent_tracing_enabled = -1;
43
44 /*
45 * Note that there is not port here. It's set after this URI is parsed so we
46 * can let the user define a custom one. However, localhost is ALWAYS the
47 * default listening address.
48 */
49 const char *default_reg_uri = "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS;
50 } /* namespace */
51
52 /*
53 * Update agent application using the given socket. This is done just after
54 * registration was successful.
55 *
56 * This will acquire the various sessions' lock; none must be held by the
57 * caller.
58 * The caller must hold the session list lock.
59 */
60 static void update_agent_app(const struct agent_app *app)
61 {
62 struct ltt_session_list *list;
63
64 list = session_get_list();
65 LTTNG_ASSERT(list);
66
67 for (auto *session :
68 lttng::urcu::list_iteration_adapter<ltt_session, &ltt_session::list>(list->head)) {
69 if (!session_get(session)) {
70 continue;
71 }
72
73 session_lock(session);
74 if (session->ust_session) {
75 const struct agent *agt;
76
77 const lttng::urcu::read_lock_guard read_lock;
78 agt = trace_ust_find_agent(session->ust_session, app->domain);
79 if (agt) {
80 agent_update(agt, app);
81 }
82 }
83 session_unlock(session);
84 session_put(session);
85 }
86
87 /*
88 * We are protected against the addition of new events by the session
89 * list lock being held.
90 */
91 for (auto *trigger_agent :
92 lttng::urcu::lfht_iteration_adapter<agent, decltype(agent::node), &agent::node>(
93 *the_trigger_agents_ht_by_domain->ht)) {
94 agent_update(trigger_agent, app);
95 }
96 }
97
98 /*
99 * Create and init socket from uri.
100 */
101 static struct lttcomm_sock *init_tcp_socket()
102 {
103 int ret;
104 struct lttng_uri *uri = nullptr;
105 struct lttcomm_sock *sock = nullptr;
106 unsigned int port;
107 bool bind_succeeded = false;
108
109 /*
110 * This should never fail since the URI is hardcoded and the port is set
111 * before this thread is launched.
112 */
113 ret = uri_parse(default_reg_uri, &uri);
114 LTTNG_ASSERT(ret);
115 LTTNG_ASSERT(the_config.agent_tcp_port.begin > 0);
116 uri->port = the_config.agent_tcp_port.begin;
117
118 sock = lttcomm_alloc_sock_from_uri(uri);
119 uri_free(uri);
120 if (sock == nullptr) {
121 ERR("agent allocating TCP socket");
122 goto error;
123 }
124
125 ret = lttcomm_create_sock(sock);
126 if (ret < 0) {
127 goto error;
128 }
129
130 for (port = the_config.agent_tcp_port.begin; port <= the_config.agent_tcp_port.end;
131 port++) {
132 ret = lttcomm_sock_set_port(sock, (uint16_t) port);
133 if (ret) {
134 ERR("Failed to set port %u on socket", port);
135 goto error;
136 }
137 DBG3("Trying to bind on port %u", port);
138 ret = sock->ops->bind(sock);
139 if (!ret) {
140 bind_succeeded = true;
141 break;
142 }
143
144 if (errno == EADDRINUSE) {
145 DBG("Failed to bind to port %u since it is already in use", port);
146 } else {
147 PERROR("Failed to bind to port %u", port);
148 goto error;
149 }
150 }
151
152 if (!bind_succeeded) {
153 if (the_config.agent_tcp_port.begin == the_config.agent_tcp_port.end) {
154 WARN("Another process is already using the agent port %i. "
155 "Agent support will be deactivated.",
156 the_config.agent_tcp_port.begin);
157 goto error;
158 } else {
159 WARN("All ports in the range [%i, %i] are already in use. "
160 "Agent support will be deactivated.",
161 the_config.agent_tcp_port.begin,
162 the_config.agent_tcp_port.end);
163 goto error;
164 }
165 }
166
167 ret = sock->ops->listen(sock, -1);
168 if (ret < 0) {
169 goto error;
170 }
171
172 DBG("Listening on TCP port %u and socket %d", port, sock->fd);
173
174 return sock;
175
176 error:
177 if (sock) {
178 lttcomm_destroy_sock(sock);
179 }
180 return nullptr;
181 }
182
183 /*
184 * Close and destroy the given TCP socket.
185 */
186 static void destroy_tcp_socket(struct lttcomm_sock *sock)
187 {
188 int ret;
189 uint16_t port;
190
191 LTTNG_ASSERT(sock);
192
193 ret = lttcomm_sock_get_port(sock, &port);
194 if (ret) {
195 ERR("Failed to get port of agent TCP socket");
196 port = 0;
197 }
198
199 DBG3("Destroy TCP socket on port %" PRIu16, port);
200
201 /* This will return gracefully if fd is invalid. */
202 sock->ops->close(sock);
203 lttcomm_destroy_sock(sock);
204 }
205
206 static const char *domain_type_str(enum lttng_domain_type domain_type)
207 {
208 switch (domain_type) {
209 case LTTNG_DOMAIN_NONE:
210 return "none";
211 case LTTNG_DOMAIN_KERNEL:
212 return "kernel";
213 case LTTNG_DOMAIN_UST:
214 return "ust";
215 case LTTNG_DOMAIN_JUL:
216 return "jul";
217 case LTTNG_DOMAIN_LOG4J:
218 return "log4j";
219 case LTTNG_DOMAIN_LOG4J2:
220 return "log4j2";
221 case LTTNG_DOMAIN_PYTHON:
222 return "python";
223 default:
224 return "unknown";
225 }
226 }
227
228 static bool is_agent_protocol_version_supported(const struct agent_protocol_version *version)
229 {
230 const bool is_supported = version->major == AGENT_MAJOR_VERSION &&
231 version->minor == AGENT_MINOR_VERSION;
232
233 if (!is_supported) {
234 WARN("Refusing agent connection: unsupported protocol version %ui.%ui, expected %i.%i",
235 version->major,
236 version->minor,
237 AGENT_MAJOR_VERSION,
238 AGENT_MINOR_VERSION);
239 }
240
241 return is_supported;
242 }
243
244 /*
245 * Handle a new agent connection on the registration socket.
246 *
247 * Returns 0 on success, or else a negative errno value.
248 * On success, the resulting socket is returned through `agent_app_socket`
249 * and the application's reported id is updated through `agent_app_id`.
250 */
251 static int accept_agent_connection(struct lttcomm_sock *reg_sock,
252 struct agent_app_id *agent_app_id,
253 struct lttcomm_sock **agent_app_socket)
254 {
255 int ret;
256 struct agent_protocol_version agent_version;
257 ssize_t size;
258 struct agent_register_msg msg;
259 struct lttcomm_sock *new_sock;
260
261 LTTNG_ASSERT(reg_sock);
262
263 new_sock = reg_sock->ops->accept(reg_sock);
264 if (!new_sock) {
265 ret = -ENOTCONN;
266 goto end;
267 }
268
269 size = new_sock->ops->recvmsg(new_sock, &msg, sizeof(msg), 0);
270 if (size < sizeof(msg)) {
271 if (size < 0) {
272 PERROR("Failed to register new agent application");
273 } else if (size != 0) {
274 ERR("Failed to register new agent application: invalid registration message length: expected length = %zu, message length = %zd",
275 sizeof(msg),
276 size);
277 } else {
278 DBG("Failed to register new agent application: connection closed");
279 }
280 ret = -EINVAL;
281 goto error_close_socket;
282 }
283
284 agent_version = (struct agent_protocol_version){
285 be32toh(msg.major_version),
286 be32toh(msg.minor_version),
287 };
288
289 /* Test communication protocol version of the registering agent. */
290 if (!is_agent_protocol_version_supported(&agent_version)) {
291 ret = -EINVAL;
292 goto error_close_socket;
293 }
294
295 *agent_app_id = (struct agent_app_id){
296 .pid = (pid_t) be32toh(msg.pid),
297 .domain = (lttng_domain_type) be32toh(msg.domain),
298 };
299
300 DBG2("New registration for agent application: pid = %ld, domain = %s, socket fd = %d",
301 (long) agent_app_id->pid,
302 domain_type_str(agent_app_id->domain),
303 new_sock->fd);
304
305 *agent_app_socket = new_sock;
306 new_sock = nullptr;
307 ret = 0;
308 goto end;
309
310 error_close_socket:
311 new_sock->ops->close(new_sock);
312 lttcomm_destroy_sock(new_sock);
313 end:
314 return ret;
315 }
316
317 bool agent_tracing_is_enabled()
318 {
319 int enabled;
320
321 enabled = uatomic_read(&agent_tracing_enabled);
322 LTTNG_ASSERT(enabled != -1);
323 return enabled == 1;
324 }
325
326 /*
327 * Write agent TCP port using the rundir.
328 */
329 static int write_agent_port(uint16_t port)
330 {
331 return utils_create_pid_file((pid_t) port, the_config.agent_port_file_path.value);
332 }
333
334 static void mark_thread_as_ready(struct thread_notifiers *notifiers)
335 {
336 DBG("Marking agent management thread as ready");
337 sem_post(&notifiers->ready);
338 }
339
340 static void wait_until_thread_is_ready(struct thread_notifiers *notifiers)
341 {
342 DBG("Waiting for agent management thread to be ready");
343 sem_wait(&notifiers->ready);
344 DBG("Agent management thread is ready");
345 }
346
347 /*
348 * This thread manage application notify communication.
349 */
350 static void *thread_agent_management(void *data)
351 {
352 int i, ret;
353 uint32_t nb_fd;
354 struct lttng_poll_event events;
355 struct lttcomm_sock *reg_sock;
356 struct thread_notifiers *notifiers = (thread_notifiers *) data;
357 const auto thread_quit_pipe_fd = lttng_pipe_get_readfd(notifiers->quit_pipe);
358
359 DBG("Manage agent application registration.");
360
361 rcu_register_thread();
362 rcu_thread_online();
363
364 /* Agent initialization call MUST be called before starting the thread. */
365 LTTNG_ASSERT(the_agent_apps_ht_by_sock);
366
367 /* Create pollset with size 2, quit pipe and registration socket. */
368 ret = lttng_poll_create(&events, 2, LTTNG_CLOEXEC);
369 if (ret < 0) {
370 goto error_poll_create;
371 }
372
373 ret = lttng_poll_add(&events, thread_quit_pipe_fd, LPOLLIN);
374 if (ret < 0) {
375 goto error_tcp_socket;
376 }
377
378 reg_sock = init_tcp_socket();
379 if (reg_sock) {
380 uint16_t port;
381
382 ret = lttcomm_sock_get_port(reg_sock, &port);
383 LTTNG_ASSERT(ret == 0);
384
385 ret = write_agent_port(port);
386 if (ret) {
387 ERR("Failed to create agent port file: agent tracing will be unavailable");
388 /* Don't prevent the launch of the sessiond on error. */
389 mark_thread_as_ready(notifiers);
390 goto error;
391 }
392 } else {
393 /* Don't prevent the launch of the sessiond on error. */
394 mark_thread_as_ready(notifiers);
395 goto error_tcp_socket;
396 }
397
398 /*
399 * Signal that the agent thread is ready. The command thread
400 * may start to query whether or not agent tracing is enabled.
401 */
402 uatomic_set(&agent_tracing_enabled, 1);
403 mark_thread_as_ready(notifiers);
404
405 /* Add TCP socket to the poll set. */
406 ret = lttng_poll_add(&events, reg_sock->fd, LPOLLIN | LPOLLRDHUP);
407 if (ret < 0) {
408 goto error;
409 }
410
411 while (true) {
412 DBG3("Manage agent polling");
413
414 /* Inifinite blocking call, waiting for transmission */
415 restart:
416 ret = lttng_poll_wait(&events, -1);
417 DBG3("Manage agent return from poll on %d fds", LTTNG_POLL_GETNB(&events));
418 if (ret < 0) {
419 /*
420 * Restart interrupted system call.
421 */
422 if (errno == EINTR) {
423 goto restart;
424 }
425 goto error;
426 }
427 nb_fd = ret;
428 DBG3("%d fd ready", nb_fd);
429
430 for (i = 0; i < nb_fd; i++) {
431 /* Fetch once the poll data */
432 const auto revents = LTTNG_POLL_GETEV(&events, i);
433 const auto pollfd = LTTNG_POLL_GETFD(&events, i);
434
435 /* Activity on thread quit pipe, exiting. */
436 if (pollfd == thread_quit_pipe_fd) {
437 DBG("Activity on thread quit pipe");
438 goto exit;
439 }
440
441 /* Activity on the registration socket. */
442 if (revents & LPOLLIN) {
443 struct agent_app_id new_app_id;
444 struct agent_app *new_app = nullptr;
445 struct lttcomm_sock *new_app_socket;
446 int new_app_socket_fd;
447
448 LTTNG_ASSERT(pollfd == reg_sock->fd);
449
450 ret = accept_agent_connection(
451 reg_sock, &new_app_id, &new_app_socket);
452 if (ret < 0) {
453 /* Errors are already logged. */
454 continue;
455 }
456
457 /*
458 * new_app_socket's ownership has been
459 * transferred to the new agent app.
460 */
461 new_app = agent_create_app(
462 new_app_id.pid, new_app_id.domain, new_app_socket);
463 if (!new_app) {
464 new_app_socket->ops->close(new_app_socket);
465 continue;
466 }
467 new_app_socket_fd = new_app_socket->fd;
468 new_app_socket = nullptr;
469
470 /*
471 * Since this is a command socket (write then
472 * read), only add poll error event to only
473 * detect shutdown.
474 */
475 ret = lttng_poll_add(&events, new_app_socket_fd, LPOLLRDHUP);
476 if (ret < 0) {
477 agent_destroy_app(new_app);
478 continue;
479 }
480
481 /*
482 * Prevent sessions from being modified while
483 * the agent application's configuration is
484 * updated.
485 */
486 const auto list_lock = lttng::sessiond::lock_session_list();
487
488 /*
489 * Update the newly registered applications's
490 * configuration.
491 */
492 update_agent_app(new_app);
493
494 ret = agent_send_registration_done(new_app);
495 if (ret < 0) {
496 agent_destroy_app(new_app);
497 /* Removing from the poll set. */
498 ret = lttng_poll_del(&events, new_app_socket_fd);
499 if (ret < 0) {
500 goto error;
501 }
502 continue;
503 }
504
505 /* Publish the new agent app. */
506 agent_add_app(new_app);
507 } else if (revents & (LPOLLERR | LPOLLHUP | LPOLLRDHUP)) {
508 /* Removing from the poll set */
509 ret = lttng_poll_del(&events, pollfd);
510 if (ret < 0) {
511 goto error;
512 }
513 agent_destroy_app_by_sock(pollfd);
514 } else {
515 ERR("Unexpected poll events %u for sock %d", revents, pollfd);
516 goto error;
517 }
518 }
519 }
520
521 exit:
522 /* Whatever happens, try to delete it and exit. */
523 (void) lttng_poll_del(&events, reg_sock->fd);
524 error:
525 destroy_tcp_socket(reg_sock);
526 error_tcp_socket:
527 lttng_poll_clean(&events);
528 error_poll_create:
529 uatomic_set(&agent_tracing_enabled, 0);
530 DBG("Cleaning up and stopping.");
531 rcu_thread_offline();
532 rcu_unregister_thread();
533 return nullptr;
534 }
535
536 static bool shutdown_agent_management_thread(void *data)
537 {
538 struct thread_notifiers *notifiers = (thread_notifiers *) data;
539 const int write_fd = lttng_pipe_get_writefd(notifiers->quit_pipe);
540
541 return notify_thread_pipe(write_fd) == 1;
542 }
543
544 static void cleanup_agent_management_thread(void *data)
545 {
546 struct thread_notifiers *notifiers = (thread_notifiers *) data;
547
548 lttng_pipe_destroy(notifiers->quit_pipe);
549 sem_destroy(&notifiers->ready);
550 free(notifiers);
551 }
552
553 bool launch_agent_management_thread()
554 {
555 struct thread_notifiers *notifiers;
556 struct lttng_thread *thread;
557
558 notifiers = zmalloc<thread_notifiers>();
559 if (!notifiers) {
560 goto error_alloc;
561 }
562
563 sem_init(&notifiers->ready, 0, 0);
564 notifiers->quit_pipe = lttng_pipe_open(FD_CLOEXEC);
565 if (!notifiers->quit_pipe) {
566 goto error;
567 }
568 thread = lttng_thread_create("Agent management",
569 thread_agent_management,
570 shutdown_agent_management_thread,
571 cleanup_agent_management_thread,
572 notifiers);
573 if (!thread) {
574 goto error;
575 }
576 wait_until_thread_is_ready(notifiers);
577 lttng_thread_put(thread);
578 return true;
579 error:
580 cleanup_agent_management_thread(notifiers);
581 error_alloc:
582 return false;
583 }
This page took 0.041022 seconds and 5 git commands to generate.