sessiond: Split ust_registry_session into per-type classes
[lttng-tools.git] / src / common / consumer / consumer-timer.cpp
CommitLineData
331744e3 1/*
ab5be9fa
MJ
2 * Copyright (C) 2012 Julien Desfossez <julien.desfossez@efficios.com>
3 * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
331744e3 4 *
ab5be9fa 5 * SPDX-License-Identifier: GPL-2.0-only
331744e3 6 *
331744e3
JD
7 */
8
6c1c0768 9#define _LGPL_SOURCE
331744e3
JD
10#include <inttypes.h>
11#include <signal.h>
12
c9e313bc
SM
13#include <bin/lttng-consumerd/health-consumerd.hpp>
14#include <common/common.hpp>
15#include <common/compat/endian.hpp>
16#include <common/kernel-ctl/kernel-ctl.hpp>
17#include <common/kernel-consumer/kernel-consumer.hpp>
18#include <common/consumer/consumer-stream.hpp>
19#include <common/consumer/consumer-timer.hpp>
20#include <common/consumer/consumer-testpoint.hpp>
21#include <common/ust-consumer/ust-consumer.hpp>
331744e3 22
e9404c27
JG
23typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
24typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
25 unsigned long *consumed);
26typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
27 unsigned long *produced);
fad4b619 28typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
e9404c27 29
2b8f8754
MD
30static struct timer_signal_data timer_signal = {
31 .tid = 0,
32 .setup_done = 0,
33 .qs_done = 0,
34 .lock = PTHREAD_MUTEX_INITIALIZER,
35};
331744e3
JD
36
37/*
38 * Set custom signal mask to current thread.
39 */
40static void setmask(sigset_t *mask)
41{
42 int ret;
43
44 ret = sigemptyset(mask);
45 if (ret) {
46 PERROR("sigemptyset");
47 }
48 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
49 if (ret) {
d3e2ba59 50 PERROR("sigaddset switch");
331744e3
JD
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
53 if (ret) {
d3e2ba59
JD
54 PERROR("sigaddset teardown");
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
57 if (ret) {
58 PERROR("sigaddset live");
331744e3 59 }
e9404c27
JG
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
61 if (ret) {
62 PERROR("sigaddset monitor");
63 }
13675d0e
MD
64 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
65 if (ret) {
66 PERROR("sigaddset exit");
67 }
331744e3
JD
68}
69
fa29bfbf 70static int the_channel_monitor_pipe = -1;
e9404c27 71
331744e3
JD
72/*
73 * Execute action on a timer switch.
d98a47c7
MD
74 *
75 * Beware: metadata_switch_timer() should *never* take a mutex also held
76 * while consumer_timer_switch_stop() is called. It would result in
77 * deadlocks.
331744e3
JD
78 */
79static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 80 siginfo_t *si)
331744e3
JD
81{
82 int ret;
83 struct lttng_consumer_channel *channel;
84
97535efa 85 channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
a0377dfe 86 LTTNG_ASSERT(channel);
331744e3 87
4419b4fb
MD
88 if (channel->switch_timer_error) {
89 return;
90 }
91
331744e3
JD
92 DBG("Switch timer for channel %" PRIu64, channel->key);
93 switch (ctx->type) {
94 case LTTNG_CONSUMER32_UST:
95 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
96 /*
97 * Locks taken by lttng_ustconsumer_request_metadata():
98 * - metadata_socket_lock
99 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 100 * - channel->metadata_cache->lock
4fa3dc0e 101 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
102 * - channel->timer_lock
103 * - channel->metadata_cache->lock
4fa3dc0e 104 *
5e41ebe1
MD
105 * Ensure that neither consumer_data.lock nor
106 * channel->lock are taken within this function, since
107 * they are held while consumer_timer_switch_stop() is
108 * called.
4fa3dc0e 109 */
94d49140 110 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 111 if (ret < 0) {
4419b4fb 112 channel->switch_timer_error = 1;
331744e3
JD
113 }
114 break;
115 case LTTNG_CONSUMER_KERNEL:
116 case LTTNG_CONSUMER_UNKNOWN:
a0377dfe 117 abort();
331744e3
JD
118 break;
119 }
120}
121
528f2ffa
JD
122static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
123 uint64_t stream_id)
d3e2ba59
JD
124{
125 int ret;
50adc264 126 struct ctf_packet_index index;
d3e2ba59
JD
127
128 memset(&index, 0, sizeof(index));
528f2ffa 129 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
130 index.timestamp_end = htobe64(ts);
131 ret = consumer_stream_write_index(stream, &index);
132 if (ret < 0) {
133 goto error;
134 }
135
136error:
137 return ret;
138}
139
c585821b 140int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 141{
528f2ffa 142 uint64_t ts, stream_id;
d3e2ba59
JD
143 int ret;
144
d3e2ba59
JD
145 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
146 if (ret < 0) {
147 ERR("Failed to get the current timestamp");
c585821b 148 goto end;
d3e2ba59
JD
149 }
150 ret = kernctl_buffer_flush(stream->wait_fd);
151 if (ret < 0) {
152 ERR("Failed to flush kernel stream");
c585821b 153 goto end;
d3e2ba59
JD
154 }
155 ret = kernctl_snapshot(stream->wait_fd);
156 if (ret < 0) {
32af2c95 157 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 158 PERROR("live timer kernel snapshot");
d3e2ba59 159 ret = -1;
c585821b 160 goto end;
d3e2ba59 161 }
528f2ffa
JD
162 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
163 if (ret < 0) {
164 PERROR("kernctl_get_stream_id");
c585821b 165 goto end;
528f2ffa 166 }
d3e2ba59 167 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 168 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 169 if (ret < 0) {
c585821b 170 goto end;
d3e2ba59
JD
171 }
172 }
173 ret = 0;
c585821b 174end:
d3e2ba59
JD
175 return ret;
176}
177
fad4b619
JG
178static int check_stream(struct lttng_consumer_stream *stream,
179 flush_index_cb flush_index)
d3e2ba59 180{
d3e2ba59
JD
181 int ret;
182
d3e2ba59
JD
183 /*
184 * While holding the stream mutex, try to take a snapshot, if it
185 * succeeds, it means that data is ready to be sent, just let the data
186 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
187 * means that there is no data to read after the flush, so we can
188 * safely send the empty index.
c585821b
MD
189 *
190 * Doing a trylock and checking if waiting on metadata if
191 * trylock fails. Bail out of the stream is indeed waiting for
192 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 193 */
c585821b
MD
194 for (;;) {
195 ret = pthread_mutex_trylock(&stream->lock);
196 switch (ret) {
197 case 0:
198 break; /* We have the lock. */
199 case EBUSY:
200 pthread_mutex_lock(&stream->metadata_timer_lock);
201 if (stream->waiting_on_metadata) {
202 ret = 0;
203 stream->missed_metadata_flush = true;
204 pthread_mutex_unlock(&stream->metadata_timer_lock);
205 goto end; /* Bail out. */
206 }
207 pthread_mutex_unlock(&stream->metadata_timer_lock);
208 /* Try again. */
209 caa_cpu_relax();
210 continue;
211 default:
212 ERR("Unexpected pthread_mutex_trylock error %d", ret);
213 ret = -1;
214 goto end;
215 }
216 break;
217 }
fad4b619 218 ret = flush_index(stream);
c585821b
MD
219 pthread_mutex_unlock(&stream->lock);
220end:
221 return ret;
222}
223
224int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
225{
226 uint64_t ts, stream_id;
227 int ret;
228
94d49140
JD
229 ret = cds_lfht_is_node_deleted(&stream->node.node);
230 if (ret) {
c585821b 231 goto end;
94d49140
JD
232 }
233
84a182ce 234 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
235 if (ret < 0) {
236 ERR("Failed to get the current timestamp");
c585821b 237 goto end;
d3e2ba59 238 }
881fc67f
MD
239 ret = lttng_ustconsumer_flush_buffer(stream, 1);
240 if (ret < 0) {
241 ERR("Failed to flush buffer while flushing index");
242 goto end;
243 }
84a182ce 244 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 245 if (ret < 0) {
94d49140 246 if (ret != -EAGAIN) {
d3e2ba59
JD
247 ERR("Taking UST snapshot");
248 ret = -1;
c585821b 249 goto end;
d3e2ba59 250 }
70190e1c 251 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa 252 if (ret < 0) {
b623cb6a 253 PERROR("lttng_ust_ctl_get_stream_id");
c585821b 254 goto end;
528f2ffa 255 }
d3e2ba59 256 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 257 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 258 if (ret < 0) {
c585821b 259 goto end;
d3e2ba59
JD
260 }
261 }
262 ret = 0;
c585821b
MD
263end:
264 return ret;
265}
d3e2ba59 266
d3e2ba59
JD
267/*
268 * Execute action on a live timer
269 */
270static void live_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 271 siginfo_t *si)
d3e2ba59
JD
272{
273 int ret;
274 struct lttng_consumer_channel *channel;
275 struct lttng_consumer_stream *stream;
d3e2ba59 276 struct lttng_ht_iter iter;
fa29bfbf 277 const struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
fad4b619
JG
278 const flush_index_cb flush_index =
279 ctx->type == LTTNG_CONSUMER_KERNEL ?
280 consumer_flush_kernel_index :
281 consumer_flush_ust_index;
d3e2ba59 282
97535efa 283 channel = (lttng_consumer_channel *) si->si_value.sival_ptr;
a0377dfe 284 LTTNG_ASSERT(channel);
d3e2ba59
JD
285
286 if (channel->switch_timer_error) {
287 goto error;
288 }
d3e2ba59
JD
289
290 DBG("Live timer for channel %" PRIu64, channel->key);
291
292 rcu_read_lock();
fad4b619
JG
293 cds_lfht_for_each_entry_duplicate(ht->ht,
294 ht->hash_fct(&channel->key, lttng_ht_seed),
295 ht->match_fct, &channel->key, &iter.iter,
296 stream, node_channel_id.node) {
297 ret = check_stream(stream, flush_index);
298 if (ret < 0) {
299 goto error_unlock;
d3e2ba59 300 }
d3e2ba59
JD
301 }
302
303error_unlock:
304 rcu_read_unlock();
305
306error:
307 return;
308}
309
2b8f8754
MD
310static
311void consumer_timer_signal_thread_qs(unsigned int signr)
312{
313 sigset_t pending_set;
314 int ret;
315
316 /*
317 * We need to be the only thread interacting with the thread
318 * that manages signals for teardown synchronization.
319 */
320 pthread_mutex_lock(&timer_signal.lock);
321
322 /* Ensure we don't have any signal queued for this channel. */
323 for (;;) {
324 ret = sigemptyset(&pending_set);
325 if (ret == -1) {
326 PERROR("sigemptyset");
327 }
328 ret = sigpending(&pending_set);
329 if (ret == -1) {
330 PERROR("sigpending");
331 }
f05a6b6d 332 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
333 break;
334 }
335 caa_cpu_relax();
336 }
337
338 /*
339 * From this point, no new signal handler will be fired that would try to
340 * access "chan". However, we still need to wait for any currently
341 * executing handler to complete.
342 */
343 cmm_smp_mb();
344 CMM_STORE_SHARED(timer_signal.qs_done, 0);
345 cmm_smp_mb();
346
347 /*
348 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
349 * up.
350 */
351 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
352
353 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
354 caa_cpu_relax();
355 }
356 cmm_smp_mb();
357
358 pthread_mutex_unlock(&timer_signal.lock);
359}
360
331744e3 361/*
e9404c27
JG
362 * Start a timer channel timer which will fire at a given interval
363 * (timer_interval_us)and fire a given signal (signal).
364 *
365 * Returns a negative value on error, 0 if a timer was created, and
366 * a positive value if no timer was created (not an error).
331744e3 367 */
e9404c27
JG
368static
369int consumer_channel_timer_start(timer_t *timer_id,
370 struct lttng_consumer_channel *channel,
371 unsigned int timer_interval_us, int signal)
331744e3 372{
e9404c27 373 int ret = 0, delete_ret;
389b8e8f 374 struct sigevent sev = {};
331744e3
JD
375 struct itimerspec its;
376
a0377dfe
FD
377 LTTNG_ASSERT(channel);
378 LTTNG_ASSERT(channel->key);
331744e3 379
e9404c27
JG
380 if (timer_interval_us == 0) {
381 /* No creation needed; not an error. */
382 ret = 1;
383 goto end;
331744e3
JD
384 }
385
386 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 387 sev.sigev_signo = signal;
331744e3 388 sev.sigev_value.sival_ptr = channel;
e9404c27 389 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
390 if (ret == -1) {
391 PERROR("timer_create");
e9404c27 392 goto end;
331744e3 393 }
331744e3 394
e9404c27
JG
395 its.it_value.tv_sec = timer_interval_us / 1000000;
396 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
397 its.it_interval.tv_sec = its.it_value.tv_sec;
398 its.it_interval.tv_nsec = its.it_value.tv_nsec;
399
e9404c27 400 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
401 if (ret == -1) {
402 PERROR("timer_settime");
e9404c27
JG
403 goto error_destroy_timer;
404 }
405end:
406 return ret;
407error_destroy_timer:
408 delete_ret = timer_delete(*timer_id);
409 if (delete_ret == -1) {
410 PERROR("timer_delete");
411 }
412 goto end;
413}
414
415static
416int consumer_channel_timer_stop(timer_t *timer_id, int signal)
417{
418 int ret = 0;
419
420 ret = timer_delete(*timer_id);
421 if (ret == -1) {
422 PERROR("timer_delete");
423 goto end;
331744e3 424 }
e9404c27
JG
425
426 consumer_timer_signal_thread_qs(signal);
427 *timer_id = 0;
428end:
429 return ret;
430}
431
432/*
433 * Set the channel's switch timer.
434 */
435void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
436 unsigned int switch_timer_interval_us)
437{
438 int ret;
439
a0377dfe
FD
440 LTTNG_ASSERT(channel);
441 LTTNG_ASSERT(channel->key);
e9404c27
JG
442
443 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
444 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
445
446 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
447}
448
449/*
e9404c27 450 * Stop and delete the channel's switch timer.
331744e3
JD
451 */
452void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
453{
454 int ret;
331744e3 455
a0377dfe 456 LTTNG_ASSERT(channel);
331744e3 457
e9404c27
JG
458 ret = consumer_channel_timer_stop(&channel->switch_timer,
459 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 460 if (ret == -1) {
e9404c27 461 ERR("Failed to stop switch timer");
331744e3
JD
462 }
463
2b8f8754 464 channel->switch_timer_enabled = 0;
331744e3
JD
465}
466
d3e2ba59 467/*
e9404c27 468 * Set the channel's live timer.
d3e2ba59
JD
469 */
470void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 471 unsigned int live_timer_interval_us)
d3e2ba59
JD
472{
473 int ret;
d3e2ba59 474
a0377dfe
FD
475 LTTNG_ASSERT(channel);
476 LTTNG_ASSERT(channel->key);
d3e2ba59 477
e9404c27
JG
478 ret = consumer_channel_timer_start(&channel->live_timer, channel,
479 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 480
e9404c27
JG
481 channel->live_timer_enabled = !!(ret == 0);
482}
d3e2ba59 483
e9404c27
JG
484/*
485 * Stop and delete the channel's live timer.
486 */
487void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
488{
489 int ret;
490
a0377dfe 491 LTTNG_ASSERT(channel);
d3e2ba59 492
e9404c27
JG
493 ret = consumer_channel_timer_stop(&channel->live_timer,
494 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 495 if (ret == -1) {
e9404c27 496 ERR("Failed to stop live timer");
d3e2ba59 497 }
e9404c27
JG
498
499 channel->live_timer_enabled = 0;
d3e2ba59
JD
500}
501
502/*
e9404c27
JG
503 * Set the channel's monitoring timer.
504 *
505 * Returns a negative value on error, 0 if a timer was created, and
506 * a positive value if no timer was created (not an error).
d3e2ba59 507 */
e9404c27
JG
508int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
509 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
510{
511 int ret;
512
a0377dfe
FD
513 LTTNG_ASSERT(channel);
514 LTTNG_ASSERT(channel->key);
515 LTTNG_ASSERT(!channel->monitor_timer_enabled);
d3e2ba59 516
e9404c27
JG
517 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
518 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
519 channel->monitor_timer_enabled = !!(ret == 0);
520 return ret;
521}
522
523/*
524 * Stop and delete the channel's monitoring timer.
525 */
526int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
527{
528 int ret;
529
a0377dfe
FD
530 LTTNG_ASSERT(channel);
531 LTTNG_ASSERT(channel->monitor_timer_enabled);
e9404c27
JG
532
533 ret = consumer_channel_timer_stop(&channel->monitor_timer,
534 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 535 if (ret == -1) {
e9404c27
JG
536 ERR("Failed to stop live timer");
537 goto end;
d3e2ba59
JD
538 }
539
e9404c27
JG
540 channel->monitor_timer_enabled = 0;
541end:
542 return ret;
d3e2ba59
JD
543}
544
331744e3
JD
545/*
546 * Block the RT signals for the entire process. It must be called from the
547 * consumer main before creating the threads
548 */
73664f81 549int consumer_signal_init(void)
331744e3
JD
550{
551 int ret;
552 sigset_t mask;
553
554 /* Block signal for entire process, so only our thread processes it. */
555 setmask(&mask);
556 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
557 if (ret) {
558 errno = ret;
559 PERROR("pthread_sigmask");
73664f81 560 return -1;
331744e3 561 }
73664f81 562 return 0;
331744e3
JD
563}
564
e9404c27
JG
565static
566int sample_channel_positions(struct lttng_consumer_channel *channel,
e8360425 567 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
e9404c27
JG
568 sample_positions_cb sample, get_consumed_cb get_consumed,
569 get_produced_cb get_produced)
570{
23bc9bb5 571 int ret = 0;
e9404c27
JG
572 struct lttng_ht_iter iter;
573 struct lttng_consumer_stream *stream;
574 bool empty_channel = true;
575 uint64_t high = 0, low = UINT64_MAX;
fa29bfbf 576 struct lttng_ht *ht = the_consumer_data.stream_per_chan_id_ht;
e9404c27 577
e8360425
JD
578 *_total_consumed = 0;
579
e9404c27
JG
580 rcu_read_lock();
581
582 cds_lfht_for_each_entry_duplicate(ht->ht,
583 ht->hash_fct(&channel->key, lttng_ht_seed),
584 ht->match_fct, &channel->key,
585 &iter.iter, stream, node_channel_id.node) {
586 unsigned long produced, consumed, usage;
587
588 empty_channel = false;
589
590 pthread_mutex_lock(&stream->lock);
591 if (cds_lfht_is_node_deleted(&stream->node.node)) {
592 goto next;
593 }
594
595 ret = sample(stream);
596 if (ret) {
597 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
598 pthread_mutex_unlock(&stream->lock);
599 goto end;
600 }
601 ret = get_consumed(stream, &consumed);
602 if (ret) {
603 ERR("Failed to get buffer consumed position in monitor timer");
604 pthread_mutex_unlock(&stream->lock);
605 goto end;
606 }
607 ret = get_produced(stream, &produced);
608 if (ret) {
609 ERR("Failed to get buffer produced position in monitor timer");
610 pthread_mutex_unlock(&stream->lock);
611 goto end;
612 }
613
614 usage = produced - consumed;
615 high = (usage > high) ? usage : high;
616 low = (usage < low) ? usage : low;
e8360425
JD
617
618 /*
619 * We don't use consumed here for 2 reasons:
620 * - output_written takes into account the padding written in the
621 * tracefiles when we stop the session;
622 * - the consumed position is not the accurate representation of what
623 * was extracted from a buffer in overwrite mode.
624 */
625 *_total_consumed += stream->output_written;
e9404c27
JG
626 next:
627 pthread_mutex_unlock(&stream->lock);
628 }
629
630 *_highest_use = high;
631 *_lowest_use = low;
632end:
633 rcu_read_unlock();
634 if (empty_channel) {
635 ret = -1;
636 }
637 return ret;
638}
639
640/*
641 * Execute action on a monitor timer.
642 */
643static
5704917f 644void monitor_timer(struct lttng_consumer_channel *channel)
e9404c27
JG
645{
646 int ret;
647 int channel_monitor_pipe =
648 consumer_timer_thread_get_channel_monitor_pipe();
649 struct lttcomm_consumer_channel_monitor_msg msg = {
650 .key = channel->key,
1c9a0b0e
MJ
651 .lowest = 0,
652 .highest = 0,
653 .total_consumed = 0,
e9404c27
JG
654 };
655 sample_positions_cb sample;
656 get_consumed_cb get_consumed;
657 get_produced_cb get_produced;
ef4b086e 658 uint64_t lowest = 0, highest = 0, total_consumed = 0;
e9404c27 659
a0377dfe 660 LTTNG_ASSERT(channel);
e9404c27
JG
661
662 if (channel_monitor_pipe < 0) {
873dda4e 663 return;
e9404c27
JG
664 }
665
fa29bfbf 666 switch (the_consumer_data.type) {
e9404c27
JG
667 case LTTNG_CONSUMER_KERNEL:
668 sample = lttng_kconsumer_sample_snapshot_positions;
669 get_consumed = lttng_kconsumer_get_consumed_snapshot;
670 get_produced = lttng_kconsumer_get_produced_snapshot;
671 break;
672 case LTTNG_CONSUMER32_UST:
673 case LTTNG_CONSUMER64_UST:
674 sample = lttng_ustconsumer_sample_snapshot_positions;
675 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
676 get_produced = lttng_ustconsumer_get_produced_snapshot;
677 break;
678 default:
679 abort();
680 }
681
ef4b086e
JG
682 ret = sample_channel_positions(channel, &highest, &lowest,
683 &total_consumed, sample, get_consumed, get_produced);
e9404c27 684 if (ret) {
873dda4e 685 return;
e9404c27 686 }
ef4b086e
JG
687 msg.highest = highest;
688 msg.lowest = lowest;
689 msg.total_consumed = total_consumed;
e9404c27
JG
690
691 /*
692 * Writes performed here are assumed to be atomic which is only
693 * guaranteed for sizes < than PIPE_BUF.
694 */
a0377dfe 695 LTTNG_ASSERT(sizeof(msg) <= PIPE_BUF);
e9404c27
JG
696
697 do {
698 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
699 } while (ret == -1 && errno == EINTR);
700 if (ret == -1) {
701 if (errno == EAGAIN) {
702 /* Not an error, the sample is merely dropped. */
97535efa 703 DBG("Channel monitor pipe is full; dropping sample for channel key = %" PRIu64,
e9404c27
JG
704 channel->key);
705 } else {
706 PERROR("write to the channel monitor pipe");
707 }
708 } else {
709 DBG("Sent channel monitoring sample for channel key %" PRIu64
97535efa 710 ", (highest = %" PRIu64 ", lowest = %" PRIu64 ")",
e9404c27
JG
711 channel->key, msg.highest, msg.lowest);
712 }
e9404c27
JG
713}
714
715int consumer_timer_thread_get_channel_monitor_pipe(void)
716{
fa29bfbf 717 return uatomic_read(&the_channel_monitor_pipe);
e9404c27
JG
718}
719
720int consumer_timer_thread_set_channel_monitor_pipe(int fd)
721{
722 int ret;
723
fa29bfbf 724 ret = uatomic_cmpxchg(&the_channel_monitor_pipe, -1, fd);
e9404c27
JG
725 if (ret != -1) {
726 ret = -1;
727 goto end;
728 }
729 ret = 0;
730end:
731 return ret;
732}
733
331744e3 734/*
d3e2ba59 735 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 736 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 737 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 738 */
d3e2ba59 739void *consumer_timer_thread(void *data)
331744e3
JD
740{
741 int signr;
742 sigset_t mask;
743 siginfo_t info;
97535efa 744 struct lttng_consumer_local_data *ctx = (lttng_consumer_local_data *) data;
331744e3 745
8a9acb74
MD
746 rcu_register_thread();
747
1fc79fb4
MD
748 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
749
2d57de81
MD
750 if (testpoint(consumerd_thread_metadata_timer)) {
751 goto error_testpoint;
752 }
753
9ce5646a
MD
754 health_code_update();
755
331744e3
JD
756 /* Only self thread will receive signal mask. */
757 setmask(&mask);
758 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
759
760 while (1) {
9ce5646a
MD
761 health_code_update();
762
763 health_poll_entry();
331744e3 764 signr = sigwaitinfo(&mask, &info);
9ce5646a 765 health_poll_exit();
e9404c27
JG
766
767 /*
768 * NOTE: cascading conditions are used instead of a switch case
769 * since the use of SIGRTMIN in the definition of the signals'
770 * values prevents the reduction to an integer constant.
771 */
331744e3
JD
772 if (signr == -1) {
773 if (errno != EINTR) {
774 PERROR("sigwaitinfo");
775 }
776 continue;
777 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 778 metadata_switch_timer(ctx, &info);
331744e3
JD
779 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
780 cmm_smp_mb();
781 CMM_STORE_SHARED(timer_signal.qs_done, 1);
782 cmm_smp_mb();
783 DBG("Signal timer metadata thread teardown");
d3e2ba59 784 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 785 live_timer(ctx, &info);
e9404c27
JG
786 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
787 struct lttng_consumer_channel *channel;
788
97535efa 789 channel = (lttng_consumer_channel *) info.si_value.sival_ptr;
5704917f 790 monitor_timer(channel);
13675d0e 791 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
a0377dfe 792 LTTNG_ASSERT(CMM_LOAD_SHARED(consumer_quit));
13675d0e 793 goto end;
331744e3
JD
794 } else {
795 ERR("Unexpected signal %d\n", info.si_signo);
796 }
797 }
798
2d57de81
MD
799error_testpoint:
800 /* Only reached in testpoint error */
801 health_error();
13675d0e 802end:
1fc79fb4 803 health_unregister(health_consumerd);
8a9acb74 804 rcu_unregister_thread();
331744e3
JD
805 return NULL;
806}
This page took 0.098118 seconds and 4 git commands to generate.