consumerd: refactor: combine duplicated check_*_functions
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
29#include <common/consumer/consumer-stream.h>
30#include <common/consumer/consumer-timer.h>
31#include <common/consumer/consumer-testpoint.h>
32#include <common/ust-consumer/ust-consumer.h>
331744e3 33
e9404c27
JG
34typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
35typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
36 unsigned long *consumed);
37typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
38 unsigned long *produced);
bb09693b 39typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
e9404c27 40
2b8f8754
MD
41static struct timer_signal_data timer_signal = {
42 .tid = 0,
43 .setup_done = 0,
44 .qs_done = 0,
45 .lock = PTHREAD_MUTEX_INITIALIZER,
46};
331744e3
JD
47
48/*
49 * Set custom signal mask to current thread.
50 */
51static void setmask(sigset_t *mask)
52{
53 int ret;
54
55 ret = sigemptyset(mask);
56 if (ret) {
57 PERROR("sigemptyset");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
60 if (ret) {
d3e2ba59 61 PERROR("sigaddset switch");
331744e3
JD
62 }
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
64 if (ret) {
d3e2ba59
JD
65 PERROR("sigaddset teardown");
66 }
67 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
68 if (ret) {
69 PERROR("sigaddset live");
331744e3 70 }
e9404c27
JG
71 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
72 if (ret) {
73 PERROR("sigaddset monitor");
74 }
13675d0e
MD
75 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
76 if (ret) {
77 PERROR("sigaddset exit");
78 }
331744e3
JD
79}
80
e9404c27
JG
81static int channel_monitor_pipe = -1;
82
331744e3
JD
83/*
84 * Execute action on a timer switch.
d98a47c7
MD
85 *
86 * Beware: metadata_switch_timer() should *never* take a mutex also held
87 * while consumer_timer_switch_stop() is called. It would result in
88 * deadlocks.
331744e3
JD
89 */
90static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 91 siginfo_t *si)
331744e3
JD
92{
93 int ret;
94 struct lttng_consumer_channel *channel;
95
96 channel = si->si_value.sival_ptr;
97 assert(channel);
98
4419b4fb
MD
99 if (channel->switch_timer_error) {
100 return;
101 }
102
331744e3
JD
103 DBG("Switch timer for channel %" PRIu64, channel->key);
104 switch (ctx->type) {
105 case LTTNG_CONSUMER32_UST:
106 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
107 /*
108 * Locks taken by lttng_ustconsumer_request_metadata():
109 * - metadata_socket_lock
110 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 111 * - channel->metadata_cache->lock
4fa3dc0e 112 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
113 * - channel->timer_lock
114 * - channel->metadata_cache->lock
4fa3dc0e 115 *
5e41ebe1
MD
116 * Ensure that neither consumer_data.lock nor
117 * channel->lock are taken within this function, since
118 * they are held while consumer_timer_switch_stop() is
119 * called.
4fa3dc0e 120 */
94d49140 121 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 122 if (ret < 0) {
4419b4fb 123 channel->switch_timer_error = 1;
331744e3
JD
124 }
125 break;
126 case LTTNG_CONSUMER_KERNEL:
127 case LTTNG_CONSUMER_UNKNOWN:
128 assert(0);
129 break;
130 }
131}
132
528f2ffa
JD
133static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
134 uint64_t stream_id)
d3e2ba59
JD
135{
136 int ret;
50adc264 137 struct ctf_packet_index index;
d3e2ba59
JD
138
139 memset(&index, 0, sizeof(index));
528f2ffa 140 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
141 index.timestamp_end = htobe64(ts);
142 ret = consumer_stream_write_index(stream, &index);
143 if (ret < 0) {
144 goto error;
145 }
146
147error:
148 return ret;
149}
150
c585821b 151int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 152{
528f2ffa 153 uint64_t ts, stream_id;
d3e2ba59
JD
154 int ret;
155
d3e2ba59
JD
156 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
157 if (ret < 0) {
158 ERR("Failed to get the current timestamp");
c585821b 159 goto end;
d3e2ba59
JD
160 }
161 ret = kernctl_buffer_flush(stream->wait_fd);
162 if (ret < 0) {
163 ERR("Failed to flush kernel stream");
c585821b 164 goto end;
d3e2ba59
JD
165 }
166 ret = kernctl_snapshot(stream->wait_fd);
167 if (ret < 0) {
32af2c95 168 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 169 PERROR("live timer kernel snapshot");
d3e2ba59 170 ret = -1;
c585821b 171 goto end;
d3e2ba59 172 }
528f2ffa
JD
173 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
174 if (ret < 0) {
175 PERROR("kernctl_get_stream_id");
c585821b 176 goto end;
528f2ffa 177 }
d3e2ba59 178 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 179 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 180 if (ret < 0) {
c585821b 181 goto end;
d3e2ba59
JD
182 }
183 }
184 ret = 0;
c585821b 185end:
d3e2ba59
JD
186 return ret;
187}
188
bb09693b
JG
189static int check_stream(struct lttng_consumer_stream *stream,
190 flush_index_cb flush_index)
d3e2ba59 191{
d3e2ba59
JD
192 int ret;
193
d3e2ba59
JD
194 /*
195 * While holding the stream mutex, try to take a snapshot, if it
196 * succeeds, it means that data is ready to be sent, just let the data
197 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
198 * means that there is no data to read after the flush, so we can
199 * safely send the empty index.
c585821b
MD
200 *
201 * Doing a trylock and checking if waiting on metadata if
202 * trylock fails. Bail out of the stream is indeed waiting for
203 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 204 */
c585821b
MD
205 for (;;) {
206 ret = pthread_mutex_trylock(&stream->lock);
207 switch (ret) {
208 case 0:
209 break; /* We have the lock. */
210 case EBUSY:
211 pthread_mutex_lock(&stream->metadata_timer_lock);
212 if (stream->waiting_on_metadata) {
213 ret = 0;
214 stream->missed_metadata_flush = true;
215 pthread_mutex_unlock(&stream->metadata_timer_lock);
216 goto end; /* Bail out. */
217 }
218 pthread_mutex_unlock(&stream->metadata_timer_lock);
219 /* Try again. */
220 caa_cpu_relax();
221 continue;
222 default:
223 ERR("Unexpected pthread_mutex_trylock error %d", ret);
224 ret = -1;
225 goto end;
226 }
227 break;
228 }
bb09693b 229 ret = flush_index(stream);
c585821b
MD
230 pthread_mutex_unlock(&stream->lock);
231end:
232 return ret;
233}
234
235int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
236{
237 uint64_t ts, stream_id;
238 int ret;
239
94d49140
JD
240 ret = cds_lfht_is_node_deleted(&stream->node.node);
241 if (ret) {
c585821b 242 goto end;
94d49140
JD
243 }
244
84a182ce 245 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
246 if (ret < 0) {
247 ERR("Failed to get the current timestamp");
c585821b 248 goto end;
d3e2ba59 249 }
84a182ce
DG
250 lttng_ustconsumer_flush_buffer(stream, 1);
251 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 252 if (ret < 0) {
94d49140 253 if (ret != -EAGAIN) {
d3e2ba59
JD
254 ERR("Taking UST snapshot");
255 ret = -1;
c585821b 256 goto end;
d3e2ba59 257 }
70190e1c 258 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
259 if (ret < 0) {
260 PERROR("ustctl_get_stream_id");
c585821b 261 goto end;
528f2ffa 262 }
d3e2ba59 263 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 264 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 265 if (ret < 0) {
c585821b 266 goto end;
d3e2ba59
JD
267 }
268 }
269 ret = 0;
c585821b
MD
270end:
271 return ret;
272}
d3e2ba59 273
d3e2ba59
JD
274/*
275 * Execute action on a live timer
276 */
277static void live_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 278 siginfo_t *si)
d3e2ba59
JD
279{
280 int ret;
281 struct lttng_consumer_channel *channel;
282 struct lttng_consumer_stream *stream;
d3e2ba59 283 struct lttng_ht_iter iter;
bb09693b
JG
284 const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
285 const flush_index_cb flush_index =
286 ctx->type == LTTNG_CONSUMER_KERNEL ?
287 consumer_flush_kernel_index :
288 consumer_flush_ust_index;
d3e2ba59
JD
289
290 channel = si->si_value.sival_ptr;
291 assert(channel);
292
293 if (channel->switch_timer_error) {
294 goto error;
295 }
d3e2ba59
JD
296
297 DBG("Live timer for channel %" PRIu64, channel->key);
298
299 rcu_read_lock();
bb09693b
JG
300 cds_lfht_for_each_entry_duplicate(ht->ht,
301 ht->hash_fct(&channel->key, lttng_ht_seed),
302 ht->match_fct, &channel->key, &iter.iter,
303 stream, node_channel_id.node) {
304 ret = check_stream(stream, flush_index);
305 if (ret < 0) {
306 goto error_unlock;
d3e2ba59 307 }
d3e2ba59
JD
308 }
309
310error_unlock:
311 rcu_read_unlock();
312
313error:
314 return;
315}
316
2b8f8754
MD
317static
318void consumer_timer_signal_thread_qs(unsigned int signr)
319{
320 sigset_t pending_set;
321 int ret;
322
323 /*
324 * We need to be the only thread interacting with the thread
325 * that manages signals for teardown synchronization.
326 */
327 pthread_mutex_lock(&timer_signal.lock);
328
329 /* Ensure we don't have any signal queued for this channel. */
330 for (;;) {
331 ret = sigemptyset(&pending_set);
332 if (ret == -1) {
333 PERROR("sigemptyset");
334 }
335 ret = sigpending(&pending_set);
336 if (ret == -1) {
337 PERROR("sigpending");
338 }
f05a6b6d 339 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
340 break;
341 }
342 caa_cpu_relax();
343 }
344
345 /*
346 * From this point, no new signal handler will be fired that would try to
347 * access "chan". However, we still need to wait for any currently
348 * executing handler to complete.
349 */
350 cmm_smp_mb();
351 CMM_STORE_SHARED(timer_signal.qs_done, 0);
352 cmm_smp_mb();
353
354 /*
355 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
356 * up.
357 */
358 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
359
360 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
361 caa_cpu_relax();
362 }
363 cmm_smp_mb();
364
365 pthread_mutex_unlock(&timer_signal.lock);
366}
367
331744e3 368/*
e9404c27
JG
369 * Start a timer channel timer which will fire at a given interval
370 * (timer_interval_us)and fire a given signal (signal).
371 *
372 * Returns a negative value on error, 0 if a timer was created, and
373 * a positive value if no timer was created (not an error).
331744e3 374 */
e9404c27
JG
375static
376int consumer_channel_timer_start(timer_t *timer_id,
377 struct lttng_consumer_channel *channel,
378 unsigned int timer_interval_us, int signal)
331744e3 379{
e9404c27 380 int ret = 0, delete_ret;
331744e3
JD
381 struct sigevent sev;
382 struct itimerspec its;
383
384 assert(channel);
385 assert(channel->key);
386
e9404c27
JG
387 if (timer_interval_us == 0) {
388 /* No creation needed; not an error. */
389 ret = 1;
390 goto end;
331744e3
JD
391 }
392
393 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 394 sev.sigev_signo = signal;
331744e3 395 sev.sigev_value.sival_ptr = channel;
e9404c27 396 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
397 if (ret == -1) {
398 PERROR("timer_create");
e9404c27 399 goto end;
331744e3 400 }
331744e3 401
e9404c27
JG
402 its.it_value.tv_sec = timer_interval_us / 1000000;
403 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
404 its.it_interval.tv_sec = its.it_value.tv_sec;
405 its.it_interval.tv_nsec = its.it_value.tv_nsec;
406
e9404c27 407 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
408 if (ret == -1) {
409 PERROR("timer_settime");
e9404c27
JG
410 goto error_destroy_timer;
411 }
412end:
413 return ret;
414error_destroy_timer:
415 delete_ret = timer_delete(*timer_id);
416 if (delete_ret == -1) {
417 PERROR("timer_delete");
418 }
419 goto end;
420}
421
422static
423int consumer_channel_timer_stop(timer_t *timer_id, int signal)
424{
425 int ret = 0;
426
427 ret = timer_delete(*timer_id);
428 if (ret == -1) {
429 PERROR("timer_delete");
430 goto end;
331744e3 431 }
e9404c27
JG
432
433 consumer_timer_signal_thread_qs(signal);
434 *timer_id = 0;
435end:
436 return ret;
437}
438
439/*
440 * Set the channel's switch timer.
441 */
442void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
443 unsigned int switch_timer_interval_us)
444{
445 int ret;
446
447 assert(channel);
448 assert(channel->key);
449
450 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
451 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
452
453 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
454}
455
456/*
e9404c27 457 * Stop and delete the channel's switch timer.
331744e3
JD
458 */
459void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
460{
461 int ret;
331744e3
JD
462
463 assert(channel);
464
e9404c27
JG
465 ret = consumer_channel_timer_stop(&channel->switch_timer,
466 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 467 if (ret == -1) {
e9404c27 468 ERR("Failed to stop switch timer");
331744e3
JD
469 }
470
2b8f8754 471 channel->switch_timer_enabled = 0;
331744e3
JD
472}
473
d3e2ba59 474/*
e9404c27 475 * Set the channel's live timer.
d3e2ba59
JD
476 */
477void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 478 unsigned int live_timer_interval_us)
d3e2ba59
JD
479{
480 int ret;
d3e2ba59
JD
481
482 assert(channel);
483 assert(channel->key);
484
e9404c27
JG
485 ret = consumer_channel_timer_start(&channel->live_timer, channel,
486 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 487
e9404c27
JG
488 channel->live_timer_enabled = !!(ret == 0);
489}
d3e2ba59 490
e9404c27
JG
491/*
492 * Stop and delete the channel's live timer.
493 */
494void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
495{
496 int ret;
497
498 assert(channel);
d3e2ba59 499
e9404c27
JG
500 ret = consumer_channel_timer_stop(&channel->live_timer,
501 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 502 if (ret == -1) {
e9404c27 503 ERR("Failed to stop live timer");
d3e2ba59 504 }
e9404c27
JG
505
506 channel->live_timer_enabled = 0;
d3e2ba59
JD
507}
508
509/*
e9404c27
JG
510 * Set the channel's monitoring timer.
511 *
512 * Returns a negative value on error, 0 if a timer was created, and
513 * a positive value if no timer was created (not an error).
d3e2ba59 514 */
e9404c27
JG
515int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
516 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
517{
518 int ret;
519
520 assert(channel);
e9404c27
JG
521 assert(channel->key);
522 assert(!channel->monitor_timer_enabled);
d3e2ba59 523
e9404c27
JG
524 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
525 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
526 channel->monitor_timer_enabled = !!(ret == 0);
527 return ret;
528}
529
530/*
531 * Stop and delete the channel's monitoring timer.
532 */
533int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
534{
535 int ret;
536
537 assert(channel);
538 assert(channel->monitor_timer_enabled);
539
540 ret = consumer_channel_timer_stop(&channel->monitor_timer,
541 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 542 if (ret == -1) {
e9404c27
JG
543 ERR("Failed to stop live timer");
544 goto end;
d3e2ba59
JD
545 }
546
e9404c27
JG
547 channel->monitor_timer_enabled = 0;
548end:
549 return ret;
d3e2ba59
JD
550}
551
331744e3
JD
552/*
553 * Block the RT signals for the entire process. It must be called from the
554 * consumer main before creating the threads
555 */
73664f81 556int consumer_signal_init(void)
331744e3
JD
557{
558 int ret;
559 sigset_t mask;
560
561 /* Block signal for entire process, so only our thread processes it. */
562 setmask(&mask);
563 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
564 if (ret) {
565 errno = ret;
566 PERROR("pthread_sigmask");
73664f81 567 return -1;
331744e3 568 }
73664f81 569 return 0;
331744e3
JD
570}
571
e9404c27
JG
572static
573int sample_channel_positions(struct lttng_consumer_channel *channel,
e8360425 574 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
e9404c27
JG
575 sample_positions_cb sample, get_consumed_cb get_consumed,
576 get_produced_cb get_produced)
577{
23bc9bb5 578 int ret = 0;
e9404c27
JG
579 struct lttng_ht_iter iter;
580 struct lttng_consumer_stream *stream;
581 bool empty_channel = true;
582 uint64_t high = 0, low = UINT64_MAX;
583 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
584
e8360425
JD
585 *_total_consumed = 0;
586
e9404c27
JG
587 rcu_read_lock();
588
589 cds_lfht_for_each_entry_duplicate(ht->ht,
590 ht->hash_fct(&channel->key, lttng_ht_seed),
591 ht->match_fct, &channel->key,
592 &iter.iter, stream, node_channel_id.node) {
593 unsigned long produced, consumed, usage;
594
595 empty_channel = false;
596
597 pthread_mutex_lock(&stream->lock);
598 if (cds_lfht_is_node_deleted(&stream->node.node)) {
599 goto next;
600 }
601
602 ret = sample(stream);
603 if (ret) {
604 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
605 pthread_mutex_unlock(&stream->lock);
606 goto end;
607 }
608 ret = get_consumed(stream, &consumed);
609 if (ret) {
610 ERR("Failed to get buffer consumed position in monitor timer");
611 pthread_mutex_unlock(&stream->lock);
612 goto end;
613 }
614 ret = get_produced(stream, &produced);
615 if (ret) {
616 ERR("Failed to get buffer produced position in monitor timer");
617 pthread_mutex_unlock(&stream->lock);
618 goto end;
619 }
620
621 usage = produced - consumed;
622 high = (usage > high) ? usage : high;
623 low = (usage < low) ? usage : low;
e8360425
JD
624
625 /*
626 * We don't use consumed here for 2 reasons:
627 * - output_written takes into account the padding written in the
628 * tracefiles when we stop the session;
629 * - the consumed position is not the accurate representation of what
630 * was extracted from a buffer in overwrite mode.
631 */
632 *_total_consumed += stream->output_written;
e9404c27
JG
633 next:
634 pthread_mutex_unlock(&stream->lock);
635 }
636
637 *_highest_use = high;
638 *_lowest_use = low;
639end:
640 rcu_read_unlock();
641 if (empty_channel) {
642 ret = -1;
643 }
644 return ret;
645}
646
647/*
648 * Execute action on a monitor timer.
649 */
650static
5704917f 651void monitor_timer(struct lttng_consumer_channel *channel)
e9404c27
JG
652{
653 int ret;
654 int channel_monitor_pipe =
655 consumer_timer_thread_get_channel_monitor_pipe();
656 struct lttcomm_consumer_channel_monitor_msg msg = {
657 .key = channel->key,
658 };
659 sample_positions_cb sample;
660 get_consumed_cb get_consumed;
661 get_produced_cb get_produced;
e890d98a 662 uint64_t lowest = 0, highest = 0, total_consumed = 0;
e9404c27
JG
663
664 assert(channel);
e9404c27
JG
665
666 if (channel_monitor_pipe < 0) {
873dda4e 667 return;
e9404c27
JG
668 }
669
670 switch (consumer_data.type) {
671 case LTTNG_CONSUMER_KERNEL:
672 sample = lttng_kconsumer_sample_snapshot_positions;
673 get_consumed = lttng_kconsumer_get_consumed_snapshot;
674 get_produced = lttng_kconsumer_get_produced_snapshot;
675 break;
676 case LTTNG_CONSUMER32_UST:
677 case LTTNG_CONSUMER64_UST:
678 sample = lttng_ustconsumer_sample_snapshot_positions;
679 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
680 get_produced = lttng_ustconsumer_get_produced_snapshot;
681 break;
682 default:
683 abort();
684 }
685
e890d98a
JG
686 ret = sample_channel_positions(channel, &highest, &lowest,
687 &total_consumed, sample, get_consumed, get_produced);
e9404c27 688 if (ret) {
873dda4e 689 return;
e9404c27 690 }
e890d98a
JG
691 msg.highest = highest;
692 msg.lowest = lowest;
693 msg.total_consumed = total_consumed;
e9404c27
JG
694
695 /*
696 * Writes performed here are assumed to be atomic which is only
697 * guaranteed for sizes < than PIPE_BUF.
698 */
699 assert(sizeof(msg) <= PIPE_BUF);
700
701 do {
702 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
703 } while (ret == -1 && errno == EINTR);
704 if (ret == -1) {
705 if (errno == EAGAIN) {
706 /* Not an error, the sample is merely dropped. */
707 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
708 channel->key);
709 } else {
710 PERROR("write to the channel monitor pipe");
711 }
712 } else {
713 DBG("Sent channel monitoring sample for channel key %" PRIu64
714 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
715 channel->key, msg.highest, msg.lowest);
716 }
e9404c27
JG
717}
718
719int consumer_timer_thread_get_channel_monitor_pipe(void)
720{
721 return uatomic_read(&channel_monitor_pipe);
722}
723
724int consumer_timer_thread_set_channel_monitor_pipe(int fd)
725{
726 int ret;
727
728 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
729 if (ret != -1) {
730 ret = -1;
731 goto end;
732 }
733 ret = 0;
734end:
735 return ret;
736}
737
331744e3 738/*
d3e2ba59 739 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 740 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 741 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 742 */
d3e2ba59 743void *consumer_timer_thread(void *data)
331744e3
JD
744{
745 int signr;
746 sigset_t mask;
747 siginfo_t info;
748 struct lttng_consumer_local_data *ctx = data;
749
8a9acb74
MD
750 rcu_register_thread();
751
1fc79fb4
MD
752 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
753
2d57de81
MD
754 if (testpoint(consumerd_thread_metadata_timer)) {
755 goto error_testpoint;
756 }
757
9ce5646a
MD
758 health_code_update();
759
331744e3
JD
760 /* Only self thread will receive signal mask. */
761 setmask(&mask);
762 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
763
764 while (1) {
9ce5646a
MD
765 health_code_update();
766
767 health_poll_entry();
331744e3 768 signr = sigwaitinfo(&mask, &info);
9ce5646a 769 health_poll_exit();
e9404c27
JG
770
771 /*
772 * NOTE: cascading conditions are used instead of a switch case
773 * since the use of SIGRTMIN in the definition of the signals'
774 * values prevents the reduction to an integer constant.
775 */
331744e3
JD
776 if (signr == -1) {
777 if (errno != EINTR) {
778 PERROR("sigwaitinfo");
779 }
780 continue;
781 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 782 metadata_switch_timer(ctx, &info);
331744e3
JD
783 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
784 cmm_smp_mb();
785 CMM_STORE_SHARED(timer_signal.qs_done, 1);
786 cmm_smp_mb();
787 DBG("Signal timer metadata thread teardown");
d3e2ba59 788 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 789 live_timer(ctx, &info);
e9404c27
JG
790 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
791 struct lttng_consumer_channel *channel;
792
793 channel = info.si_value.sival_ptr;
5704917f 794 monitor_timer(channel);
13675d0e
MD
795 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
796 assert(CMM_LOAD_SHARED(consumer_quit));
797 goto end;
331744e3
JD
798 } else {
799 ERR("Unexpected signal %d\n", info.si_signo);
800 }
801 }
802
2d57de81
MD
803error_testpoint:
804 /* Only reached in testpoint error */
805 health_error();
13675d0e 806end:
1fc79fb4 807 health_unregister(health_consumerd);
8a9acb74 808 rcu_unregister_thread();
331744e3
JD
809 return NULL;
810}
This page took 0.084127 seconds and 4 git commands to generate.