Fix: ustctl_get_stream_id without UST support
[lttng-tools.git] / src / common / consumer-timer.c
CommitLineData
331744e3
JD
1/*
2 * Copyright (C) 2012 - Julien Desfossez <julien.desfossez@efficios.com>
3 * David Goulet <dgoulet@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or modify it
6 * under the terms of the GNU General Public License, version 2 only, as
7 * published by the Free Software Foundation.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
12 * more details.
13 *
14 * You should have received a copy of the GNU General Public License along with
15 * this program; if not, write to the Free Software Foundation, Inc., 51
16 * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
17 */
18
19#define _GNU_SOURCE
20#include <assert.h>
21#include <inttypes.h>
22#include <signal.h>
23
51a9e1c7 24#include <bin/lttng-consumerd/health-consumerd.h>
331744e3 25#include <common/common.h>
d3e2ba59
JD
26#include <common/kernel-ctl/kernel-ctl.h>
27#include <common/kernel-consumer/kernel-consumer.h>
28#include <common/consumer-stream.h>
331744e3
JD
29
30#include "consumer-timer.h"
ede905e6 31#include "consumer-testpoint.h"
331744e3
JD
32#include "ust-consumer/ust-consumer.h"
33
2b8f8754
MD
34static struct timer_signal_data timer_signal = {
35 .tid = 0,
36 .setup_done = 0,
37 .qs_done = 0,
38 .lock = PTHREAD_MUTEX_INITIALIZER,
39};
331744e3
JD
40
41/*
42 * Set custom signal mask to current thread.
43 */
44static void setmask(sigset_t *mask)
45{
46 int ret;
47
48 ret = sigemptyset(mask);
49 if (ret) {
50 PERROR("sigemptyset");
51 }
52 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_SWITCH);
53 if (ret) {
d3e2ba59 54 PERROR("sigaddset switch");
331744e3
JD
55 }
56 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_TEARDOWN);
57 if (ret) {
d3e2ba59
JD
58 PERROR("sigaddset teardown");
59 }
60 ret = sigaddset(mask, LTTNG_CONSUMER_SIG_LIVE);
61 if (ret) {
62 PERROR("sigaddset live");
331744e3
JD
63 }
64}
65
66/*
67 * Execute action on a timer switch.
d98a47c7
MD
68 *
69 * Beware: metadata_switch_timer() should *never* take a mutex also held
70 * while consumer_timer_switch_stop() is called. It would result in
71 * deadlocks.
331744e3
JD
72 */
73static void metadata_switch_timer(struct lttng_consumer_local_data *ctx,
74 int sig, siginfo_t *si, void *uc)
75{
76 int ret;
77 struct lttng_consumer_channel *channel;
78
79 channel = si->si_value.sival_ptr;
80 assert(channel);
81
4419b4fb
MD
82 if (channel->switch_timer_error) {
83 return;
84 }
85
331744e3
JD
86 DBG("Switch timer for channel %" PRIu64, channel->key);
87 switch (ctx->type) {
88 case LTTNG_CONSUMER32_UST:
89 case LTTNG_CONSUMER64_UST:
4fa3dc0e
MD
90 /*
91 * Locks taken by lttng_ustconsumer_request_metadata():
92 * - metadata_socket_lock
93 * - Calling lttng_ustconsumer_recv_metadata():
f82d9449 94 * - channel->metadata_cache->lock
4fa3dc0e 95 * - Calling consumer_metadata_cache_flushed():
5e41ebe1
MD
96 * - channel->timer_lock
97 * - channel->metadata_cache->lock
4fa3dc0e 98 *
5e41ebe1
MD
99 * Ensure that neither consumer_data.lock nor
100 * channel->lock are taken within this function, since
101 * they are held while consumer_timer_switch_stop() is
102 * called.
4fa3dc0e 103 */
94d49140 104 ret = lttng_ustconsumer_request_metadata(ctx, channel, 1, 1);
331744e3 105 if (ret < 0) {
4419b4fb 106 channel->switch_timer_error = 1;
331744e3
JD
107 }
108 break;
109 case LTTNG_CONSUMER_KERNEL:
110 case LTTNG_CONSUMER_UNKNOWN:
111 assert(0);
112 break;
113 }
114}
115
5e372a51
JD
116static int send_empty_index(struct lttng_consumer_stream *stream, uint64_t ts,
117 uint64_t stream_id)
d3e2ba59
JD
118{
119 int ret;
50adc264 120 struct ctf_packet_index index;
d3e2ba59
JD
121
122 memset(&index, 0, sizeof(index));
5e372a51 123 index.stream_id = htobe64(stream_id);
d3e2ba59
JD
124 index.timestamp_end = htobe64(ts);
125 ret = consumer_stream_write_index(stream, &index);
126 if (ret < 0) {
127 goto error;
128 }
129
130error:
131 return ret;
132}
133
134static int check_kernel_stream(struct lttng_consumer_stream *stream)
135{
5e372a51 136 uint64_t ts, stream_id;
d3e2ba59
JD
137 int ret;
138
139 /*
140 * While holding the stream mutex, try to take a snapshot, if it
141 * succeeds, it means that data is ready to be sent, just let the data
142 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
143 * means that there is no data to read after the flush, so we can
144 * safely send the empty index.
145 */
146 pthread_mutex_lock(&stream->lock);
147 ret = kernctl_get_current_timestamp(stream->wait_fd, &ts);
148 if (ret < 0) {
149 ERR("Failed to get the current timestamp");
150 goto error_unlock;
151 }
152 ret = kernctl_buffer_flush(stream->wait_fd);
153 if (ret < 0) {
154 ERR("Failed to flush kernel stream");
155 goto error_unlock;
156 }
157 ret = kernctl_snapshot(stream->wait_fd);
158 if (ret < 0) {
12c6de07
DG
159 if (errno != EAGAIN && errno != ENODATA) {
160 PERROR("live timer kernel snapshot");
d3e2ba59
JD
161 ret = -1;
162 goto error_unlock;
163 }
5e372a51
JD
164 ret = kernctl_get_stream_id(stream->wait_fd, &stream_id);
165 if (ret < 0) {
166 PERROR("kernctl_get_stream_id");
167 goto error_unlock;
168 }
d3e2ba59 169 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
5e372a51 170 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59
JD
171 if (ret < 0) {
172 goto error_unlock;
173 }
174 }
175 ret = 0;
176
177error_unlock:
178 pthread_mutex_unlock(&stream->lock);
179 return ret;
180}
181
182static int check_ust_stream(struct lttng_consumer_stream *stream)
183{
5e372a51 184 uint64_t ts, stream_id;
d3e2ba59
JD
185 int ret;
186
187 assert(stream);
188 assert(stream->ustream);
189 /*
190 * While holding the stream mutex, try to take a snapshot, if it
191 * succeeds, it means that data is ready to be sent, just let the data
192 * thread handle that. Otherwise, if the snapshot returns EAGAIN, it
193 * means that there is no data to read after the flush, so we can
194 * safely send the empty index.
195 */
196 pthread_mutex_lock(&stream->lock);
94d49140
JD
197 ret = cds_lfht_is_node_deleted(&stream->node.node);
198 if (ret) {
199 goto error_unlock;
200 }
201
84a182ce 202 ret = lttng_ustconsumer_get_current_timestamp(stream, &ts);
d3e2ba59
JD
203 if (ret < 0) {
204 ERR("Failed to get the current timestamp");
205 goto error_unlock;
206 }
84a182ce
DG
207 lttng_ustconsumer_flush_buffer(stream, 1);
208 ret = lttng_ustconsumer_take_snapshot(stream);
d3e2ba59 209 if (ret < 0) {
94d49140 210 if (ret != -EAGAIN) {
d3e2ba59
JD
211 ERR("Taking UST snapshot");
212 ret = -1;
213 goto error_unlock;
214 }
abffd1e0 215 ret = lttng_ustconsumer_get_stream_id(stream, &stream_id);
5e372a51
JD
216 if (ret < 0) {
217 PERROR("ustctl_get_stream_id");
218 goto error_unlock;
219 }
d3e2ba59 220 DBG("Stream %" PRIu64 " empty, sending beacon", stream->key);
5e372a51 221 ret = send_empty_index(stream, ts, stream_id);
d3e2ba59
JD
222 if (ret < 0) {
223 goto error_unlock;
224 }
225 }
226 ret = 0;
227
228error_unlock:
229 pthread_mutex_unlock(&stream->lock);
230 return ret;
231}
232
233/*
234 * Execute action on a live timer
235 */
236static void live_timer(struct lttng_consumer_local_data *ctx,
237 int sig, siginfo_t *si, void *uc)
238{
239 int ret;
240 struct lttng_consumer_channel *channel;
241 struct lttng_consumer_stream *stream;
242 struct lttng_ht *ht;
243 struct lttng_ht_iter iter;
244
245 channel = si->si_value.sival_ptr;
246 assert(channel);
247
248 if (channel->switch_timer_error) {
249 goto error;
250 }
251 ht = consumer_data.stream_per_chan_id_ht;
252
253 DBG("Live timer for channel %" PRIu64, channel->key);
254
255 rcu_read_lock();
256 switch (ctx->type) {
257 case LTTNG_CONSUMER32_UST:
258 case LTTNG_CONSUMER64_UST:
259 cds_lfht_for_each_entry_duplicate(ht->ht,
260 ht->hash_fct(&channel->key, lttng_ht_seed),
261 ht->match_fct, &channel->key, &iter.iter,
262 stream, node_channel_id.node) {
263 ret = check_ust_stream(stream);
264 if (ret < 0) {
265 goto error_unlock;
266 }
267 }
268 break;
269 case LTTNG_CONSUMER_KERNEL:
270 cds_lfht_for_each_entry_duplicate(ht->ht,
271 ht->hash_fct(&channel->key, lttng_ht_seed),
272 ht->match_fct, &channel->key, &iter.iter,
273 stream, node_channel_id.node) {
274 ret = check_kernel_stream(stream);
275 if (ret < 0) {
276 goto error_unlock;
277 }
278 }
279 break;
280 case LTTNG_CONSUMER_UNKNOWN:
281 assert(0);
282 break;
283 }
284
285error_unlock:
286 rcu_read_unlock();
287
288error:
289 return;
290}
291
2b8f8754
MD
292static
293void consumer_timer_signal_thread_qs(unsigned int signr)
294{
295 sigset_t pending_set;
296 int ret;
297
298 /*
299 * We need to be the only thread interacting with the thread
300 * that manages signals for teardown synchronization.
301 */
302 pthread_mutex_lock(&timer_signal.lock);
303
304 /* Ensure we don't have any signal queued for this channel. */
305 for (;;) {
306 ret = sigemptyset(&pending_set);
307 if (ret == -1) {
308 PERROR("sigemptyset");
309 }
310 ret = sigpending(&pending_set);
311 if (ret == -1) {
312 PERROR("sigpending");
313 }
314 if (!sigismember(&pending_set, LTTNG_CONSUMER_SIG_SWITCH)) {
315 break;
316 }
317 caa_cpu_relax();
318 }
319
320 /*
321 * From this point, no new signal handler will be fired that would try to
322 * access "chan". However, we still need to wait for any currently
323 * executing handler to complete.
324 */
325 cmm_smp_mb();
326 CMM_STORE_SHARED(timer_signal.qs_done, 0);
327 cmm_smp_mb();
328
329 /*
330 * Kill with LTTNG_CONSUMER_SIG_TEARDOWN, so signal management thread wakes
331 * up.
332 */
333 kill(getpid(), LTTNG_CONSUMER_SIG_TEARDOWN);
334
335 while (!CMM_LOAD_SHARED(timer_signal.qs_done)) {
336 caa_cpu_relax();
337 }
338 cmm_smp_mb();
339
340 pthread_mutex_unlock(&timer_signal.lock);
341}
342
331744e3
JD
343/*
344 * Set the timer for periodical metadata flush.
345 */
346void consumer_timer_switch_start(struct lttng_consumer_channel *channel,
347 unsigned int switch_timer_interval)
348{
349 int ret;
350 struct sigevent sev;
351 struct itimerspec its;
352
353 assert(channel);
354 assert(channel->key);
355
356 if (switch_timer_interval == 0) {
357 return;
358 }
359
360 sev.sigev_notify = SIGEV_SIGNAL;
361 sev.sigev_signo = LTTNG_CONSUMER_SIG_SWITCH;
362 sev.sigev_value.sival_ptr = channel;
363 ret = timer_create(CLOCKID, &sev, &channel->switch_timer);
364 if (ret == -1) {
365 PERROR("timer_create");
366 }
367 channel->switch_timer_enabled = 1;
368
369 its.it_value.tv_sec = switch_timer_interval / 1000000;
370 its.it_value.tv_nsec = switch_timer_interval % 1000000;
371 its.it_interval.tv_sec = its.it_value.tv_sec;
372 its.it_interval.tv_nsec = its.it_value.tv_nsec;
373
374 ret = timer_settime(channel->switch_timer, 0, &its, NULL);
375 if (ret == -1) {
376 PERROR("timer_settime");
377 }
378}
379
380/*
381 * Stop and delete timer.
382 */
383void consumer_timer_switch_stop(struct lttng_consumer_channel *channel)
384{
385 int ret;
331744e3
JD
386
387 assert(channel);
388
389 ret = timer_delete(channel->switch_timer);
390 if (ret == -1) {
391 PERROR("timer_delete");
392 }
393
2b8f8754 394 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_SWITCH);
331744e3 395
2b8f8754
MD
396 channel->switch_timer = 0;
397 channel->switch_timer_enabled = 0;
331744e3
JD
398}
399
d3e2ba59
JD
400/*
401 * Set the timer for the live mode.
402 */
403void consumer_timer_live_start(struct lttng_consumer_channel *channel,
404 int live_timer_interval)
405{
406 int ret;
407 struct sigevent sev;
408 struct itimerspec its;
409
410 assert(channel);
411 assert(channel->key);
412
fac41e72 413 if (live_timer_interval <= 0) {
d3e2ba59
JD
414 return;
415 }
416
417 sev.sigev_notify = SIGEV_SIGNAL;
418 sev.sigev_signo = LTTNG_CONSUMER_SIG_LIVE;
419 sev.sigev_value.sival_ptr = channel;
420 ret = timer_create(CLOCKID, &sev, &channel->live_timer);
421 if (ret == -1) {
422 PERROR("timer_create");
423 }
424 channel->live_timer_enabled = 1;
425
426 its.it_value.tv_sec = live_timer_interval / 1000000;
427 its.it_value.tv_nsec = live_timer_interval % 1000000;
428 its.it_interval.tv_sec = its.it_value.tv_sec;
429 its.it_interval.tv_nsec = its.it_value.tv_nsec;
430
431 ret = timer_settime(channel->live_timer, 0, &its, NULL);
432 if (ret == -1) {
433 PERROR("timer_settime");
434 }
435}
436
437/*
438 * Stop and delete timer.
439 */
440void consumer_timer_live_stop(struct lttng_consumer_channel *channel)
441{
442 int ret;
443
444 assert(channel);
445
446 ret = timer_delete(channel->live_timer);
447 if (ret == -1) {
448 PERROR("timer_delete");
449 }
450
451 consumer_timer_signal_thread_qs(LTTNG_CONSUMER_SIG_LIVE);
452
453 channel->live_timer = 0;
454 channel->live_timer_enabled = 0;
455}
456
331744e3
JD
457/*
458 * Block the RT signals for the entire process. It must be called from the
459 * consumer main before creating the threads
460 */
461void consumer_signal_init(void)
462{
463 int ret;
464 sigset_t mask;
465
466 /* Block signal for entire process, so only our thread processes it. */
467 setmask(&mask);
468 ret = pthread_sigmask(SIG_BLOCK, &mask, NULL);
469 if (ret) {
470 errno = ret;
471 PERROR("pthread_sigmask");
472 }
473}
474
475/*
d3e2ba59
JD
476 * This thread is the sighandler for signals LTTNG_CONSUMER_SIG_SWITCH,
477 * LTTNG_CONSUMER_SIG_TEARDOWN and LTTNG_CONSUMER_SIG_LIVE.
331744e3 478 */
d3e2ba59 479void *consumer_timer_thread(void *data)
331744e3
JD
480{
481 int signr;
482 sigset_t mask;
483 siginfo_t info;
484 struct lttng_consumer_local_data *ctx = data;
485
1fc79fb4
MD
486 health_register(health_consumerd, HEALTH_CONSUMERD_TYPE_METADATA_TIMER);
487
ede905e6
MD
488 if (testpoint(consumerd_thread_metadata_timer)) {
489 goto error_testpoint;
490 }
491
9ce5646a
MD
492 health_code_update();
493
331744e3
JD
494 /* Only self thread will receive signal mask. */
495 setmask(&mask);
496 CMM_STORE_SHARED(timer_signal.tid, pthread_self());
497
498 while (1) {
9ce5646a
MD
499 health_code_update();
500
501 health_poll_entry();
331744e3 502 signr = sigwaitinfo(&mask, &info);
9ce5646a 503 health_poll_exit();
331744e3
JD
504 if (signr == -1) {
505 if (errno != EINTR) {
506 PERROR("sigwaitinfo");
507 }
508 continue;
509 } else if (signr == LTTNG_CONSUMER_SIG_SWITCH) {
510 metadata_switch_timer(ctx, info.si_signo, &info, NULL);
511 } else if (signr == LTTNG_CONSUMER_SIG_TEARDOWN) {
512 cmm_smp_mb();
513 CMM_STORE_SHARED(timer_signal.qs_done, 1);
514 cmm_smp_mb();
515 DBG("Signal timer metadata thread teardown");
d3e2ba59
JD
516 } else if (signr == LTTNG_CONSUMER_SIG_LIVE) {
517 live_timer(ctx, info.si_signo, &info, NULL);
331744e3
JD
518 } else {
519 ERR("Unexpected signal %d\n", info.si_signo);
520 }
521 }
522
ede905e6
MD
523error_testpoint:
524 /* Only reached in testpoint error */
525 health_error();
1fc79fb4
MD
526 health_unregister(health_consumerd);
527
528 /* Never return */
331744e3
JD
529 return NULL;
530}
This page took 0.049951 seconds and 4 git commands to generate.