consumerd: refactor: combine duplicated check_*_functions
[lttng-tools.git] / src / common / consumer / consumer-timer.c
1 /*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19 #define _LGPL_SOURCE
20 #include <assert.h>
21 #include <inttypes.h>
22 #include <signal.h>
23
24 #include <bin/lttng-consumerd/health-consumerd.h>
25 #include <common/common.h>
26 #include <common/compat/endian.h>
27 #include <common/kernel-ctl/kernel-ctl.h>
28 #include <common/kernel-consumer/kernel-consumer.h>
29 #include <common/consumer/consumer-stream.h>
30 #include <common/consumer/consumer-timer.h>
31 #include <common/consumer/consumer-testpoint.h>
32 #include <common/ust-consumer/ust-consumer.h>
33
34 typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
35 typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
36 unsigned long *consumed);
37 typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
38 unsigned long *produced);
39 typedef int (*flush_index_cb)(struct lttng_consumer_stream *stream);
40
41 static struct timer_signal_data timer_signal = {
42 .tid = 0,
43 .setup_done = 0,
44 .qs_done = 0,
45 .lock = PTHREAD_MUTEX_INITIALIZER,
46 };
47
48 /*
49 * Set custom signal mask to current thread.
50 */
51 static void setmask(sigset_t *mask)
52 {
53 int ret;
54
55 ret = sigemptyset(mask);
56 if (ret) {
57 PERROR("sigemptyset");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
60 if (ret) {
61 PERROR("sigaddset switch");
62 }
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
64 if (ret) {
65 PERROR("sigaddset teardown");
66 }
67 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
68 if (ret) {
69 PERROR("sigaddset live");
70 }
71 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
72 if (ret) {
73 PERROR("sigaddset monitor");
74 }
75 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
76 if (ret) {
77 PERROR("sigaddset exit");
78 }
79 }
80
81 static int channel_monitor_pipe = -1;
82
83 /*
84 * Execute action on a timer switch.
85 *
86 * Beware: metadata_switch_timer() should *never* take a mutex also held
87 * while consumer_timer_switch_stop() is called. It would result in
88 * deadlocks.
89 */
90 static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
91 siginfo_t *si)
92 {
93 int ret;
94 struct lttng_consumer_channel *channel;
95
96 channel = si->si_value.sival_ptr;
97 assert(channel);
98
99 if (channel->switch_timer_error) {
100 return;
101 }
102
103 DBG("Switch timer for channel %" PRIu64, channel->key);
104 switch (ctx->type) {
105 case LTTNG_CONSUMER32_UST:
106 case LTTNG_CONSUMER64_UST:
107 /*
108 * Locks taken by lttng_ustconsumer_request_metadata():
109 * - metadata_socket_lock
110 * - Calling lttng_ustconsumer_recv_metadata():
111 * - channel->metadata_cache->lock
112 * - Calling consumer_metadata_cache_flushed():
113 * - channel->timer_lock
114 * - channel->metadata_cache->lock
115 *
116 * Ensure that neither consumer_data.lock nor
117 * channel->lock are taken within this function, since
118 * they are held while consumer_timer_switch_stop() is
119 * called.
120 */
121 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
122 if (ret < 0) {
123 channel->switch_timer_error = 1;
124 }
125 break;
126 case LTTNG_CONSUMER_KERNEL:
127 case LTTNG_CONSUMER_UNKNOWN:
128 assert(0);
129 break;
130 }
131 }
132
133 static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
134 uint64_t stream_id)
135 {
136 int ret;
137 struct ctf_packet_index index;
138
139 memset(&index, 0, sizeof(index));
140 index.stream_id = htobe64(stream_id);
141 index.timestamp_end = htobe64(ts);
142 ret = consumer_stream_write_index(stream, &index);
143 if (ret < 0) {
144 goto error;
145 }
146
147 error:
148 return ret;
149 }
150
151 int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
152 {
153 uint64_t ts, stream_id;
154 int ret;
155
156 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
157 if (ret < 0) {
158 ERR("Failed to get the current timestamp");
159 goto end;
160 }
161 ret = kernctl_buffer_flush(stream->wait_fd);
162 if (ret < 0) {
163 ERR("Failed to flush kernel stream");
164 goto end;
165 }
166 ret = kernctl_snapshot(stream->wait_fd);
167 if (ret < 0) {
168 if (ret != -EAGAIN && ret != -ENODATA) {
169 PERROR("live timer kernel snapshot");
170 ret = -1;
171 goto end;
172 }
173 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
174 if (ret < 0) {
175 PERROR("kernctl_get_stream_id");
176 goto end;
177 }
178 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
179 ret = send_empty_index(stream, ts, stream_id);
180 if (ret < 0) {
181 goto end;
182 }
183 }
184 ret = 0;
185 end:
186 return ret;
187 }
188
189 static int check_stream(struct lttng_consumer_stream *stream,
190 flush_index_cb flush_index)
191 {
192 int ret;
193
194 /*
195 * While holding the stream mutex, try to take a snapshot, if it
196 * succeeds, it means that data is ready to be sent, just let the data
197 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
198 * means that there is no data to read after the flush, so we can
199 * safely send the empty index.
200 *
201 * Doing a trylock and checking if waiting on metadata if
202 * trylock fails. Bail out of the stream is indeed waiting for
203 * metadata to be pushed. Busy wait on trylock otherwise.
204 */
205 for (;;) {
206 ret = pthread_mutex_trylock(&stream->lock);
207 switch (ret) {
208 case 0:
209 break; /* We have the lock. */
210 case EBUSY:
211 pthread_mutex_lock(&stream->metadata_timer_lock);
212 if (stream->waiting_on_metadata) {
213 ret = 0;
214 stream->missed_metadata_flush = true;
215 pthread_mutex_unlock(&stream->metadata_timer_lock);
216 goto end; /* Bail out. */
217 }
218 pthread_mutex_unlock(&stream->metadata_timer_lock);
219 /* Try again. */
220 caa_cpu_relax();
221 continue;
222 default:
223 ERR("Unexpected pthread_mutex_trylock error %d", ret);
224 ret = -1;
225 goto end;
226 }
227 break;
228 }
229 ret = flush_index(stream);
230 pthread_mutex_unlock(&stream->lock);
231 end:
232 return ret;
233 }
234
235 int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
236 {
237 uint64_t ts, stream_id;
238 int ret;
239
240 ret = cds_lfht_is_node_deleted(&stream->node.node);
241 if (ret) {
242 goto end;
243 }
244
245 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
246 if (ret < 0) {
247 ERR("Failed to get the current timestamp");
248 goto end;
249 }
250 lttng_ustconsumer_flush_buffer(stream, 1);
251 ret = lttng_ustconsumer_take_snapshot(stream);
252 if (ret < 0) {
253 if (ret != -EAGAIN) {
254 ERR("Taking UST snapshot");
255 ret = -1;
256 goto end;
257 }
258 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
259 if (ret < 0) {
260 PERROR("ustctl_get_stream_id");
261 goto end;
262 }
263 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
264 ret = send_empty_index(stream, ts, stream_id);
265 if (ret < 0) {
266 goto end;
267 }
268 }
269 ret = 0;
270 end:
271 return ret;
272 }
273
274 /*
275 * Execute action on a live timer
276 */
277 static void live_timer(struct lttng_consumer_local_data *ctx,
278 siginfo_t *si)
279 {
280 int ret;
281 struct lttng_consumer_channel *channel;
282 struct lttng_consumer_stream *stream;
283 struct lttng_ht_iter iter;
284 const struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
285 const flush_index_cb flush_index =
286 ctx->type == LTTNG_CONSUMER_KERNEL ?
287 consumer_flush_kernel_index :
288 consumer_flush_ust_index;
289
290 channel = si->si_value.sival_ptr;
291 assert(channel);
292
293 if (channel->switch_timer_error) {
294 goto error;
295 }
296
297 DBG("Live timer for channel %" PRIu64, channel->key);
298
299 rcu_read_lock();
300 cds_lfht_for_each_entry_duplicate(ht->ht,
301 ht->hash_fct(&channel->key, lttng_ht_seed),
302 ht->match_fct, &channel->key, &iter.iter,
303 stream, node_channel_id.node) {
304 ret = check_stream(stream, flush_index);
305 if (ret < 0) {
306 goto error_unlock;
307 }
308 }
309
310 error_unlock:
311 rcu_read_unlock();
312
313 error:
314 return;
315 }
316
317 static
318 void consumer_timer_signal_thread_qs(unsigned int signr)
319 {
320 sigset_t pending_set;
321 int ret;
322
323 /*
324 * We need to be the only thread interacting with the thread
325 * that manages signals for teardown synchronization.
326 */
327 pthread_mutex_lock(&timer_signal.lock);
328
329 /* Ensure we don't have any signal queued for this channel. */
330 for (;;) {
331 ret = sigemptyset(&pending_set);
332 if (ret == -1) {
333 PERROR("sigemptyset");
334 }
335 ret = sigpending(&pending_set);
336 if (ret == -1) {
337 PERROR("sigpending");
338 }
339 if (!sigismember(&pending_set, signr)) {
340 break;
341 }
342 caa_cpu_relax();
343 }
344
345 /*
346 * From this point, no new signal handler will be fired that would try to
347 * access "chan". However, we still need to wait for any currently
348 * executing handler to complete.
349 */
350 cmm_smp_mb();
351 CMM_STORE_SHARED(timer_signal.qs_done, 0);
352 cmm_smp_mb();
353
354 /*
355 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
356 * up.
357 */
358 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
359
360 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
361 caa_cpu_relax();
362 }
363 cmm_smp_mb();
364
365 pthread_mutex_unlock(&timer_signal.lock);
366 }
367
368 /*
369 * Start a timer channel timer which will fire at a given interval
370 * (timer_interval_us)and fire a given signal (signal).
371 *
372 * Returns a negative value on error, 0 if a timer was created, and
373 * a positive value if no timer was created (not an error).
374 */
375 static
376 int consumer_channel_timer_start(timer_t *timer_id,
377 struct lttng_consumer_channel *channel,
378 unsigned int timer_interval_us, int signal)
379 {
380 int ret = 0, delete_ret;
381 struct sigevent sev;
382 struct itimerspec its;
383
384 assert(channel);
385 assert(channel->key);
386
387 if (timer_interval_us == 0) {
388 /* No creation needed; not an error. */
389 ret = 1;
390 goto end;
391 }
392
393 sev.sigev_notify = SIGEV_SIGNAL;
394 sev.sigev_signo = signal;
395 sev.sigev_value.sival_ptr = channel;
396 ret = timer_create(CLOCKID, &sev, timer_id);
397 if (ret == -1) {
398 PERROR("timer_create");
399 goto end;
400 }
401
402 its.it_value.tv_sec = timer_interval_us / 1000000;
403 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
404 its.it_interval.tv_sec = its.it_value.tv_sec;
405 its.it_interval.tv_nsec = its.it_value.tv_nsec;
406
407 ret = timer_settime(*timer_id, 0, &its, NULL);
408 if (ret == -1) {
409 PERROR("timer_settime");
410 goto error_destroy_timer;
411 }
412 end:
413 return ret;
414 error_destroy_timer:
415 delete_ret = timer_delete(*timer_id);
416 if (delete_ret == -1) {
417 PERROR("timer_delete");
418 }
419 goto end;
420 }
421
422 static
423 int consumer_channel_timer_stop(timer_t *timer_id, int signal)
424 {
425 int ret = 0;
426
427 ret = timer_delete(*timer_id);
428 if (ret == -1) {
429 PERROR("timer_delete");
430 goto end;
431 }
432
433 consumer_timer_signal_thread_qs(signal);
434 *timer_id = 0;
435 end:
436 return ret;
437 }
438
439 /*
440 * Set the channel's switch timer.
441 */
442 void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
443 unsigned int switch_timer_interval_us)
444 {
445 int ret;
446
447 assert(channel);
448 assert(channel->key);
449
450 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
451 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
452
453 channel->switch_timer_enabled = !!(ret == 0);
454 }
455
456 /*
457 * Stop and delete the channel's switch timer.
458 */
459 void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
460 {
461 int ret;
462
463 assert(channel);
464
465 ret = consumer_channel_timer_stop(&channel->switch_timer,
466 LTTNG_CONSUMER_SIG_SWITCH);
467 if (ret == -1) {
468 ERR("Failed to stop switch timer");
469 }
470
471 channel->switch_timer_enabled = 0;
472 }
473
474 /*
475 * Set the channel's live timer.
476 */
477 void consumer_timer_live_start(struct lttng_consumer_channel *channel,
478 unsigned int live_timer_interval_us)
479 {
480 int ret;
481
482 assert(channel);
483 assert(channel->key);
484
485 ret = consumer_channel_timer_start(&channel->live_timer, channel,
486 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
487
488 channel->live_timer_enabled = !!(ret == 0);
489 }
490
491 /*
492 * Stop and delete the channel's live timer.
493 */
494 void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
495 {
496 int ret;
497
498 assert(channel);
499
500 ret = consumer_channel_timer_stop(&channel->live_timer,
501 LTTNG_CONSUMER_SIG_LIVE);
502 if (ret == -1) {
503 ERR("Failed to stop live timer");
504 }
505
506 channel->live_timer_enabled = 0;
507 }
508
509 /*
510 * Set the channel's monitoring timer.
511 *
512 * Returns a negative value on error, 0 if a timer was created, and
513 * a positive value if no timer was created (not an error).
514 */
515 int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
516 unsigned int monitor_timer_interval_us)
517 {
518 int ret;
519
520 assert(channel);
521 assert(channel->key);
522 assert(!channel->monitor_timer_enabled);
523
524 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
525 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
526 channel->monitor_timer_enabled = !!(ret == 0);
527 return ret;
528 }
529
530 /*
531 * Stop and delete the channel's monitoring timer.
532 */
533 int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
534 {
535 int ret;
536
537 assert(channel);
538 assert(channel->monitor_timer_enabled);
539
540 ret = consumer_channel_timer_stop(&channel->monitor_timer,
541 LTTNG_CONSUMER_SIG_MONITOR);
542 if (ret == -1) {
543 ERR("Failed to stop live timer");
544 goto end;
545 }
546
547 channel->monitor_timer_enabled = 0;
548 end:
549 return ret;
550 }
551
552 /*
553 * Block the RT signals for the entire process. It must be called from the
554 * consumer main before creating the threads
555 */
556 int consumer_signal_init(void)
557 {
558 int ret;
559 sigset_t mask;
560
561 /* Block signal for entire process, so only our thread processes it. */
562 setmask(&mask);
563 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
564 if (ret) {
565 errno = ret;
566 PERROR("pthread_sigmask");
567 return -1;
568 }
569 return 0;
570 }
571
572 static
573 int sample_channel_positions(struct lttng_consumer_channel *channel,
574 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
575 sample_positions_cb sample, get_consumed_cb get_consumed,
576 get_produced_cb get_produced)
577 {
578 int ret = 0;
579 struct lttng_ht_iter iter;
580 struct lttng_consumer_stream *stream;
581 bool empty_channel = true;
582 uint64_t high = 0, low = UINT64_MAX;
583 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
584
585 *_total_consumed = 0;
586
587 rcu_read_lock();
588
589 cds_lfht_for_each_entry_duplicate(ht->ht,
590 ht->hash_fct(&channel->key, lttng_ht_seed),
591 ht->match_fct, &channel->key,
592 &iter.iter, stream, node_channel_id.node) {
593 unsigned long produced, consumed, usage;
594
595 empty_channel = false;
596
597 pthread_mutex_lock(&stream->lock);
598 if (cds_lfht_is_node_deleted(&stream->node.node)) {
599 goto next;
600 }
601
602 ret = sample(stream);
603 if (ret) {
604 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
605 pthread_mutex_unlock(&stream->lock);
606 goto end;
607 }
608 ret = get_consumed(stream, &consumed);
609 if (ret) {
610 ERR("Failed to get buffer consumed position in monitor timer");
611 pthread_mutex_unlock(&stream->lock);
612 goto end;
613 }
614 ret = get_produced(stream, &produced);
615 if (ret) {
616 ERR("Failed to get buffer produced position in monitor timer");
617 pthread_mutex_unlock(&stream->lock);
618 goto end;
619 }
620
621 usage = produced - consumed;
622 high = (usage > high) ? usage : high;
623 low = (usage < low) ? usage : low;
624
625 /*
626 * We don't use consumed here for 2 reasons:
627 * - output_written takes into account the padding written in the
628 * tracefiles when we stop the session;
629 * - the consumed position is not the accurate representation of what
630 * was extracted from a buffer in overwrite mode.
631 */
632 *_total_consumed += stream->output_written;
633 next:
634 pthread_mutex_unlock(&stream->lock);
635 }
636
637 *_highest_use = high;
638 *_lowest_use = low;
639 end:
640 rcu_read_unlock();
641 if (empty_channel) {
642 ret = -1;
643 }
644 return ret;
645 }
646
647 /*
648 * Execute action on a monitor timer.
649 */
650 static
651 void monitor_timer(struct lttng_consumer_channel *channel)
652 {
653 int ret;
654 int channel_monitor_pipe =
655 consumer_timer_thread_get_channel_monitor_pipe();
656 struct lttcomm_consumer_channel_monitor_msg msg = {
657 .key = channel->key,
658 };
659 sample_positions_cb sample;
660 get_consumed_cb get_consumed;
661 get_produced_cb get_produced;
662 uint64_t lowest = 0, highest = 0, total_consumed = 0;
663
664 assert(channel);
665
666 if (channel_monitor_pipe < 0) {
667 return;
668 }
669
670 switch (consumer_data.type) {
671 case LTTNG_CONSUMER_KERNEL:
672 sample = lttng_kconsumer_sample_snapshot_positions;
673 get_consumed = lttng_kconsumer_get_consumed_snapshot;
674 get_produced = lttng_kconsumer_get_produced_snapshot;
675 break;
676 case LTTNG_CONSUMER32_UST:
677 case LTTNG_CONSUMER64_UST:
678 sample = lttng_ustconsumer_sample_snapshot_positions;
679 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
680 get_produced = lttng_ustconsumer_get_produced_snapshot;
681 break;
682 default:
683 abort();
684 }
685
686 ret = sample_channel_positions(channel, &highest, &lowest,
687 &total_consumed, sample, get_consumed, get_produced);
688 if (ret) {
689 return;
690 }
691 msg.highest = highest;
692 msg.lowest = lowest;
693 msg.total_consumed = total_consumed;
694
695 /*
696 * Writes performed here are assumed to be atomic which is only
697 * guaranteed for sizes < than PIPE_BUF.
698 */
699 assert(sizeof(msg) <= PIPE_BUF);
700
701 do {
702 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
703 } while (ret == -1 && errno == EINTR);
704 if (ret == -1) {
705 if (errno == EAGAIN) {
706 /* Not an error, the sample is merely dropped. */
707 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
708 channel->key);
709 } else {
710 PERROR("write to the channel monitor pipe");
711 }
712 } else {
713 DBG("Sent channel monitoring sample for channel key %" PRIu64
714 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
715 channel->key, msg.highest, msg.lowest);
716 }
717 }
718
719 int consumer_timer_thread_get_channel_monitor_pipe(void)
720 {
721 return uatomic_read(&channel_monitor_pipe);
722 }
723
724 int consumer_timer_thread_set_channel_monitor_pipe(int fd)
725 {
726 int ret;
727
728 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
729 if (ret != -1) {
730 ret = -1;
731 goto end;
732 }
733 ret = 0;
734 end:
735 return ret;
736 }
737
738 /*
739 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
740 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
741 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
742 */
743 void *consumer_timer_thread(void *data)
744 {
745 int signr;
746 sigset_t mask;
747 siginfo_t info;
748 struct lttng_consumer_local_data *ctx = data;
749
750 rcu_register_thread();
751
752 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
753
754 if (testpoint(consumerd_thread_metadata_timer)) {
755 goto error_testpoint;
756 }
757
758 health_code_update();
759
760 /* Only self thread will receive signal mask. */
761 setmask(&mask);
762 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
763
764 while (1) {
765 health_code_update();
766
767 health_poll_entry();
768 signr = sigwaitinfo(&mask, &info);
769 health_poll_exit();
770
771 /*
772 * NOTE: cascading conditions are used instead of a switch case
773 * since the use of SIGRTMIN in the definition of the signals'
774 * values prevents the reduction to an integer constant.
775 */
776 if (signr == -1) {
777 if (errno != EINTR) {
778 PERROR("sigwaitinfo");
779 }
780 continue;
781 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
782 metadata_switch_timer(ctx, &info);
783 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
784 cmm_smp_mb();
785 CMM_STORE_SHARED(timer_signal.qs_done, 1);
786 cmm_smp_mb();
787 DBG("Signal timer metadata thread teardown");
788 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
789 live_timer(ctx, &info);
790 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
791 struct lttng_consumer_channel *channel;
792
793 channel = info.si_value.sival_ptr;
794 monitor_timer(channel);
795 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
796 assert(CMM_LOAD_SHARED(consumer_quit));
797 goto end;
798 } else {
799 ERR("Unexpected signal %d\n", info.si_signo);
800 }
801 }
802
803 error_testpoint:
804 /* Only reached in testpoint error */
805 health_error();
806 end:
807 health_unregister(health_consumerd);
808 rcu_unregister_thread();
809 return NULL;
810 }
This page took 0.048659 seconds and 4 git commands to generate.