Fix: create_channel_per_pid: remove channel on error
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
ea791363 24#include <bin/lttng-sessiond/ust-ctl.h>
51a9e1c7 25#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 26#include <common/common.h>
f263b7fd 27#include <common/compat/endian.h>
d3e2ba59
JD
28#include <common/kernel-ctl/kernel-ctl.h>
29#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
30#include <common/consumer/consumer-stream.h>
31#include <common/consumer/consumer-timer.h>
32#include <common/consumer/consumer-testpoint.h>
33#include <common/ust-consumer/ust-consumer.h>
331744e3 34
e9404c27
JG
35typedef int (*sample_positions_cb)(struct lttng_consumer_stream *stream);
36typedef int (*get_consumed_cb)(struct lttng_consumer_stream *stream,
37 unsigned long *consumed);
38typedef int (*get_produced_cb)(struct lttng_consumer_stream *stream,
39 unsigned long *produced);
40
2b8f8754
MD
41static struct timer_signal_data timer_signal = {
42 .tid = 0,
43 .setup_done = 0,
44 .qs_done = 0,
45 .lock = PTHREAD_MUTEX_INITIALIZER,
46};
331744e3
JD
47
48/*
49 * Set custom signal mask to current thread.
50 */
51static void setmask(sigset_t *mask)
52{
53 int ret;
54
55 ret = sigemptyset(mask);
56 if (ret) {
57 PERROR("sigemptyset");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
60 if (ret) {
d3e2ba59 61 PERROR("sigaddset switch");
331744e3
JD
62 }
63 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
64 if (ret) {
d3e2ba59
JD
65 PERROR("sigaddset teardown");
66 }
67 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
68 if (ret) {
69 PERROR("sigaddset live");
331744e3 70 }
e9404c27
JG
71 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_MONITOR);
72 if (ret) {
73 PERROR("sigaddset monitor");
74 }
13675d0e
MD
75 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
76 if (ret) {
77 PERROR("sigaddset exit");
78 }
331744e3
JD
79}
80
e9404c27
JG
81static int channel_monitor_pipe = -1;
82
331744e3
JD
83/*
84 * Execute action on a timer switch.
d98a47c7
MD
85 *
86 * Beware: metadata_switch_timer() should *never* take a mutex also held
87 * while consumer_timer_switch_stop() is called. It would result in
88 * deadlocks.
331744e3
JD
89 */
90static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 91 siginfo_t *si)
331744e3
JD
92{
93 int ret;
94 struct lttng_consumer_channel *channel;
95
96 channel = si->si_value.sival_ptr;
97 assert(channel);
98
4419b4fb
MD
99 if (channel->switch_timer_error) {
100 return;
101 }
102
331744e3
JD
103 DBG("Switch timer for channel %" PRIu64, channel->key);
104 switch (ctx->type) {
105 case LTTNG_CONSUMER32_UST:
106 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
107 /*
108 * Locks taken by lttng_ustconsumer_request_metadata():
109 * - metadata_socket_lock
110 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 111 * - channel->metadata_cache->lock
4fa3dc0e 112 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
113 * - channel->timer_lock
114 * - channel->metadata_cache->lock
4fa3dc0e 115 *
5e41ebe1
MD
116 * Ensure that neither consumer_data.lock nor
117 * channel->lock are taken within this function, since
118 * they are held while consumer_timer_switch_stop() is
119 * called.
4fa3dc0e 120 */
94d49140 121 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 122 if (ret < 0) {
4419b4fb 123 channel->switch_timer_error = 1;
331744e3
JD
124 }
125 break;
126 case LTTNG_CONSUMER_KERNEL:
127 case LTTNG_CONSUMER_UNKNOWN:
128 assert(0);
129 break;
130 }
131}
132
528f2ffa
JD
133static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
134 uint64_t stream_id)
d3e2ba59
JD
135{
136 int ret;
50adc264 137 struct ctf_packet_index index;
d3e2ba59
JD
138
139 memset(&index, 0, sizeof(index));
528f2ffa 140 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
141 index.timestamp_end = htobe64(ts);
142 ret = consumer_stream_write_index(stream, &index);
143 if (ret < 0) {
144 goto error;
145 }
146
147error:
148 return ret;
149}
150
c585821b 151int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 152{
528f2ffa 153 uint64_t ts, stream_id;
d3e2ba59
JD
154 int ret;
155
d3e2ba59
JD
156 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
157 if (ret < 0) {
158 ERR("Failed to get the current timestamp");
c585821b 159 goto end;
d3e2ba59
JD
160 }
161 ret = kernctl_buffer_flush(stream->wait_fd);
162 if (ret < 0) {
163 ERR("Failed to flush kernel stream");
c585821b 164 goto end;
d3e2ba59
JD
165 }
166 ret = kernctl_snapshot(stream->wait_fd);
167 if (ret < 0) {
32af2c95 168 if (ret != -EAGAIN && ret != -ENODATA) {
08b1dcd3 169 PERROR("live timer kernel snapshot");
d3e2ba59 170 ret = -1;
c585821b 171 goto end;
d3e2ba59 172 }
528f2ffa
JD
173 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
174 if (ret < 0) {
175 PERROR("kernctl_get_stream_id");
c585821b 176 goto end;
528f2ffa 177 }
d3e2ba59 178 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 179 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 180 if (ret < 0) {
c585821b 181 goto end;
d3e2ba59
JD
182 }
183 }
184 ret = 0;
c585821b 185end:
d3e2ba59
JD
186 return ret;
187}
188
c585821b 189static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 190{
d3e2ba59
JD
191 int ret;
192
d3e2ba59
JD
193 /*
194 * While holding the stream mutex, try to take a snapshot, if it
195 * succeeds, it means that data is ready to be sent, just let the data
196 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
197 * means that there is no data to read after the flush, so we can
198 * safely send the empty index.
c585821b
MD
199 *
200 * Doing a trylock and checking if waiting on metadata if
201 * trylock fails. Bail out of the stream is indeed waiting for
202 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 203 */
c585821b
MD
204 for (;;) {
205 ret = pthread_mutex_trylock(&stream->lock);
206 switch (ret) {
207 case 0:
208 break; /* We have the lock. */
209 case EBUSY:
210 pthread_mutex_lock(&stream->metadata_timer_lock);
211 if (stream->waiting_on_metadata) {
212 ret = 0;
213 stream->missed_metadata_flush = true;
214 pthread_mutex_unlock(&stream->metadata_timer_lock);
215 goto end; /* Bail out. */
216 }
217 pthread_mutex_unlock(&stream->metadata_timer_lock);
218 /* Try again. */
219 caa_cpu_relax();
220 continue;
221 default:
222 ERR("Unexpected pthread_mutex_trylock error %d", ret);
223 ret = -1;
224 goto end;
225 }
226 break;
227 }
228 ret = consumer_flush_kernel_index(stream);
229 pthread_mutex_unlock(&stream->lock);
230end:
231 return ret;
232}
233
234int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
235{
236 uint64_t ts, stream_id;
237 int ret;
238
94d49140
JD
239 ret = cds_lfht_is_node_deleted(&stream->node.node);
240 if (ret) {
c585821b 241 goto end;
94d49140
JD
242 }
243
84a182ce 244 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
245 if (ret < 0) {
246 ERR("Failed to get the current timestamp");
c585821b 247 goto end;
d3e2ba59 248 }
84a182ce
DG
249 lttng_ustconsumer_flush_buffer(stream, 1);
250 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 251 if (ret < 0) {
94d49140 252 if (ret != -EAGAIN) {
d3e2ba59
JD
253 ERR("Taking UST snapshot");
254 ret = -1;
c585821b 255 goto end;
d3e2ba59 256 }
70190e1c 257 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
258 if (ret < 0) {
259 PERROR("ustctl_get_stream_id");
c585821b 260 goto end;
528f2ffa 261 }
d3e2ba59 262 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 263 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 264 if (ret < 0) {
c585821b 265 goto end;
d3e2ba59
JD
266 }
267 }
268 ret = 0;
c585821b
MD
269end:
270 return ret;
271}
d3e2ba59 272
c585821b
MD
273static int check_ust_stream(struct lttng_consumer_stream *stream)
274{
275 int ret;
276
277 assert(stream);
278 assert(stream->ustream);
279 /*
280 * While holding the stream mutex, try to take a snapshot, if it
281 * succeeds, it means that data is ready to be sent, just let the data
282 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
283 * means that there is no data to read after the flush, so we can
284 * safely send the empty index.
285 *
286 * Doing a trylock and checking if waiting on metadata if
287 * trylock fails. Bail out of the stream is indeed waiting for
288 * metadata to be pushed. Busy wait on trylock otherwise.
289 */
290 for (;;) {
291 ret = pthread_mutex_trylock(&stream->lock);
292 switch (ret) {
293 case 0:
294 break; /* We have the lock. */
295 case EBUSY:
296 pthread_mutex_lock(&stream->metadata_timer_lock);
297 if (stream->waiting_on_metadata) {
298 ret = 0;
299 stream->missed_metadata_flush = true;
300 pthread_mutex_unlock(&stream->metadata_timer_lock);
301 goto end; /* Bail out. */
302 }
303 pthread_mutex_unlock(&stream->metadata_timer_lock);
304 /* Try again. */
305 caa_cpu_relax();
306 continue;
307 default:
308 ERR("Unexpected pthread_mutex_trylock error %d", ret);
309 ret = -1;
310 goto end;
311 }
312 break;
313 }
314 ret = consumer_flush_ust_index(stream);
d3e2ba59 315 pthread_mutex_unlock(&stream->lock);
c585821b 316end:
d3e2ba59
JD
317 return ret;
318}
319
320/*
321 * Execute action on a live timer
322 */
323static void live_timer(struct lttng_consumer_local_data *ctx,
d7e2822f 324 siginfo_t *si)
d3e2ba59
JD
325{
326 int ret;
327 struct lttng_consumer_channel *channel;
328 struct lttng_consumer_stream *stream;
329 struct lttng_ht *ht;
330 struct lttng_ht_iter iter;
331
332 channel = si->si_value.sival_ptr;
333 assert(channel);
334
335 if (channel->switch_timer_error) {
336 goto error;
337 }
338 ht = consumer_data.stream_per_chan_id_ht;
339
340 DBG("Live timer for channel %" PRIu64, channel->key);
341
342 rcu_read_lock();
343 switch (ctx->type) {
344 case LTTNG_CONSUMER32_UST:
345 case LTTNG_CONSUMER64_UST:
346 cds_lfht_for_each_entry_duplicate(ht->ht,
347 ht->hash_fct(&channel->key, lttng_ht_seed),
348 ht->match_fct, &channel->key, &iter.iter,
349 stream, node_channel_id.node) {
350 ret = check_ust_stream(stream);
351 if (ret < 0) {
352 goto error_unlock;
353 }
354 }
355 break;
356 case LTTNG_CONSUMER_KERNEL:
357 cds_lfht_for_each_entry_duplicate(ht->ht,
358 ht->hash_fct(&channel->key, lttng_ht_seed),
359 ht->match_fct, &channel->key, &iter.iter,
360 stream, node_channel_id.node) {
361 ret = check_kernel_stream(stream);
362 if (ret < 0) {
363 goto error_unlock;
364 }
365 }
366 break;
367 case LTTNG_CONSUMER_UNKNOWN:
368 assert(0);
369 break;
370 }
371
372error_unlock:
373 rcu_read_unlock();
374
375error:
376 return;
377}
378
2b8f8754
MD
379static
380void consumer_timer_signal_thread_qs(unsigned int signr)
381{
382 sigset_t pending_set;
383 int ret;
384
385 /*
386 * We need to be the only thread interacting with the thread
387 * that manages signals for teardown synchronization.
388 */
389 pthread_mutex_lock(&timer_signal.lock);
390
391 /* Ensure we don't have any signal queued for this channel. */
392 for (;;) {
393 ret = sigemptyset(&pending_set);
394 if (ret == -1) {
395 PERROR("sigemptyset");
396 }
397 ret = sigpending(&pending_set);
398 if (ret == -1) {
399 PERROR("sigpending");
400 }
f05a6b6d 401 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
402 break;
403 }
404 caa_cpu_relax();
405 }
406
407 /*
408 * From this point, no new signal handler will be fired that would try to
409 * access "chan". However, we still need to wait for any currently
410 * executing handler to complete.
411 */
412 cmm_smp_mb();
413 CMM_STORE_SHARED(timer_signal.qs_done, 0);
414 cmm_smp_mb();
415
416 /*
417 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
418 * up.
419 */
420 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
421
422 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
423 caa_cpu_relax();
424 }
425 cmm_smp_mb();
426
427 pthread_mutex_unlock(&timer_signal.lock);
428}
429
331744e3 430/*
e9404c27
JG
431 * Start a timer channel timer which will fire at a given interval
432 * (timer_interval_us)and fire a given signal (signal).
433 *
434 * Returns a negative value on error, 0 if a timer was created, and
435 * a positive value if no timer was created (not an error).
331744e3 436 */
e9404c27
JG
437static
438int consumer_channel_timer_start(timer_t *timer_id,
439 struct lttng_consumer_channel *channel,
440 unsigned int timer_interval_us, int signal)
331744e3 441{
e9404c27 442 int ret = 0, delete_ret;
331744e3
JD
443 struct sigevent sev;
444 struct itimerspec its;
445
446 assert(channel);
447 assert(channel->key);
448
e9404c27
JG
449 if (timer_interval_us == 0) {
450 /* No creation needed; not an error. */
451 ret = 1;
452 goto end;
331744e3
JD
453 }
454
455 sev.sigev_notify = SIGEV_SIGNAL;
e9404c27 456 sev.sigev_signo = signal;
331744e3 457 sev.sigev_value.sival_ptr = channel;
e9404c27 458 ret = timer_create(CLOCKID, &sev, timer_id);
331744e3
JD
459 if (ret == -1) {
460 PERROR("timer_create");
e9404c27 461 goto end;
331744e3 462 }
331744e3 463
e9404c27
JG
464 its.it_value.tv_sec = timer_interval_us / 1000000;
465 its.it_value.tv_nsec = (timer_interval_us % 1000000) * 1000;
331744e3
JD
466 its.it_interval.tv_sec = its.it_value.tv_sec;
467 its.it_interval.tv_nsec = its.it_value.tv_nsec;
468
e9404c27 469 ret = timer_settime(*timer_id, 0, &its, NULL);
331744e3
JD
470 if (ret == -1) {
471 PERROR("timer_settime");
e9404c27
JG
472 goto error_destroy_timer;
473 }
474end:
475 return ret;
476error_destroy_timer:
477 delete_ret = timer_delete(*timer_id);
478 if (delete_ret == -1) {
479 PERROR("timer_delete");
480 }
481 goto end;
482}
483
484static
485int consumer_channel_timer_stop(timer_t *timer_id, int signal)
486{
487 int ret = 0;
488
489 ret = timer_delete(*timer_id);
490 if (ret == -1) {
491 PERROR("timer_delete");
492 goto end;
331744e3 493 }
e9404c27
JG
494
495 consumer_timer_signal_thread_qs(signal);
496 *timer_id = 0;
497end:
498 return ret;
499}
500
501/*
502 * Set the channel's switch timer.
503 */
504void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
505 unsigned int switch_timer_interval_us)
506{
507 int ret;
508
509 assert(channel);
510 assert(channel->key);
511
512 ret = consumer_channel_timer_start(&channel->switch_timer, channel,
513 switch_timer_interval_us, LTTNG_CONSUMER_SIG_SWITCH);
514
515 channel->switch_timer_enabled = !!(ret == 0);
331744e3
JD
516}
517
518/*
e9404c27 519 * Stop and delete the channel's switch timer.
331744e3
JD
520 */
521void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
522{
523 int ret;
331744e3
JD
524
525 assert(channel);
526
e9404c27
JG
527 ret = consumer_channel_timer_stop(&channel->switch_timer,
528 LTTNG_CONSUMER_SIG_SWITCH);
331744e3 529 if (ret == -1) {
e9404c27 530 ERR("Failed to stop switch timer");
331744e3
JD
531 }
532
2b8f8754 533 channel->switch_timer_enabled = 0;
331744e3
JD
534}
535
d3e2ba59 536/*
e9404c27 537 * Set the channel's live timer.
d3e2ba59
JD
538 */
539void consumer_timer_live_start(struct lttng_consumer_channel *channel,
e9404c27 540 unsigned int live_timer_interval_us)
d3e2ba59
JD
541{
542 int ret;
d3e2ba59
JD
543
544 assert(channel);
545 assert(channel->key);
546
e9404c27
JG
547 ret = consumer_channel_timer_start(&channel->live_timer, channel,
548 live_timer_interval_us, LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 549
e9404c27
JG
550 channel->live_timer_enabled = !!(ret == 0);
551}
d3e2ba59 552
e9404c27
JG
553/*
554 * Stop and delete the channel's live timer.
555 */
556void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
557{
558 int ret;
559
560 assert(channel);
d3e2ba59 561
e9404c27
JG
562 ret = consumer_channel_timer_stop(&channel->live_timer,
563 LTTNG_CONSUMER_SIG_LIVE);
d3e2ba59 564 if (ret == -1) {
e9404c27 565 ERR("Failed to stop live timer");
d3e2ba59 566 }
e9404c27
JG
567
568 channel->live_timer_enabled = 0;
d3e2ba59
JD
569}
570
571/*
e9404c27
JG
572 * Set the channel's monitoring timer.
573 *
574 * Returns a negative value on error, 0 if a timer was created, and
575 * a positive value if no timer was created (not an error).
d3e2ba59 576 */
e9404c27
JG
577int consumer_timer_monitor_start(struct lttng_consumer_channel *channel,
578 unsigned int monitor_timer_interval_us)
d3e2ba59
JD
579{
580 int ret;
581
582 assert(channel);
e9404c27
JG
583 assert(channel->key);
584 assert(!channel->monitor_timer_enabled);
d3e2ba59 585
e9404c27
JG
586 ret = consumer_channel_timer_start(&channel->monitor_timer, channel,
587 monitor_timer_interval_us, LTTNG_CONSUMER_SIG_MONITOR);
588 channel->monitor_timer_enabled = !!(ret == 0);
589 return ret;
590}
591
592/*
593 * Stop and delete the channel's monitoring timer.
594 */
595int consumer_timer_monitor_stop(struct lttng_consumer_channel *channel)
596{
597 int ret;
598
599 assert(channel);
600 assert(channel->monitor_timer_enabled);
601
602 ret = consumer_channel_timer_stop(&channel->monitor_timer,
603 LTTNG_CONSUMER_SIG_MONITOR);
d3e2ba59 604 if (ret == -1) {
e9404c27
JG
605 ERR("Failed to stop live timer");
606 goto end;
d3e2ba59
JD
607 }
608
e9404c27
JG
609 channel->monitor_timer_enabled = 0;
610end:
611 return ret;
d3e2ba59
JD
612}
613
331744e3
JD
614/*
615 * Block the RT signals for the entire process. It must be called from the
616 * consumer main before creating the threads
617 */
73664f81 618int consumer_signal_init(void)
331744e3
JD
619{
620 int ret;
621 sigset_t mask;
622
623 /* Block signal for entire process, so only our thread processes it. */
624 setmask(&mask);
625 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
626 if (ret) {
627 errno = ret;
628 PERROR("pthread_sigmask");
73664f81 629 return -1;
331744e3 630 }
73664f81 631 return 0;
331744e3
JD
632}
633
e9404c27
JG
634static
635int sample_channel_positions(struct lttng_consumer_channel *channel,
e8360425 636 uint64_t *_highest_use, uint64_t *_lowest_use, uint64_t *_total_consumed,
e9404c27
JG
637 sample_positions_cb sample, get_consumed_cb get_consumed,
638 get_produced_cb get_produced)
639{
23bc9bb5 640 int ret = 0;
e9404c27
JG
641 struct lttng_ht_iter iter;
642 struct lttng_consumer_stream *stream;
643 bool empty_channel = true;
644 uint64_t high = 0, low = UINT64_MAX;
645 struct lttng_ht *ht = consumer_data.stream_per_chan_id_ht;
646
e8360425
JD
647 *_total_consumed = 0;
648
e9404c27
JG
649 rcu_read_lock();
650
651 cds_lfht_for_each_entry_duplicate(ht->ht,
652 ht->hash_fct(&channel->key, lttng_ht_seed),
653 ht->match_fct, &channel->key,
654 &iter.iter, stream, node_channel_id.node) {
655 unsigned long produced, consumed, usage;
656
657 empty_channel = false;
658
659 pthread_mutex_lock(&stream->lock);
660 if (cds_lfht_is_node_deleted(&stream->node.node)) {
661 goto next;
662 }
663
664 ret = sample(stream);
665 if (ret) {
666 ERR("Failed to take buffer position snapshot in monitor timer (ret = %d)", ret);
667 pthread_mutex_unlock(&stream->lock);
668 goto end;
669 }
670 ret = get_consumed(stream, &consumed);
671 if (ret) {
672 ERR("Failed to get buffer consumed position in monitor timer");
673 pthread_mutex_unlock(&stream->lock);
674 goto end;
675 }
676 ret = get_produced(stream, &produced);
677 if (ret) {
678 ERR("Failed to get buffer produced position in monitor timer");
679 pthread_mutex_unlock(&stream->lock);
680 goto end;
681 }
682
683 usage = produced - consumed;
684 high = (usage > high) ? usage : high;
685 low = (usage < low) ? usage : low;
e8360425
JD
686
687 /*
688 * We don't use consumed here for 2 reasons:
689 * - output_written takes into account the padding written in the
690 * tracefiles when we stop the session;
691 * - the consumed position is not the accurate representation of what
692 * was extracted from a buffer in overwrite mode.
693 */
694 *_total_consumed += stream->output_written;
e9404c27
JG
695 next:
696 pthread_mutex_unlock(&stream->lock);
697 }
698
699 *_highest_use = high;
700 *_lowest_use = low;
701end:
702 rcu_read_unlock();
703 if (empty_channel) {
704 ret = -1;
705 }
706 return ret;
707}
708
709/*
710 * Execute action on a monitor timer.
711 */
712static
5704917f 713void monitor_timer(struct lttng_consumer_channel *channel)
e9404c27
JG
714{
715 int ret;
716 int channel_monitor_pipe =
717 consumer_timer_thread_get_channel_monitor_pipe();
718 struct lttcomm_consumer_channel_monitor_msg msg = {
719 .key = channel->key,
720 };
721 sample_positions_cb sample;
722 get_consumed_cb get_consumed;
723 get_produced_cb get_produced;
724
725 assert(channel);
e9404c27
JG
726
727 if (channel_monitor_pipe < 0) {
873dda4e 728 return;
e9404c27
JG
729 }
730
731 switch (consumer_data.type) {
732 case LTTNG_CONSUMER_KERNEL:
733 sample = lttng_kconsumer_sample_snapshot_positions;
734 get_consumed = lttng_kconsumer_get_consumed_snapshot;
735 get_produced = lttng_kconsumer_get_produced_snapshot;
736 break;
737 case LTTNG_CONSUMER32_UST:
738 case LTTNG_CONSUMER64_UST:
739 sample = lttng_ustconsumer_sample_snapshot_positions;
740 get_consumed = lttng_ustconsumer_get_consumed_snapshot;
741 get_produced = lttng_ustconsumer_get_produced_snapshot;
742 break;
743 default:
744 abort();
745 }
746
747 ret = sample_channel_positions(channel, &msg.highest, &msg.lowest,
e8360425 748 &msg.total_consumed, sample, get_consumed, get_produced);
e9404c27 749 if (ret) {
873dda4e 750 return;
e9404c27
JG
751 }
752
753 /*
754 * Writes performed here are assumed to be atomic which is only
755 * guaranteed for sizes < than PIPE_BUF.
756 */
757 assert(sizeof(msg) <= PIPE_BUF);
758
759 do {
760 ret = write(channel_monitor_pipe, &msg, sizeof(msg));
761 } while (ret == -1 && errno == EINTR);
762 if (ret == -1) {
763 if (errno == EAGAIN) {
764 /* Not an error, the sample is merely dropped. */
765 DBG("Channel monitor pipe is full; dropping sample for channel key = %"PRIu64,
766 channel->key);
767 } else {
768 PERROR("write to the channel monitor pipe");
769 }
770 } else {
771 DBG("Sent channel monitoring sample for channel key %" PRIu64
772 ", (highest = %" PRIu64 ", lowest = %"PRIu64")",
773 channel->key, msg.highest, msg.lowest);
774 }
e9404c27
JG
775}
776
777int consumer_timer_thread_get_channel_monitor_pipe(void)
778{
779 return uatomic_read(&channel_monitor_pipe);
780}
781
782int consumer_timer_thread_set_channel_monitor_pipe(int fd)
783{
784 int ret;
785
786 ret = uatomic_cmpxchg(&channel_monitor_pipe, -1, fd);
787 if (ret != -1) {
788 ret = -1;
789 goto end;
790 }
791 ret = 0;
792end:
793 return ret;
794}
795
331744e3 796/*
d3e2ba59 797 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
e9404c27 798 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
13675d0e 799 * LTTNG_CONSUMER_SIG_MONITOR, LTTNG_CONSUMER_SIG_EXIT.
331744e3 800 */
d3e2ba59 801void *consumer_timer_thread(void *data)
331744e3
JD
802{
803 int signr;
804 sigset_t mask;
805 siginfo_t info;
806 struct lttng_consumer_local_data *ctx = data;
807
8a9acb74
MD
808 rcu_register_thread();
809
1fc79fb4
MD
810 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
811
2d57de81
MD
812 if (testpoint(consumerd_thread_metadata_timer)) {
813 goto error_testpoint;
814 }
815
9ce5646a
MD
816 health_code_update();
817
331744e3
JD
818 /* Only self thread will receive signal mask. */
819 setmask(&mask);
820 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
821
822 while (1) {
9ce5646a
MD
823 health_code_update();
824
825 health_poll_entry();
331744e3 826 signr = sigwaitinfo(&mask, &info);
9ce5646a 827 health_poll_exit();
e9404c27
JG
828
829 /*
830 * NOTE: cascading conditions are used instead of a switch case
831 * since the use of SIGRTMIN in the definition of the signals'
832 * values prevents the reduction to an integer constant.
833 */
331744e3
JD
834 if (signr == -1) {
835 if (errno != EINTR) {
836 PERROR("sigwaitinfo");
837 }
838 continue;
839 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
d7e2822f 840 metadata_switch_timer(ctx, &info);
331744e3
JD
841 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
842 cmm_smp_mb();
843 CMM_STORE_SHARED(timer_signal.qs_done, 1);
844 cmm_smp_mb();
845 DBG("Signal timer metadata thread teardown");
d3e2ba59 846 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
d7e2822f 847 live_timer(ctx, &info);
e9404c27
JG
848 } else if (signr == LTTNG_CONSUMER_SIG_MONITOR) {
849 struct lttng_consumer_channel *channel;
850
851 channel = info.si_value.sival_ptr;
5704917f 852 monitor_timer(channel);
13675d0e
MD
853 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
854 assert(CMM_LOAD_SHARED(consumer_quit));
855 goto end;
331744e3
JD
856 } else {
857 ERR("Unexpected signal %d\n", info.si_signo);
858 }
859 }
860
2d57de81
MD
861error_testpoint:
862 /* Only reached in testpoint error */
863 health_error();
13675d0e 864end:
1fc79fb4 865 health_unregister(health_consumerd);
8a9acb74 866 rcu_unregister_thread();
331744e3
JD
867 return NULL;
868}
This page took 0.08234 seconds and 4 git commands to generate.