Use compiler-agnostic defines to silence warning
[lttng-tools.git] / src / bin / lttng-sessiond / rotation-thread.cpp
CommitLineData
db66e574 1/*
ab5be9fa
MJ
2 * Copyright (C) 2017 Julien Desfossez <jdesfossez@efficios.com>
3 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
db66e574 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
db66e574 6 *
db66e574
JD
7 */
8
9#define _LGPL_SOURCE
28ab034a
JG
10#include "cmd.hpp"
11#include "health-sessiond.hpp"
12#include "lttng-sessiond.hpp"
13#include "notification-thread-commands.hpp"
28ab034a
JG
14#include "rotation-thread.hpp"
15#include "session.hpp"
16#include "thread.hpp"
17#include "timer.hpp"
18#include "utils.hpp"
19
20#include <common/align.hpp>
c9e313bc
SM
21#include <common/config/session-config.hpp>
22#include <common/defaults.hpp>
28ab034a 23#include <common/error.hpp>
0038180d
JG
24#include <common/eventfd.hpp>
25#include <common/exception.hpp>
26#include <common/file-descriptor.hpp>
27#include <common/format.hpp>
c9e313bc 28#include <common/futex.hpp>
c9e313bc 29#include <common/hashtable/utils.hpp>
c9e313bc 30#include <common/kernel-ctl/kernel-ctl.hpp>
0038180d
JG
31#include <common/locked-reference.hpp>
32#include <common/make-unique-wrapper.hpp>
33#include <common/pthread-lock.hpp>
34#include <common/scope-exit.hpp>
28ab034a 35#include <common/time.hpp>
56047f5a 36#include <common/urcu.hpp>
28ab034a
JG
37#include <common/utils.hpp>
38
0038180d 39#include <lttng/action/action-internal.hpp>
c9e313bc 40#include <lttng/condition/condition-internal.hpp>
28ab034a
JG
41#include <lttng/location-internal.hpp>
42#include <lttng/notification/channel-internal.hpp>
c08136a3 43#include <lttng/notification/notification-internal.hpp>
28ab034a
JG
44#include <lttng/rotate-internal.hpp>
45#include <lttng/trigger/trigger.h>
db66e574 46
671e39d7 47#include <fcntl.h>
28ab034a 48#include <inttypes.h>
0038180d 49#include <memory>
28ab034a 50#include <signal.h>
dc65dda3 51#include <sys/eventfd.h>
28ab034a
JG
52#include <sys/stat.h>
53#include <time.h>
db66e574
JD
54#include <urcu.h>
55#include <urcu/list.h>
db66e574 56
0038180d 57namespace ls = lttng::sessiond;
db66e574 58
92816cc3
JG
59/*
60 * The timer thread enqueues jobs and wakes up the rotation thread.
61 * When the rotation thread wakes up, it empties the queue.
62 */
0038180d 63struct ls::rotation_thread_timer_queue {
92816cc3
JG
64 struct lttng_pipe *event_pipe;
65 struct cds_list_head list;
66 pthread_mutex_t lock;
67};
68
f1494934
JG
69namespace {
70struct rotation_thread_job {
303ac4ed
JG
71 using uptr =
72 std::unique_ptr<rotation_thread_job,
73 lttng::memory::create_deleter_class<rotation_thread_job,
74 lttng::memory::free>::deleter>;
0038180d
JG
75
76 enum ls::rotation_thread_job_type type;
f1494934
JG
77 struct ltt_session *session;
78 /* List member in struct rotation_thread_timer_queue. */
79 struct cds_list_head head;
80};
f1494934 81
0038180d 82const char *get_job_type_str(enum ls::rotation_thread_job_type job_type)
db66e574 83{
92816cc3 84 switch (job_type) {
0038180d 85 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
92816cc3 86 return "CHECK_PENDING_ROTATION";
0038180d 87 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
92816cc3
JG
88 return "SCHEDULED_ROTATION";
89 default:
90 abort();
91 }
db66e574
JD
92}
93
92816cc3
JG
94/*
95 * Called with the rotation_thread_timer_queue lock held.
96 * Return true if the same timer job already exists in the queue, false if not.
97 */
0038180d
JG
98bool timer_job_exists(const ls::rotation_thread_timer_queue *queue,
99 ls::rotation_thread_job_type job_type,
100 ltt_session *session)
92816cc3
JG
101{
102 bool exists = false;
103 struct rotation_thread_job *job;
104
28ab034a 105 cds_list_for_each_entry (job, &queue->list, head) {
c7031a2c 106 if (job->session == session && job->type == job_type) {
92816cc3
JG
107 exists = true;
108 goto end;
db66e574 109 }
db66e574 110 }
92816cc3
JG
111end:
112 return exists;
113}
114
a0a4f314
JG
115void check_session_rotation_pending_on_consumers(const ltt_session::locked_ref& session,
116 bool& _rotation_completed)
92816cc3 117{
db582e11 118 int ret = 0;
d2956687
JG
119 enum consumer_trace_chunk_exists_status exists_status;
120 uint64_t relayd_id;
121 bool chunk_exists_on_peer = false;
122 enum lttng_trace_chunk_status chunk_status;
07c4863f 123 const lttng::urcu::read_lock_guard read_lock;
d2956687 124
a0a4f314 125 LTTNG_ASSERT(session->chunk_being_archived);
92816cc3
JG
126
127 /*
128 * Check for a local pending rotation on all consumers (32-bit
129 * user space, 64-bit user space, and kernel).
130 */
a0a4f314 131 if (!session->ust_session) {
92816cc3
JG
132 goto skip_ust;
133 }
56047f5a 134
8f75fc59
JG
135 for (auto *socket : lttng::urcu::lfht_iteration_adapter<consumer_socket,
136 decltype(consumer_socket::node),
137 &consumer_socket::node>(
138 *session->ust_session->consumer->socks->ht)) {
a0a4f314 139 relayd_id = session->ust_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 140 -1ULL :
a0a4f314 141 session->ust_session->consumer->net_seq_index;
d2956687 142
07c4863f 143 const lttng::pthread::lock_guard socket_lock(*socket->lock);
d2956687 144 ret = consumer_trace_chunk_exists(socket,
28ab034a 145 relayd_id,
a0a4f314
JG
146 session->id,
147 session->chunk_being_archived,
28ab034a 148 &exists_status);
d2956687 149 if (ret) {
83ed9e90 150 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3 151 goto end;
db66e574 152 }
d2956687 153
16100d7a 154 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
155 chunk_exists_on_peer = true;
156 goto end;
16100d7a 157 }
16100d7a 158 }
db66e574 159
92816cc3 160skip_ust:
a0a4f314 161 if (!session->kernel_session) {
92816cc3 162 goto skip_kernel;
db66e574 163 }
0038180d 164
8f75fc59
JG
165 for (auto *socket : lttng::urcu::lfht_iteration_adapter<consumer_socket,
166 decltype(consumer_socket::node),
167 &consumer_socket::node>(
168 *session->kernel_session->consumer->socks->ht)) {
07c4863f 169 const lttng::pthread::lock_guard socket_lock(*socket->lock);
0038180d 170
a0a4f314 171 relayd_id = session->kernel_session->consumer->type == CONSUMER_DST_LOCAL ?
28ab034a 172 -1ULL :
a0a4f314 173 session->kernel_session->consumer->net_seq_index;
d2956687
JG
174
175 ret = consumer_trace_chunk_exists(socket,
28ab034a 176 relayd_id,
a0a4f314
JG
177 session->id,
178 session->chunk_being_archived,
28ab034a 179 &exists_status);
d2956687 180 if (ret) {
83ed9e90 181 ERR("Error occurred while checking rotation status on consumer daemon");
92816cc3
JG
182 goto end;
183 }
d2956687 184
16100d7a 185 if (exists_status != CONSUMER_TRACE_CHUNK_EXISTS_STATUS_UNKNOWN_CHUNK) {
d2956687
JG
186 chunk_exists_on_peer = true;
187 goto end;
16100d7a 188 }
92816cc3
JG
189 }
190skip_kernel:
191end:
db66e574 192
d2956687
JG
193 if (!chunk_exists_on_peer) {
194 uint64_t chunk_being_archived_id;
195
a0a4f314 196 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
28ab034a 197 &chunk_being_archived_id);
a0377dfe 198 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
28ab034a
JG
199 DBG("Rotation of trace archive %" PRIu64
200 " of session \"%s\" is complete on all consumers",
201 chunk_being_archived_id,
a0a4f314 202 session->name);
db66e574 203 }
0038180d
JG
204
205 _rotation_completed = !chunk_exists_on_peer;
92816cc3 206 if (ret) {
28ab034a 207 ret = session_reset_rotation_state(session, LTTNG_ROTATION_STATE_ERROR);
2961f09e 208 if (ret) {
a0a4f314 209 ERR("Failed to reset rotation state of session \"%s\"", session->name);
2961f09e 210 }
db66e574 211 }
db66e574
JD
212}
213
d88744a4 214/*
92816cc3 215 * Check if the last rotation was completed, called with session lock held.
d2956687
JG
216 * Should only return non-zero in the event of a fatal error. Doing so will
217 * shutdown the thread.
d88744a4 218 */
a0a4f314 219int check_session_rotation_pending(const ltt_session::locked_ref& session,
0038180d 220 notification_thread_handle& notification_thread_handle)
d88744a4
JD
221{
222 int ret;
92816cc3 223 struct lttng_trace_archive_location *location;
d2956687
JG
224 enum lttng_trace_chunk_status chunk_status;
225 bool rotation_completed = false;
226 const char *archived_chunk_name;
227 uint64_t chunk_being_archived_id;
228
a0a4f314 229 if (!session->chunk_being_archived) {
dc1d5967
FD
230 ret = 0;
231 goto end;
232 }
233
28ab034a 234 chunk_status =
a0a4f314 235 lttng_trace_chunk_get_id(session->chunk_being_archived, &chunk_being_archived_id);
a0377dfe 236 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d88744a4 237
bd0514a5 238 DBG("Checking for pending rotation on session \"%s\", trace archive %" PRIu64,
a0a4f314 239 session->name,
28ab034a 240 chunk_being_archived_id);
d2956687 241
faf1bdcf
JG
242 /*
243 * The rotation-pending check timer of a session is launched in
244 * one-shot mode. If the rotation is incomplete, the rotation
245 * thread will re-enable the pending-check timer.
246 *
247 * The timer thread can't stop the timer itself since it is involved
248 * in the check for the timer's quiescence.
249 */
250 ret = timer_session_rotation_pending_check_stop(session);
251 if (ret) {
6ae1bf46 252 goto check_ongoing_rotation;
faf1bdcf
JG
253 }
254
0038180d 255 check_session_rotation_pending_on_consumers(session, rotation_completed);
a0a4f314 256 if (!rotation_completed || session->rotation_state == LTTNG_ROTATION_STATE_ERROR) {
6ae1bf46 257 goto check_ongoing_rotation;
92816cc3
JG
258 }
259
92816cc3
JG
260 /*
261 * Now we can clear the "ONGOING" state in the session. New
262 * rotations can start now.
263 */
28ab034a 264 chunk_status = lttng_trace_chunk_get_name(
a0a4f314 265 session->chunk_being_archived, &archived_chunk_name, nullptr);
a0377dfe 266 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
a0a4f314
JG
267 free(session->last_archived_chunk_name);
268 session->last_archived_chunk_name = strdup(archived_chunk_name);
269 if (!session->last_archived_chunk_name) {
d2956687
JG
270 PERROR("Failed to duplicate archived chunk name");
271 }
0038180d 272
d2956687 273 session_reset_rotation_state(session, LTTNG_ROTATION_STATE_COMPLETED);
92816cc3 274
a0a4f314
JG
275 if (!session->quiet_rotation) {
276 location = session_get_trace_archive_location(session);
7fdbed1c 277 ret = notification_thread_command_session_rotation_completed(
0038180d 278 &notification_thread_handle,
a0a4f314
JG
279 session->id,
280 session->last_archived_chunk_id.value,
28ab034a 281 location);
d3740619 282 lttng_trace_archive_location_put(location);
7fdbed1c 283 if (ret != LTTNG_OK) {
bd0514a5 284 ERR("Failed to notify notification thread of completed rotation for session %s",
a0a4f314 285 session->name);
7fdbed1c 286 }
92816cc3
JG
287 }
288
289 ret = 0;
6ae1bf46 290check_ongoing_rotation:
a0a4f314
JG
291 if (session->rotation_state == LTTNG_ROTATION_STATE_ONGOING) {
292 chunk_status = lttng_trace_chunk_get_id(session->chunk_being_archived,
28ab034a 293 &chunk_being_archived_id);
a0377dfe 294 LTTNG_ASSERT(chunk_status == LTTNG_TRACE_CHUNK_STATUS_OK);
d2956687 295
bd0514a5 296 DBG("Rotation of trace archive %" PRIu64 " is still pending for session %s",
28ab034a 297 chunk_being_archived_id,
a0a4f314
JG
298 session->name);
299 ret = timer_session_rotation_pending_check_start(session,
28ab034a 300 DEFAULT_ROTATE_PENDING_TIMER);
92816cc3 301 if (ret) {
d2956687 302 ERR("Failed to re-enable rotation pending timer");
92816cc3
JG
303 ret = -1;
304 goto end;
305 }
306 }
307
6ae1bf46 308end:
d88744a4
JD
309 return ret;
310}
311
ed1e52a3 312/* Call with the session and session_list locks held. */
a0a4f314 313void launch_session_rotation(const ltt_session::locked_ref& session)
259c2674
JD
314{
315 int ret;
316
a0a4f314 317 DBG_FMT("Launching scheduled time-based rotation: session_name='{}'", session->name);
259c2674 318
0038180d 319 ASSERT_SESSION_LIST_LOCKED();
a0a4f314
JG
320
321 ret = cmd_rotate_session(
322 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
323 if (ret != LTTNG_OK) {
324 LTTNG_THROW_CTL(fmt::format("Failed to launch session rotation: session_name={}",
325 session->name),
326 static_cast<lttng_error_code>(ret));
92816cc3
JG
327 } else {
328 /* Don't consider errors as fatal. */
a0a4f314
JG
329 DBG_FMT("Scheduled time-based rotation aborted session_name=`{}`, error='{}'",
330 session->name,
331 lttng_strerror(ret));
259c2674 332 }
92816cc3 333}
259c2674 334
0038180d 335int run_job(const rotation_thread_job& job,
a0a4f314 336 const ltt_session::locked_ref& session,
0038180d 337 notification_thread_handle& notification_thread_handle)
92816cc3 338{
a0a4f314 339 int ret = 0;
259c2674 340
0038180d
JG
341 switch (job.type) {
342 case ls::rotation_thread_job_type::SCHEDULED_ROTATION:
a0a4f314
JG
343 try {
344 launch_session_rotation(session);
345 DBG("Scheduled time-based rotation successfully launched on session \"%s\"",
346 session->name);
347 } catch (const lttng::ctl::error& ctl_ex) {
348 /* Don't consider errors as fatal. */
349 DBG("Scheduled time-based rotation aborted for session %s: %s",
350 session->name,
351 lttng_strerror(ctl_ex.code()));
352 }
92816cc3 353 break;
0038180d 354 case ls::rotation_thread_job_type::CHECK_PENDING_ROTATION:
28ab034a 355 ret = check_session_rotation_pending(session, notification_thread_handle);
92816cc3
JG
356 break;
357 default:
358 abort();
259c2674 359 }
0038180d 360
259c2674
JD
361 return ret;
362}
363
0038180d 364bool shutdown_rotation_thread(void *thread_data)
d88744a4 365{
0038180d 366 auto *handle = reinterpret_cast<const ls::rotation_thread *>(thread_data);
d88744a4 367
0038180d
JG
368 return handle->shutdown();
369}
370} /* namespace */
d88744a4 371
0038180d
JG
372ls::rotation_thread_timer_queue *ls::rotation_thread_timer_queue_create()
373{
374 auto queue = zmalloc<ls::rotation_thread_timer_queue>();
375 if (!queue) {
376 PERROR("Failed to allocate timer rotate queue");
377 goto end;
378 }
379
380 queue->event_pipe = lttng_pipe_open(FD_CLOEXEC | O_NONBLOCK);
381 CDS_INIT_LIST_HEAD(&queue->list);
382 pthread_mutex_init(&queue->lock, nullptr);
383end:
384 return queue;
385}
386
387void ls::rotation_thread_timer_queue_destroy(struct rotation_thread_timer_queue *queue)
388{
389 if (!queue) {
390 return;
391 }
392
393 lttng_pipe_destroy(queue->event_pipe);
394
395 {
07c4863f 396 const lttng::pthread::lock_guard queue_lock(queue->lock);
0038180d
JG
397
398 LTTNG_ASSERT(cds_list_empty(&queue->list));
399 }
400
401 pthread_mutex_destroy(&queue->lock);
402 free(queue);
403}
404
28f23191
JG
405ls::rotation_thread::rotation_thread(rotation_thread_timer_queue& rotation_timer_queue,
406 notification_thread_handle& notification_thread_handle) :
83885b70
MJ
407 _rotation_timer_queue(rotation_timer_queue),
408 _notification_thread_handle(notification_thread_handle)
0038180d
JG
409{
410 _quit_pipe.reset([]() {
411 auto raw_pipe = lttng_pipe_open(FD_CLOEXEC);
412 if (!raw_pipe) {
413 LTTNG_THROW_POSIX("Failed to rotation thread's quit pipe", errno);
d88744a4 414 }
d88744a4 415
0038180d
JG
416 return raw_pipe;
417 }());
418
419 _notification_channel.reset([]() {
420 auto channel = lttng_notification_channel_create(
421 lttng_session_daemon_notification_endpoint);
422 if (!channel) {
423 LTTNG_THROW_ERROR(
424 "Failed to create notification channel of rotation thread");
425 }
426
427 return channel;
428 }());
429
430 lttng_poll_init(&_events);
431
432 /*
433 * Create pollset with size 4:
434 * - rotation thread quit pipe,
435 * - rotation thread timer queue pipe,
436 * - notification channel sock,
437 * - subscribtion change event fd
438 */
439 if (lttng_poll_create(&_events, 4, LTTNG_CLOEXEC) < 0) {
440 LTTNG_THROW_ERROR("Failed to create poll object for rotation thread");
441 }
442
443 if (lttng_poll_add(&_events, lttng_pipe_get_readfd(_quit_pipe.get()), LPOLLIN) < 0) {
444 LTTNG_THROW_ERROR("Failed to add quit pipe read fd to poll set");
445 }
446
447 if (lttng_poll_add(&_events,
448 lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe),
449 LPOLLIN) < 0) {
450 LTTNG_THROW_ERROR("Failed to add rotation timer queue event pipe fd to poll set");
451 }
452
453 if (lttng_poll_add(&_events,
454 _notification_channel_subscribtion_change_eventfd.fd(),
455 LPOLLIN) < 0) {
456 LTTNG_THROW_ERROR(
457 "Failed to add rotation thread notification channel subscription change eventfd to poll set");
458 }
459
460 if (lttng_poll_add(&_events, _notification_channel->socket, LPOLLIN) < 0) {
461 LTTNG_THROW_ERROR("Failed to add notification channel socket fd to pollset");
462 }
463}
464
465ls::rotation_thread::~rotation_thread()
466{
467 lttng_poll_clean(&_events);
468}
469
470void ls::rotation_thread_enqueue_job(ls::rotation_thread_timer_queue *queue,
28f23191
JG
471 ls::rotation_thread_job_type job_type,
472 ltt_session *session)
0038180d
JG
473{
474 const char dummy = '!';
475 struct rotation_thread_job *job = nullptr;
476 const char *job_type_str = get_job_type_str(job_type);
07c4863f 477 const lttng::pthread::lock_guard queue_lock(queue->lock);
0038180d
JG
478
479 if (timer_job_exists(queue, job_type, session)) {
480 /*
481 * This timer job is already pending, we don't need to add
482 * it.
483 */
484 return;
485 }
486
487 job = zmalloc<rotation_thread_job>();
488 if (!job) {
489 PERROR("Failed to allocate rotation thread job of type \"%s\" for session \"%s\"",
490 job_type_str,
491 session->name);
492 return;
493 }
494
495 /* No reason for this to fail as the caller must hold a reference. */
496 (void) session_get(session);
497
498 job->session = session;
499 job->type = job_type;
500 cds_list_add_tail(&job->head, &queue->list);
501
502 const int write_ret =
503 lttng_write(lttng_pipe_get_writefd(queue->event_pipe), &dummy, sizeof(dummy));
504 if (write_ret < 0) {
505 /*
506 * We do not want to block in the timer handler, the job has
507 * been enqueued in the list, the wakeup pipe is probably full,
508 * the job will be processed when the rotation_thread catches
509 * up.
510 */
511 DIAGNOSTIC_PUSH
512 DIAGNOSTIC_IGNORE_LOGICAL_OP
513 if (errno == EAGAIN || errno == EWOULDBLOCK) {
514 DIAGNOSTIC_POP
d88744a4 515 /*
0038180d
JG
516 * Not an error, but would be surprising and indicate
517 * that the rotation thread can't keep up with the
518 * current load.
d88744a4 519 */
0038180d
JG
520 DBG("Wake-up pipe of rotation thread job queue is full");
521 return;
d88744a4
JD
522 }
523
0038180d
JG
524 PERROR("Failed to wake-up the rotation thread after pushing a job of type \"%s\" for session \"%s\"",
525 job_type_str,
526 session->name);
527 return;
d88744a4 528 }
0038180d 529}
d88744a4 530
0038180d
JG
531void ls::rotation_thread::_handle_job_queue()
532{
533 for (;;) {
534 rotation_thread_job::uptr job;
535
536 {
537 /* Take the queue lock only to pop an element from the list. */
07c4863f 538 const lttng::pthread::lock_guard rotation_timer_queue_lock(
0038180d
JG
539 _rotation_timer_queue.lock);
540 if (cds_list_empty(&_rotation_timer_queue.list)) {
541 break;
542 }
d88744a4 543
0038180d
JG
544 job.reset(cds_list_first_entry(
545 &_rotation_timer_queue.list, typeof(rotation_thread_job), head));
546 cds_list_del(&job->head);
547 }
548
d9a970b7 549 const auto list_lock = lttng::sessiond::lock_session_list();
0038180d 550
77682be9 551 /* locked_ref will unlock the session and release the ref held by the job. */
0038180d 552 session_lock(job->session);
16d64977 553 auto session = ltt_session::make_locked_ref(*job->session);
0038180d 554
a0a4f314 555 if (run_job(*job, session, _notification_thread_handle)) {
0038180d
JG
556 return;
557 }
558 }
d88744a4
JD
559}
560
28f23191 561void ls::rotation_thread::_handle_notification(const lttng_notification& notification)
90936dcf
JD
562{
563 int ret = 0;
cd9adb8b 564 const char *condition_session_name = nullptr;
90936dcf
JD
565 enum lttng_condition_status condition_status;
566 enum lttng_evaluation_status evaluation_status;
567 uint64_t consumed;
0038180d
JG
568 auto *condition = lttng_notification_get_const_condition(&notification);
569 auto *evaluation = lttng_notification_get_const_evaluation(&notification);
570 const auto condition_type = lttng_condition_get_type(condition);
90936dcf
JD
571
572 if (condition_type != LTTNG_CONDITION_TYPE_SESSION_CONSUMED_SIZE) {
0038180d 573 LTTNG_THROW_ERROR("Unexpected condition type");
90936dcf
JD
574 }
575
0038180d 576 /* Fetch info to test. */
90936dcf 577 condition_status = lttng_condition_session_consumed_size_get_session_name(
28ab034a 578 condition, &condition_session_name);
90936dcf 579 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
0038180d 580 LTTNG_THROW_ERROR("Session name could not be fetched from notification");
90936dcf 581 }
0038180d 582
28ab034a
JG
583 evaluation_status =
584 lttng_evaluation_session_consumed_size_get_consumed_size(evaluation, &consumed);
90936dcf 585 if (evaluation_status != LTTNG_EVALUATION_STATUS_OK) {
0038180d 586 LTTNG_THROW_ERROR("Failed to get consumed size from evaluation");
90936dcf
JD
587 }
588
0038180d
JG
589 DBG_FMT("Handling session consumed size condition: session_name=`{}`, consumed_size={}",
590 condition_session_name,
591 consumed);
592
d9a970b7
JG
593 /*
594 * Mind the order of the declaration of list_lock vs session:
595 * the session list lock must always be released _after_ the release of
596 * a session's reference (the destruction of a ref/locked_ref) to ensure
597 * since the reference's release may unpublish the session from the list of
598 * sessions.
599 */
600 const auto list_lock = lttng::sessiond::lock_session_list();
d9a970b7 601 try {
a0a4f314
JG
602 const auto session = ltt_session::find_locked_session(condition_session_name);
603
604 if (!lttng_trigger_is_equal(session->rotate_trigger,
605 lttng_notification_get_const_trigger(&notification))) {
606 DBG("Notification does not originate from the internal size-based scheduled rotation trigger, skipping");
607 return;
608 }
609
610 unsubscribe_session_consumed_size_rotation(*session);
611
612 ret = cmd_rotate_session(
613 session, nullptr, false, LTTNG_TRACE_CHUNK_COMMAND_TYPE_MOVE_TO_COMPLETED);
614 if (ret != LTTNG_OK) {
615 switch (ret) {
616 case LTTNG_OK:
617 break;
618 case -LTTNG_ERR_ROTATION_PENDING:
619 DBG("Rotate already pending, subscribe to the next threshold value");
620 break;
621 case -LTTNG_ERR_ROTATION_MULTIPLE_AFTER_STOP:
622 DBG("Rotation already happened since last stop, subscribe to the next threshold value");
623 break;
624 case -LTTNG_ERR_ROTATION_AFTER_STOP_CLEAR:
625 DBG("Rotation already happened since last stop and clear, subscribe to the next threshold value");
626 break;
627 default:
628 LTTNG_THROW_CTL("Failed to rotate on consumed size notification",
629 static_cast<lttng_error_code>(-ret));
630 }
631 }
632
633 subscribe_session_consumed_size_rotation(*session, consumed + session->rotate_size);
d9a970b7 634 } catch (const lttng::sessiond::exceptions::session_not_found_error& ex) {
0038180d
JG
635 DBG_FMT("Failed to find session while handling notification: notification_type={}, session name=`{}`",
636 lttng_condition_type_str(condition_type),
637 condition_session_name);
eb2827a4
JG
638 /*
639 * Not a fatal error: a session can be destroyed before we get
640 * the chance to handle the notification.
641 */
0038180d 642 return;
90936dcf 643 }
90936dcf
JD
644}
645
0038180d 646void ls::rotation_thread::_handle_notification_channel_activity()
90936dcf 647{
dc65dda3 648 bool notification_pending = true;
90936dcf 649
dc65dda3
JG
650 /*
651 * A notification channel may have multiple notifications queued-up internally in
652 * its buffers. This is because a notification channel multiplexes command replies
653 * and notifications. The current protocol specifies that multiple notifications can be
654 * received before the reply to a command.
655 *
656 * In such cases, the notification channel client implementation internally queues them and
657 * provides them on the next calls to lttng_notification_channel_get_next_notification().
658 * This is correct with respect to the public API, which is intended to be used in "blocking
659 * mode".
660 *
661 * However, this internal user relies on poll/epoll to wake-up when data is available
662 * on the notification channel's socket. As such, it can't assume that a wake-up means only
663 * one notification is available for consumption since many of them may have been queued in
664 * the channel's internal buffers.
665 */
666 while (notification_pending) {
0038180d
JG
667 const auto pending_status = lttng_notification_channel_has_pending_notification(
668 _notification_channel.get(), &notification_pending);
669 if (pending_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
670 LTTNG_THROW_ERROR("Error occurred while checking for pending notification");
dc65dda3 671 }
d73ee93f 672
dc65dda3 673 if (!notification_pending) {
0038180d 674 return;
dc65dda3 675 }
d73ee93f 676
dc65dda3 677 /* Receive the next notification. */
0038180d
JG
678 lttng_notification::uptr notification;
679 enum lttng_notification_channel_status next_notification_status;
680
681 {
682 struct lttng_notification *raw_notification_ptr;
683
684 next_notification_status = lttng_notification_channel_get_next_notification(
685 _notification_channel.get(), &raw_notification_ptr);
686 notification.reset(raw_notification_ptr);
687 }
688
689 switch (next_notification_status) {
dc65dda3
JG
690 case LTTNG_NOTIFICATION_CHANNEL_STATUS_OK:
691 break;
692 case LTTNG_NOTIFICATION_CHANNEL_STATUS_NOTIFICATIONS_DROPPED:
693 WARN("Dropped notification detected on notification channel used by the rotation management thread.");
0038180d 694 return;
dc65dda3 695 case LTTNG_NOTIFICATION_CHANNEL_STATUS_CLOSED:
0038180d 696 LTTNG_THROW_ERROR("Notification channel was closed");
dc65dda3
JG
697 default:
698 /* Unhandled conditions / errors. */
0038180d 699 LTTNG_THROW_ERROR("Unknown notification channel status");
dc65dda3 700 }
90936dcf 701
0038180d 702 _handle_notification(*notification);
90936dcf 703 }
90936dcf
JD
704}
705
0038180d 706void ls::rotation_thread::_thread_function() noexcept
db66e574 707{
bd0514a5 708 DBG("Started rotation thread");
0038180d
JG
709
710 try {
711 _run();
712 } catch (const std::exception& e) {
713 ERR_FMT("Fatal rotation thread error: {}", e.what());
714 }
715
716 DBG("Thread exit");
717}
718
719void ls::rotation_thread::_run()
720{
f620cc28 721 rcu_register_thread();
0038180d
JG
722 const auto unregister_rcu_thread =
723 lttng::make_scope_exit([]() noexcept { rcu_unregister_thread(); });
724
f620cc28 725 rcu_thread_online();
0038180d
JG
726 const auto offline_rcu_thread =
727 lttng::make_scope_exit([]() noexcept { rcu_thread_offline(); });
728
412d7227 729 health_register(the_health_sessiond, HEALTH_SESSIOND_TYPE_ROTATION);
f620cc28 730 health_code_update();
0038180d
JG
731 const auto unregister_health =
732 lttng::make_scope_exit([]() noexcept { health_unregister(the_health_sessiond); });
db66e574 733
0038180d 734 const auto queue_pipe_fd = lttng_pipe_get_readfd(_rotation_timer_queue.event_pipe);
db66e574 735
db66e574 736 while (true) {
db66e574 737 health_poll_entry();
bd0514a5 738 DBG("Entering poll wait");
0038180d
JG
739 auto poll_wait_ret = lttng_poll_wait(&_events, -1);
740 DBG_FMT("Poll wait returned: ret={}", poll_wait_ret);
db66e574 741 health_poll_exit();
0038180d 742 if (poll_wait_ret < 0) {
db66e574
JD
743 /*
744 * Restart interrupted system call.
745 */
746 if (errno == EINTR) {
747 continue;
748 }
0038180d
JG
749
750 LTTNG_THROW_POSIX("Error encountered during lttng_poll_wait", errno);
db66e574
JD
751 }
752
0038180d
JG
753 const auto fd_count = poll_wait_ret;
754 for (int i = 0; i < fd_count; i++) {
755 const auto fd = LTTNG_POLL_GETFD(&_events, i);
756 const auto revents = LTTNG_POLL_GETEV(&_events, i);
db66e574 757
0038180d 758 DBG_FMT("Handling descriptor activity: fd={}, events={:b}", fd, revents);
db66e574 759
92816cc3 760 if (revents & LPOLLERR) {
f9a41357
JG
761 LTTNG_THROW_ERROR(lttng::format(
762 "Polling returned an error on fd: fd={}", fd));
92816cc3
JG
763 }
764
0038180d
JG
765 if (fd == _notification_channel->socket ||
766 fd == _notification_channel_subscribtion_change_eventfd.fd()) {
767 try {
768 _handle_notification_channel_activity();
769 } catch (const lttng::ctl::error& e) {
770 /*
771 * The only non-fatal error (rotation failed), others
772 * are caught at the top-level.
773 */
774 DBG_FMT("Control error occurred while handling activity on notification channel socket: {}",
775 e.what());
776 continue;
85e17b27 777 }
dc65dda3 778
0038180d 779 if (fd == _notification_channel_subscribtion_change_eventfd.fd()) {
28f23191
JG
780 _notification_channel_subscribtion_change_eventfd
781 .decrement();
dc65dda3 782 }
85e17b27
JG
783 } else {
784 /* Job queue or quit pipe activity. */
85e17b27
JG
785
786 /*
787 * The job queue is serviced if there is
788 * activity on the quit pipe to ensure it is
789 * flushed and all references held in the queue
790 * are released.
791 */
0038180d 792 _handle_job_queue();
64d9b072
JG
793 if (fd == queue_pipe_fd) {
794 char buf;
795
0038180d
JG
796 if (lttng_read(fd, &buf, 1) != 1) {
797 LTTNG_THROW_POSIX(
f9a41357 798 lttng::format(
0038180d
JG
799 "Failed to read from wakeup pipe: fd={}",
800 fd),
801 errno);
64d9b072
JG
802 }
803 } else {
bd0514a5 804 DBG("Quit pipe activity");
0038180d 805 return;
90936dcf 806 }
db66e574
JD
807 }
808 }
809 }
db66e574 810}
64d9b072 811
0038180d 812bool ls::rotation_thread::shutdown() const noexcept
64d9b072 813{
0038180d 814 const int write_fd = lttng_pipe_get_writefd(_quit_pipe.get());
64d9b072
JG
815
816 return notify_thread_pipe(write_fd) == 1;
817}
818
0038180d 819void ls::rotation_thread::launch_thread()
64d9b072 820{
0038180d
JG
821 auto thread = lttng_thread_create(
822 "Rotation",
823 [](void *ptr) {
824 auto handle = reinterpret_cast<rotation_thread *>(ptr);
825
826 handle->_thread_function();
827 return static_cast<void *>(nullptr);
828 },
829 shutdown_rotation_thread,
830 nullptr,
831 this);
64d9b072 832 if (!thread) {
0038180d 833 LTTNG_THROW_ERROR("Failed to launch rotation thread");
64d9b072 834 }
0038180d 835
64d9b072 836 lttng_thread_put(thread);
0038180d
JG
837}
838
839void ls::rotation_thread::subscribe_session_consumed_size_rotation(ltt_session& session,
28f23191 840 std::uint64_t size)
0038180d
JG
841{
842 const struct lttng_credentials session_creds = {
843 .uid = LTTNG_OPTIONAL_INIT_VALUE(session.uid),
844 .gid = LTTNG_OPTIONAL_INIT_VALUE(session.gid),
845 };
846
d9a970b7 847 ASSERT_LOCKED(session._lock);
0038180d
JG
848
849 auto rotate_condition = lttng::make_unique_wrapper<lttng_condition, lttng_condition_put>(
850 lttng_condition_session_consumed_size_create());
851 if (!rotate_condition) {
852 LTTNG_THROW_POSIX("Failed to create session consumed size condition object", errno);
853 }
854
855 auto condition_status =
856 lttng_condition_session_consumed_size_set_threshold(rotate_condition.get(), size);
857 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
f9a41357 858 LTTNG_THROW_ERROR(lttng::format(
0038180d
JG
859 "Could not set session consumed size condition threshold: size={}", size));
860 }
861
28f23191
JG
862 condition_status = lttng_condition_session_consumed_size_set_session_name(
863 rotate_condition.get(), session.name);
0038180d 864 if (condition_status != LTTNG_CONDITION_STATUS_OK) {
f9a41357 865 LTTNG_THROW_ERROR(lttng::format(
0038180d
JG
866 "Could not set session consumed size condition session name: name=`{}`",
867 session.name));
868 }
869
870 auto notify_action = lttng::make_unique_wrapper<lttng_action, lttng_action_put>(
871 lttng_action_notify_create());
872 if (!notify_action) {
873 LTTNG_THROW_POSIX("Could not create notify action", errno);
874 }
875
876 LTTNG_ASSERT(!session.rotate_trigger);
877 /* trigger acquires its own reference to condition and action on success. */
878 auto trigger = lttng::make_unique_wrapper<lttng_trigger, lttng_trigger_put>(
879 lttng_trigger_create(rotate_condition.get(), notify_action.get()));
28f23191 880 if (!trigger) {
0038180d
JG
881 LTTNG_THROW_POSIX("Could not create size-based rotation trigger", errno);
882 }
883
884 /* Ensure this trigger is not visible to external users. */
885 lttng_trigger_set_hidden(trigger.get());
886 lttng_trigger_set_credentials(trigger.get(), &session_creds);
887
28f23191
JG
888 auto nc_status = lttng_notification_channel_subscribe(_notification_channel.get(),
889 rotate_condition.get());
0038180d
JG
890 if (nc_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
891 LTTNG_THROW_ERROR("Could not subscribe to session consumed size notification");
892 }
893
894 /*
895 * Ensure any notification queued during the subscription are consumed by queueing an
896 * event.
897 */
898 _notification_channel_subscribtion_change_eventfd.increment();
899
900 const auto register_ret = notification_thread_command_register_trigger(
901 &_notification_thread_handle, trigger.get(), true);
902 if (register_ret != LTTNG_OK) {
903 LTTNG_THROW_CTL(
f9a41357 904 lttng::format(
0038180d
JG
905 "Failed to register trigger for automatic size-based rotation: session_name{}, size={}",
906 session.name,
907 size),
908 register_ret);
909 }
910
911 /* Ownership transferred to the session. */
912 session.rotate_trigger = trigger.release();
913}
914
915void ls::rotation_thread::unsubscribe_session_consumed_size_rotation(ltt_session& session)
916{
917 LTTNG_ASSERT(session.rotate_trigger);
918
919 const auto remove_session_trigger = lttng::make_scope_exit([&session]() noexcept {
920 lttng_trigger_put(session.rotate_trigger);
921 session.rotate_trigger = nullptr;
922 });
923
924 const auto unsubscribe_status = lttng_notification_channel_unsubscribe(
925 _notification_channel.get(),
926 lttng_trigger_get_const_condition(session.rotate_trigger));
927 if (unsubscribe_status != LTTNG_NOTIFICATION_CHANNEL_STATUS_OK) {
f9a41357 928 LTTNG_THROW_ERROR(lttng::format(
0038180d
JG
929 "Failed to unsubscribe from consumed size condition used to control automatic size-based rotations: session_name=`{}` return_code={}",
930 session.name,
931 static_cast<int>(unsubscribe_status)));
932 }
933
934 /*
935 * Ensure any notification queued during the un-subscription are consumed by queueing an
936 * event.
937 */
938 _notification_channel_subscribtion_change_eventfd.increment();
939
940 const auto unregister_status = notification_thread_command_unregister_trigger(
941 &_notification_thread_handle, session.rotate_trigger);
942 if (unregister_status != LTTNG_OK) {
943 LTTNG_THROW_CTL(
f9a41357 944 lttng::format(
0038180d
JG
945 "Failed to unregister trigger for automatic size-based rotation: session_name{}",
946 session.name),
947 unregister_status);
948 }
64d9b072 949}
This page took 0.146176 seconds and 4 git commands to generate.