Fix: join consumer timer thread
[lttng-tools.git] / src / common / consumer / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
6c1c0768 19#define _LGPL_SOURCE
331744e3
JD
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
f263b7fd 26#include <common/compat/endian.h>
d3e2ba59
JD
27#include <common/kernel-ctl/kernel-ctl.h>
28#include <common/kernel-consumer/kernel-consumer.h>
c8fea79c
JR
29#include <common/consumer/consumer-stream.h>
30#include <common/consumer/consumer-timer.h>
31#include <common/consumer/consumer-testpoint.h>
32#include <common/ust-consumer/ust-consumer.h>
331744e3 33
2b8f8754
MD
34static struct timer_signal_data timer_signal = {
35 .tid = 0,
36 .setup_done = 0,
37 .qs_done = 0,
38 .lock = PTHREAD_MUTEX_INITIALIZER,
39};
331744e3
JD
40
41/*
42 * Set custom signal mask to current thread.
43 */
44static void setmask(sigset_t *mask)
45{
46 int ret;
47
48 ret = sigemptyset(mask);
49 if (ret) {
50 PERROR("sigemptyset");
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
53 if (ret) {
d3e2ba59 54 PERROR("sigaddset switch");
331744e3
JD
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
57 if (ret) {
d3e2ba59
JD
58 PERROR("sigaddset teardown");
59 }
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
61 if (ret) {
62 PERROR("sigaddset live");
331744e3 63 }
bec11ce9
MD
64 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_EXIT);
65 if (ret) {
66 PERROR("sigaddset exit");
67 }
331744e3
JD
68}
69
70/*
71 * Execute action on a timer switch.
d98a47c7
MD
72 *
73 * Beware: metadata_switch_timer() should *never* take a mutex also held
74 * while consumer_timer_switch_stop() is called. It would result in
75 * deadlocks.
331744e3
JD
76 */
77static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
78 int sig, siginfo_t *si, void *uc)
79{
80 int ret;
81 struct lttng_consumer_channel *channel;
82
83 channel = si->si_value.sival_ptr;
84 assert(channel);
85
4419b4fb
MD
86 if (channel->switch_timer_error) {
87 return;
88 }
89
331744e3
JD
90 DBG("Switch timer for channel %" PRIu64, channel->key);
91 switch (ctx->type) {
92 case LTTNG_CONSUMER32_UST:
93 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
94 /*
95 * Locks taken by lttng_ustconsumer_request_metadata():
96 * - metadata_socket_lock
97 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 98 * - channel->metadata_cache->lock
4fa3dc0e 99 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
100 * - channel->timer_lock
101 * - channel->metadata_cache->lock
4fa3dc0e 102 *
5e41ebe1
MD
103 * Ensure that neither consumer_data.lock nor
104 * channel->lock are taken within this function, since
105 * they are held while consumer_timer_switch_stop() is
106 * called.
4fa3dc0e 107 */
94d49140 108 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 109 if (ret < 0) {
4419b4fb 110 channel->switch_timer_error = 1;
331744e3
JD
111 }
112 break;
113 case LTTNG_CONSUMER_KERNEL:
114 case LTTNG_CONSUMER_UNKNOWN:
115 assert(0);
116 break;
117 }
118}
119
528f2ffa
JD
120static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
121 uint64_t stream_id)
d3e2ba59
JD
122{
123 int ret;
50adc264 124 struct ctf_packet_index index;
d3e2ba59
JD
125
126 memset(&index, 0, sizeof(index));
528f2ffa 127 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
128 index.timestamp_end = htobe64(ts);
129 ret = consumer_stream_write_index(stream, &index);
130 if (ret < 0) {
131 goto error;
132 }
133
134error:
135 return ret;
136}
137
c585821b 138int consumer_flush_kernel_index(struct lttng_consumer_stream *stream)
d3e2ba59 139{
528f2ffa 140 uint64_t ts, stream_id;
d3e2ba59
JD
141 int ret;
142
d3e2ba59
JD
143 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
144 if (ret < 0) {
145 ERR("Failed to get the current timestamp");
c585821b 146 goto end;
d3e2ba59
JD
147 }
148 ret = kernctl_buffer_flush(stream->wait_fd);
149 if (ret < 0) {
150 ERR("Failed to flush kernel stream");
c585821b 151 goto end;
d3e2ba59
JD
152 }
153 ret = kernctl_snapshot(stream->wait_fd);
154 if (ret < 0) {
08b1dcd3
DG
155 if (errno != EAGAIN && errno != ENODATA) {
156 PERROR("live timer kernel snapshot");
d3e2ba59 157 ret = -1;
c585821b 158 goto end;
d3e2ba59 159 }
528f2ffa
JD
160 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
161 if (ret < 0) {
162 PERROR("kernctl_get_stream_id");
c585821b 163 goto end;
528f2ffa 164 }
d3e2ba59 165 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 166 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 167 if (ret < 0) {
c585821b 168 goto end;
d3e2ba59
JD
169 }
170 }
171 ret = 0;
c585821b 172end:
d3e2ba59
JD
173 return ret;
174}
175
c585821b 176static int check_kernel_stream(struct lttng_consumer_stream *stream)
d3e2ba59 177{
d3e2ba59
JD
178 int ret;
179
d3e2ba59
JD
180 /*
181 * While holding the stream mutex, try to take a snapshot, if it
182 * succeeds, it means that data is ready to be sent, just let the data
183 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
184 * means that there is no data to read after the flush, so we can
185 * safely send the empty index.
c585821b
MD
186 *
187 * Doing a trylock and checking if waiting on metadata if
188 * trylock fails. Bail out of the stream is indeed waiting for
189 * metadata to be pushed. Busy wait on trylock otherwise.
d3e2ba59 190 */
c585821b
MD
191 for (;;) {
192 ret = pthread_mutex_trylock(&stream->lock);
193 switch (ret) {
194 case 0:
195 break; /* We have the lock. */
196 case EBUSY:
197 pthread_mutex_lock(&stream->metadata_timer_lock);
198 if (stream->waiting_on_metadata) {
199 ret = 0;
200 stream->missed_metadata_flush = true;
201 pthread_mutex_unlock(&stream->metadata_timer_lock);
202 goto end; /* Bail out. */
203 }
204 pthread_mutex_unlock(&stream->metadata_timer_lock);
205 /* Try again. */
206 caa_cpu_relax();
207 continue;
208 default:
209 ERR("Unexpected pthread_mutex_trylock error %d", ret);
210 ret = -1;
211 goto end;
212 }
213 break;
214 }
215 ret = consumer_flush_kernel_index(stream);
216 pthread_mutex_unlock(&stream->lock);
217end:
218 return ret;
219}
220
221int consumer_flush_ust_index(struct lttng_consumer_stream *stream)
222{
223 uint64_t ts, stream_id;
224 int ret;
225
94d49140
JD
226 ret = cds_lfht_is_node_deleted(&stream->node.node);
227 if (ret) {
c585821b 228 goto end;
94d49140
JD
229 }
230
84a182ce 231 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
232 if (ret < 0) {
233 ERR("Failed to get the current timestamp");
c585821b 234 goto end;
d3e2ba59 235 }
84a182ce
DG
236 lttng_ustconsumer_flush_buffer(stream, 1);
237 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 238 if (ret < 0) {
94d49140 239 if (ret != -EAGAIN) {
d3e2ba59
JD
240 ERR("Taking UST snapshot");
241 ret = -1;
c585821b 242 goto end;
d3e2ba59 243 }
70190e1c 244 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
528f2ffa
JD
245 if (ret < 0) {
246 PERROR("ustctl_get_stream_id");
c585821b 247 goto end;
528f2ffa 248 }
d3e2ba59 249 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
528f2ffa 250 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59 251 if (ret < 0) {
c585821b 252 goto end;
d3e2ba59
JD
253 }
254 }
255 ret = 0;
c585821b
MD
256end:
257 return ret;
258}
d3e2ba59 259
c585821b
MD
260static int check_ust_stream(struct lttng_consumer_stream *stream)
261{
262 int ret;
263
264 assert(stream);
265 assert(stream->ustream);
266 /*
267 * While holding the stream mutex, try to take a snapshot, if it
268 * succeeds, it means that data is ready to be sent, just let the data
269 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
270 * means that there is no data to read after the flush, so we can
271 * safely send the empty index.
272 *
273 * Doing a trylock and checking if waiting on metadata if
274 * trylock fails. Bail out of the stream is indeed waiting for
275 * metadata to be pushed. Busy wait on trylock otherwise.
276 */
277 for (;;) {
278 ret = pthread_mutex_trylock(&stream->lock);
279 switch (ret) {
280 case 0:
281 break; /* We have the lock. */
282 case EBUSY:
283 pthread_mutex_lock(&stream->metadata_timer_lock);
284 if (stream->waiting_on_metadata) {
285 ret = 0;
286 stream->missed_metadata_flush = true;
287 pthread_mutex_unlock(&stream->metadata_timer_lock);
288 goto end; /* Bail out. */
289 }
290 pthread_mutex_unlock(&stream->metadata_timer_lock);
291 /* Try again. */
292 caa_cpu_relax();
293 continue;
294 default:
295 ERR("Unexpected pthread_mutex_trylock error %d", ret);
296 ret = -1;
297 goto end;
298 }
299 break;
300 }
301 ret = consumer_flush_ust_index(stream);
d3e2ba59 302 pthread_mutex_unlock(&stream->lock);
c585821b 303end:
d3e2ba59
JD
304 return ret;
305}
306
307/*
308 * Execute action on a live timer
309 */
310static void live_timer(struct lttng_consumer_local_data *ctx,
311 int sig, siginfo_t *si, void *uc)
312{
313 int ret;
314 struct lttng_consumer_channel *channel;
315 struct lttng_consumer_stream *stream;
316 struct lttng_ht *ht;
317 struct lttng_ht_iter iter;
318
319 channel = si->si_value.sival_ptr;
320 assert(channel);
321
322 if (channel->switch_timer_error) {
323 goto error;
324 }
325 ht = consumer_data.stream_per_chan_id_ht;
326
327 DBG("Live timer for channel %" PRIu64, channel->key);
328
329 rcu_read_lock();
330 switch (ctx->type) {
331 case LTTNG_CONSUMER32_UST:
332 case LTTNG_CONSUMER64_UST:
333 cds_lfht_for_each_entry_duplicate(ht->ht,
334 ht->hash_fct(&channel->key, lttng_ht_seed),
335 ht->match_fct, &channel->key, &iter.iter,
336 stream, node_channel_id.node) {
337 ret = check_ust_stream(stream);
338 if (ret < 0) {
339 goto error_unlock;
340 }
341 }
342 break;
343 case LTTNG_CONSUMER_KERNEL:
344 cds_lfht_for_each_entry_duplicate(ht->ht,
345 ht->hash_fct(&channel->key, lttng_ht_seed),
346 ht->match_fct, &channel->key, &iter.iter,
347 stream, node_channel_id.node) {
348 ret = check_kernel_stream(stream);
349 if (ret < 0) {
350 goto error_unlock;
351 }
352 }
353 break;
354 case LTTNG_CONSUMER_UNKNOWN:
355 assert(0);
356 break;
357 }
358
359error_unlock:
360 rcu_read_unlock();
361
362error:
363 return;
364}
365
2b8f8754
MD
366static
367void consumer_timer_signal_thread_qs(unsigned int signr)
368{
369 sigset_t pending_set;
370 int ret;
371
372 /*
373 * We need to be the only thread interacting with the thread
374 * that manages signals for teardown synchronization.
375 */
376 pthread_mutex_lock(&timer_signal.lock);
377
378 /* Ensure we don't have any signal queued for this channel. */
379 for (;;) {
380 ret = sigemptyset(&pending_set);
381 if (ret == -1) {
382 PERROR("sigemptyset");
383 }
384 ret = sigpending(&pending_set);
385 if (ret == -1) {
386 PERROR("sigpending");
387 }
90aa9dae 388 if (!sigismember(&pending_set, signr)) {
2b8f8754
MD
389 break;
390 }
391 caa_cpu_relax();
392 }
393
394 /*
395 * From this point, no new signal handler will be fired that would try to
396 * access "chan". However, we still need to wait for any currently
397 * executing handler to complete.
398 */
399 cmm_smp_mb();
400 CMM_STORE_SHARED(timer_signal.qs_done, 0);
401 cmm_smp_mb();
402
403 /*
404 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
405 * up.
406 */
407 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
408
409 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
410 caa_cpu_relax();
411 }
412 cmm_smp_mb();
413
414 pthread_mutex_unlock(&timer_signal.lock);
415}
416
331744e3
JD
417/*
418 * Set the timer for periodical metadata flush.
419 */
420void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
421 unsigned int switch_timer_interval)
422{
423 int ret;
424 struct sigevent sev;
425 struct itimerspec its;
426
427 assert(channel);
428 assert(channel->key);
429
430 if (switch_timer_interval == 0) {
431 return;
432 }
433
434 sev.sigev_notify = SIGEV_SIGNAL;
435 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
436 sev.sigev_value.sival_ptr = channel;
437 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
438 if (ret == -1) {
439 PERROR("timer_create");
440 }
441 channel->switch_timer_enabled = 1;
442
443 its.it_value.tv_sec = switch_timer_interval / 1000000;
69f60d21 444 its.it_value.tv_nsec = (switch_timer_interval % 1000000) * 1000;
331744e3
JD
445 its.it_interval.tv_sec = its.it_value.tv_sec;
446 its.it_interval.tv_nsec = its.it_value.tv_nsec;
447
448 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
449 if (ret == -1) {
450 PERROR("timer_settime");
451 }
452}
453
454/*
455 * Stop and delete timer.
456 */
457void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
458{
459 int ret;
331744e3
JD
460
461 assert(channel);
462
463 ret = timer_delete(channel->switch_timer);
464 if (ret == -1) {
465 PERROR("timer_delete");
466 }
467
2b8f8754 468 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 469
2b8f8754
MD
470 channel->switch_timer = 0;
471 channel->switch_timer_enabled = 0;
331744e3
JD
472}
473
d3e2ba59
JD
474/*
475 * Set the timer for the live mode.
476 */
477void consumer_timer_live_start(struct lttng_consumer_channel *channel,
478 int live_timer_interval)
479{
480 int ret;
481 struct sigevent sev;
482 struct itimerspec its;
483
484 assert(channel);
485 assert(channel->key);
486
fac41e72 487 if (live_timer_interval <= 0) {
d3e2ba59
JD
488 return;
489 }
490
491 sev.sigev_notify = SIGEV_SIGNAL;
492 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
493 sev.sigev_value.sival_ptr = channel;
494 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
495 if (ret == -1) {
496 PERROR("timer_create");
497 }
498 channel->live_timer_enabled = 1;
499
500 its.it_value.tv_sec = live_timer_interval / 1000000;
69f60d21 501 its.it_value.tv_nsec = (live_timer_interval % 1000000) * 1000;
d3e2ba59
JD
502 its.it_interval.tv_sec = its.it_value.tv_sec;
503 its.it_interval.tv_nsec = its.it_value.tv_nsec;
504
505 ret = timer_settime(channel->live_timer, 0, &its, NULL);
506 if (ret == -1) {
507 PERROR("timer_settime");
508 }
509}
510
511/*
512 * Stop and delete timer.
513 */
514void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
515{
516 int ret;
517
518 assert(channel);
519
520 ret = timer_delete(channel->live_timer);
521 if (ret == -1) {
522 PERROR("timer_delete");
523 }
524
525 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
526
527 channel->live_timer = 0;
528 channel->live_timer_enabled = 0;
529}
530
331744e3
JD
531/*
532 * Block the RT signals for the entire process. It must be called from the
533 * consumer main before creating the threads
534 */
73664f81 535int consumer_signal_init(void)
331744e3
JD
536{
537 int ret;
538 sigset_t mask;
539
540 /* Block signal for entire process, so only our thread processes it. */
541 setmask(&mask);
542 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
543 if (ret) {
544 errno = ret;
545 PERROR("pthread_sigmask");
73664f81 546 return -1;
331744e3 547 }
73664f81 548 return 0;
331744e3
JD
549}
550
551/*
d3e2ba59 552 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
bec11ce9
MD
553 * LTTNG_CONSUMER_SIG_TEARDOWN, LTTNG_CONSUMER_SIG_LIVE, and
554 * LTTNG_CONSUMER_SIG_EXIT.
331744e3 555 */
d3e2ba59 556void *consumer_timer_thread(void *data)
331744e3
JD
557{
558 int signr;
559 sigset_t mask;
560 siginfo_t info;
561 struct lttng_consumer_local_data *ctx = data;
562
8a9acb74
MD
563 rcu_register_thread();
564
1fc79fb4
MD
565 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
566
2d57de81
MD
567 if (testpoint(consumerd_thread_metadata_timer)) {
568 goto error_testpoint;
569 }
570
9ce5646a
MD
571 health_code_update();
572
331744e3
JD
573 /* Only self thread will receive signal mask. */
574 setmask(&mask);
575 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
576
577 while (1) {
9ce5646a
MD
578 health_code_update();
579
580 health_poll_entry();
331744e3 581 signr = sigwaitinfo(&mask, &info);
9ce5646a 582 health_poll_exit();
331744e3
JD
583 if (signr == -1) {
584 if (errno != EINTR) {
585 PERROR("sigwaitinfo");
586 }
587 continue;
588 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
589 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
590 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
591 cmm_smp_mb();
592 CMM_STORE_SHARED(timer_signal.qs_done, 1);
593 cmm_smp_mb();
594 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
595 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
596 live_timer(ctx, info.si_signo, &info, NULL);
bec11ce9
MD
597 } else if (signr == LTTNG_CONSUMER_SIG_EXIT) {
598 assert(consumer_quit);
599 goto end;
331744e3
JD
600 } else {
601 ERR("Unexpected signal %d\n", info.si_signo);
602 }
603 }
604
2d57de81
MD
605error_testpoint:
606 /* Only reached in testpoint error */
607 health_error();
bec11ce9 608end:
1fc79fb4 609 health_unregister(health_consumerd);
8a9acb74 610 rcu_unregister_thread();
331744e3
JD
611 return NULL;
612}
This page took 0.079279 seconds and 4 git commands to generate.