Implement the relayd live features
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
d3e2ba59 24#include <lttng/ust-ctl.h>
331744e3 25#include <common/common.h>
d3e2ba59
JD
26#include <common/kernel-ctl/kernel-ctl.h>
27#include <common/kernel-consumer/kernel-consumer.h>
28#include <common/consumer-stream.h>
331744e3
JD
29
30#include "consumer-timer.h"
31#include "ust-consumer/ust-consumer.h"
32
2b8f8754
MD
33static struct timer_signal_data timer_signal = {
34 .tid = 0,
35 .setup_done = 0,
36 .qs_done = 0,
37 .lock = PTHREAD_MUTEX_INITIALIZER,
38};
331744e3
JD
39
40/*
41 * Set custom signal mask to current thread.
42 */
43static void setmask(sigset_t *mask)
44{
45 int ret;
46
47 ret = sigemptyset(mask);
48 if (ret) {
49 PERROR("sigemptyset");
50 }
51 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
52 if (ret) {
d3e2ba59 53 PERROR("sigaddset switch");
331744e3
JD
54 }
55 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
56 if (ret) {
d3e2ba59
JD
57 PERROR("sigaddset teardown");
58 }
59 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
60 if (ret) {
61 PERROR("sigaddset live");
331744e3
JD
62 }
63}
64
65/*
66 * Execute action on a timer switch.
d98a47c7
MD
67 *
68 * Beware: metadata_switch_timer() should *never* take a mutex also held
69 * while consumer_timer_switch_stop() is called. It would result in
70 * deadlocks.
331744e3
JD
71 */
72static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
73 int sig, siginfo_t *si, void *uc)
74{
75 int ret;
76 struct lttng_consumer_channel *channel;
77
78 channel = si->si_value.sival_ptr;
79 assert(channel);
80
4419b4fb
MD
81 if (channel->switch_timer_error) {
82 return;
83 }
84
331744e3
JD
85 DBG("Switch timer for channel %" PRIu64, channel->key);
86 switch (ctx->type) {
87 case LTTNG_CONSUMER32_UST:
88 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
89 /*
90 * Locks taken by lttng_ustconsumer_request_metadata():
91 * - metadata_socket_lock
92 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 93 * - channel->metadata_cache->lock
4fa3dc0e 94 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
95 * - channel->timer_lock
96 * - channel->metadata_cache->lock
4fa3dc0e 97 *
5e41ebe1
MD
98 * Ensure that neither consumer_data.lock nor
99 * channel->lock are taken within this function, since
100 * they are held while consumer_timer_switch_stop() is
101 * called.
4fa3dc0e 102 */
5e41ebe1 103 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1);
331744e3 104 if (ret < 0) {
4419b4fb 105 channel->switch_timer_error = 1;
331744e3
JD
106 }
107 break;
108 case LTTNG_CONSUMER_KERNEL:
109 case LTTNG_CONSUMER_UNKNOWN:
110 assert(0);
111 break;
112 }
113}
114
d3e2ba59
JD
115static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts)
116{
117 int ret;
118 struct lttng_packet_index index;
119
120 memset(&index, 0, sizeof(index));
121 index.timestamp_end = htobe64(ts);
122 ret = consumer_stream_write_index(stream, &index);
123 if (ret < 0) {
124 goto error;
125 }
126
127error:
128 return ret;
129}
130
131static int check_kernel_stream(struct lttng_consumer_stream *stream)
132{
133 uint64_t ts;
134 int ret;
135
136 /*
137 * While holding the stream mutex, try to take a snapshot, if it
138 * succeeds, it means that data is ready to be sent, just let the data
139 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
140 * means that there is no data to read after the flush, so we can
141 * safely send the empty index.
142 */
143 pthread_mutex_lock(&stream->lock);
144 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
145 if (ret < 0) {
146 ERR("Failed to get the current timestamp");
147 goto error_unlock;
148 }
149 ret = kernctl_buffer_flush(stream->wait_fd);
150 if (ret < 0) {
151 ERR("Failed to flush kernel stream");
152 goto error_unlock;
153 }
154 ret = kernctl_snapshot(stream->wait_fd);
155 if (ret < 0) {
156 if (errno != EAGAIN) {
157 ERR("Taking kernel snapshot");
158 ret = -1;
159 goto error_unlock;
160 }
161 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
162 ret = send_empty_index(stream, ts);
163 if (ret < 0) {
164 goto error_unlock;
165 }
166 }
167 ret = 0;
168
169error_unlock:
170 pthread_mutex_unlock(&stream->lock);
171 return ret;
172}
173
174static int check_ust_stream(struct lttng_consumer_stream *stream)
175{
176 uint64_t ts;
177 int ret;
178
179 assert(stream);
180 assert(stream->ustream);
181 /*
182 * While holding the stream mutex, try to take a snapshot, if it
183 * succeeds, it means that data is ready to be sent, just let the data
184 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
185 * means that there is no data to read after the flush, so we can
186 * safely send the empty index.
187 */
188 pthread_mutex_lock(&stream->lock);
189 ret = ustctl_get_current_timestamp(stream->ustream, &ts);
190 if (ret < 0) {
191 ERR("Failed to get the current timestamp");
192 goto error_unlock;
193 }
194 ustctl_flush_buffer(stream->ustream, 1);
195 ret = ustctl_snapshot(stream->ustream);
196 if (ret < 0) {
197 if (errno != EAGAIN) {
198 ERR("Taking UST snapshot");
199 ret = -1;
200 goto error_unlock;
201 }
202 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
203 ret = send_empty_index(stream, ts);
204 if (ret < 0) {
205 goto error_unlock;
206 }
207 }
208 ret = 0;
209
210error_unlock:
211 pthread_mutex_unlock(&stream->lock);
212 return ret;
213}
214
215/*
216 * Execute action on a live timer
217 */
218static void live_timer(struct lttng_consumer_local_data *ctx,
219 int sig, siginfo_t *si, void *uc)
220{
221 int ret;
222 struct lttng_consumer_channel *channel;
223 struct lttng_consumer_stream *stream;
224 struct lttng_ht *ht;
225 struct lttng_ht_iter iter;
226
227 channel = si->si_value.sival_ptr;
228 assert(channel);
229
230 if (channel->switch_timer_error) {
231 goto error;
232 }
233 ht = consumer_data.stream_per_chan_id_ht;
234
235 DBG("Live timer for channel %" PRIu64, channel->key);
236
237 rcu_read_lock();
238 switch (ctx->type) {
239 case LTTNG_CONSUMER32_UST:
240 case LTTNG_CONSUMER64_UST:
241 cds_lfht_for_each_entry_duplicate(ht->ht,
242 ht->hash_fct(&channel->key, lttng_ht_seed),
243 ht->match_fct, &channel->key, &iter.iter,
244 stream, node_channel_id.node) {
245 ret = check_ust_stream(stream);
246 if (ret < 0) {
247 goto error_unlock;
248 }
249 }
250 break;
251 case LTTNG_CONSUMER_KERNEL:
252 cds_lfht_for_each_entry_duplicate(ht->ht,
253 ht->hash_fct(&channel->key, lttng_ht_seed),
254 ht->match_fct, &channel->key, &iter.iter,
255 stream, node_channel_id.node) {
256 ret = check_kernel_stream(stream);
257 if (ret < 0) {
258 goto error_unlock;
259 }
260 }
261 break;
262 case LTTNG_CONSUMER_UNKNOWN:
263 assert(0);
264 break;
265 }
266
267error_unlock:
268 rcu_read_unlock();
269
270error:
271 return;
272}
273
2b8f8754
MD
274static
275void consumer_timer_signal_thread_qs(unsigned int signr)
276{
277 sigset_t pending_set;
278 int ret;
279
280 /*
281 * We need to be the only thread interacting with the thread
282 * that manages signals for teardown synchronization.
283 */
284 pthread_mutex_lock(&timer_signal.lock);
285
286 /* Ensure we don't have any signal queued for this channel. */
287 for (;;) {
288 ret = sigemptyset(&pending_set);
289 if (ret == -1) {
290 PERROR("sigemptyset");
291 }
292 ret = sigpending(&pending_set);
293 if (ret == -1) {
294 PERROR("sigpending");
295 }
296 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
297 break;
298 }
299 caa_cpu_relax();
300 }
301
302 /*
303 * From this point, no new signal handler will be fired that would try to
304 * access "chan". However, we still need to wait for any currently
305 * executing handler to complete.
306 */
307 cmm_smp_mb();
308 CMM_STORE_SHARED(timer_signal.qs_done, 0);
309 cmm_smp_mb();
310
311 /*
312 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
313 * up.
314 */
315 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
316
317 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
318 caa_cpu_relax();
319 }
320 cmm_smp_mb();
321
322 pthread_mutex_unlock(&timer_signal.lock);
323}
324
331744e3
JD
325/*
326 * Set the timer for periodical metadata flush.
327 */
328void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
329 unsigned int switch_timer_interval)
330{
331 int ret;
332 struct sigevent sev;
333 struct itimerspec its;
334
335 assert(channel);
336 assert(channel->key);
337
338 if (switch_timer_interval == 0) {
339 return;
340 }
341
342 sev.sigev_notify = SIGEV_SIGNAL;
343 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
344 sev.sigev_value.sival_ptr = channel;
345 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
346 if (ret == -1) {
347 PERROR("timer_create");
348 }
349 channel->switch_timer_enabled = 1;
350
351 its.it_value.tv_sec = switch_timer_interval / 1000000;
352 its.it_value.tv_nsec = switch_timer_interval % 1000000;
353 its.it_interval.tv_sec = its.it_value.tv_sec;
354 its.it_interval.tv_nsec = its.it_value.tv_nsec;
355
356 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
357 if (ret == -1) {
358 PERROR("timer_settime");
359 }
360}
361
362/*
363 * Stop and delete timer.
364 */
365void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
366{
367 int ret;
331744e3
JD
368
369 assert(channel);
370
371 ret = timer_delete(channel->switch_timer);
372 if (ret == -1) {
373 PERROR("timer_delete");
374 }
375
2b8f8754 376 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 377
2b8f8754
MD
378 channel->switch_timer = 0;
379 channel->switch_timer_enabled = 0;
331744e3
JD
380}
381
d3e2ba59
JD
382/*
383 * Set the timer for the live mode.
384 */
385void consumer_timer_live_start(struct lttng_consumer_channel *channel,
386 int live_timer_interval)
387{
388 int ret;
389 struct sigevent sev;
390 struct itimerspec its;
391
392 assert(channel);
393 assert(channel->key);
394
395 if (live_timer_interval == 0) {
396 return;
397 }
398
399 sev.sigev_notify = SIGEV_SIGNAL;
400 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
401 sev.sigev_value.sival_ptr = channel;
402 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
403 if (ret == -1) {
404 PERROR("timer_create");
405 }
406 channel->live_timer_enabled = 1;
407
408 its.it_value.tv_sec = live_timer_interval / 1000000;
409 its.it_value.tv_nsec = live_timer_interval % 1000000;
410 its.it_interval.tv_sec = its.it_value.tv_sec;
411 its.it_interval.tv_nsec = its.it_value.tv_nsec;
412
413 ret = timer_settime(channel->live_timer, 0, &its, NULL);
414 if (ret == -1) {
415 PERROR("timer_settime");
416 }
417}
418
419/*
420 * Stop and delete timer.
421 */
422void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
423{
424 int ret;
425
426 assert(channel);
427
428 ret = timer_delete(channel->live_timer);
429 if (ret == -1) {
430 PERROR("timer_delete");
431 }
432
433 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
434
435 channel->live_timer = 0;
436 channel->live_timer_enabled = 0;
437}
438
331744e3
JD
439/*
440 * Block the RT signals for the entire process. It must be called from the
441 * consumer main before creating the threads
442 */
443void consumer_signal_init(void)
444{
445 int ret;
446 sigset_t mask;
447
448 /* Block signal for entire process, so only our thread processes it. */
449 setmask(&mask);
450 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
451 if (ret) {
452 errno = ret;
453 PERROR("pthread_sigmask");
454 }
455}
456
457/*
d3e2ba59
JD
458 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
459 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 460 */
d3e2ba59 461void *consumer_timer_thread(void *data)
331744e3
JD
462{
463 int signr;
464 sigset_t mask;
465 siginfo_t info;
466 struct lttng_consumer_local_data *ctx = data;
467
468 /* Only self thread will receive signal mask. */
469 setmask(&mask);
470 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
471
472 while (1) {
473 signr = sigwaitinfo(&mask, &info);
474 if (signr == -1) {
475 if (errno != EINTR) {
476 PERROR("sigwaitinfo");
477 }
478 continue;
479 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
480 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
481 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
482 cmm_smp_mb();
483 CMM_STORE_SHARED(timer_signal.qs_done, 1);
484 cmm_smp_mb();
485 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
486 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
487 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
488 } else {
489 ERR("Unexpected signal %d\n", info.si_signo);
490 }
491 }
492
493 return NULL;
494}
This page took 0.045636 seconds and 4 git commands to generate.