19713c81291f3ae8e0ab3a5c4bfe67cbacb3c27d
[lttng-tools.git] / src / common / ust-consumer / ust-consumer.c
1 /*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 * Copyright (C) 2017 - Jérémie Galarneau <jeremie.galarneau@efficios.com>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License, version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along
16 * with this program; if not, write to the Free Software Foundation, Inc.,
17 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
18 */
19
20 #include <stdint.h>
21 #define _LGPL_SOURCE
22 #include <assert.h>
23 #include <lttng/ust-ctl.h>
24 #include <poll.h>
25 #include <pthread.h>
26 #include <stdlib.h>
27 #include <string.h>
28 #include <sys/mman.h>
29 #include <sys/socket.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <inttypes.h>
33 #include <unistd.h>
34 #include <urcu/list.h>
35 #include <signal.h>
36 #include <stdbool.h>
37
38 #include <bin/lttng-consumerd/health-consumerd.h>
39 #include <common/common.h>
40 #include <common/sessiond-comm/sessiond-comm.h>
41 #include <common/relayd/relayd.h>
42 #include <common/compat/fcntl.h>
43 #include <common/compat/endian.h>
44 #include <common/consumer/consumer-metadata-cache.h>
45 #include <common/consumer/consumer-stream.h>
46 #include <common/consumer/consumer-timer.h>
47 #include <common/utils.h>
48 #include <common/index/index.h>
49
50 #include "ust-consumer.h"
51
52 #define INT_MAX_STR_LEN 12 /* includes \0 */
53
54 extern struct lttng_consumer_global_data consumer_data;
55 extern int consumer_poll_timeout;
56
57 /*
58 * Free channel object and all streams associated with it. This MUST be used
59 * only and only if the channel has _NEVER_ been added to the global channel
60 * hash table.
61 */
62 static void destroy_channel(struct lttng_consumer_channel *channel)
63 {
64 struct lttng_consumer_stream *stream, *stmp;
65
66 assert(channel);
67
68 DBG("UST consumer cleaning stream list");
69
70 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
71 send_node) {
72
73 health_code_update();
74
75 cds_list_del(&stream->send_node);
76 ustctl_destroy_stream(stream->ustream);
77 lttng_trace_chunk_put(stream->trace_chunk);
78 free(stream);
79 }
80
81 /*
82 * If a channel is available meaning that was created before the streams
83 * were, delete it.
84 */
85 if (channel->uchan) {
86 lttng_ustconsumer_del_channel(channel);
87 lttng_ustconsumer_free_channel(channel);
88 }
89 free(channel);
90 }
91
92 /*
93 * Add channel to internal consumer state.
94 *
95 * Returns 0 on success or else a negative value.
96 */
97 static int add_channel(struct lttng_consumer_channel *channel,
98 struct lttng_consumer_local_data *ctx)
99 {
100 int ret = 0;
101
102 assert(channel);
103 assert(ctx);
104
105 if (ctx->on_recv_channel != NULL) {
106 ret = ctx->on_recv_channel(channel);
107 if (ret == 0) {
108 ret = consumer_add_channel(channel, ctx);
109 } else if (ret < 0) {
110 /* Most likely an ENOMEM. */
111 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
112 goto error;
113 }
114 } else {
115 ret = consumer_add_channel(channel, ctx);
116 }
117
118 DBG("UST consumer channel added (key: %" PRIu64 ")", channel->key);
119
120 error:
121 return ret;
122 }
123
124 /*
125 * Allocate and return a consumer channel object.
126 */
127 static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
128 const uint64_t *chunk_id, const char *pathname, const char *name,
129 uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
130 uint64_t tracefile_size, uint64_t tracefile_count,
131 uint64_t session_id_per_pid, unsigned int monitor,
132 unsigned int live_timer_interval,
133 const char *root_shm_path, const char *shm_path)
134 {
135 assert(pathname);
136 assert(name);
137
138 return consumer_allocate_channel(key, session_id, chunk_id, pathname,
139 name, relayd_id, output, tracefile_size,
140 tracefile_count, session_id_per_pid, monitor,
141 live_timer_interval, root_shm_path, shm_path);
142 }
143
144 /*
145 * Allocate and return a consumer stream object. If _alloc_ret is not NULL, the
146 * error value if applicable is set in it else it is kept untouched.
147 *
148 * Return NULL on error else the newly allocated stream object.
149 */
150 static struct lttng_consumer_stream *allocate_stream(int cpu, int key,
151 struct lttng_consumer_channel *channel,
152 struct lttng_consumer_local_data *ctx, int *_alloc_ret)
153 {
154 int alloc_ret;
155 struct lttng_consumer_stream *stream = NULL;
156
157 assert(channel);
158 assert(ctx);
159
160 stream = consumer_allocate_stream(
161 channel,
162 channel->key,
163 key,
164 channel->name,
165 channel->relayd_id,
166 channel->session_id,
167 channel->trace_chunk,
168 cpu,
169 &alloc_ret,
170 channel->type,
171 channel->monitor);
172 if (stream == NULL) {
173 switch (alloc_ret) {
174 case -ENOENT:
175 /*
176 * We could not find the channel. Can happen if cpu hotplug
177 * happens while tearing down.
178 */
179 DBG3("Could not find channel");
180 break;
181 case -ENOMEM:
182 case -EINVAL:
183 default:
184 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_OUTFD_ERROR);
185 break;
186 }
187 goto error;
188 }
189
190 consumer_stream_update_channel_attributes(stream, channel);
191
192 error:
193 if (_alloc_ret) {
194 *_alloc_ret = alloc_ret;
195 }
196 return stream;
197 }
198
199 /*
200 * Send the given stream pointer to the corresponding thread.
201 *
202 * Returns 0 on success else a negative value.
203 */
204 static int send_stream_to_thread(struct lttng_consumer_stream *stream,
205 struct lttng_consumer_local_data *ctx)
206 {
207 int ret;
208 struct lttng_pipe *stream_pipe;
209
210 /* Get the right pipe where the stream will be sent. */
211 if (stream->metadata_flag) {
212 consumer_add_metadata_stream(stream);
213 stream_pipe = ctx->consumer_metadata_pipe;
214 } else {
215 consumer_add_data_stream(stream);
216 stream_pipe = ctx->consumer_data_pipe;
217 }
218
219 /*
220 * From this point on, the stream's ownership has been moved away from
221 * the channel and it becomes globally visible. Hence, remove it from
222 * the local stream list to prevent the stream from being both local and
223 * global.
224 */
225 stream->globally_visible = 1;
226 cds_list_del(&stream->send_node);
227
228 ret = lttng_pipe_write(stream_pipe, &stream, sizeof(stream));
229 if (ret < 0) {
230 ERR("Consumer write %s stream to pipe %d",
231 stream->metadata_flag ? "metadata" : "data",
232 lttng_pipe_get_writefd(stream_pipe));
233 if (stream->metadata_flag) {
234 consumer_del_stream_for_metadata(stream);
235 } else {
236 consumer_del_stream_for_data(stream);
237 }
238 goto error;
239 }
240
241 error:
242 return ret;
243 }
244
245 static
246 int get_stream_shm_path(char *stream_shm_path, const char *shm_path, int cpu)
247 {
248 char cpu_nr[INT_MAX_STR_LEN]; /* int max len */
249 int ret;
250
251 strncpy(stream_shm_path, shm_path, PATH_MAX);
252 stream_shm_path[PATH_MAX - 1] = '\0';
253 ret = snprintf(cpu_nr, INT_MAX_STR_LEN, "%i", cpu);
254 if (ret < 0) {
255 PERROR("snprintf");
256 goto end;
257 }
258 strncat(stream_shm_path, cpu_nr,
259 PATH_MAX - strlen(stream_shm_path) - 1);
260 ret = 0;
261 end:
262 return ret;
263 }
264
265 /*
266 * Create streams for the given channel using liblttng-ust-ctl.
267 * The channel lock must be acquired by the caller.
268 *
269 * Return 0 on success else a negative value.
270 */
271 static int create_ust_streams(struct lttng_consumer_channel *channel,
272 struct lttng_consumer_local_data *ctx)
273 {
274 int ret, cpu = 0;
275 struct ustctl_consumer_stream *ustream;
276 struct lttng_consumer_stream *stream;
277 pthread_mutex_t *current_stream_lock = NULL;
278
279 assert(channel);
280 assert(ctx);
281
282 /*
283 * While a stream is available from ustctl. When NULL is returned, we've
284 * reached the end of the possible stream for the channel.
285 */
286 while ((ustream = ustctl_create_stream(channel->uchan, cpu))) {
287 int wait_fd;
288 int ust_metadata_pipe[2];
289
290 health_code_update();
291
292 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && channel->monitor) {
293 ret = utils_create_pipe_cloexec_nonblock(ust_metadata_pipe);
294 if (ret < 0) {
295 ERR("Create ust metadata poll pipe");
296 goto error;
297 }
298 wait_fd = ust_metadata_pipe[0];
299 } else {
300 wait_fd = ustctl_stream_get_wait_fd(ustream);
301 }
302
303 /* Allocate consumer stream object. */
304 stream = allocate_stream(cpu, wait_fd, channel, ctx, &ret);
305 if (!stream) {
306 goto error_alloc;
307 }
308 stream->ustream = ustream;
309 /*
310 * Store it so we can save multiple function calls afterwards since
311 * this value is used heavily in the stream threads. This is UST
312 * specific so this is why it's done after allocation.
313 */
314 stream->wait_fd = wait_fd;
315
316 /*
317 * Increment channel refcount since the channel reference has now been
318 * assigned in the allocation process above.
319 */
320 if (stream->chan->monitor) {
321 uatomic_inc(&stream->chan->refcount);
322 }
323
324 pthread_mutex_lock(&stream->lock);
325 current_stream_lock = &stream->lock;
326 /*
327 * Order is important this is why a list is used. On error, the caller
328 * should clean this list.
329 */
330 cds_list_add_tail(&stream->send_node, &channel->streams.head);
331
332 ret = ustctl_get_max_subbuf_size(stream->ustream,
333 &stream->max_sb_size);
334 if (ret < 0) {
335 ERR("ustctl_get_max_subbuf_size failed for stream %s",
336 stream->name);
337 goto error;
338 }
339
340 /* Do actions once stream has been received. */
341 if (ctx->on_recv_stream) {
342 ret = ctx->on_recv_stream(stream);
343 if (ret < 0) {
344 goto error;
345 }
346 }
347
348 DBG("UST consumer add stream %s (key: %" PRIu64 ") with relayd id %" PRIu64,
349 stream->name, stream->key, stream->relayd_stream_id);
350
351 /* Set next CPU stream. */
352 channel->streams.count = ++cpu;
353
354 /* Keep stream reference when creating metadata. */
355 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA) {
356 channel->metadata_stream = stream;
357 if (channel->monitor) {
358 /* Set metadata poll pipe if we created one */
359 memcpy(stream->ust_metadata_poll_pipe,
360 ust_metadata_pipe,
361 sizeof(ust_metadata_pipe));
362 }
363 }
364 pthread_mutex_unlock(&stream->lock);
365 current_stream_lock = NULL;
366 }
367
368 return 0;
369
370 error:
371 error_alloc:
372 if (current_stream_lock) {
373 pthread_mutex_unlock(current_stream_lock);
374 }
375 return ret;
376 }
377
378 /*
379 * create_posix_shm is never called concurrently within a process.
380 */
381 static
382 int create_posix_shm(void)
383 {
384 char tmp_name[NAME_MAX];
385 int shmfd, ret;
386
387 ret = snprintf(tmp_name, NAME_MAX, "/ust-shm-consumer-%d", getpid());
388 if (ret < 0) {
389 PERROR("snprintf");
390 return -1;
391 }
392 /*
393 * Allocate shm, and immediately unlink its shm oject, keeping
394 * only the file descriptor as a reference to the object.
395 * We specifically do _not_ use the / at the beginning of the
396 * pathname so that some OS implementations can keep it local to
397 * the process (POSIX leaves this implementation-defined).
398 */
399 shmfd = shm_open(tmp_name, O_CREAT | O_EXCL | O_RDWR, 0700);
400 if (shmfd < 0) {
401 PERROR("shm_open");
402 goto error_shm_open;
403 }
404 ret = shm_unlink(tmp_name);
405 if (ret < 0 && errno != ENOENT) {
406 PERROR("shm_unlink");
407 goto error_shm_release;
408 }
409 return shmfd;
410
411 error_shm_release:
412 ret = close(shmfd);
413 if (ret) {
414 PERROR("close");
415 }
416 error_shm_open:
417 return -1;
418 }
419
420 static int open_ust_stream_fd(struct lttng_consumer_channel *channel, int cpu,
421 const struct lttng_credentials *session_credentials)
422 {
423 char shm_path[PATH_MAX];
424 int ret;
425
426 if (!channel->shm_path[0]) {
427 return create_posix_shm();
428 }
429 ret = get_stream_shm_path(shm_path, channel->shm_path, cpu);
430 if (ret) {
431 goto error_shm_path;
432 }
433 return run_as_open(shm_path,
434 O_RDWR | O_CREAT | O_EXCL, S_IRUSR | S_IWUSR,
435 session_credentials->uid, session_credentials->gid);
436
437 error_shm_path:
438 return -1;
439 }
440
441 /*
442 * Create an UST channel with the given attributes and send it to the session
443 * daemon using the ust ctl API.
444 *
445 * Return 0 on success or else a negative value.
446 */
447 static int create_ust_channel(struct lttng_consumer_channel *channel,
448 struct ustctl_consumer_channel_attr *attr,
449 struct ustctl_consumer_channel **ust_chanp)
450 {
451 int ret, nr_stream_fds, i, j;
452 int *stream_fds;
453 struct ustctl_consumer_channel *ust_channel;
454
455 assert(channel);
456 assert(attr);
457 assert(ust_chanp);
458 assert(channel->buffer_credentials.is_set);
459
460 DBG3("Creating channel to ustctl with attr: [overwrite: %d, "
461 "subbuf_size: %" PRIu64 ", num_subbuf: %" PRIu64 ", "
462 "switch_timer_interval: %u, read_timer_interval: %u, "
463 "output: %d, type: %d", attr->overwrite, attr->subbuf_size,
464 attr->num_subbuf, attr->switch_timer_interval,
465 attr->read_timer_interval, attr->output, attr->type);
466
467 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA)
468 nr_stream_fds = 1;
469 else
470 nr_stream_fds = ustctl_get_nr_stream_per_channel();
471 stream_fds = zmalloc(nr_stream_fds * sizeof(*stream_fds));
472 if (!stream_fds) {
473 ret = -1;
474 goto error_alloc;
475 }
476 for (i = 0; i < nr_stream_fds; i++) {
477 stream_fds[i] = open_ust_stream_fd(channel, i,
478 &channel->buffer_credentials.value);
479 if (stream_fds[i] < 0) {
480 ret = -1;
481 goto error_open;
482 }
483 }
484 ust_channel = ustctl_create_channel(attr, stream_fds, nr_stream_fds);
485 if (!ust_channel) {
486 ret = -1;
487 goto error_create;
488 }
489 channel->nr_stream_fds = nr_stream_fds;
490 channel->stream_fds = stream_fds;
491 *ust_chanp = ust_channel;
492
493 return 0;
494
495 error_create:
496 error_open:
497 for (j = i - 1; j >= 0; j--) {
498 int closeret;
499
500 closeret = close(stream_fds[j]);
501 if (closeret) {
502 PERROR("close");
503 }
504 if (channel->shm_path[0]) {
505 char shm_path[PATH_MAX];
506
507 closeret = get_stream_shm_path(shm_path,
508 channel->shm_path, j);
509 if (closeret) {
510 ERR("Cannot get stream shm path");
511 }
512 closeret = run_as_unlink(shm_path,
513 channel->buffer_credentials.value.uid,
514 channel->buffer_credentials.value.gid);
515 if (closeret) {
516 PERROR("unlink %s", shm_path);
517 }
518 }
519 }
520 /* Try to rmdir all directories under shm_path root. */
521 if (channel->root_shm_path[0]) {
522 (void) run_as_rmdir_recursive(channel->root_shm_path,
523 channel->buffer_credentials.value.uid,
524 channel->buffer_credentials.value.gid,
525 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
526 }
527 free(stream_fds);
528 error_alloc:
529 return ret;
530 }
531
532 /*
533 * Send a single given stream to the session daemon using the sock.
534 *
535 * Return 0 on success else a negative value.
536 */
537 static int send_sessiond_stream(int sock, struct lttng_consumer_stream *stream)
538 {
539 int ret;
540
541 assert(stream);
542 assert(sock >= 0);
543
544 DBG("UST consumer sending stream %" PRIu64 " to sessiond", stream->key);
545
546 /* Send stream to session daemon. */
547 ret = ustctl_send_stream_to_sessiond(sock, stream->ustream);
548 if (ret < 0) {
549 goto error;
550 }
551
552 error:
553 return ret;
554 }
555
556 /*
557 * Send channel to sessiond and relayd if applicable.
558 *
559 * Return 0 on success or else a negative value.
560 */
561 static int send_channel_to_sessiond_and_relayd(int sock,
562 struct lttng_consumer_channel *channel,
563 struct lttng_consumer_local_data *ctx, int *relayd_error)
564 {
565 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
566 struct lttng_consumer_stream *stream;
567 uint64_t net_seq_idx = -1ULL;
568
569 assert(channel);
570 assert(ctx);
571 assert(sock >= 0);
572
573 DBG("UST consumer sending channel %s to sessiond", channel->name);
574
575 if (channel->relayd_id != (uint64_t) -1ULL) {
576 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
577
578 health_code_update();
579
580 /* Try to send the stream to the relayd if one is available. */
581 DBG("Sending stream %" PRIu64 " of channel \"%s\" to relayd",
582 stream->key, channel->name);
583 ret = consumer_send_relayd_stream(stream, stream->chan->pathname);
584 if (ret < 0) {
585 /*
586 * Flag that the relayd was the problem here probably due to a
587 * communicaton error on the socket.
588 */
589 if (relayd_error) {
590 *relayd_error = 1;
591 }
592 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
593 }
594 if (net_seq_idx == -1ULL) {
595 net_seq_idx = stream->net_seq_idx;
596 }
597 }
598 }
599
600 /* Inform sessiond that we are about to send channel and streams. */
601 ret = consumer_send_status_msg(sock, ret_code);
602 if (ret < 0 || ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
603 /*
604 * Either the session daemon is not responding or the relayd died so we
605 * stop now.
606 */
607 goto error;
608 }
609
610 /* Send channel to sessiond. */
611 ret = ustctl_send_channel_to_sessiond(sock, channel->uchan);
612 if (ret < 0) {
613 goto error;
614 }
615
616 ret = ustctl_channel_close_wakeup_fd(channel->uchan);
617 if (ret < 0) {
618 goto error;
619 }
620
621 /* The channel was sent successfully to the sessiond at this point. */
622 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
623
624 health_code_update();
625
626 /* Send stream to session daemon. */
627 ret = send_sessiond_stream(sock, stream);
628 if (ret < 0) {
629 goto error;
630 }
631 }
632
633 /* Tell sessiond there is no more stream. */
634 ret = ustctl_send_stream_to_sessiond(sock, NULL);
635 if (ret < 0) {
636 goto error;
637 }
638
639 DBG("UST consumer NULL stream sent to sessiond");
640
641 return 0;
642
643 error:
644 if (ret_code != LTTCOMM_CONSUMERD_SUCCESS) {
645 ret = -1;
646 }
647 return ret;
648 }
649
650 /*
651 * Creates a channel and streams and add the channel it to the channel internal
652 * state. The created stream must ONLY be sent once the GET_CHANNEL command is
653 * received.
654 *
655 * Return 0 on success or else, a negative value is returned and the channel
656 * MUST be destroyed by consumer_del_channel().
657 */
658 static int ask_channel(struct lttng_consumer_local_data *ctx,
659 struct lttng_consumer_channel *channel,
660 struct ustctl_consumer_channel_attr *attr)
661 {
662 int ret;
663
664 assert(ctx);
665 assert(channel);
666 assert(attr);
667
668 /*
669 * This value is still used by the kernel consumer since for the kernel,
670 * the stream ownership is not IN the consumer so we need to have the
671 * number of left stream that needs to be initialized so we can know when
672 * to delete the channel (see consumer.c).
673 *
674 * As for the user space tracer now, the consumer creates and sends the
675 * stream to the session daemon which only sends them to the application
676 * once every stream of a channel is received making this value useless
677 * because we they will be added to the poll thread before the application
678 * receives them. This ensures that a stream can not hang up during
679 * initilization of a channel.
680 */
681 channel->nb_init_stream_left = 0;
682
683 /* The reply msg status is handled in the following call. */
684 ret = create_ust_channel(channel, attr, &channel->uchan);
685 if (ret < 0) {
686 goto end;
687 }
688
689 channel->wait_fd = ustctl_channel_get_wait_fd(channel->uchan);
690
691 /*
692 * For the snapshots (no monitor), we create the metadata streams
693 * on demand, not during the channel creation.
694 */
695 if (channel->type == CONSUMER_CHANNEL_TYPE_METADATA && !channel->monitor) {
696 ret = 0;
697 goto end;
698 }
699
700 /* Open all streams for this channel. */
701 pthread_mutex_lock(&channel->lock);
702 ret = create_ust_streams(channel, ctx);
703 pthread_mutex_unlock(&channel->lock);
704 if (ret < 0) {
705 goto end;
706 }
707
708 end:
709 return ret;
710 }
711
712 /*
713 * Send all stream of a channel to the right thread handling it.
714 *
715 * On error, return a negative value else 0 on success.
716 */
717 static int send_streams_to_thread(struct lttng_consumer_channel *channel,
718 struct lttng_consumer_local_data *ctx)
719 {
720 int ret = 0;
721 struct lttng_consumer_stream *stream, *stmp;
722
723 assert(channel);
724 assert(ctx);
725
726 /* Send streams to the corresponding thread. */
727 cds_list_for_each_entry_safe(stream, stmp, &channel->streams.head,
728 send_node) {
729
730 health_code_update();
731
732 /* Sending the stream to the thread. */
733 ret = send_stream_to_thread(stream, ctx);
734 if (ret < 0) {
735 /*
736 * If we are unable to send the stream to the thread, there is
737 * a big problem so just stop everything.
738 */
739 goto error;
740 }
741 }
742
743 error:
744 return ret;
745 }
746
747 /*
748 * Flush channel's streams using the given key to retrieve the channel.
749 *
750 * Return 0 on success else an LTTng error code.
751 */
752 static int flush_channel(uint64_t chan_key)
753 {
754 int ret = 0;
755 struct lttng_consumer_channel *channel;
756 struct lttng_consumer_stream *stream;
757 struct lttng_ht *ht;
758 struct lttng_ht_iter iter;
759
760 DBG("UST consumer flush channel key %" PRIu64, chan_key);
761
762 rcu_read_lock();
763 channel = consumer_find_channel(chan_key);
764 if (!channel) {
765 ERR("UST consumer flush channel %" PRIu64 " not found", chan_key);
766 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
767 goto error;
768 }
769
770 ht = consumer_data.stream_per_chan_id_ht;
771
772 /* For each stream of the channel id, flush it. */
773 cds_lfht_for_each_entry_duplicate(ht->ht,
774 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
775 &channel->key, &iter.iter, stream, node_channel_id.node) {
776
777 health_code_update();
778
779 pthread_mutex_lock(&stream->lock);
780
781 /*
782 * Protect against concurrent teardown of a stream.
783 */
784 if (cds_lfht_is_node_deleted(&stream->node.node)) {
785 goto next;
786 }
787
788 if (!stream->quiescent) {
789 ustctl_flush_buffer(stream->ustream, 0);
790 stream->quiescent = true;
791 }
792 next:
793 pthread_mutex_unlock(&stream->lock);
794 }
795 error:
796 rcu_read_unlock();
797 return ret;
798 }
799
800 /*
801 * Clear quiescent state from channel's streams using the given key to
802 * retrieve the channel.
803 *
804 * Return 0 on success else an LTTng error code.
805 */
806 static int clear_quiescent_channel(uint64_t chan_key)
807 {
808 int ret = 0;
809 struct lttng_consumer_channel *channel;
810 struct lttng_consumer_stream *stream;
811 struct lttng_ht *ht;
812 struct lttng_ht_iter iter;
813
814 DBG("UST consumer clear quiescent channel key %" PRIu64, chan_key);
815
816 rcu_read_lock();
817 channel = consumer_find_channel(chan_key);
818 if (!channel) {
819 ERR("UST consumer clear quiescent channel %" PRIu64 " not found", chan_key);
820 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
821 goto error;
822 }
823
824 ht = consumer_data.stream_per_chan_id_ht;
825
826 /* For each stream of the channel id, clear quiescent state. */
827 cds_lfht_for_each_entry_duplicate(ht->ht,
828 ht->hash_fct(&channel->key, lttng_ht_seed), ht->match_fct,
829 &channel->key, &iter.iter, stream, node_channel_id.node) {
830
831 health_code_update();
832
833 pthread_mutex_lock(&stream->lock);
834 stream->quiescent = false;
835 pthread_mutex_unlock(&stream->lock);
836 }
837 error:
838 rcu_read_unlock();
839 return ret;
840 }
841
842 /*
843 * Close metadata stream wakeup_fd using the given key to retrieve the channel.
844 *
845 * Return 0 on success else an LTTng error code.
846 */
847 static int close_metadata(uint64_t chan_key)
848 {
849 int ret = 0;
850 struct lttng_consumer_channel *channel;
851 unsigned int channel_monitor;
852
853 DBG("UST consumer close metadata key %" PRIu64, chan_key);
854
855 channel = consumer_find_channel(chan_key);
856 if (!channel) {
857 /*
858 * This is possible if the metadata thread has issue a delete because
859 * the endpoint point of the stream hung up. There is no way the
860 * session daemon can know about it thus use a DBG instead of an actual
861 * error.
862 */
863 DBG("UST consumer close metadata %" PRIu64 " not found", chan_key);
864 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
865 goto error;
866 }
867
868 pthread_mutex_lock(&consumer_data.lock);
869 pthread_mutex_lock(&channel->lock);
870 channel_monitor = channel->monitor;
871 if (cds_lfht_is_node_deleted(&channel->node.node)) {
872 goto error_unlock;
873 }
874
875 lttng_ustconsumer_close_metadata(channel);
876 pthread_mutex_unlock(&channel->lock);
877 pthread_mutex_unlock(&consumer_data.lock);
878
879 /*
880 * The ownership of a metadata channel depends on the type of
881 * session to which it belongs. In effect, the monitor flag is checked
882 * to determine if this metadata channel is in "snapshot" mode or not.
883 *
884 * In the non-snapshot case, the metadata channel is created along with
885 * a single stream which will remain present until the metadata channel
886 * is destroyed (on the destruction of its session). In this case, the
887 * metadata stream in "monitored" by the metadata poll thread and holds
888 * the ownership of its channel.
889 *
890 * Closing the metadata will cause the metadata stream's "metadata poll
891 * pipe" to be closed. Closing this pipe will wake-up the metadata poll
892 * thread which will teardown the metadata stream which, in return,
893 * deletes the metadata channel.
894 *
895 * In the snapshot case, the metadata stream is created and destroyed
896 * on every snapshot record. Since the channel doesn't have an owner
897 * other than the session daemon, it is safe to destroy it immediately
898 * on reception of the CLOSE_METADATA command.
899 */
900 if (!channel_monitor) {
901 /*
902 * The channel and consumer_data locks must be
903 * released before this call since consumer_del_channel
904 * re-acquires the channel and consumer_data locks to teardown
905 * the channel and queue its reclamation by the "call_rcu"
906 * worker thread.
907 */
908 consumer_del_channel(channel);
909 }
910
911 return ret;
912 error_unlock:
913 pthread_mutex_unlock(&channel->lock);
914 pthread_mutex_unlock(&consumer_data.lock);
915 error:
916 return ret;
917 }
918
919 /*
920 * RCU read side lock MUST be acquired before calling this function.
921 *
922 * Return 0 on success else an LTTng error code.
923 */
924 static int setup_metadata(struct lttng_consumer_local_data *ctx, uint64_t key)
925 {
926 int ret;
927 struct lttng_consumer_channel *metadata;
928
929 DBG("UST consumer setup metadata key %" PRIu64, key);
930
931 metadata = consumer_find_channel(key);
932 if (!metadata) {
933 ERR("UST consumer push metadata %" PRIu64 " not found", key);
934 ret = LTTNG_ERR_UST_CHAN_NOT_FOUND;
935 goto end;
936 }
937
938 /*
939 * In no monitor mode, the metadata channel has no stream(s) so skip the
940 * ownership transfer to the metadata thread.
941 */
942 if (!metadata->monitor) {
943 DBG("Metadata channel in no monitor");
944 ret = 0;
945 goto end;
946 }
947
948 /*
949 * Send metadata stream to relayd if one available. Availability is
950 * known if the stream is still in the list of the channel.
951 */
952 if (cds_list_empty(&metadata->streams.head)) {
953 ERR("Metadata channel key %" PRIu64 ", no stream available.", key);
954 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
955 goto error_no_stream;
956 }
957
958 /* Send metadata stream to relayd if needed. */
959 if (metadata->metadata_stream->net_seq_idx != (uint64_t) -1ULL) {
960 ret = consumer_send_relayd_stream(metadata->metadata_stream,
961 metadata->pathname);
962 if (ret < 0) {
963 ret = LTTCOMM_CONSUMERD_ERROR_METADATA;
964 goto error;
965 }
966 ret = consumer_send_relayd_streams_sent(
967 metadata->metadata_stream->net_seq_idx);
968 if (ret < 0) {
969 ret = LTTCOMM_CONSUMERD_RELAYD_FAIL;
970 goto error;
971 }
972 }
973
974 /*
975 * Ownership of metadata stream is passed along. Freeing is handled by
976 * the callee.
977 */
978 ret = send_streams_to_thread(metadata, ctx);
979 if (ret < 0) {
980 /*
981 * If we are unable to send the stream to the thread, there is
982 * a big problem so just stop everything.
983 */
984 ret = LTTCOMM_CONSUMERD_FATAL;
985 goto send_streams_error;
986 }
987 /* List MUST be empty after or else it could be reused. */
988 assert(cds_list_empty(&metadata->streams.head));
989
990 ret = 0;
991 goto end;
992
993 error:
994 /*
995 * Delete metadata channel on error. At this point, the metadata stream can
996 * NOT be monitored by the metadata thread thus having the guarantee that
997 * the stream is still in the local stream list of the channel. This call
998 * will make sure to clean that list.
999 */
1000 consumer_stream_destroy(metadata->metadata_stream, NULL);
1001 cds_list_del(&metadata->metadata_stream->send_node);
1002 metadata->metadata_stream = NULL;
1003 send_streams_error:
1004 error_no_stream:
1005 end:
1006 return ret;
1007 }
1008
1009 /*
1010 * Snapshot the whole metadata.
1011 * RCU read-side lock must be held by the caller.
1012 *
1013 * Returns 0 on success, < 0 on error
1014 */
1015 static int snapshot_metadata(struct lttng_consumer_channel *metadata_channel,
1016 uint64_t key, char *path, uint64_t relayd_id,
1017 struct lttng_consumer_local_data *ctx)
1018 {
1019 int ret = 0;
1020 struct lttng_consumer_stream *metadata_stream;
1021
1022 assert(path);
1023 assert(ctx);
1024
1025 DBG("UST consumer snapshot metadata with key %" PRIu64 " at path %s",
1026 key, path);
1027
1028 rcu_read_lock();
1029
1030 assert(!metadata_channel->monitor);
1031
1032 health_code_update();
1033
1034 /*
1035 * Ask the sessiond if we have new metadata waiting and update the
1036 * consumer metadata cache.
1037 */
1038 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 1);
1039 if (ret < 0) {
1040 goto error;
1041 }
1042
1043 health_code_update();
1044
1045 /*
1046 * The metadata stream is NOT created in no monitor mode when the channel
1047 * is created on a sessiond ask channel command.
1048 */
1049 ret = create_ust_streams(metadata_channel, ctx);
1050 if (ret < 0) {
1051 goto error;
1052 }
1053
1054 metadata_stream = metadata_channel->metadata_stream;
1055 assert(metadata_stream);
1056
1057 pthread_mutex_lock(&metadata_stream->lock);
1058 if (relayd_id != (uint64_t) -1ULL) {
1059 metadata_stream->net_seq_idx = relayd_id;
1060 ret = consumer_send_relayd_stream(metadata_stream, path);
1061 } else {
1062 ret = consumer_stream_create_output_files(metadata_stream,
1063 false);
1064 }
1065 pthread_mutex_unlock(&metadata_stream->lock);
1066 if (ret < 0) {
1067 goto error_stream;
1068 }
1069
1070 do {
1071 health_code_update();
1072
1073 ret = lttng_consumer_read_subbuffer(metadata_stream, ctx);
1074 if (ret < 0) {
1075 goto error_stream;
1076 }
1077 } while (ret > 0);
1078
1079 error_stream:
1080 /*
1081 * Clean up the stream completly because the next snapshot will use a new
1082 * metadata stream.
1083 */
1084 consumer_stream_destroy(metadata_stream, NULL);
1085 cds_list_del(&metadata_stream->send_node);
1086 metadata_channel->metadata_stream = NULL;
1087
1088 error:
1089 rcu_read_unlock();
1090 return ret;
1091 }
1092
1093 static
1094 int get_current_subbuf_addr(struct lttng_consumer_stream *stream,
1095 const char **addr)
1096 {
1097 int ret;
1098 unsigned long mmap_offset;
1099 const char *mmap_base;
1100
1101 mmap_base = ustctl_get_mmap_base(stream->ustream);
1102 if (!mmap_base) {
1103 ERR("Failed to get mmap base for stream `%s`",
1104 stream->name);
1105 ret = -EPERM;
1106 goto error;
1107 }
1108
1109 ret = ustctl_get_mmap_read_offset(stream->ustream, &mmap_offset);
1110 if (ret != 0) {
1111 ERR("Failed to get mmap offset for stream `%s`", stream->name);
1112 ret = -EINVAL;
1113 goto error;
1114 }
1115
1116 *addr = mmap_base + mmap_offset;
1117 error:
1118 return ret;
1119
1120 }
1121
1122 /*
1123 * Take a snapshot of all the stream of a channel.
1124 * RCU read-side lock and the channel lock must be held by the caller.
1125 *
1126 * Returns 0 on success, < 0 on error
1127 */
1128 static int snapshot_channel(struct lttng_consumer_channel *channel,
1129 uint64_t key, char *path, uint64_t relayd_id,
1130 uint64_t nb_packets_per_stream,
1131 struct lttng_consumer_local_data *ctx)
1132 {
1133 int ret;
1134 unsigned use_relayd = 0;
1135 unsigned long consumed_pos, produced_pos;
1136 struct lttng_consumer_stream *stream;
1137
1138 assert(path);
1139 assert(ctx);
1140
1141 rcu_read_lock();
1142
1143 if (relayd_id != (uint64_t) -1ULL) {
1144 use_relayd = 1;
1145 }
1146
1147 assert(!channel->monitor);
1148 DBG("UST consumer snapshot channel %" PRIu64, key);
1149
1150 cds_list_for_each_entry(stream, &channel->streams.head, send_node) {
1151 health_code_update();
1152
1153 /* Lock stream because we are about to change its state. */
1154 pthread_mutex_lock(&stream->lock);
1155 assert(channel->trace_chunk);
1156 if (!lttng_trace_chunk_get(channel->trace_chunk)) {
1157 /*
1158 * Can't happen barring an internal error as the channel
1159 * holds a reference to the trace chunk.
1160 */
1161 ERR("Failed to acquire reference to channel's trace chunk");
1162 ret = -1;
1163 goto error_unlock;
1164 }
1165 assert(!stream->trace_chunk);
1166 stream->trace_chunk = channel->trace_chunk;
1167
1168 stream->net_seq_idx = relayd_id;
1169
1170 if (use_relayd) {
1171 ret = consumer_send_relayd_stream(stream, path);
1172 if (ret < 0) {
1173 goto error_unlock;
1174 }
1175 } else {
1176 ret = consumer_stream_create_output_files(stream,
1177 false);
1178 if (ret < 0) {
1179 goto error_unlock;
1180 }
1181 DBG("UST consumer snapshot stream (%" PRIu64 ")",
1182 stream->key);
1183 }
1184
1185 /*
1186 * If tracing is active, we want to perform a "full" buffer flush.
1187 * Else, if quiescent, it has already been done by the prior stop.
1188 */
1189 if (!stream->quiescent) {
1190 ustctl_flush_buffer(stream->ustream, 0);
1191 }
1192
1193 ret = lttng_ustconsumer_take_snapshot(stream);
1194 if (ret < 0) {
1195 ERR("Taking UST snapshot");
1196 goto error_unlock;
1197 }
1198
1199 ret = lttng_ustconsumer_get_produced_snapshot(stream, &produced_pos);
1200 if (ret < 0) {
1201 ERR("Produced UST snapshot position");
1202 goto error_unlock;
1203 }
1204
1205 ret = lttng_ustconsumer_get_consumed_snapshot(stream, &consumed_pos);
1206 if (ret < 0) {
1207 ERR("Consumerd UST snapshot position");
1208 goto error_unlock;
1209 }
1210
1211 /*
1212 * The original value is sent back if max stream size is larger than
1213 * the possible size of the snapshot. Also, we assume that the session
1214 * daemon should never send a maximum stream size that is lower than
1215 * subbuffer size.
1216 */
1217 consumed_pos = consumer_get_consume_start_pos(consumed_pos,
1218 produced_pos, nb_packets_per_stream,
1219 stream->max_sb_size);
1220
1221 while ((long) (consumed_pos - produced_pos) < 0) {
1222 ssize_t read_len;
1223 unsigned long len, padded_len;
1224 const char *subbuf_addr;
1225 struct lttng_buffer_view subbuf_view;
1226
1227 health_code_update();
1228
1229 DBG("UST consumer taking snapshot at pos %lu", consumed_pos);
1230
1231 ret = ustctl_get_subbuf(stream->ustream, &consumed_pos);
1232 if (ret < 0) {
1233 if (ret != -EAGAIN) {
1234 PERROR("ustctl_get_subbuf snapshot");
1235 goto error_close_stream;
1236 }
1237 DBG("UST consumer get subbuf failed. Skipping it.");
1238 consumed_pos += stream->max_sb_size;
1239 stream->chan->lost_packets++;
1240 continue;
1241 }
1242
1243 ret = ustctl_get_subbuf_size(stream->ustream, &len);
1244 if (ret < 0) {
1245 ERR("Snapshot ustctl_get_subbuf_size");
1246 goto error_put_subbuf;
1247 }
1248
1249 ret = ustctl_get_padded_subbuf_size(stream->ustream, &padded_len);
1250 if (ret < 0) {
1251 ERR("Snapshot ustctl_get_padded_subbuf_size");
1252 goto error_put_subbuf;
1253 }
1254
1255 ret = get_current_subbuf_addr(stream, &subbuf_addr);
1256 if (ret) {
1257 goto error_put_subbuf;
1258 }
1259
1260 subbuf_view = lttng_buffer_view_init(
1261 subbuf_addr, 0, padded_len);
1262 read_len = lttng_consumer_on_read_subbuffer_mmap(ctx,
1263 stream, &subbuf_view, padded_len - len,
1264 NULL);
1265 if (use_relayd) {
1266 if (read_len != len) {
1267 ret = -EPERM;
1268 goto error_put_subbuf;
1269 }
1270 } else {
1271 if (read_len != padded_len) {
1272 ret = -EPERM;
1273 goto error_put_subbuf;
1274 }
1275 }
1276
1277 ret = ustctl_put_subbuf(stream->ustream);
1278 if (ret < 0) {
1279 ERR("Snapshot ustctl_put_subbuf");
1280 goto error_close_stream;
1281 }
1282 consumed_pos += stream->max_sb_size;
1283 }
1284
1285 /* Simply close the stream so we can use it on the next snapshot. */
1286 consumer_stream_close(stream);
1287 pthread_mutex_unlock(&stream->lock);
1288 }
1289
1290 rcu_read_unlock();
1291 return 0;
1292
1293 error_put_subbuf:
1294 if (ustctl_put_subbuf(stream->ustream) < 0) {
1295 ERR("Snapshot ustctl_put_subbuf");
1296 }
1297 error_close_stream:
1298 consumer_stream_close(stream);
1299 error_unlock:
1300 pthread_mutex_unlock(&stream->lock);
1301 rcu_read_unlock();
1302 return ret;
1303 }
1304
1305 /*
1306 * Receive the metadata updates from the sessiond. Supports receiving
1307 * overlapping metadata, but is needs to always belong to a contiguous
1308 * range starting from 0.
1309 * Be careful about the locks held when calling this function: it needs
1310 * the metadata cache flush to concurrently progress in order to
1311 * complete.
1312 */
1313 int lttng_ustconsumer_recv_metadata(int sock, uint64_t key, uint64_t offset,
1314 uint64_t len, uint64_t version,
1315 struct lttng_consumer_channel *channel, int timer, int wait)
1316 {
1317 int ret, ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1318 char *metadata_str;
1319
1320 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key, len);
1321
1322 metadata_str = zmalloc(len * sizeof(char));
1323 if (!metadata_str) {
1324 PERROR("zmalloc metadata string");
1325 ret_code = LTTCOMM_CONSUMERD_ENOMEM;
1326 goto end;
1327 }
1328
1329 health_code_update();
1330
1331 /* Receive metadata string. */
1332 ret = lttcomm_recv_unix_sock(sock, metadata_str, len);
1333 if (ret < 0) {
1334 /* Session daemon is dead so return gracefully. */
1335 ret_code = ret;
1336 goto end_free;
1337 }
1338
1339 health_code_update();
1340
1341 pthread_mutex_lock(&channel->metadata_cache->lock);
1342 ret = consumer_metadata_cache_write(channel, offset, len, version,
1343 metadata_str);
1344 if (ret < 0) {
1345 /* Unable to handle metadata. Notify session daemon. */
1346 ret_code = LTTCOMM_CONSUMERD_ERROR_METADATA;
1347 /*
1348 * Skip metadata flush on write error since the offset and len might
1349 * not have been updated which could create an infinite loop below when
1350 * waiting for the metadata cache to be flushed.
1351 */
1352 pthread_mutex_unlock(&channel->metadata_cache->lock);
1353 goto end_free;
1354 }
1355 pthread_mutex_unlock(&channel->metadata_cache->lock);
1356
1357 if (!wait) {
1358 goto end_free;
1359 }
1360 while (consumer_metadata_cache_flushed(channel, offset + len, timer)) {
1361 DBG("Waiting for metadata to be flushed");
1362
1363 health_code_update();
1364
1365 usleep(DEFAULT_METADATA_AVAILABILITY_WAIT_TIME);
1366 }
1367
1368 end_free:
1369 free(metadata_str);
1370 end:
1371 return ret_code;
1372 }
1373
1374 /*
1375 * Receive command from session daemon and process it.
1376 *
1377 * Return 1 on success else a negative value or 0.
1378 */
1379 int lttng_ustconsumer_recv_cmd(struct lttng_consumer_local_data *ctx,
1380 int sock, struct pollfd *consumer_sockpoll)
1381 {
1382 ssize_t ret;
1383 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1384 struct lttcomm_consumer_msg msg;
1385 struct lttng_consumer_channel *channel = NULL;
1386
1387 health_code_update();
1388
1389 ret = lttcomm_recv_unix_sock(sock, &msg, sizeof(msg));
1390 if (ret != sizeof(msg)) {
1391 DBG("Consumer received unexpected message size %zd (expects %zu)",
1392 ret, sizeof(msg));
1393 /*
1394 * The ret value might 0 meaning an orderly shutdown but this is ok
1395 * since the caller handles this.
1396 */
1397 if (ret > 0) {
1398 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
1399 ret = -1;
1400 }
1401 return ret;
1402 }
1403
1404 health_code_update();
1405
1406 /* deprecated */
1407 assert(msg.cmd_type != LTTNG_CONSUMER_STOP);
1408
1409 health_code_update();
1410
1411 /* relayd needs RCU read-side lock */
1412 rcu_read_lock();
1413
1414 switch (msg.cmd_type) {
1415 case LTTNG_CONSUMER_ADD_RELAYD_SOCKET:
1416 {
1417 /* Session daemon status message are handled in the following call. */
1418 consumer_add_relayd_socket(msg.u.relayd_sock.net_index,
1419 msg.u.relayd_sock.type, ctx, sock, consumer_sockpoll,
1420 &msg.u.relayd_sock.sock, msg.u.relayd_sock.session_id,
1421 msg.u.relayd_sock.relayd_session_id);
1422 goto end_nosignal;
1423 }
1424 case LTTNG_CONSUMER_DESTROY_RELAYD:
1425 {
1426 uint64_t index = msg.u.destroy_relayd.net_seq_idx;
1427 struct consumer_relayd_sock_pair *relayd;
1428
1429 DBG("UST consumer destroying relayd %" PRIu64, index);
1430
1431 /* Get relayd reference if exists. */
1432 relayd = consumer_find_relayd(index);
1433 if (relayd == NULL) {
1434 DBG("Unable to find relayd %" PRIu64, index);
1435 ret_code = LTTCOMM_CONSUMERD_RELAYD_FAIL;
1436 }
1437
1438 /*
1439 * Each relayd socket pair has a refcount of stream attached to it
1440 * which tells if the relayd is still active or not depending on the
1441 * refcount value.
1442 *
1443 * This will set the destroy flag of the relayd object and destroy it
1444 * if the refcount reaches zero when called.
1445 *
1446 * The destroy can happen either here or when a stream fd hangs up.
1447 */
1448 if (relayd) {
1449 consumer_flag_relayd_for_destroy(relayd);
1450 }
1451
1452 goto end_msg_sessiond;
1453 }
1454 case LTTNG_CONSUMER_UPDATE_STREAM:
1455 {
1456 rcu_read_unlock();
1457 return -ENOSYS;
1458 }
1459 case LTTNG_CONSUMER_DATA_PENDING:
1460 {
1461 int ret, is_data_pending;
1462 uint64_t id = msg.u.data_pending.session_id;
1463
1464 DBG("UST consumer data pending command for id %" PRIu64, id);
1465
1466 is_data_pending = consumer_data_pending(id);
1467
1468 /* Send back returned value to session daemon */
1469 ret = lttcomm_send_unix_sock(sock, &is_data_pending,
1470 sizeof(is_data_pending));
1471 if (ret < 0) {
1472 DBG("Error when sending the data pending ret code: %d", ret);
1473 goto error_fatal;
1474 }
1475
1476 /*
1477 * No need to send back a status message since the data pending
1478 * returned value is the response.
1479 */
1480 break;
1481 }
1482 case LTTNG_CONSUMER_ASK_CHANNEL_CREATION:
1483 {
1484 int ret;
1485 struct ustctl_consumer_channel_attr attr;
1486 const uint64_t chunk_id = msg.u.ask_channel.chunk_id.value;
1487 const struct lttng_credentials buffer_credentials = {
1488 .uid = msg.u.ask_channel.buffer_credentials.uid,
1489 .gid = msg.u.ask_channel.buffer_credentials.gid,
1490 };
1491
1492 /* Create a plain object and reserve a channel key. */
1493 channel = allocate_channel(msg.u.ask_channel.session_id,
1494 msg.u.ask_channel.chunk_id.is_set ?
1495 &chunk_id : NULL,
1496 msg.u.ask_channel.pathname,
1497 msg.u.ask_channel.name,
1498 msg.u.ask_channel.relayd_id,
1499 msg.u.ask_channel.key,
1500 (enum lttng_event_output) msg.u.ask_channel.output,
1501 msg.u.ask_channel.tracefile_size,
1502 msg.u.ask_channel.tracefile_count,
1503 msg.u.ask_channel.session_id_per_pid,
1504 msg.u.ask_channel.monitor,
1505 msg.u.ask_channel.live_timer_interval,
1506 msg.u.ask_channel.root_shm_path,
1507 msg.u.ask_channel.shm_path);
1508 if (!channel) {
1509 goto end_channel_error;
1510 }
1511
1512 LTTNG_OPTIONAL_SET(&channel->buffer_credentials,
1513 buffer_credentials);
1514
1515 /*
1516 * Assign UST application UID to the channel. This value is ignored for
1517 * per PID buffers. This is specific to UST thus setting this after the
1518 * allocation.
1519 */
1520 channel->ust_app_uid = msg.u.ask_channel.ust_app_uid;
1521
1522 /* Build channel attributes from received message. */
1523 attr.subbuf_size = msg.u.ask_channel.subbuf_size;
1524 attr.num_subbuf = msg.u.ask_channel.num_subbuf;
1525 attr.overwrite = msg.u.ask_channel.overwrite;
1526 attr.switch_timer_interval = msg.u.ask_channel.switch_timer_interval;
1527 attr.read_timer_interval = msg.u.ask_channel.read_timer_interval;
1528 attr.chan_id = msg.u.ask_channel.chan_id;
1529 memcpy(attr.uuid, msg.u.ask_channel.uuid, sizeof(attr.uuid));
1530 attr.blocking_timeout= msg.u.ask_channel.blocking_timeout;
1531
1532 /* Match channel buffer type to the UST abi. */
1533 switch (msg.u.ask_channel.output) {
1534 case LTTNG_EVENT_MMAP:
1535 default:
1536 attr.output = LTTNG_UST_MMAP;
1537 break;
1538 }
1539
1540 /* Translate and save channel type. */
1541 switch (msg.u.ask_channel.type) {
1542 case LTTNG_UST_CHAN_PER_CPU:
1543 channel->type = CONSUMER_CHANNEL_TYPE_DATA;
1544 attr.type = LTTNG_UST_CHAN_PER_CPU;
1545 /*
1546 * Set refcount to 1 for owner. Below, we will
1547 * pass ownership to the
1548 * consumer_thread_channel_poll() thread.
1549 */
1550 channel->refcount = 1;
1551 break;
1552 case LTTNG_UST_CHAN_METADATA:
1553 channel->type = CONSUMER_CHANNEL_TYPE_METADATA;
1554 attr.type = LTTNG_UST_CHAN_METADATA;
1555 break;
1556 default:
1557 assert(0);
1558 goto error_fatal;
1559 };
1560
1561 health_code_update();
1562
1563 ret = ask_channel(ctx, channel, &attr);
1564 if (ret < 0) {
1565 goto end_channel_error;
1566 }
1567
1568 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1569 ret = consumer_metadata_cache_allocate(channel);
1570 if (ret < 0) {
1571 ERR("Allocating metadata cache");
1572 goto end_channel_error;
1573 }
1574 consumer_timer_switch_start(channel, attr.switch_timer_interval);
1575 attr.switch_timer_interval = 0;
1576 } else {
1577 int monitor_start_ret;
1578
1579 consumer_timer_live_start(channel,
1580 msg.u.ask_channel.live_timer_interval);
1581 monitor_start_ret = consumer_timer_monitor_start(
1582 channel,
1583 msg.u.ask_channel.monitor_timer_interval);
1584 if (monitor_start_ret < 0) {
1585 ERR("Starting channel monitoring timer failed");
1586 goto end_channel_error;
1587 }
1588 }
1589
1590 health_code_update();
1591
1592 /*
1593 * Add the channel to the internal state AFTER all streams were created
1594 * and successfully sent to session daemon. This way, all streams must
1595 * be ready before this channel is visible to the threads.
1596 * If add_channel succeeds, ownership of the channel is
1597 * passed to consumer_thread_channel_poll().
1598 */
1599 ret = add_channel(channel, ctx);
1600 if (ret < 0) {
1601 if (msg.u.ask_channel.type == LTTNG_UST_CHAN_METADATA) {
1602 if (channel->switch_timer_enabled == 1) {
1603 consumer_timer_switch_stop(channel);
1604 }
1605 consumer_metadata_cache_destroy(channel);
1606 }
1607 if (channel->live_timer_enabled == 1) {
1608 consumer_timer_live_stop(channel);
1609 }
1610 if (channel->monitor_timer_enabled == 1) {
1611 consumer_timer_monitor_stop(channel);
1612 }
1613 goto end_channel_error;
1614 }
1615
1616 health_code_update();
1617
1618 /*
1619 * Channel and streams are now created. Inform the session daemon that
1620 * everything went well and should wait to receive the channel and
1621 * streams with ustctl API.
1622 */
1623 ret = consumer_send_status_channel(sock, channel);
1624 if (ret < 0) {
1625 /*
1626 * There is probably a problem on the socket.
1627 */
1628 goto error_fatal;
1629 }
1630
1631 break;
1632 }
1633 case LTTNG_CONSUMER_GET_CHANNEL:
1634 {
1635 int ret, relayd_err = 0;
1636 uint64_t key = msg.u.get_channel.key;
1637 struct lttng_consumer_channel *channel;
1638
1639 channel = consumer_find_channel(key);
1640 if (!channel) {
1641 ERR("UST consumer get channel key %" PRIu64 " not found", key);
1642 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1643 goto end_get_channel;
1644 }
1645
1646 health_code_update();
1647
1648 /* Send the channel to sessiond (and relayd, if applicable). */
1649 ret = send_channel_to_sessiond_and_relayd(sock, channel, ctx,
1650 &relayd_err);
1651 if (ret < 0) {
1652 if (relayd_err) {
1653 /*
1654 * We were unable to send to the relayd the stream so avoid
1655 * sending back a fatal error to the thread since this is OK
1656 * and the consumer can continue its work. The above call
1657 * has sent the error status message to the sessiond.
1658 */
1659 goto end_get_channel_nosignal;
1660 }
1661 /*
1662 * The communicaton was broken hence there is a bad state between
1663 * the consumer and sessiond so stop everything.
1664 */
1665 goto error_get_channel_fatal;
1666 }
1667
1668 health_code_update();
1669
1670 /*
1671 * In no monitor mode, the streams ownership is kept inside the channel
1672 * so don't send them to the data thread.
1673 */
1674 if (!channel->monitor) {
1675 goto end_get_channel;
1676 }
1677
1678 ret = send_streams_to_thread(channel, ctx);
1679 if (ret < 0) {
1680 /*
1681 * If we are unable to send the stream to the thread, there is
1682 * a big problem so just stop everything.
1683 */
1684 goto error_get_channel_fatal;
1685 }
1686 /* List MUST be empty after or else it could be reused. */
1687 assert(cds_list_empty(&channel->streams.head));
1688 end_get_channel:
1689 goto end_msg_sessiond;
1690 error_get_channel_fatal:
1691 goto error_fatal;
1692 end_get_channel_nosignal:
1693 goto end_nosignal;
1694 }
1695 case LTTNG_CONSUMER_DESTROY_CHANNEL:
1696 {
1697 uint64_t key = msg.u.destroy_channel.key;
1698
1699 /*
1700 * Only called if streams have not been sent to stream
1701 * manager thread. However, channel has been sent to
1702 * channel manager thread.
1703 */
1704 notify_thread_del_channel(ctx, key);
1705 goto end_msg_sessiond;
1706 }
1707 case LTTNG_CONSUMER_CLOSE_METADATA:
1708 {
1709 int ret;
1710
1711 ret = close_metadata(msg.u.close_metadata.key);
1712 if (ret != 0) {
1713 ret_code = ret;
1714 }
1715
1716 goto end_msg_sessiond;
1717 }
1718 case LTTNG_CONSUMER_FLUSH_CHANNEL:
1719 {
1720 int ret;
1721
1722 ret = flush_channel(msg.u.flush_channel.key);
1723 if (ret != 0) {
1724 ret_code = ret;
1725 }
1726
1727 goto end_msg_sessiond;
1728 }
1729 case LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL:
1730 {
1731 int ret;
1732
1733 ret = clear_quiescent_channel(
1734 msg.u.clear_quiescent_channel.key);
1735 if (ret != 0) {
1736 ret_code = ret;
1737 }
1738
1739 goto end_msg_sessiond;
1740 }
1741 case LTTNG_CONSUMER_PUSH_METADATA:
1742 {
1743 int ret;
1744 uint64_t len = msg.u.push_metadata.len;
1745 uint64_t key = msg.u.push_metadata.key;
1746 uint64_t offset = msg.u.push_metadata.target_offset;
1747 uint64_t version = msg.u.push_metadata.version;
1748 struct lttng_consumer_channel *channel;
1749
1750 DBG("UST consumer push metadata key %" PRIu64 " of len %" PRIu64, key,
1751 len);
1752
1753 channel = consumer_find_channel(key);
1754 if (!channel) {
1755 /*
1756 * This is possible if the metadata creation on the consumer side
1757 * is in flight vis-a-vis a concurrent push metadata from the
1758 * session daemon. Simply return that the channel failed and the
1759 * session daemon will handle that message correctly considering
1760 * that this race is acceptable thus the DBG() statement here.
1761 */
1762 DBG("UST consumer push metadata %" PRIu64 " not found", key);
1763 ret_code = LTTCOMM_CONSUMERD_CHANNEL_FAIL;
1764 goto end_push_metadata_msg_sessiond;
1765 }
1766
1767 health_code_update();
1768
1769 if (!len) {
1770 /*
1771 * There is nothing to receive. We have simply
1772 * checked whether the channel can be found.
1773 */
1774 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1775 goto end_push_metadata_msg_sessiond;
1776 }
1777
1778 /* Tell session daemon we are ready to receive the metadata. */
1779 ret = consumer_send_status_msg(sock, LTTCOMM_CONSUMERD_SUCCESS);
1780 if (ret < 0) {
1781 /* Somehow, the session daemon is not responding anymore. */
1782 goto error_push_metadata_fatal;
1783 }
1784
1785 health_code_update();
1786
1787 /* Wait for more data. */
1788 health_poll_entry();
1789 ret = lttng_consumer_poll_socket(consumer_sockpoll);
1790 health_poll_exit();
1791 if (ret) {
1792 goto error_push_metadata_fatal;
1793 }
1794
1795 health_code_update();
1796
1797 ret = lttng_ustconsumer_recv_metadata(sock, key, offset,
1798 len, version, channel, 0, 1);
1799 if (ret < 0) {
1800 /* error receiving from sessiond */
1801 goto error_push_metadata_fatal;
1802 } else {
1803 ret_code = ret;
1804 goto end_push_metadata_msg_sessiond;
1805 }
1806 end_push_metadata_msg_sessiond:
1807 goto end_msg_sessiond;
1808 error_push_metadata_fatal:
1809 goto error_fatal;
1810 }
1811 case LTTNG_CONSUMER_SETUP_METADATA:
1812 {
1813 int ret;
1814
1815 ret = setup_metadata(ctx, msg.u.setup_metadata.key);
1816 if (ret) {
1817 ret_code = ret;
1818 }
1819 goto end_msg_sessiond;
1820 }
1821 case LTTNG_CONSUMER_SNAPSHOT_CHANNEL:
1822 {
1823 struct lttng_consumer_channel *channel;
1824 uint64_t key = msg.u.snapshot_channel.key;
1825
1826 channel = consumer_find_channel(key);
1827 if (!channel) {
1828 DBG("UST snapshot channel not found for key %" PRIu64, key);
1829 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
1830 } else {
1831 if (msg.u.snapshot_channel.metadata) {
1832 ret = snapshot_metadata(channel, key,
1833 msg.u.snapshot_channel.pathname,
1834 msg.u.snapshot_channel.relayd_id,
1835 ctx);
1836 if (ret < 0) {
1837 ERR("Snapshot metadata failed");
1838 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1839 }
1840 } else {
1841 ret = snapshot_channel(channel, key,
1842 msg.u.snapshot_channel.pathname,
1843 msg.u.snapshot_channel.relayd_id,
1844 msg.u.snapshot_channel.nb_packets_per_stream,
1845 ctx);
1846 if (ret < 0) {
1847 ERR("Snapshot channel failed");
1848 ret_code = LTTCOMM_CONSUMERD_SNAPSHOT_FAILED;
1849 }
1850 }
1851 }
1852 health_code_update();
1853 ret = consumer_send_status_msg(sock, ret_code);
1854 if (ret < 0) {
1855 /* Somehow, the session daemon is not responding anymore. */
1856 goto end_nosignal;
1857 }
1858 health_code_update();
1859 break;
1860 }
1861 case LTTNG_CONSUMER_DISCARDED_EVENTS:
1862 {
1863 int ret = 0;
1864 uint64_t discarded_events;
1865 struct lttng_ht_iter iter;
1866 struct lttng_ht *ht;
1867 struct lttng_consumer_stream *stream;
1868 uint64_t id = msg.u.discarded_events.session_id;
1869 uint64_t key = msg.u.discarded_events.channel_key;
1870
1871 DBG("UST consumer discarded events command for session id %"
1872 PRIu64, id);
1873 rcu_read_lock();
1874 pthread_mutex_lock(&consumer_data.lock);
1875
1876 ht = consumer_data.stream_list_ht;
1877
1878 /*
1879 * We only need a reference to the channel, but they are not
1880 * directly indexed, so we just use the first matching stream
1881 * to extract the information we need, we default to 0 if not
1882 * found (no events are dropped if the channel is not yet in
1883 * use).
1884 */
1885 discarded_events = 0;
1886 cds_lfht_for_each_entry_duplicate(ht->ht,
1887 ht->hash_fct(&id, lttng_ht_seed),
1888 ht->match_fct, &id,
1889 &iter.iter, stream, node_session_id.node) {
1890 if (stream->chan->key == key) {
1891 discarded_events = stream->chan->discarded_events;
1892 break;
1893 }
1894 }
1895 pthread_mutex_unlock(&consumer_data.lock);
1896 rcu_read_unlock();
1897
1898 DBG("UST consumer discarded events command for session id %"
1899 PRIu64 ", channel key %" PRIu64, id, key);
1900
1901 health_code_update();
1902
1903 /* Send back returned value to session daemon */
1904 ret = lttcomm_send_unix_sock(sock, &discarded_events, sizeof(discarded_events));
1905 if (ret < 0) {
1906 PERROR("send discarded events");
1907 goto error_fatal;
1908 }
1909
1910 break;
1911 }
1912 case LTTNG_CONSUMER_LOST_PACKETS:
1913 {
1914 int ret;
1915 uint64_t lost_packets;
1916 struct lttng_ht_iter iter;
1917 struct lttng_ht *ht;
1918 struct lttng_consumer_stream *stream;
1919 uint64_t id = msg.u.lost_packets.session_id;
1920 uint64_t key = msg.u.lost_packets.channel_key;
1921
1922 DBG("UST consumer lost packets command for session id %"
1923 PRIu64, id);
1924 rcu_read_lock();
1925 pthread_mutex_lock(&consumer_data.lock);
1926
1927 ht = consumer_data.stream_list_ht;
1928
1929 /*
1930 * We only need a reference to the channel, but they are not
1931 * directly indexed, so we just use the first matching stream
1932 * to extract the information we need, we default to 0 if not
1933 * found (no packets lost if the channel is not yet in use).
1934 */
1935 lost_packets = 0;
1936 cds_lfht_for_each_entry_duplicate(ht->ht,
1937 ht->hash_fct(&id, lttng_ht_seed),
1938 ht->match_fct, &id,
1939 &iter.iter, stream, node_session_id.node) {
1940 if (stream->chan->key == key) {
1941 lost_packets = stream->chan->lost_packets;
1942 break;
1943 }
1944 }
1945 pthread_mutex_unlock(&consumer_data.lock);
1946 rcu_read_unlock();
1947
1948 DBG("UST consumer lost packets command for session id %"
1949 PRIu64 ", channel key %" PRIu64, id, key);
1950
1951 health_code_update();
1952
1953 /* Send back returned value to session daemon */
1954 ret = lttcomm_send_unix_sock(sock, &lost_packets,
1955 sizeof(lost_packets));
1956 if (ret < 0) {
1957 PERROR("send lost packets");
1958 goto error_fatal;
1959 }
1960
1961 break;
1962 }
1963 case LTTNG_CONSUMER_SET_CHANNEL_MONITOR_PIPE:
1964 {
1965 int channel_monitor_pipe;
1966
1967 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1968 /* Successfully received the command's type. */
1969 ret = consumer_send_status_msg(sock, ret_code);
1970 if (ret < 0) {
1971 goto error_fatal;
1972 }
1973
1974 ret = lttcomm_recv_fds_unix_sock(sock, &channel_monitor_pipe,
1975 1);
1976 if (ret != sizeof(channel_monitor_pipe)) {
1977 ERR("Failed to receive channel monitor pipe");
1978 goto error_fatal;
1979 }
1980
1981 DBG("Received channel monitor pipe (%d)", channel_monitor_pipe);
1982 ret = consumer_timer_thread_set_channel_monitor_pipe(
1983 channel_monitor_pipe);
1984 if (!ret) {
1985 int flags;
1986
1987 ret_code = LTTCOMM_CONSUMERD_SUCCESS;
1988 /* Set the pipe as non-blocking. */
1989 ret = fcntl(channel_monitor_pipe, F_GETFL, 0);
1990 if (ret == -1) {
1991 PERROR("fcntl get flags of the channel monitoring pipe");
1992 goto error_fatal;
1993 }
1994 flags = ret;
1995
1996 ret = fcntl(channel_monitor_pipe, F_SETFL,
1997 flags | O_NONBLOCK);
1998 if (ret == -1) {
1999 PERROR("fcntl set O_NONBLOCK flag of the channel monitoring pipe");
2000 goto error_fatal;
2001 }
2002 DBG("Channel monitor pipe set as non-blocking");
2003 } else {
2004 ret_code = LTTCOMM_CONSUMERD_ALREADY_SET;
2005 }
2006 goto end_msg_sessiond;
2007 }
2008 case LTTNG_CONSUMER_ROTATE_CHANNEL:
2009 {
2010 struct lttng_consumer_channel *channel;
2011 uint64_t key = msg.u.rotate_channel.key;
2012
2013 channel = consumer_find_channel(key);
2014 if (!channel) {
2015 DBG("Channel %" PRIu64 " not found", key);
2016 ret_code = LTTCOMM_CONSUMERD_CHAN_NOT_FOUND;
2017 } else {
2018 /*
2019 * Sample the rotate position of all the streams in
2020 * this channel.
2021 */
2022 ret = lttng_consumer_rotate_channel(channel, key,
2023 msg.u.rotate_channel.relayd_id,
2024 msg.u.rotate_channel.metadata,
2025 ctx);
2026 if (ret < 0) {
2027 ERR("Rotate channel failed");
2028 ret_code = LTTCOMM_CONSUMERD_ROTATION_FAIL;
2029 }
2030
2031 health_code_update();
2032 }
2033 ret = consumer_send_status_msg(sock, ret_code);
2034 if (ret < 0) {
2035 /* Somehow, the session daemon is not responding anymore. */
2036 goto end_rotate_channel_nosignal;
2037 }
2038
2039 /*
2040 * Rotate the streams that are ready right now.
2041 * FIXME: this is a second consecutive iteration over the
2042 * streams in a channel, there is probably a better way to
2043 * handle this, but it needs to be after the
2044 * consumer_send_status_msg() call.
2045 */
2046 if (channel) {
2047 ret = lttng_consumer_rotate_ready_streams(
2048 channel, key, ctx);
2049 if (ret < 0) {
2050 ERR("Rotate channel failed");
2051 }
2052 }
2053 break;
2054 end_rotate_channel_nosignal:
2055 goto end_nosignal;
2056 }
2057 case LTTNG_CONSUMER_INIT:
2058 {
2059 ret_code = lttng_consumer_init_command(ctx,
2060 msg.u.init.sessiond_uuid);
2061 health_code_update();
2062 ret = consumer_send_status_msg(sock, ret_code);
2063 if (ret < 0) {
2064 /* Somehow, the session daemon is not responding anymore. */
2065 goto end_nosignal;
2066 }
2067 break;
2068 }
2069 case LTTNG_CONSUMER_CREATE_TRACE_CHUNK:
2070 {
2071 const struct lttng_credentials credentials = {
2072 .uid = msg.u.create_trace_chunk.credentials.value.uid,
2073 .gid = msg.u.create_trace_chunk.credentials.value.gid,
2074 };
2075 const bool is_local_trace =
2076 !msg.u.create_trace_chunk.relayd_id.is_set;
2077 const uint64_t relayd_id =
2078 msg.u.create_trace_chunk.relayd_id.value;
2079 const char *chunk_override_name =
2080 *msg.u.create_trace_chunk.override_name ?
2081 msg.u.create_trace_chunk.override_name :
2082 NULL;
2083 LTTNG_OPTIONAL(struct lttng_directory_handle) chunk_directory_handle =
2084 LTTNG_OPTIONAL_INIT;
2085
2086 /*
2087 * The session daemon will only provide a chunk directory file
2088 * descriptor for local traces.
2089 */
2090 if (is_local_trace) {
2091 int chunk_dirfd;
2092
2093 /* Acnowledge the reception of the command. */
2094 ret = consumer_send_status_msg(sock,
2095 LTTCOMM_CONSUMERD_SUCCESS);
2096 if (ret < 0) {
2097 /* Somehow, the session daemon is not responding anymore. */
2098 goto end_nosignal;
2099 }
2100
2101 ret = lttcomm_recv_fds_unix_sock(sock, &chunk_dirfd, 1);
2102 if (ret != sizeof(chunk_dirfd)) {
2103 ERR("Failed to receive trace chunk directory file descriptor");
2104 goto error_fatal;
2105 }
2106
2107 DBG("Received trace chunk directory fd (%d)",
2108 chunk_dirfd);
2109 ret = lttng_directory_handle_init_from_dirfd(
2110 &chunk_directory_handle.value,
2111 chunk_dirfd);
2112 if (ret) {
2113 ERR("Failed to initialize chunk directory handle from directory file descriptor");
2114 if (close(chunk_dirfd)) {
2115 PERROR("Failed to close chunk directory file descriptor");
2116 }
2117 goto error_fatal;
2118 }
2119 chunk_directory_handle.is_set = true;
2120 }
2121
2122 ret_code = lttng_consumer_create_trace_chunk(
2123 !is_local_trace ? &relayd_id : NULL,
2124 msg.u.create_trace_chunk.session_id,
2125 msg.u.create_trace_chunk.chunk_id,
2126 (time_t) msg.u.create_trace_chunk
2127 .creation_timestamp,
2128 chunk_override_name,
2129 msg.u.create_trace_chunk.credentials.is_set ?
2130 &credentials :
2131 NULL,
2132 chunk_directory_handle.is_set ?
2133 &chunk_directory_handle.value :
2134 NULL);
2135
2136 if (chunk_directory_handle.is_set) {
2137 lttng_directory_handle_fini(
2138 &chunk_directory_handle.value);
2139 }
2140 goto end_msg_sessiond;
2141 }
2142 case LTTNG_CONSUMER_CLOSE_TRACE_CHUNK:
2143 {
2144 enum lttng_trace_chunk_command_type close_command =
2145 msg.u.close_trace_chunk.close_command.value;
2146 const uint64_t relayd_id =
2147 msg.u.close_trace_chunk.relayd_id.value;
2148 struct lttcomm_consumer_close_trace_chunk_reply reply;
2149 char closed_trace_chunk_path[LTTNG_PATH_MAX];
2150 int ret;
2151
2152 ret_code = lttng_consumer_close_trace_chunk(
2153 msg.u.close_trace_chunk.relayd_id.is_set ?
2154 &relayd_id :
2155 NULL,
2156 msg.u.close_trace_chunk.session_id,
2157 msg.u.close_trace_chunk.chunk_id,
2158 (time_t) msg.u.close_trace_chunk.close_timestamp,
2159 msg.u.close_trace_chunk.close_command.is_set ?
2160 &close_command :
2161 NULL, closed_trace_chunk_path);
2162 reply.ret_code = ret_code;
2163 reply.path_length = strlen(closed_trace_chunk_path) + 1;
2164 ret = lttcomm_send_unix_sock(sock, &reply, sizeof(reply));
2165 if (ret != sizeof(reply)) {
2166 goto error_fatal;
2167 }
2168 ret = lttcomm_send_unix_sock(sock, closed_trace_chunk_path,
2169 reply.path_length);
2170 if (ret != reply.path_length) {
2171 goto error_fatal;
2172 }
2173 goto end_nosignal;
2174 }
2175 case LTTNG_CONSUMER_TRACE_CHUNK_EXISTS:
2176 {
2177 const uint64_t relayd_id =
2178 msg.u.trace_chunk_exists.relayd_id.value;
2179
2180 ret_code = lttng_consumer_trace_chunk_exists(
2181 msg.u.trace_chunk_exists.relayd_id.is_set ?
2182 &relayd_id : NULL,
2183 msg.u.trace_chunk_exists.session_id,
2184 msg.u.trace_chunk_exists.chunk_id);
2185 goto end_msg_sessiond;
2186 }
2187 default:
2188 break;
2189 }
2190
2191 end_nosignal:
2192 /*
2193 * Return 1 to indicate success since the 0 value can be a socket
2194 * shutdown during the recv() or send() call.
2195 */
2196 ret = 1;
2197 goto end;
2198
2199 end_msg_sessiond:
2200 /*
2201 * The returned value here is not useful since either way we'll return 1 to
2202 * the caller because the session daemon socket management is done
2203 * elsewhere. Returning a negative code or 0 will shutdown the consumer.
2204 */
2205 ret = consumer_send_status_msg(sock, ret_code);
2206 if (ret < 0) {
2207 goto error_fatal;
2208 }
2209 ret = 1;
2210 goto end;
2211
2212 end_channel_error:
2213 if (channel) {
2214 /*
2215 * Free channel here since no one has a reference to it. We don't
2216 * free after that because a stream can store this pointer.
2217 */
2218 destroy_channel(channel);
2219 }
2220 /* We have to send a status channel message indicating an error. */
2221 ret = consumer_send_status_channel(sock, NULL);
2222 if (ret < 0) {
2223 /* Stop everything if session daemon can not be notified. */
2224 goto error_fatal;
2225 }
2226 ret = 1;
2227 goto end;
2228
2229 error_fatal:
2230 /* This will issue a consumer stop. */
2231 ret = -1;
2232 goto end;
2233
2234 end:
2235 rcu_read_unlock();
2236 health_code_update();
2237 return ret;
2238 }
2239
2240 void lttng_ustctl_flush_buffer(struct lttng_consumer_stream *stream,
2241 int producer_active)
2242 {
2243 assert(stream);
2244 assert(stream->ustream);
2245
2246 ustctl_flush_buffer(stream->ustream, producer_active);
2247 }
2248
2249 /*
2250 * Take a snapshot for a specific stream.
2251 *
2252 * Returns 0 on success, < 0 on error
2253 */
2254 int lttng_ustconsumer_take_snapshot(struct lttng_consumer_stream *stream)
2255 {
2256 assert(stream);
2257 assert(stream->ustream);
2258
2259 return ustctl_snapshot(stream->ustream);
2260 }
2261
2262 /*
2263 * Sample consumed and produced positions for a specific stream.
2264 *
2265 * Returns 0 on success, < 0 on error.
2266 */
2267 int lttng_ustconsumer_sample_snapshot_positions(
2268 struct lttng_consumer_stream *stream)
2269 {
2270 assert(stream);
2271 assert(stream->ustream);
2272
2273 return ustctl_snapshot_sample_positions(stream->ustream);
2274 }
2275
2276 /*
2277 * Get the produced position
2278 *
2279 * Returns 0 on success, < 0 on error
2280 */
2281 int lttng_ustconsumer_get_produced_snapshot(
2282 struct lttng_consumer_stream *stream, unsigned long *pos)
2283 {
2284 assert(stream);
2285 assert(stream->ustream);
2286 assert(pos);
2287
2288 return ustctl_snapshot_get_produced(stream->ustream, pos);
2289 }
2290
2291 /*
2292 * Get the consumed position
2293 *
2294 * Returns 0 on success, < 0 on error
2295 */
2296 int lttng_ustconsumer_get_consumed_snapshot(
2297 struct lttng_consumer_stream *stream, unsigned long *pos)
2298 {
2299 assert(stream);
2300 assert(stream->ustream);
2301 assert(pos);
2302
2303 return ustctl_snapshot_get_consumed(stream->ustream, pos);
2304 }
2305
2306 void lttng_ustconsumer_flush_buffer(struct lttng_consumer_stream *stream,
2307 int producer)
2308 {
2309 assert(stream);
2310 assert(stream->ustream);
2311
2312 ustctl_flush_buffer(stream->ustream, producer);
2313 }
2314
2315 int lttng_ustconsumer_get_current_timestamp(
2316 struct lttng_consumer_stream *stream, uint64_t *ts)
2317 {
2318 assert(stream);
2319 assert(stream->ustream);
2320 assert(ts);
2321
2322 return ustctl_get_current_timestamp(stream->ustream, ts);
2323 }
2324
2325 int lttng_ustconsumer_get_sequence_number(
2326 struct lttng_consumer_stream *stream, uint64_t *seq)
2327 {
2328 assert(stream);
2329 assert(stream->ustream);
2330 assert(seq);
2331
2332 return ustctl_get_sequence_number(stream->ustream, seq);
2333 }
2334
2335 /*
2336 * Called when the stream signals the consumer that it has hung up.
2337 */
2338 void lttng_ustconsumer_on_stream_hangup(struct lttng_consumer_stream *stream)
2339 {
2340 assert(stream);
2341 assert(stream->ustream);
2342
2343 pthread_mutex_lock(&stream->lock);
2344 if (!stream->quiescent) {
2345 ustctl_flush_buffer(stream->ustream, 0);
2346 stream->quiescent = true;
2347 }
2348 pthread_mutex_unlock(&stream->lock);
2349 stream->hangup_flush_done = 1;
2350 }
2351
2352 void lttng_ustconsumer_del_channel(struct lttng_consumer_channel *chan)
2353 {
2354 int i;
2355
2356 assert(chan);
2357 assert(chan->uchan);
2358 assert(chan->buffer_credentials.is_set);
2359
2360 if (chan->switch_timer_enabled == 1) {
2361 consumer_timer_switch_stop(chan);
2362 }
2363 for (i = 0; i < chan->nr_stream_fds; i++) {
2364 int ret;
2365
2366 ret = close(chan->stream_fds[i]);
2367 if (ret) {
2368 PERROR("close");
2369 }
2370 if (chan->shm_path[0]) {
2371 char shm_path[PATH_MAX];
2372
2373 ret = get_stream_shm_path(shm_path, chan->shm_path, i);
2374 if (ret) {
2375 ERR("Cannot get stream shm path");
2376 }
2377 ret = run_as_unlink(shm_path,
2378 chan->buffer_credentials.value.uid,
2379 chan->buffer_credentials.value.gid);
2380 if (ret) {
2381 PERROR("unlink %s", shm_path);
2382 }
2383 }
2384 }
2385 }
2386
2387 void lttng_ustconsumer_free_channel(struct lttng_consumer_channel *chan)
2388 {
2389 assert(chan);
2390 assert(chan->uchan);
2391 assert(chan->buffer_credentials.is_set);
2392
2393 consumer_metadata_cache_destroy(chan);
2394 ustctl_destroy_channel(chan->uchan);
2395 /* Try to rmdir all directories under shm_path root. */
2396 if (chan->root_shm_path[0]) {
2397 (void) run_as_rmdir_recursive(chan->root_shm_path,
2398 chan->buffer_credentials.value.uid,
2399 chan->buffer_credentials.value.gid,
2400 LTTNG_DIRECTORY_HANDLE_SKIP_NON_EMPTY_FLAG);
2401 }
2402 free(chan->stream_fds);
2403 }
2404
2405 void lttng_ustconsumer_del_stream(struct lttng_consumer_stream *stream)
2406 {
2407 assert(stream);
2408 assert(stream->ustream);
2409
2410 if (stream->chan->switch_timer_enabled == 1) {
2411 consumer_timer_switch_stop(stream->chan);
2412 }
2413 ustctl_destroy_stream(stream->ustream);
2414 }
2415
2416 int lttng_ustconsumer_get_wakeup_fd(struct lttng_consumer_stream *stream)
2417 {
2418 assert(stream);
2419 assert(stream->ustream);
2420
2421 return ustctl_stream_get_wakeup_fd(stream->ustream);
2422 }
2423
2424 int lttng_ustconsumer_close_wakeup_fd(struct lttng_consumer_stream *stream)
2425 {
2426 assert(stream);
2427 assert(stream->ustream);
2428
2429 return ustctl_stream_close_wakeup_fd(stream->ustream);
2430 }
2431
2432 /*
2433 * Populate index values of a UST stream. Values are set in big endian order.
2434 *
2435 * Return 0 on success or else a negative value.
2436 */
2437 static int get_index_values(struct ctf_packet_index *index,
2438 struct ustctl_consumer_stream *ustream)
2439 {
2440 int ret;
2441 uint64_t packet_size, content_size, timestamp_begin, timestamp_end,
2442 events_discarded, stream_id, stream_instance_id,
2443 packet_seq_num;
2444
2445 ret = ustctl_get_timestamp_begin(ustream, &timestamp_begin);
2446 if (ret < 0) {
2447 PERROR("ustctl_get_timestamp_begin");
2448 goto error;
2449 }
2450
2451 ret = ustctl_get_timestamp_end(ustream, &timestamp_end);
2452 if (ret < 0) {
2453 PERROR("ustctl_get_timestamp_end");
2454 goto error;
2455 }
2456
2457 ret = ustctl_get_events_discarded(ustream, &events_discarded);
2458 if (ret < 0) {
2459 PERROR("ustctl_get_events_discarded");
2460 goto error;
2461 }
2462
2463 ret = ustctl_get_content_size(ustream, &content_size);
2464 if (ret < 0) {
2465 PERROR("ustctl_get_content_size");
2466 goto error;
2467 }
2468
2469 ret = ustctl_get_packet_size(ustream, &packet_size);
2470 if (ret < 0) {
2471 PERROR("ustctl_get_packet_size");
2472 goto error;
2473 }
2474
2475 ret = ustctl_get_stream_id(ustream, &stream_id);
2476 if (ret < 0) {
2477 PERROR("ustctl_get_stream_id");
2478 goto error;
2479 }
2480
2481 ret = ustctl_get_instance_id(ustream, &stream_instance_id);
2482 if (ret < 0) {
2483 PERROR("ustctl_get_instance_id");
2484 goto error;
2485 }
2486
2487 ret = ustctl_get_sequence_number(ustream, &packet_seq_num);
2488 if (ret < 0) {
2489 PERROR("ustctl_get_sequence_number");
2490 goto error;
2491 }
2492
2493 *index = (typeof(*index)) {
2494 .offset = index->offset,
2495 .packet_size = htobe64(packet_size),
2496 .content_size = htobe64(content_size),
2497 .timestamp_begin = htobe64(timestamp_begin),
2498 .timestamp_end = htobe64(timestamp_end),
2499 .events_discarded = htobe64(events_discarded),
2500 .stream_id = htobe64(stream_id),
2501 .stream_instance_id = htobe64(stream_instance_id),
2502 .packet_seq_num = htobe64(packet_seq_num),
2503 };
2504
2505 error:
2506 return ret;
2507 }
2508
2509 static
2510 void metadata_stream_reset_cache(struct lttng_consumer_stream *stream,
2511 struct consumer_metadata_cache *cache)
2512 {
2513 DBG("Metadata stream update to version %" PRIu64,
2514 cache->version);
2515 stream->ust_metadata_pushed = 0;
2516 stream->metadata_version = cache->version;
2517 stream->reset_metadata_flag = 1;
2518 }
2519
2520 /*
2521 * Check if the version of the metadata stream and metadata cache match.
2522 * If the cache got updated, reset the metadata stream.
2523 * The stream lock and metadata cache lock MUST be held.
2524 * Return 0 on success, a negative value on error.
2525 */
2526 static
2527 int metadata_stream_check_version(struct lttng_consumer_stream *stream)
2528 {
2529 int ret = 0;
2530 struct consumer_metadata_cache *cache = stream->chan->metadata_cache;
2531
2532 if (cache->version == stream->metadata_version) {
2533 goto end;
2534 }
2535 metadata_stream_reset_cache(stream, cache);
2536
2537 end:
2538 return ret;
2539 }
2540
2541 /*
2542 * Write up to one packet from the metadata cache to the channel.
2543 *
2544 * Returns the number of bytes pushed in the cache, or a negative value
2545 * on error.
2546 */
2547 static
2548 int commit_one_metadata_packet(struct lttng_consumer_stream *stream)
2549 {
2550 ssize_t write_len;
2551 int ret;
2552
2553 pthread_mutex_lock(&stream->chan->metadata_cache->lock);
2554 ret = metadata_stream_check_version(stream);
2555 if (ret < 0) {
2556 goto end;
2557 }
2558 if (stream->chan->metadata_cache->max_offset
2559 == stream->ust_metadata_pushed) {
2560 ret = 0;
2561 goto end;
2562 }
2563
2564 write_len = ustctl_write_one_packet_to_channel(stream->chan->uchan,
2565 &stream->chan->metadata_cache->data[stream->ust_metadata_pushed],
2566 stream->chan->metadata_cache->max_offset
2567 - stream->ust_metadata_pushed);
2568 assert(write_len != 0);
2569 if (write_len < 0) {
2570 ERR("Writing one metadata packet");
2571 ret = -1;
2572 goto end;
2573 }
2574 stream->ust_metadata_pushed += write_len;
2575
2576 assert(stream->chan->metadata_cache->max_offset >=
2577 stream->ust_metadata_pushed);
2578 ret = write_len;
2579
2580 /*
2581 * Switch packet (but don't open the next one) on every commit of
2582 * a metadata packet. Since the subbuffer is fully filled (with padding,
2583 * if needed), the stream is "quiescent" after this commit.
2584 */
2585 ustctl_flush_buffer(stream->ustream, 1);
2586 stream->quiescent = true;
2587 end:
2588 pthread_mutex_unlock(&stream->chan->metadata_cache->lock);
2589 return ret;
2590 }
2591
2592
2593 /*
2594 * Sync metadata meaning request them to the session daemon and snapshot to the
2595 * metadata thread can consumer them.
2596 *
2597 * Metadata stream lock is held here, but we need to release it when
2598 * interacting with sessiond, else we cause a deadlock with live
2599 * awaiting on metadata to be pushed out.
2600 *
2601 * The RCU read side lock must be held by the caller.
2602 *
2603 * Return 0 if new metadatda is available, EAGAIN if the metadata stream
2604 * is empty or a negative value on error.
2605 */
2606 int lttng_ustconsumer_sync_metadata(struct lttng_consumer_local_data *ctx,
2607 struct lttng_consumer_stream *metadata_stream)
2608 {
2609 int ret;
2610 int retry = 0;
2611 struct lttng_consumer_channel *metadata_channel;
2612
2613 assert(ctx);
2614 assert(metadata_stream);
2615
2616 metadata_channel = metadata_stream->chan;
2617 pthread_mutex_unlock(&metadata_stream->lock);
2618 /*
2619 * Request metadata from the sessiond, but don't wait for the flush
2620 * because we locked the metadata thread.
2621 */
2622 ret = lttng_ustconsumer_request_metadata(ctx, metadata_channel, 0, 0);
2623 pthread_mutex_lock(&metadata_stream->lock);
2624 if (ret < 0) {
2625 goto end;
2626 }
2627
2628 /*
2629 * The metadata stream and channel can be deleted while the
2630 * metadata stream lock was released. The streamed is checked
2631 * for deletion before we use it further.
2632 *
2633 * Note that it is safe to access a logically-deleted stream since its
2634 * existence is still guaranteed by the RCU read side lock. However,
2635 * it should no longer be used. The close/deletion of the metadata
2636 * channel and stream already guarantees that all metadata has been
2637 * consumed. Therefore, there is nothing left to do in this function.
2638 */
2639 if (consumer_stream_is_deleted(metadata_stream)) {
2640 DBG("Metadata stream %" PRIu64 " was deleted during the metadata synchronization",
2641 metadata_stream->key);
2642 ret = 0;
2643 goto end;
2644 }
2645
2646 ret = commit_one_metadata_packet(metadata_stream);
2647 if (ret <= 0) {
2648 goto end;
2649 } else if (ret > 0) {
2650 retry = 1;
2651 }
2652
2653 ret = ustctl_snapshot(metadata_stream->ustream);
2654 if (ret < 0) {
2655 if (errno != EAGAIN) {
2656 ERR("Sync metadata, taking UST snapshot");
2657 goto end;
2658 }
2659 DBG("No new metadata when syncing them.");
2660 /* No new metadata, exit. */
2661 ret = ENODATA;
2662 goto end;
2663 }
2664
2665 /*
2666 * After this flush, we still need to extract metadata.
2667 */
2668 if (retry) {
2669 ret = EAGAIN;
2670 }
2671
2672 end:
2673 return ret;
2674 }
2675
2676 /*
2677 * Return 0 on success else a negative value.
2678 */
2679 static int notify_if_more_data(struct lttng_consumer_stream *stream,
2680 struct lttng_consumer_local_data *ctx)
2681 {
2682 int ret;
2683 struct ustctl_consumer_stream *ustream;
2684
2685 assert(stream);
2686 assert(ctx);
2687
2688 ustream = stream->ustream;
2689
2690 /*
2691 * First, we are going to check if there is a new subbuffer available
2692 * before reading the stream wait_fd.
2693 */
2694 /* Get the next subbuffer */
2695 ret = ustctl_get_next_subbuf(ustream);
2696 if (ret) {
2697 /* No more data found, flag the stream. */
2698 stream->has_data = 0;
2699 ret = 0;
2700 goto end;
2701 }
2702
2703 ret = ustctl_put_subbuf(ustream);
2704 assert(!ret);
2705
2706 /* This stream still has data. Flag it and wake up the data thread. */
2707 stream->has_data = 1;
2708
2709 if (stream->monitor && !stream->hangup_flush_done && !ctx->has_wakeup) {
2710 ssize_t writelen;
2711
2712 writelen = lttng_pipe_write(ctx->consumer_wakeup_pipe, "!", 1);
2713 if (writelen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2714 ret = writelen;
2715 goto end;
2716 }
2717
2718 /* The wake up pipe has been notified. */
2719 ctx->has_wakeup = 1;
2720 }
2721 ret = 0;
2722
2723 end:
2724 return ret;
2725 }
2726
2727 static
2728 int update_stream_stats(struct lttng_consumer_stream *stream)
2729 {
2730 int ret;
2731 uint64_t seq, discarded;
2732
2733 ret = ustctl_get_sequence_number(stream->ustream, &seq);
2734 if (ret < 0) {
2735 PERROR("ustctl_get_sequence_number");
2736 goto end;
2737 }
2738 /*
2739 * Start the sequence when we extract the first packet in case we don't
2740 * start at 0 (for example if a consumer is not connected to the
2741 * session immediately after the beginning).
2742 */
2743 if (stream->last_sequence_number == -1ULL) {
2744 stream->last_sequence_number = seq;
2745 } else if (seq > stream->last_sequence_number) {
2746 stream->chan->lost_packets += seq -
2747 stream->last_sequence_number - 1;
2748 } else {
2749 /* seq <= last_sequence_number */
2750 ERR("Sequence number inconsistent : prev = %" PRIu64
2751 ", current = %" PRIu64,
2752 stream->last_sequence_number, seq);
2753 ret = -1;
2754 goto end;
2755 }
2756 stream->last_sequence_number = seq;
2757
2758 ret = ustctl_get_events_discarded(stream->ustream, &discarded);
2759 if (ret < 0) {
2760 PERROR("kernctl_get_events_discarded");
2761 goto end;
2762 }
2763 if (discarded < stream->last_discarded_events) {
2764 /*
2765 * Overflow has occurred. We assume only one wrap-around
2766 * has occurred.
2767 */
2768 stream->chan->discarded_events +=
2769 (1ULL << (CAA_BITS_PER_LONG - 1)) -
2770 stream->last_discarded_events + discarded;
2771 } else {
2772 stream->chan->discarded_events += discarded -
2773 stream->last_discarded_events;
2774 }
2775 stream->last_discarded_events = discarded;
2776 ret = 0;
2777
2778 end:
2779 return ret;
2780 }
2781
2782 /*
2783 * Read subbuffer from the given stream.
2784 *
2785 * Stream and channel locks MUST be acquired by the caller.
2786 *
2787 * Return 0 on success else a negative value.
2788 */
2789 int lttng_ustconsumer_read_subbuffer(struct lttng_consumer_stream *stream,
2790 struct lttng_consumer_local_data *ctx)
2791 {
2792 unsigned long len, subbuf_size, padding;
2793 int err, write_index = 1, rotation_ret;
2794 long ret = 0;
2795 struct ustctl_consumer_stream *ustream;
2796 struct ctf_packet_index index;
2797 const char *subbuf_addr;
2798 struct lttng_buffer_view subbuf_view;
2799
2800 assert(stream);
2801 assert(stream->ustream);
2802 assert(ctx);
2803
2804 DBG("In UST read_subbuffer (wait_fd: %d, name: %s)", stream->wait_fd,
2805 stream->name);
2806
2807 /* Ease our life for what's next. */
2808 ustream = stream->ustream;
2809
2810 /*
2811 * We can consume the 1 byte written into the wait_fd by UST. Don't trigger
2812 * error if we cannot read this one byte (read returns 0), or if the error
2813 * is EAGAIN or EWOULDBLOCK.
2814 *
2815 * This is only done when the stream is monitored by a thread, before the
2816 * flush is done after a hangup and if the stream is not flagged with data
2817 * since there might be nothing to consume in the wait fd but still have
2818 * data available flagged by the consumer wake up pipe.
2819 */
2820 if (stream->monitor && !stream->hangup_flush_done && !stream->has_data) {
2821 char dummy;
2822 ssize_t readlen;
2823
2824 readlen = lttng_read(stream->wait_fd, &dummy, 1);
2825 if (readlen < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
2826 ret = readlen;
2827 goto error;
2828 }
2829 }
2830
2831 /*
2832 * If the stream was flagged to be ready for rotation before we extract the
2833 * next packet, rotate it now.
2834 */
2835 if (stream->rotate_ready) {
2836 DBG("Rotate stream before extracting data");
2837 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
2838 if (rotation_ret < 0) {
2839 ERR("Stream rotation error");
2840 ret = -1;
2841 goto error;
2842 }
2843 }
2844
2845 retry:
2846 /* Get the next subbuffer */
2847 err = ustctl_get_next_subbuf(ustream);
2848 if (err != 0) {
2849 /*
2850 * Populate metadata info if the existing info has
2851 * already been read.
2852 */
2853 if (stream->metadata_flag) {
2854 ret = commit_one_metadata_packet(stream);
2855 if (ret <= 0) {
2856 goto error;
2857 }
2858 goto retry;
2859 }
2860
2861 ret = err; /* ustctl_get_next_subbuf returns negative, caller expect positive. */
2862 /*
2863 * This is a debug message even for single-threaded consumer,
2864 * because poll() have more relaxed criterions than get subbuf,
2865 * so get_subbuf may fail for short race windows where poll()
2866 * would issue wakeups.
2867 */
2868 DBG("Reserving sub buffer failed (everything is normal, "
2869 "it is due to concurrency) [ret: %d]", err);
2870 goto error;
2871 }
2872 assert(stream->chan->output == CONSUMER_CHANNEL_MMAP);
2873
2874 if (!stream->metadata_flag) {
2875 index.offset = htobe64(stream->out_fd_offset);
2876 ret = get_index_values(&index, ustream);
2877 if (ret < 0) {
2878 err = ustctl_put_subbuf(ustream);
2879 assert(err == 0);
2880 goto error;
2881 }
2882
2883 /* Update the stream's sequence and discarded events count. */
2884 ret = update_stream_stats(stream);
2885 if (ret < 0) {
2886 PERROR("kernctl_get_events_discarded");
2887 err = ustctl_put_subbuf(ustream);
2888 assert(err == 0);
2889 goto error;
2890 }
2891 } else {
2892 write_index = 0;
2893 }
2894
2895 /* Get the full padded subbuffer size */
2896 err = ustctl_get_padded_subbuf_size(ustream, &len);
2897 assert(err == 0);
2898
2899 /* Get subbuffer data size (without padding) */
2900 err = ustctl_get_subbuf_size(ustream, &subbuf_size);
2901 assert(err == 0);
2902
2903 /* Make sure we don't get a subbuffer size bigger than the padded */
2904 assert(len >= subbuf_size);
2905
2906 padding = len - subbuf_size;
2907
2908 ret = get_current_subbuf_addr(stream, &subbuf_addr);
2909 if (ret) {
2910 write_index = 0;
2911 goto error_put_subbuf;
2912 }
2913
2914 subbuf_view = lttng_buffer_view_init(subbuf_addr, 0, len);
2915
2916 /* write the subbuffer to the tracefile */
2917 ret = lttng_consumer_on_read_subbuffer_mmap(
2918 ctx, stream, &subbuf_view, padding, &index);
2919 /*
2920 * The mmap operation should write subbuf_size amount of data when
2921 * network streaming or the full padding (len) size when we are _not_
2922 * streaming.
2923 */
2924 if ((ret != subbuf_size && stream->net_seq_idx != (uint64_t) -1ULL) ||
2925 (ret != len && stream->net_seq_idx == (uint64_t) -1ULL)) {
2926 /*
2927 * Display the error but continue processing to try to release the
2928 * subbuffer. This is a DBG statement since any unexpected kill or
2929 * signal, the application gets unregistered, relayd gets closed or
2930 * anything that affects the buffer lifetime will trigger this error.
2931 * So, for the sake of the user, don't print this error since it can
2932 * happen and it is OK with the code flow.
2933 */
2934 DBG("Error writing to tracefile "
2935 "(ret: %ld != len: %lu != subbuf_size: %lu)",
2936 ret, len, subbuf_size);
2937 write_index = 0;
2938 }
2939 error_put_subbuf:
2940 err = ustctl_put_next_subbuf(ustream);
2941 assert(err == 0);
2942
2943 /*
2944 * This will consumer the byte on the wait_fd if and only if there is not
2945 * next subbuffer to be acquired.
2946 */
2947 if (!stream->metadata_flag) {
2948 ret = notify_if_more_data(stream, ctx);
2949 if (ret < 0) {
2950 goto error;
2951 }
2952 }
2953
2954 /* Write index if needed. */
2955 if (!write_index) {
2956 goto rotate;
2957 }
2958
2959 if (stream->chan->live_timer_interval && !stream->metadata_flag) {
2960 /*
2961 * In live, block until all the metadata is sent.
2962 */
2963 pthread_mutex_lock(&stream->metadata_timer_lock);
2964 assert(!stream->missed_metadata_flush);
2965 stream->waiting_on_metadata = true;
2966 pthread_mutex_unlock(&stream->metadata_timer_lock);
2967
2968 err = consumer_stream_sync_metadata(ctx, stream->session_id);
2969
2970 pthread_mutex_lock(&stream->metadata_timer_lock);
2971 stream->waiting_on_metadata = false;
2972 if (stream->missed_metadata_flush) {
2973 stream->missed_metadata_flush = false;
2974 pthread_mutex_unlock(&stream->metadata_timer_lock);
2975 (void) consumer_flush_ust_index(stream);
2976 } else {
2977 pthread_mutex_unlock(&stream->metadata_timer_lock);
2978 }
2979
2980 if (err < 0) {
2981 goto error;
2982 }
2983 }
2984
2985 assert(!stream->metadata_flag);
2986 err = consumer_stream_write_index(stream, &index);
2987 if (err < 0) {
2988 goto error;
2989 }
2990
2991 rotate:
2992 /*
2993 * After extracting the packet, we check if the stream is now ready to be
2994 * rotated and perform the action immediately.
2995 */
2996 rotation_ret = lttng_consumer_stream_is_rotate_ready(stream);
2997 if (rotation_ret == 1) {
2998 rotation_ret = lttng_consumer_rotate_stream(ctx, stream);
2999 if (rotation_ret < 0) {
3000 ERR("Stream rotation error");
3001 ret = -1;
3002 goto error;
3003 }
3004 } else if (rotation_ret < 0) {
3005 ERR("Checking if stream is ready to rotate");
3006 ret = -1;
3007 goto error;
3008 }
3009 error:
3010 return ret;
3011 }
3012
3013 /*
3014 * Called when a stream is created.
3015 *
3016 * Return 0 on success or else a negative value.
3017 */
3018 int lttng_ustconsumer_on_recv_stream(struct lttng_consumer_stream *stream)
3019 {
3020 int ret;
3021
3022 assert(stream);
3023
3024 /*
3025 * Don't create anything if this is set for streaming or if there is
3026 * no current trace chunk on the parent channel.
3027 */
3028 if (stream->net_seq_idx == (uint64_t) -1ULL && stream->chan->monitor &&
3029 stream->chan->trace_chunk) {
3030 ret = consumer_stream_create_output_files(stream, true);
3031 if (ret) {
3032 goto error;
3033 }
3034 }
3035 ret = 0;
3036
3037 error:
3038 return ret;
3039 }
3040
3041 /*
3042 * Check if data is still being extracted from the buffers for a specific
3043 * stream. Consumer data lock MUST be acquired before calling this function
3044 * and the stream lock.
3045 *
3046 * Return 1 if the traced data are still getting read else 0 meaning that the
3047 * data is available for trace viewer reading.
3048 */
3049 int lttng_ustconsumer_data_pending(struct lttng_consumer_stream *stream)
3050 {
3051 int ret;
3052
3053 assert(stream);
3054 assert(stream->ustream);
3055
3056 DBG("UST consumer checking data pending");
3057
3058 if (stream->endpoint_status != CONSUMER_ENDPOINT_ACTIVE) {
3059 ret = 0;
3060 goto end;
3061 }
3062
3063 if (stream->chan->type == CONSUMER_CHANNEL_TYPE_METADATA) {
3064 uint64_t contiguous, pushed;
3065
3066 /* Ease our life a bit. */
3067 contiguous = stream->chan->metadata_cache->max_offset;
3068 pushed = stream->ust_metadata_pushed;
3069
3070 /*
3071 * We can simply check whether all contiguously available data
3072 * has been pushed to the ring buffer, since the push operation
3073 * is performed within get_next_subbuf(), and because both
3074 * get_next_subbuf() and put_next_subbuf() are issued atomically
3075 * thanks to the stream lock within
3076 * lttng_ustconsumer_read_subbuffer(). This basically means that
3077 * whetnever ust_metadata_pushed is incremented, the associated
3078 * metadata has been consumed from the metadata stream.
3079 */
3080 DBG("UST consumer metadata pending check: contiguous %" PRIu64 " vs pushed %" PRIu64,
3081 contiguous, pushed);
3082 assert(((int64_t) (contiguous - pushed)) >= 0);
3083 if ((contiguous != pushed) ||
3084 (((int64_t) contiguous - pushed) > 0 || contiguous == 0)) {
3085 ret = 1; /* Data is pending */
3086 goto end;
3087 }
3088 } else {
3089 ret = ustctl_get_next_subbuf(stream->ustream);
3090 if (ret == 0) {
3091 /*
3092 * There is still data so let's put back this
3093 * subbuffer.
3094 */
3095 ret = ustctl_put_subbuf(stream->ustream);
3096 assert(ret == 0);
3097 ret = 1; /* Data is pending */
3098 goto end;
3099 }
3100 }
3101
3102 /* Data is NOT pending so ready to be read. */
3103 ret = 0;
3104
3105 end:
3106 return ret;
3107 }
3108
3109 /*
3110 * Stop a given metadata channel timer if enabled and close the wait fd which
3111 * is the poll pipe of the metadata stream.
3112 *
3113 * This MUST be called with the metadata channel lock acquired.
3114 */
3115 void lttng_ustconsumer_close_metadata(struct lttng_consumer_channel *metadata)
3116 {
3117 int ret;
3118
3119 assert(metadata);
3120 assert(metadata->type == CONSUMER_CHANNEL_TYPE_METADATA);
3121
3122 DBG("Closing metadata channel key %" PRIu64, metadata->key);
3123
3124 if (metadata->switch_timer_enabled == 1) {
3125 consumer_timer_switch_stop(metadata);
3126 }
3127
3128 if (!metadata->metadata_stream) {
3129 goto end;
3130 }
3131
3132 /*
3133 * Closing write side so the thread monitoring the stream wakes up if any
3134 * and clean the metadata stream.
3135 */
3136 if (metadata->metadata_stream->ust_metadata_poll_pipe[1] >= 0) {
3137 ret = close(metadata->metadata_stream->ust_metadata_poll_pipe[1]);
3138 if (ret < 0) {
3139 PERROR("closing metadata pipe write side");
3140 }
3141 metadata->metadata_stream->ust_metadata_poll_pipe[1] = -1;
3142 }
3143
3144 end:
3145 return;
3146 }
3147
3148 /*
3149 * Close every metadata stream wait fd of the metadata hash table. This
3150 * function MUST be used very carefully so not to run into a race between the
3151 * metadata thread handling streams and this function closing their wait fd.
3152 *
3153 * For UST, this is used when the session daemon hangs up. Its the metadata
3154 * producer so calling this is safe because we are assured that no state change
3155 * can occur in the metadata thread for the streams in the hash table.
3156 */
3157 void lttng_ustconsumer_close_all_metadata(struct lttng_ht *metadata_ht)
3158 {
3159 struct lttng_ht_iter iter;
3160 struct lttng_consumer_stream *stream;
3161
3162 assert(metadata_ht);
3163 assert(metadata_ht->ht);
3164
3165 DBG("UST consumer closing all metadata streams");
3166
3167 rcu_read_lock();
3168 cds_lfht_for_each_entry(metadata_ht->ht, &iter.iter, stream,
3169 node.node) {
3170
3171 health_code_update();
3172
3173 pthread_mutex_lock(&stream->chan->lock);
3174 lttng_ustconsumer_close_metadata(stream->chan);
3175 pthread_mutex_unlock(&stream->chan->lock);
3176
3177 }
3178 rcu_read_unlock();
3179 }
3180
3181 void lttng_ustconsumer_close_stream_wakeup(struct lttng_consumer_stream *stream)
3182 {
3183 int ret;
3184
3185 ret = ustctl_stream_close_wakeup_fd(stream->ustream);
3186 if (ret < 0) {
3187 ERR("Unable to close wakeup fd");
3188 }
3189 }
3190
3191 /*
3192 * Please refer to consumer-timer.c before adding any lock within this
3193 * function or any of its callees. Timers have a very strict locking
3194 * semantic with respect to teardown. Failure to respect this semantic
3195 * introduces deadlocks.
3196 *
3197 * DON'T hold the metadata lock when calling this function, else this
3198 * can cause deadlock involving consumer awaiting for metadata to be
3199 * pushed out due to concurrent interaction with the session daemon.
3200 */
3201 int lttng_ustconsumer_request_metadata(struct lttng_consumer_local_data *ctx,
3202 struct lttng_consumer_channel *channel, int timer, int wait)
3203 {
3204 struct lttcomm_metadata_request_msg request;
3205 struct lttcomm_consumer_msg msg;
3206 enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
3207 uint64_t len, key, offset, version;
3208 int ret;
3209
3210 assert(channel);
3211 assert(channel->metadata_cache);
3212
3213 memset(&request, 0, sizeof(request));
3214
3215 /* send the metadata request to sessiond */
3216 switch (consumer_data.type) {
3217 case LTTNG_CONSUMER64_UST:
3218 request.bits_per_long = 64;
3219 break;
3220 case LTTNG_CONSUMER32_UST:
3221 request.bits_per_long = 32;
3222 break;
3223 default:
3224 request.bits_per_long = 0;
3225 break;
3226 }
3227
3228 request.session_id = channel->session_id;
3229 request.session_id_per_pid = channel->session_id_per_pid;
3230 /*
3231 * Request the application UID here so the metadata of that application can
3232 * be sent back. The channel UID corresponds to the user UID of the session
3233 * used for the rights on the stream file(s).
3234 */
3235 request.uid = channel->ust_app_uid;
3236 request.key = channel->key;
3237
3238 DBG("Sending metadata request to sessiond, session id %" PRIu64
3239 ", per-pid %" PRIu64 ", app UID %u and channel key %" PRIu64,
3240 request.session_id, request.session_id_per_pid, request.uid,
3241 request.key);
3242
3243 pthread_mutex_lock(&ctx->metadata_socket_lock);
3244
3245 health_code_update();
3246
3247 ret = lttcomm_send_unix_sock(ctx->consumer_metadata_socket, &request,
3248 sizeof(request));
3249 if (ret < 0) {
3250 ERR("Asking metadata to sessiond");
3251 goto end;
3252 }
3253
3254 health_code_update();
3255
3256 /* Receive the metadata from sessiond */
3257 ret = lttcomm_recv_unix_sock(ctx->consumer_metadata_socket, &msg,
3258 sizeof(msg));
3259 if (ret != sizeof(msg)) {
3260 DBG("Consumer received unexpected message size %d (expects %zu)",
3261 ret, sizeof(msg));
3262 lttng_consumer_send_error(ctx, LTTCOMM_CONSUMERD_ERROR_RECV_CMD);
3263 /*
3264 * The ret value might 0 meaning an orderly shutdown but this is ok
3265 * since the caller handles this.
3266 */
3267 goto end;
3268 }
3269
3270 health_code_update();
3271
3272 if (msg.cmd_type == LTTNG_ERR_UND) {
3273 /* No registry found */
3274 (void) consumer_send_status_msg(ctx->consumer_metadata_socket,
3275 ret_code);
3276 ret = 0;
3277 goto end;
3278 } else if (msg.cmd_type != LTTNG_CONSUMER_PUSH_METADATA) {
3279 ERR("Unexpected cmd_type received %d", msg.cmd_type);
3280 ret = -1;
3281 goto end;
3282 }
3283
3284 len = msg.u.push_metadata.len;
3285 key = msg.u.push_metadata.key;
3286 offset = msg.u.push_metadata.target_offset;
3287 version = msg.u.push_metadata.version;
3288
3289 assert(key == channel->key);
3290 if (len == 0) {
3291 DBG("No new metadata to receive for key %" PRIu64, key);
3292 }
3293
3294 health_code_update();
3295
3296 /* Tell session daemon we are ready to receive the metadata. */
3297 ret = consumer_send_status_msg(ctx->consumer_metadata_socket,
3298 LTTCOMM_CONSUMERD_SUCCESS);
3299 if (ret < 0 || len == 0) {
3300 /*
3301 * Somehow, the session daemon is not responding anymore or there is
3302 * nothing to receive.
3303 */
3304 goto end;
3305 }
3306
3307 health_code_update();
3308
3309 ret = lttng_ustconsumer_recv_metadata(ctx->consumer_metadata_socket,
3310 key, offset, len, version, channel, timer, wait);
3311 if (ret >= 0) {
3312 /*
3313 * Only send the status msg if the sessiond is alive meaning a positive
3314 * ret code.
3315 */
3316 (void) consumer_send_status_msg(ctx->consumer_metadata_socket, ret);
3317 }
3318 ret = 0;
3319
3320 end:
3321 health_code_update();
3322
3323 pthread_mutex_unlock(&ctx->metadata_socket_lock);
3324 return ret;
3325 }
3326
3327 /*
3328 * Return the ustctl call for the get stream id.
3329 */
3330 int lttng_ustconsumer_get_stream_id(struct lttng_consumer_stream *stream,
3331 uint64_t *stream_id)
3332 {
3333 assert(stream);
3334 assert(stream_id);
3335
3336 return ustctl_get_stream_id(stream->ustream, stream_id);
3337 }
This page took 0.14094 seconds and 3 git commands to generate.