Update and add to the quickstart guide
[lttng-tools.git] / liblttkconsumerd / liblttkconsumerd.c
CommitLineData
1ce86c9a
JD
1/*
2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
4 *
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
82a3637f
DG
7 * as published by the Free Software Foundation; only version 2
8 * of the License.
1ce86c9a
JD
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
18 */
19
20#define _GNU_SOURCE
21#include <fcntl.h>
22#include <poll.h>
23#include <pthread.h>
24#include <stdlib.h>
25#include <string.h>
26#include <sys/mman.h>
27#include <sys/socket.h>
28#include <sys/types.h>
29#include <unistd.h>
30#include <urcu/list.h>
31
32#include "libkernelctl.h"
33#include "liblttkconsumerd.h"
34#include "lttngerr.h"
35
242cd187
MD
36static
37struct kconsumerd_global_data {
38 /*
39 * kconsumerd_data.lock protects kconsumerd_data.fd_list,
40 * kconsumerd_data.fds_count, and kconsumerd_data.need_update. It
41 * ensures the count matches the number of items in the fd_list.
42 * It ensures the list updates *always* trigger an fd_array
43 * update (therefore need to make list update vs
44 * kconsumerd_data.need_update flag update atomic, and also flag
45 * read, fd array and flag clear atomic).
46 */
47 pthread_mutex_t lock;
48 /*
49 * Number of element for the list below. Protected by
50 * kconsumerd_data.lock.
51 */
52 unsigned int fds_count;
53 /*
54 * List of FDs. Protected by kconsumerd_data.lock.
55 */
56 struct kconsumerd_fd_list fd_list;
57 /*
58 * Flag specifying if the local array of FDs needs update in the
59 * poll function. Protected by kconsumerd_data.lock.
60 */
61 unsigned int need_update;
62} kconsumerd_data = {
63 .fd_list.head = CDS_LIST_HEAD_INIT(kconsumerd_data.fd_list.head),
1ce86c9a
JD
64};
65
1ce86c9a
JD
66/* communication with splice */
67static int kconsumerd_thread_pipe[2];
68
69/* pipe to wake the poll thread when necessary */
70static int kconsumerd_poll_pipe[2];
71
4de84ad9
JD
72/* to let the signal handler wake up the fd receiver thread */
73static int kconsumerd_should_quit[2];
fec07047 74
1ce86c9a
JD
75/* timeout parameter, to control the polling thread grace period */
76static int kconsumerd_poll_timeout = -1;
77
78/* socket to communicate errors with sessiond */
79static int kconsumerd_error_socket;
80
81/* socket to exchange commands with sessiond */
82static char *kconsumerd_command_sock_path;
83
3dcd2721
MD
84/*
85 * flag to inform the polling thread to quit when all fd hung up.
86 * Updated by the kconsumerd_thread_receive_fds when it notices that all
87 * fds has hung up. Also updated by the signal handler
88 * (kconsumerd_should_exit()). Read by the polling threads.
89 */
90static volatile int kconsumerd_quit = 0;
1ce86c9a
JD
91
92/*
93 * kconsumerd_set_error_socket
94 *
95 * Set the error socket
96 */
97void kconsumerd_set_error_socket(int sock)
98{
99 kconsumerd_error_socket = sock;
100}
101
102/*
103 * kconsumerd_set_command_socket_path
104 *
105 * Set the command socket path
106 */
107void kconsumerd_set_command_socket_path(char *sock)
108{
109 kconsumerd_command_sock_path = sock;
110}
111
38079a1b
DG
112/*
113 * kconsumerd_find_session_fd
114 *
115 * Find a session fd in the global list.
fec07047 116 * The kconsumerd_data.lock must be locked during this call
38079a1b
DG
117 *
118 * Return 1 if found else 0
119 */
120static int kconsumerd_find_session_fd(int fd)
121{
122 struct kconsumerd_fd *iter;
123
242cd187 124 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
38079a1b
DG
125 if (iter->sessiond_fd == fd) {
126 DBG("Duplicate session fd %d", fd);
38079a1b
DG
127 return 1;
128 }
129 }
38079a1b
DG
130
131 return 0;
132}
133
1ce86c9a
JD
134/*
135 * kconsumerd_del_fd
136 *
137 * Remove a fd from the global list protected by a mutex
138 */
139static void kconsumerd_del_fd(struct kconsumerd_fd *lcf)
140{
242cd187 141 pthread_mutex_lock(&kconsumerd_data.lock);
1ce86c9a 142 cds_list_del(&lcf->list);
242cd187
MD
143 if (kconsumerd_data.fds_count > 0) {
144 kconsumerd_data.fds_count--;
1ce86c9a
JD
145 if (lcf != NULL) {
146 close(lcf->out_fd);
147 close(lcf->consumerd_fd);
148 free(lcf);
149 lcf = NULL;
150 }
151 }
242cd187
MD
152 kconsumerd_data.need_update = 1;
153 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
154}
155
156/*
157 * kconsumerd_add_fd
158 *
159 * Add a fd to the global list protected by a mutex
160 */
161static int kconsumerd_add_fd(struct lttcomm_kconsumerd_msg *buf, int consumerd_fd)
162{
1ce86c9a 163 int ret;
38079a1b
DG
164 struct kconsumerd_fd *tmp_fd;
165
242cd187 166 pthread_mutex_lock(&kconsumerd_data.lock);
38079a1b
DG
167 /* Check if already exist */
168 ret = kconsumerd_find_session_fd(buf->fd);
169 if (ret == 1) {
170 goto end;
171 }
1ce86c9a
JD
172
173 tmp_fd = malloc(sizeof(struct kconsumerd_fd));
174 tmp_fd->sessiond_fd = buf->fd;
175 tmp_fd->consumerd_fd = consumerd_fd;
176 tmp_fd->state = buf->state;
177 tmp_fd->max_sb_size = buf->max_sb_size;
178 strncpy(tmp_fd->path_name, buf->path_name, PATH_MAX);
179
180 /* Opening the tracefile in write mode */
181 ret = open(tmp_fd->path_name,
182 O_WRONLY|O_CREAT|O_TRUNC, S_IRWXU|S_IRWXG|S_IRWXO);
183 if (ret < 0) {
184 ERR("Opening %s", tmp_fd->path_name);
185 perror("open");
186 goto end;
187 }
188 tmp_fd->out_fd = ret;
189 tmp_fd->out_fd_offset = 0;
190
191 DBG("Adding %s (%d, %d, %d)", tmp_fd->path_name,
192 tmp_fd->sessiond_fd, tmp_fd->consumerd_fd, tmp_fd->out_fd);
193
242cd187
MD
194 cds_list_add(&tmp_fd->list, &kconsumerd_data.fd_list.head);
195 kconsumerd_data.fds_count++;
196 kconsumerd_data.need_update = 1;
1ce86c9a 197end:
242cd187 198 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
199 return ret;
200}
201
202/*
203 * kconsumerd_change_fd_state
204 *
205 * Update a fd according to what we just received
206 */
207static void kconsumerd_change_fd_state(int sessiond_fd,
208 enum kconsumerd_fd_state state)
209{
210 struct kconsumerd_fd *iter;
0237248c 211
242cd187
MD
212 pthread_mutex_lock(&kconsumerd_data.lock);
213 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
214 if (iter->sessiond_fd == sessiond_fd) {
215 iter->state = state;
216 break;
217 }
218 }
242cd187
MD
219 kconsumerd_data.need_update = 1;
220 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
221}
222
223/*
224 * kconsumerd_update_poll_array
225 *
226 * Allocate the pollfd structure and the local view of the out fds
227 * to avoid doing a lookup in the linked list and concurrency issues
228 * when writing is needed.
229 * Returns the number of fds in the structures
242cd187 230 * Called with kconsumerd_data.lock held.
1ce86c9a
JD
231 */
232static int kconsumerd_update_poll_array(struct pollfd **pollfd,
233 struct kconsumerd_fd **local_kconsumerd_fd)
234{
235 struct kconsumerd_fd *iter;
236 int i = 0;
237
238 DBG("Updating poll fd array");
1ce86c9a 239
242cd187 240 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
241 DBG("Inside for each");
242 if (iter->state == ACTIVE_FD) {
243 DBG("Active FD %d", iter->consumerd_fd);
244 (*pollfd)[i].fd = iter->consumerd_fd;
245 (*pollfd)[i].events = POLLIN | POLLPRI;
246 local_kconsumerd_fd[i] = iter;
247 i++;
248 }
249 }
250
251 /*
252 * insert the kconsumerd_poll_pipe at the end of the array and don't
253 * increment i so nb_fd is the number of real FD
254 */
255 (*pollfd)[i].fd = kconsumerd_poll_pipe[0];
256 (*pollfd)[i].events = POLLIN;
1ce86c9a
JD
257 return i;
258}
259
260
261/*
262 * kconsumerd_on_read_subbuffer_mmap
263 *
264 * mmap the ring buffer, read it and write the data to the tracefile.
265 * Returns the number of bytes written
266 */
267static int kconsumerd_on_read_subbuffer_mmap(
268 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
269{
270 unsigned long mmap_len, mmap_offset, padded_len, padding_len;
271 char *mmap_base;
272 char *padding = NULL;
273 long ret = 0;
274 off_t orig_offset = kconsumerd_fd->out_fd_offset;
275 int fd = kconsumerd_fd->consumerd_fd;
276 int outfd = kconsumerd_fd->out_fd;
277
278 /* get the padded subbuffer size to know the padding required */
279 ret = kernctl_get_padded_subbuf_size(fd, &padded_len);
280 if (ret != 0) {
281 ret = errno;
282 perror("kernctl_get_padded_subbuf_size");
283 goto end;
284 }
285 padding_len = padded_len - len;
286 padding = malloc(padding_len * sizeof(char));
287 memset(padding, '\0', padding_len);
288
289 /* get the len of the mmap region */
290 ret = kernctl_get_mmap_len(fd, &mmap_len);
291 if (ret != 0) {
292 ret = errno;
293 perror("kernctl_get_mmap_len");
294 goto end;
295 }
296
297 /* get the offset inside the fd to mmap */
298 ret = kernctl_get_mmap_read_offset(fd, &mmap_offset);
299 if (ret != 0) {
300 ret = errno;
301 perror("kernctl_get_mmap_read_offset");
302 goto end;
303 }
304
305 mmap_base = mmap(NULL, mmap_len, PROT_READ, MAP_PRIVATE, fd, mmap_offset);
306 if (mmap_base == MAP_FAILED) {
307 perror("Error mmaping");
308 ret = -1;
309 goto end;
310 }
311
312 while (len > 0) {
313 ret = write(outfd, mmap_base, len);
314 if (ret >= len) {
315 len = 0;
316 } else if (ret < 0) {
317 ret = errno;
318 perror("Error in file write");
319 goto end;
320 }
321 /* This won't block, but will start writeout asynchronously */
322 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
323 SYNC_FILE_RANGE_WRITE);
324 kconsumerd_fd->out_fd_offset += ret;
325 }
326
327 /* once all the data is written, write the padding to disk */
328 ret = write(outfd, padding, padding_len);
329 if (ret < 0) {
330 ret = errno;
331 perror("Error writing padding to file");
332 goto end;
333 }
334
335 /*
336 * This does a blocking write-and-wait on any page that belongs to the
337 * subbuffer prior to the one we just wrote.
338 * Don't care about error values, as these are just hints and ways to
339 * limit the amount of page cache used.
340 */
341 if (orig_offset >= kconsumerd_fd->max_sb_size) {
342 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
343 kconsumerd_fd->max_sb_size,
344 SYNC_FILE_RANGE_WAIT_BEFORE
345 | SYNC_FILE_RANGE_WRITE
346 | SYNC_FILE_RANGE_WAIT_AFTER);
347
348 /*
349 * Give hints to the kernel about how we access the file:
350 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
351 * we write it.
352 *
353 * We need to call fadvise again after the file grows because the
354 * kernel does not seem to apply fadvise to non-existing parts of the
355 * file.
356 *
357 * Call fadvise _after_ having waited for the page writeback to
358 * complete because the dirty page writeback semantic is not well
359 * defined. So it can be expected to lead to lower throughput in
360 * streaming.
361 */
362 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
363 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
364 }
365 goto end;
366
367end:
368 if (padding != NULL) {
369 free(padding);
370 }
371 return ret;
372}
373
374/*
375 * kconsumerd_on_read_subbuffer
376 *
377 * Splice the data from the ring buffer to the tracefile.
378 * Returns the number of bytes spliced
379 */
380static int kconsumerd_on_read_subbuffer(
381 struct kconsumerd_fd *kconsumerd_fd, unsigned long len)
382{
383 long ret = 0;
384 loff_t offset = 0;
385 off_t orig_offset = kconsumerd_fd->out_fd_offset;
386 int fd = kconsumerd_fd->consumerd_fd;
387 int outfd = kconsumerd_fd->out_fd;
388
389 while (len > 0) {
390 DBG("splice chan to pipe offset %lu (fd : %d)",
391 (unsigned long)offset, fd);
392 ret = splice(fd, &offset, kconsumerd_thread_pipe[1], NULL, len,
393 SPLICE_F_MOVE | SPLICE_F_MORE);
394 DBG("splice chan to pipe ret %ld", ret);
395 if (ret < 0) {
396 ret = errno;
397 perror("Error in relay splice");
398 goto splice_error;
399 }
400
401 ret = splice(kconsumerd_thread_pipe[0], NULL, outfd, NULL, ret,
402 SPLICE_F_MOVE | SPLICE_F_MORE);
403 DBG("splice pipe to file %ld", ret);
404 if (ret < 0) {
405 ret = errno;
406 perror("Error in file splice");
407 goto splice_error;
408 }
409 if (ret >= len) {
410 len = 0;
411 }
412 /* This won't block, but will start writeout asynchronously */
413 sync_file_range(outfd, kconsumerd_fd->out_fd_offset, ret,
414 SYNC_FILE_RANGE_WRITE);
415 kconsumerd_fd->out_fd_offset += ret;
416 }
417
418 /*
419 * This does a blocking write-and-wait on any page that belongs to the
420 * subbuffer prior to the one we just wrote.
421 * Don't care about error values, as these are just hints and ways to
422 * limit the amount of page cache used.
423 */
424 if (orig_offset >= kconsumerd_fd->max_sb_size) {
425 sync_file_range(outfd, orig_offset - kconsumerd_fd->max_sb_size,
426 kconsumerd_fd->max_sb_size,
427 SYNC_FILE_RANGE_WAIT_BEFORE
428 | SYNC_FILE_RANGE_WRITE
429 | SYNC_FILE_RANGE_WAIT_AFTER);
430 /*
431 * Give hints to the kernel about how we access the file:
432 * POSIX_FADV_DONTNEED : we won't re-access data in a near future after
433 * we write it.
434 *
435 * We need to call fadvise again after the file grows because the
436 * kernel does not seem to apply fadvise to non-existing parts of the
437 * file.
438 *
439 * Call fadvise _after_ having waited for the page writeback to
440 * complete because the dirty page writeback semantic is not well
441 * defined. So it can be expected to lead to lower throughput in
442 * streaming.
443 */
444 posix_fadvise(outfd, orig_offset - kconsumerd_fd->max_sb_size,
445 kconsumerd_fd->max_sb_size, POSIX_FADV_DONTNEED);
446 }
447 goto end;
448
449splice_error:
450 /* send the appropriate error description to sessiond */
451 switch(ret) {
452 case EBADF:
453 kconsumerd_send_error(KCONSUMERD_SPLICE_EBADF);
454 break;
455 case EINVAL:
456 kconsumerd_send_error(KCONSUMERD_SPLICE_EINVAL);
457 break;
458 case ENOMEM:
459 kconsumerd_send_error(KCONSUMERD_SPLICE_ENOMEM);
460 break;
461 case ESPIPE:
462 kconsumerd_send_error(KCONSUMERD_SPLICE_ESPIPE);
463 break;
464 }
465
466end:
467 return ret;
468}
469
470/*
471 * kconsumerd_read_subbuffer
472 *
473 * Consume data on a file descriptor and write it on a trace file
474 */
475static int kconsumerd_read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
476{
477 unsigned long len;
478 int err;
479 long ret = 0;
480 int infd = kconsumerd_fd->consumerd_fd;
481
482 DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
483 /* Get the next subbuffer */
484 err = kernctl_get_next_subbuf(infd);
485 if (err != 0) {
486 ret = errno;
487 perror("Reserving sub buffer failed (everything is normal, "
488 "it is due to concurrency)");
489 goto end;
490 }
491
492 switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
7d29a247 493 case LTTNG_EVENT_SPLICE:
1ce86c9a
JD
494 /* read the whole subbuffer */
495 err = kernctl_get_padded_subbuf_size(infd, &len);
496 if (err != 0) {
497 ret = errno;
498 perror("Getting sub-buffer len failed.");
499 goto end;
500 }
501
502 /* splice the subbuffer to the tracefile */
503 ret = kconsumerd_on_read_subbuffer(kconsumerd_fd, len);
504 if (ret < 0) {
505 /*
506 * display the error but continue processing to try
507 * to release the subbuffer
508 */
509 ERR("Error splicing to tracefile");
510 }
511 break;
7d29a247 512 case LTTNG_EVENT_MMAP:
1ce86c9a
JD
513 /* read the used subbuffer size */
514 err = kernctl_get_subbuf_size(infd, &len);
515 if (err != 0) {
516 ret = errno;
517 perror("Getting sub-buffer len failed.");
518 goto end;
519 }
520 /* write the subbuffer to the tracefile */
521 ret = kconsumerd_on_read_subbuffer_mmap(kconsumerd_fd, len);
522 if (ret < 0) {
523 /*
524 * display the error but continue processing to try
525 * to release the subbuffer
526 */
527 ERR("Error writing to tracefile");
528 }
529 break;
530 default:
531 ERR("Unknown output method");
532 ret = -1;
533 }
534
535 err = kernctl_put_next_subbuf(infd);
536 if (err != 0) {
537 ret = errno;
538 if (errno == EFAULT) {
539 perror("Error in unreserving sub buffer\n");
540 } else if (errno == EIO) {
541 /* Should never happen with newer LTTng versions */
542 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
543 }
544 goto end;
545 }
546
547end:
548 return ret;
549}
550
4de84ad9
JD
551/*
552 * kconsumerd_poll_socket
553 *
554 * Poll on the should_quit pipe and the command socket
555 * return -1 on error and should exit, 0 if data is
556 * available on the command socket
557 */
558int kconsumerd_poll_socket(struct pollfd *kconsumerd_sockpoll)
559{
560 int num_rdy;
561
562 num_rdy = poll(kconsumerd_sockpoll, 2, -1);
563 if (num_rdy == -1) {
564 perror("Poll error");
565 goto exit;
566 }
567 if (kconsumerd_sockpoll[0].revents == POLLIN) {
568 DBG("kconsumerd_should_quit wake up");
569 goto exit;
570 }
571 return 0;
572
573exit:
574 return -1;
575}
576
1ce86c9a
JD
577/*
578 * kconsumerd_consumerd_recv_fd
579 *
580 * Receives an array of file descriptors and the associated
581 * structures describing each fd (path name).
582 * Returns the size of received data
583 */
4de84ad9
JD
584static int kconsumerd_consumerd_recv_fd(int sfd,
585 struct pollfd *kconsumerd_sockpoll, int size,
1ce86c9a
JD
586 enum kconsumerd_command cmd_type)
587{
588 struct msghdr msg;
589 struct iovec iov[1];
590 int ret = 0, i, tmp2;
591 struct cmsghdr *cmsg;
592 int nb_fd;
593 char recv_fd[CMSG_SPACE(sizeof(int))];
594 struct lttcomm_kconsumerd_msg lkm;
595
596 /* the number of fds we are about to receive */
597 nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
598
599 for (i = 0; i < nb_fd; i++) {
600 memset(&msg, 0, sizeof(msg));
601
602 /* Prepare to receive the structures */
603 iov[0].iov_base = &lkm;
604 iov[0].iov_len = sizeof(lkm);
605 msg.msg_iov = iov;
606 msg.msg_iovlen = 1;
607
608 msg.msg_control = recv_fd;
609 msg.msg_controllen = sizeof(recv_fd);
610
611 DBG("Waiting to receive fd");
4de84ad9
JD
612 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
613 goto end;
614 }
615
1ce86c9a
JD
616 if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
617 perror("recvmsg");
618 continue;
619 }
620
621 if (ret != (size / nb_fd)) {
622 ERR("Received only %d, expected %d", ret, size);
623 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
624 goto end;
625 }
626
627 cmsg = CMSG_FIRSTHDR(&msg);
628 if (!cmsg) {
629 ERR("Invalid control message header");
630 ret = -1;
631 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
632 goto end;
633 }
634 /* if we received fds */
635 if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
636 switch (cmd_type) {
637 case ADD_STREAM:
638 DBG("kconsumerd_add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
639 ret = kconsumerd_add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
640 if (ret < 0) {
641 kconsumerd_send_error(KCONSUMERD_OUTFD_ERROR);
642 goto end;
643 }
644 break;
645 case UPDATE_STREAM:
646 kconsumerd_change_fd_state(lkm.fd, lkm.state);
647 break;
648 default:
649 break;
650 }
1ce86c9a
JD
651 /* signal the poll thread */
652 tmp2 = write(kconsumerd_poll_pipe[1], "4", 1);
f40799e8
DG
653 if (tmp2 < 0) {
654 perror("write kconsumerd poll");
655 }
1ce86c9a
JD
656 } else {
657 ERR("Didn't received any fd");
658 kconsumerd_send_error(KCONSUMERD_ERROR_RECV_FD);
659 ret = -1;
660 goto end;
661 }
662 }
663
664end:
1ce86c9a
JD
665 return ret;
666}
667
668/*
669 * kconsumerd_thread_poll_fds
670 *
671 * This thread polls the fds in the ltt_fd_list to consume the data
672 * and write it to tracefile if necessary.
673 */
674void *kconsumerd_thread_poll_fds(void *data)
675{
676 int num_rdy, num_hup, high_prio, ret, i;
677 struct pollfd *pollfd = NULL;
678 /* local view of the fds */
679 struct kconsumerd_fd **local_kconsumerd_fd = NULL;
242cd187 680 /* local view of kconsumerd_data.fds_count */
1ce86c9a
JD
681 int nb_fd = 0;
682 char tmp;
683 int tmp2;
684
685 ret = pipe(kconsumerd_thread_pipe);
686 if (ret < 0) {
687 perror("Error creating pipe");
688 goto end;
689 }
690
691 local_kconsumerd_fd = malloc(sizeof(struct kconsumerd_fd));
692
693 while (1) {
694 high_prio = 0;
695 num_hup = 0;
696
697 /*
698 * the ltt_fd_list has been updated, we need to update our
699 * local array as well
700 */
242cd187
MD
701 pthread_mutex_lock(&kconsumerd_data.lock);
702 if (kconsumerd_data.need_update) {
1ce86c9a
JD
703 if (pollfd != NULL) {
704 free(pollfd);
705 pollfd = NULL;
706 }
707 if (local_kconsumerd_fd != NULL) {
708 free(local_kconsumerd_fd);
709 local_kconsumerd_fd = NULL;
710 }
0237248c 711
1ce86c9a 712 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 713 pollfd = malloc((kconsumerd_data.fds_count + 1) * sizeof(struct pollfd));
1ce86c9a
JD
714 if (pollfd == NULL) {
715 perror("pollfd malloc");
242cd187 716 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
717 goto end;
718 }
0237248c 719
1ce86c9a 720 /* allocate for all fds + 1 for the kconsumerd_poll_pipe */
242cd187 721 local_kconsumerd_fd = malloc((kconsumerd_data.fds_count + 1) *
1ce86c9a
JD
722 sizeof(struct kconsumerd_fd));
723 if (local_kconsumerd_fd == NULL) {
724 perror("local_kconsumerd_fd malloc");
242cd187 725 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
726 goto end;
727 }
728 ret = kconsumerd_update_poll_array(&pollfd, local_kconsumerd_fd);
729 if (ret < 0) {
730 ERR("Error in allocating pollfd or local_outfds");
731 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
242cd187 732 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
733 goto end;
734 }
735 nb_fd = ret;
242cd187 736 kconsumerd_data.need_update = 0;
1ce86c9a 737 }
242cd187 738 pthread_mutex_unlock(&kconsumerd_data.lock);
1ce86c9a
JD
739
740 /* poll on the array of fds */
741 DBG("polling on %d fd", nb_fd + 1);
742 num_rdy = poll(pollfd, nb_fd + 1, kconsumerd_poll_timeout);
743 DBG("poll num_rdy : %d", num_rdy);
744 if (num_rdy == -1) {
745 perror("Poll error");
746 kconsumerd_send_error(KCONSUMERD_POLL_ERROR);
747 goto end;
748 } else if (num_rdy == 0) {
749 DBG("Polling thread timed out");
750 goto end;
751 }
752
753 /* No FDs and kconsumerd_quit, kconsumerd_cleanup the thread */
754 if (nb_fd == 0 && kconsumerd_quit == 1) {
755 goto end;
756 }
757
758 /*
242cd187
MD
759 * If the kconsumerd_poll_pipe triggered poll go
760 * directly to the beginning of the loop to update the
761 * array. We want to prioritize array update over
762 * low-priority reads.
1ce86c9a 763 */
242cd187 764 if (pollfd[nb_fd].revents == POLLIN) {
1ce86c9a
JD
765 DBG("kconsumerd_poll_pipe wake up");
766 tmp2 = read(kconsumerd_poll_pipe[0], &tmp, 1);
f40799e8
DG
767 if (tmp2 < 0) {
768 perror("read kconsumerd poll");
769 }
1ce86c9a
JD
770 continue;
771 }
772
773 /* Take care of high priority channels first. */
774 for (i = 0; i < nb_fd; i++) {
775 switch(pollfd[i].revents) {
776 case POLLERR:
777 ERR("Error returned in polling fd %d.", pollfd[i].fd);
778 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
779 num_hup++;
780 break;
781 case POLLHUP:
782 DBG("Polling fd %d tells it has hung up.", pollfd[i].fd);
783 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
784 num_hup++;
785 break;
786 case POLLNVAL:
787 ERR("Polling fd %d tells fd is not open.", pollfd[i].fd);
788 kconsumerd_del_fd(local_kconsumerd_fd[i]);
1ce86c9a
JD
789 num_hup++;
790 break;
791 case POLLPRI:
792 DBG("Urgent read on fd %d", pollfd[i].fd);
793 high_prio = 1;
794 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
795 /* it's ok to have an unavailable sub-buffer */
796 if (ret == EAGAIN) {
797 ret = 0;
798 }
799 break;
800 }
801 }
802
803 /* If every buffer FD has hung up, we end the read loop here */
804 if (nb_fd > 0 && num_hup == nb_fd) {
805 DBG("every buffer FD has hung up\n");
806 if (kconsumerd_quit == 1) {
807 goto end;
808 }
809 continue;
810 }
811
812 /* Take care of low priority channels. */
813 if (high_prio == 0) {
814 for (i = 0; i < nb_fd; i++) {
815 if (pollfd[i].revents == POLLIN) {
816 DBG("Normal read on fd %d", pollfd[i].fd);
817 ret = kconsumerd_read_subbuffer(local_kconsumerd_fd[i]);
818 /* it's ok to have an unavailable subbuffer */
819 if (ret == EAGAIN) {
820 ret = 0;
821 }
822 }
823 }
824 }
825 }
826end:
827 DBG("polling thread exiting");
828 if (pollfd != NULL) {
829 free(pollfd);
830 pollfd = NULL;
831 }
832 if (local_kconsumerd_fd != NULL) {
833 free(local_kconsumerd_fd);
834 local_kconsumerd_fd = NULL;
835 }
1ce86c9a
JD
836 return NULL;
837}
838
839/*
4de84ad9 840 * kconsumerd_init(void)
1ce86c9a 841 *
4de84ad9
JD
842 * initialise the necessary environnement :
843 * - inform the polling thread to update the polling array
844 * - create the poll_pipe
845 * - create the should_quit pipe (for signal handler)
1ce86c9a 846 */
4de84ad9 847int kconsumerd_init(void)
1ce86c9a 848{
4de84ad9
JD
849 int ret;
850
851 /* need to update the polling array at init time */
852 kconsumerd_data.need_update = 1;
853
854 ret = pipe(kconsumerd_poll_pipe);
855 if (ret < 0) {
856 perror("Error creating poll pipe");
857 goto end;
858 }
859
860 ret = pipe(kconsumerd_should_quit);
861 if (ret < 0) {
862 perror("Error creating recv pipe");
863 goto end;
864 }
865
866end:
867 return ret;
1ce86c9a
JD
868}
869
870/*
871 * kconsumerd_thread_receive_fds
872 *
873 * This thread listens on the consumerd socket and
874 * receives the file descriptors from ltt-sessiond
875 */
876void *kconsumerd_thread_receive_fds(void *data)
877{
878 int sock, client_socket, ret;
879 struct lttcomm_kconsumerd_header tmp;
4de84ad9
JD
880 /*
881 * structure to poll for incoming data on communication socket
882 * avoids making blocking sockets
883 */
884 struct pollfd kconsumerd_sockpoll[2];
885
1ce86c9a
JD
886
887 DBG("Creating command socket %s", kconsumerd_command_sock_path);
888 unlink(kconsumerd_command_sock_path);
889 client_socket = lttcomm_create_unix_sock(kconsumerd_command_sock_path);
890 if (client_socket < 0) {
891 ERR("Cannot create command socket");
892 goto end;
893 }
894
895 ret = lttcomm_listen_unix_sock(client_socket);
896 if (ret < 0) {
897 goto end;
898 }
899
900 DBG("Sending ready command to ltt-sessiond");
901 ret = kconsumerd_send_error(KCONSUMERD_COMMAND_SOCK_READY);
902 if (ret < 0) {
903 ERR("Error sending ready command to ltt-sessiond");
904 goto end;
905 }
906
4de84ad9
JD
907 ret = fcntl(client_socket, F_SETFL, O_NONBLOCK);
908 if (ret < 0) {
909 perror("fcntl O_NONBLOCK");
910 goto end;
911 }
912
913 /* prepare the FDs to poll : to client socket and the should_quit pipe */
914 kconsumerd_sockpoll[0].fd = kconsumerd_should_quit[0];
915 kconsumerd_sockpoll[0].events = POLLIN | POLLPRI;
916 kconsumerd_sockpoll[1].fd = client_socket;
917 kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
918
919 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
920 goto end;
921 }
922 DBG("Connection on client_socket");
923
1ce86c9a
JD
924 /* Blocking call, waiting for transmission */
925 sock = lttcomm_accept_unix_sock(client_socket);
926 if (sock <= 0) {
927 WARN("On accept");
928 goto end;
929 }
4de84ad9
JD
930 ret = fcntl(sock, F_SETFL, O_NONBLOCK);
931 if (ret < 0) {
932 perror("fcntl O_NONBLOCK");
933 goto end;
934 }
935
936 /* update the polling structure to poll on the established socket */
937 kconsumerd_sockpoll[1].fd = sock;
938 kconsumerd_sockpoll[1].events = POLLIN | POLLPRI;
939
1ce86c9a 940 while (1) {
4de84ad9
JD
941 if (kconsumerd_poll_socket(kconsumerd_sockpoll) < 0) {
942 goto end;
943 }
944 DBG("Incoming fds on sock");
945
1ce86c9a
JD
946 /* We first get the number of fd we are about to receive */
947 ret = lttcomm_recv_unix_sock(sock, &tmp,
948 sizeof(struct lttcomm_kconsumerd_header));
949 if (ret <= 0) {
950 ERR("Communication interrupted on command socket");
951 goto end;
952 }
953 if (tmp.cmd_type == STOP) {
954 DBG("Received STOP command");
955 goto end;
956 }
3dcd2721
MD
957 if (kconsumerd_quit) {
958 DBG("kconsumerd_thread_receive_fds received quit from signal");
959 goto end;
960 }
4de84ad9 961
1ce86c9a 962 /* we received a command to add or update fds */
4de84ad9
JD
963 ret = kconsumerd_consumerd_recv_fd(sock, kconsumerd_sockpoll,
964 tmp.payload_size, tmp.cmd_type);
1ce86c9a
JD
965 if (ret <= 0) {
966 ERR("Receiving the FD, exiting");
967 goto end;
968 }
4de84ad9 969 DBG("received fds on sock");
1ce86c9a
JD
970 }
971
972end:
973 DBG("kconsumerd_thread_receive_fds exiting");
974
975 /*
976 * when all fds have hung up, the polling thread
977 * can exit cleanly
978 */
979 kconsumerd_quit = 1;
980
981 /*
982 * 2s of grace period, if no polling events occur during
983 * this period, the polling thread will exit even if there
984 * are still open FDs (should not happen, but safety mechanism).
985 */
986 kconsumerd_poll_timeout = KCONSUMERD_POLL_GRACE_PERIOD;
987
988 /* wake up the polling thread */
989 ret = write(kconsumerd_poll_pipe[1], "4", 1);
990 if (ret < 0) {
991 perror("poll pipe write");
992 }
993 return NULL;
994}
995
996/*
997 * kconsumerd_cleanup
998 *
999 * Cleanup the daemon's socket on exit
1000 */
3dcd2721 1001void kconsumerd_cleanup(void)
1ce86c9a
JD
1002{
1003 struct kconsumerd_fd *iter;
1004
1005 /* remove the socket file */
1006 unlink(kconsumerd_command_sock_path);
1007
3dcd2721
MD
1008 /*
1009 * close all outfd. Called when there are no more threads
1010 * running (after joining on the threads), no need to protect
1011 * list iteration with mutex.
1012 */
242cd187 1013 cds_list_for_each_entry(iter, &kconsumerd_data.fd_list.head, list) {
1ce86c9a
JD
1014 kconsumerd_del_fd(iter);
1015 }
1016}
1017
3dcd2721 1018/*
4de84ad9
JD
1019 * kconsumerd_should_exit
1020 *
3dcd2721
MD
1021 * Called from signal handler.
1022 */
1023void kconsumerd_should_exit(void)
1024{
4de84ad9 1025 int ret;
3dcd2721 1026 kconsumerd_quit = 1;
4de84ad9 1027 ret = write(kconsumerd_should_quit[1], "4", 1);
f40799e8
DG
1028 if (ret < 0) {
1029 perror("write kconsumerd quit");
1030 }
3dcd2721
MD
1031}
1032
1ce86c9a
JD
1033/*
1034 * kconsumerd_send_error
1035 *
1036 * send return code to ltt-sessiond
1037 */
1038int kconsumerd_send_error(enum lttcomm_return_code cmd)
1039{
1040 if (kconsumerd_error_socket > 0) {
1041 return lttcomm_send_unix_sock(kconsumerd_error_socket, &cmd,
1042 sizeof(enum lttcomm_sessiond_command));
1043 }
1044
1045 return 0;
1046}
This page took 0.073309 seconds and 4 git commands to generate.