2 * Copyright (C) 2011 - Julien Desfossez <julien.desfossez@polymtl.ca>
3 * Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
5 * This program is free software; you can redistribute it and/or
6 * modify it under the terms of the GNU General Public License
7 * as published by the Free Software Foundation; either version 2
8 * of the License, or (at your option) any later version.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
15 * You should have received a copy of the GNU General Public License
16 * along with this program; if not, write to the Free Software
17 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
32 #include <sys/socket.h>
34 #include <sys/types.h>
35 #include <urcu/list.h>
40 #include "libkernelctl.h"
41 #include "liblttsessiondcomm.h"
42 #include "kconsumerd.h"
44 /* Init the list of FDs */
45 static struct ltt_kconsumerd_fd_list kconsumerd_fd_list
= {
46 .head
= CDS_LIST_HEAD_INIT(kconsumerd_fd_list
.head
),
49 /* Number of element for the list below. */
50 static unsigned int fds_count
;
52 /* If the local array of FDs needs update in the poll function */
53 static unsigned int update_fd_array
= 1;
55 /* lock the fd array and structures */
56 static pthread_mutex_t kconsumerd_lock_fds
;
58 /* the two threads (receive fd and poll) */
59 static pthread_t threads
[2];
61 /* communication with splice */
62 static int thread_pipe
[2];
64 /* pipe to wake the poll thread when necessary */
65 static int poll_pipe
[2];
67 /* socket to communicate errors with sessiond */
68 static int error_socket
= -1;
70 /* Argument variables */
73 static int opt_daemon
;
74 static const char *progname
;
75 static char command_sock_path
[PATH_MAX
]; /* Global command socket path */
76 static char error_sock_path
[PATH_MAX
]; /* Global error path */
81 * Remove a fd from the global list protected by a mutex
83 static void del_fd(struct ltt_kconsumerd_fd
*lcf
)
85 DBG("Removing %d", lcf
->consumerd_fd
);
86 pthread_mutex_lock(&kconsumerd_lock_fds
);
87 cds_list_del(&lcf
->list
);
90 DBG("Removed ltt_kconsumerd_fd");
93 close(lcf
->consumerd_fd
);
98 pthread_mutex_unlock(&kconsumerd_lock_fds
);
104 * Cleanup the daemon's socket on exit
106 static void cleanup()
108 struct ltt_kconsumerd_fd
*iter
;
110 /* remove the socket file */
111 unlink(command_sock_path
);
113 /* unblock the threads */
114 WARN("Terminating the threads before exiting");
115 pthread_cancel(threads
[0]);
116 pthread_cancel(threads
[1]);
118 /* close all outfd */
119 cds_list_for_each_entry(iter
, &kconsumerd_fd_list
.head
, list
) {
127 * send return code to ltt-sessiond
129 static int send_error(enum lttcomm_return_code cmd
)
131 if (error_socket
> 0) {
132 return lttcomm_send_unix_sock(error_socket
, &cmd
,
133 sizeof(enum lttcomm_sessiond_command
));
142 * Add a fd to the global list protected by a mutex
144 static int add_fd(struct lttcomm_kconsumerd_msg
*buf
, int consumerd_fd
)
146 struct ltt_kconsumerd_fd
*tmp_fd
;
149 tmp_fd
= malloc(sizeof(struct ltt_kconsumerd_fd
));
150 tmp_fd
->sessiond_fd
= buf
->fd
;
151 tmp_fd
->consumerd_fd
= consumerd_fd
;
152 tmp_fd
->state
= buf
->state
;
153 tmp_fd
->max_sb_size
= buf
->max_sb_size
;
154 strncpy(tmp_fd
->path_name
, buf
->path_name
, PATH_MAX
);
156 /* Opening the tracefile in write mode */
157 DBG("Opening %s for writing", tmp_fd
->path_name
);
158 ret
= open(tmp_fd
->path_name
,
159 O_WRONLY
|O_CREAT
, S_IRWXU
|S_IRWXG
|S_IRWXO
);
161 ERR("Opening %s", tmp_fd
->path_name
);
165 tmp_fd
->out_fd
= ret
;
166 tmp_fd
->out_fd_offset
= 0;
168 DBG("Adding %s (%d, %d, %d)", tmp_fd
->path_name
,
169 tmp_fd
->sessiond_fd
, tmp_fd
->consumerd_fd
, tmp_fd
->out_fd
);
171 pthread_mutex_lock(&kconsumerd_lock_fds
);
172 cds_list_add(&tmp_fd
->list
, &kconsumerd_fd_list
.head
);
174 pthread_mutex_unlock(&kconsumerd_lock_fds
);
184 * Signal handler for the daemon
186 static void sighandler(int sig
)
196 * Setup signal handler for :
197 * SIGINT, SIGTERM, SIGPIPE
199 static int set_signal_handler(void)
205 if ((ret
= sigemptyset(&sigset
)) < 0) {
206 perror("sigemptyset");
210 sa
.sa_handler
= sighandler
;
213 if ((ret
= sigaction(SIGTERM
, &sa
, NULL
)) < 0) {
218 if ((ret
= sigaction(SIGINT
, &sa
, NULL
)) < 0) {
223 if ((ret
= sigaction(SIGPIPE
, &sa
, NULL
)) < 0) {
234 * Splice the data from the ring buffer to the tracefile.
235 * Returns the number of bytes spliced
237 static int on_read_subbuffer(struct ltt_kconsumerd_fd
*kconsumerd_fd
,
242 off_t orig_offset
= kconsumerd_fd
->out_fd_offset
;
243 int fd
= kconsumerd_fd
->consumerd_fd
;
244 int outfd
= kconsumerd_fd
->out_fd
;
247 DBG("splice chan to pipe offset %lu (fd : %d)",
248 (unsigned long)offset
, fd
);
249 ret
= splice(fd
, &offset
, thread_pipe
[1], NULL
, len
,
250 SPLICE_F_MOVE
| SPLICE_F_MORE
);
251 DBG("splice chan to pipe ret %ld", ret
);
254 perror("Error in relay splice");
258 ret
= splice(thread_pipe
[0], NULL
, outfd
, NULL
, ret
,
259 SPLICE_F_MOVE
| SPLICE_F_MORE
);
260 DBG("splice pipe to file %ld", ret
);
263 perror("Error in file splice");
269 /* This won't block, but will start writeout asynchronously */
270 sync_file_range(outfd
, kconsumerd_fd
->out_fd_offset
, ret
,
271 SYNC_FILE_RANGE_WRITE
);
272 kconsumerd_fd
->out_fd_offset
+= ret
;
276 * This does a blocking write-and-wait on any page that belongs to the
277 * subbuffer prior to the one we just wrote.
278 * Don't care about error values, as these are just hints and ways to
279 * limit the amount of page cache used.
281 if (orig_offset
>= kconsumerd_fd
->max_sb_size
) {
282 sync_file_range(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
283 kconsumerd_fd
->max_sb_size
,
284 SYNC_FILE_RANGE_WAIT_BEFORE
285 | SYNC_FILE_RANGE_WRITE
286 | SYNC_FILE_RANGE_WAIT_AFTER
);
288 * Give hints to the kernel about how we access the file:
289 * POSIX_FADV_DONTNEED : we won't re-access data in a near
290 * future after we write it.
291 * We need to call fadvise again after the file grows because
292 * the kernel does not seem to apply fadvise to non-existing
294 * Call fadvise _after_ having waited for the page writeback to
295 * complete because the dirty page writeback semantic is not
296 * well defined. So it can be expected to lead to lower
297 * throughput in streaming.
299 posix_fadvise(outfd
, orig_offset
- kconsumerd_fd
->max_sb_size
,
300 kconsumerd_fd
->max_sb_size
, POSIX_FADV_DONTNEED
);
305 /* send the appropriate error description to sessiond */
308 send_error(KCONSUMERD_SPLICE_EBADF
);
311 send_error(KCONSUMERD_SPLICE_EINVAL
);
314 send_error(KCONSUMERD_SPLICE_ENOMEM
);
317 send_error(KCONSUMERD_SPLICE_ESPIPE
);
328 * Consume data on a file descriptor and write it on a trace file
330 static int read_subbuffer(struct ltt_kconsumerd_fd
*kconsumerd_fd
)
335 int infd
= kconsumerd_fd
->consumerd_fd
;
337 DBG("In read_subbuffer (infd : %d)", infd
);
338 /* Get the next subbuffer */
339 err
= kernctl_get_next_subbuf(infd
);
342 perror("Reserving sub buffer failed (everything is normal, "
343 "it is due to concurrency)");
347 /* read the whole subbuffer */
348 err
= kernctl_get_padded_subbuf_size(infd
, &len
);
351 perror("Getting sub-buffer len failed.");
355 /* splice the subbuffer to the tracefile */
356 ret
= on_read_subbuffer(kconsumerd_fd
, len
);
359 * display the error but continue processing to try
360 * to release the subbuffer
362 ERR("Error splicing to tracefile");
365 err
= kernctl_put_next_subbuf(infd
);
368 if (errno
== EFAULT
) {
369 perror("Error in unreserving sub buffer\n");
370 } else if (errno
== EIO
) {
371 /* Should never happen with newer LTTng versions */
372 perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
384 * Update a fd according to what we just received
386 static void change_fd_state(int sessiond_fd
,
387 enum lttcomm_kconsumerd_fd_state state
)
389 struct ltt_kconsumerd_fd
*iter
;
390 cds_list_for_each_entry(iter
, &kconsumerd_fd_list
.head
, list
) {
391 if (iter
->sessiond_fd
== sessiond_fd
) {
401 * Receives an array of file descriptors and the associated
402 * structures describing each fd (path name).
403 * Returns the size of received data
405 static int consumerd_recv_fd(int sfd
, int size
,
406 enum lttcomm_consumerd_command cmd_type
)
411 struct cmsghdr
*cmsg
;
413 char tmp
[CMSG_SPACE(size
)];
414 struct lttcomm_kconsumerd_msg
*buf
;
415 /* the number of fds we are about to receive */
416 nb_fd
= size
/sizeof(struct lttcomm_kconsumerd_msg
);
420 memset(&msg
, 0, sizeof(msg
));
422 /* Prepare to receive the structures */
423 iov
[0].iov_base
= buf
;
424 iov
[0].iov_len
= size
;
428 msg
.msg_control
= tmp
;
429 msg
.msg_controllen
= sizeof(tmp
);
431 DBG("Waiting to receive fds");
432 if ((ret
= recvmsg(sfd
, &msg
, 0)) < 0) {
436 ERR("Received only %d, expected %d", ret
, size
);
437 send_error(KCONSUMERD_ERROR_RECV_FD
);
441 cmsg
= CMSG_FIRSTHDR(&msg
);
443 ERR("Invalid control message header");
445 send_error(KCONSUMERD_ERROR_RECV_FD
);
449 /* if we received fds */
450 if (cmsg
->cmsg_level
== SOL_SOCKET
&& cmsg
->cmsg_type
== SCM_RIGHTS
) {
451 DBG("Receive : expecting %d fds", nb_fd
);
452 for (i
= 0; i
< nb_fd
; i
++) {
454 case LTTCOMM_ADD_STREAM
:
455 DBG("add_fd %s (%d)", buf
[i
].path_name
, ((int *)CMSG_DATA(cmsg
))[i
]);
456 ret
= add_fd(&buf
[i
], ((int *)CMSG_DATA(cmsg
))[i
]);
458 send_error(KCONSUMERD_OUTFD_ERROR
);
462 case LTTCOMM_UPDATE_STREAM
:
463 change_fd_state(buf
[i
].fd
, buf
[i
].state
);
469 /* flag to tell the polling thread to update its fd array */
471 /* signal the poll thread */
472 tmp2
= write(poll_pipe
[1], "4", 1);
474 ERR("Didn't received any fd");
475 send_error(KCONSUMERD_ERROR_RECV_FD
);
491 * This thread listens on the consumerd socket and
492 * receives the file descriptors from ltt-sessiond
494 static void *thread_receive_fds(void *data
)
496 int sock
, client_socket
, ret
;
497 struct lttcomm_kconsumerd_header tmp
;
499 DBG("Creating command socket %s", command_sock_path
);
500 unlink(command_sock_path
);
501 client_socket
= lttcomm_create_unix_sock(command_sock_path
);
502 if (client_socket
< 0) {
503 ERR("Cannot create command socket");
507 ret
= lttcomm_listen_unix_sock(client_socket
);
512 DBG("Sending ready command to ltt-sessiond");
513 ret
= send_error(KCONSUMERD_COMMAND_SOCK_READY
);
515 ERR("Error sending ready command to ltt-sessiond");
519 /* Blocking call, waiting for transmission */
520 sock
= lttcomm_accept_unix_sock(client_socket
);
522 WARN("On accept, retrying");
526 /* We first get the number of fd we are about to receive */
527 ret
= lttcomm_recv_unix_sock(sock
, &tmp
,
528 sizeof(struct lttcomm_kconsumerd_header
));
530 ERR("Receiving the lttcomm_kconsumerd_header, exiting");
533 ret
= consumerd_recv_fd(sock
, tmp
.payload_size
, tmp
.cmd_type
);
535 ERR("Receiving the FD, exiting");
547 * Allocate the pollfd structure and the local view of the out fds
548 * to avoid doing a lookup in the linked list and concurrency issues
549 * when writing is needed.
550 * Returns the number of fds in the structures
552 static int update_poll_array(struct pollfd
**pollfd
,
553 struct ltt_kconsumerd_fd
**local_kconsumerd_fd
)
555 struct ltt_kconsumerd_fd
*iter
;
558 if (*pollfd
!= NULL
) {
563 if (*local_kconsumerd_fd
!= NULL
) {
564 free(*local_kconsumerd_fd
);
565 *local_kconsumerd_fd
= NULL
;
568 /* allocate for all fds + 1 for the poll_pipe */
569 *pollfd
= malloc((fds_count
+ 1) * sizeof(struct pollfd
));
570 if (*pollfd
== NULL
) {
571 perror("pollfd malloc");
575 /* allocate for all fds + 1 for the poll_pipe */
576 *local_kconsumerd_fd
= malloc((fds_count
+ 1) * sizeof(struct ltt_kconsumerd_fd
));
577 if (*local_kconsumerd_fd
== NULL
) {
578 perror("local_kconsumerd_fd malloc");
582 DBG("Updating poll fd array");
583 pthread_mutex_lock(&kconsumerd_lock_fds
);
585 cds_list_for_each_entry(iter
, &kconsumerd_fd_list
.head
, list
) {
586 DBG("Inside for each");
587 if (iter
->state
== ACTIVE_FD
) {
588 DBG("Active FD %d", iter
->consumerd_fd
);
589 (*pollfd
)[i
].fd
= iter
->consumerd_fd
;
590 (*pollfd
)[i
].events
= POLLIN
| POLLPRI
;
591 local_kconsumerd_fd
[i
] = iter
;
593 } else if (iter
->state
== DELETE_FD
) {
598 * insert the poll_pipe at the end of the array and don't increment i
599 * so nb_fd is the number of real FD
601 (*pollfd
)[i
].fd
= poll_pipe
[0];
602 (*pollfd
)[i
].events
= POLLIN
;
605 pthread_mutex_unlock(&kconsumerd_lock_fds
);
615 * This thread polls the fds in the ltt_fd_list to consume the data
616 * and write it to tracefile if necessary.
618 static void *thread_poll_fds(void *data
)
620 int num_rdy
, num_hup
, high_prio
, ret
, i
;
621 struct pollfd
*pollfd
= NULL
;
622 /* local view of the fds */
623 struct ltt_kconsumerd_fd
**local_kconsumerd_fd
= NULL
;
624 /* local view of fds_count */
629 ret
= pipe(thread_pipe
);
631 perror("Error creating pipe");
635 local_kconsumerd_fd
= malloc(sizeof(struct ltt_kconsumerd_fd
));
642 * the ltt_fd_list has been updated, we need to update our
643 * local array as well
645 if (update_fd_array
) {
646 ret
= update_poll_array(&pollfd
, local_kconsumerd_fd
);
648 ERR("Error in allocating pollfd or local_outfds");
649 send_error(KCONSUMERD_POLL_ERROR
);
655 /* poll on the array of fds */
656 DBG("polling on %d fd", nb_fd
+ 1);
657 num_rdy
= poll(pollfd
, nb_fd
+ 1, -1);
658 DBG("poll num_rdy : %d", num_rdy
);
660 perror("Poll error");
661 send_error(KCONSUMERD_POLL_ERROR
);
666 * if only the poll_pipe triggered poll to return just return to the
667 * beginning of the loop to update the array
669 if (num_rdy
== 1 && pollfd
[nb_fd
].revents
== POLLIN
) {
670 DBG("poll_pipe wake up");
671 tmp2
= read(poll_pipe
[0], &tmp
, 1);
675 /* Take care of high priority channels first. */
676 for (i
= 0; i
< nb_fd
; i
++) {
677 switch(pollfd
[i
].revents
) {
679 ERR("Error returned in polling fd %d.", pollfd
[i
].fd
);
681 send_error(KCONSUMERD_POLL_ERROR
);
684 ERR("Polling fd %d tells it has hung up.", pollfd
[i
].fd
);
688 ERR("Polling fd %d tells fd is not open.", pollfd
[i
].fd
);
689 send_error(KCONSUMERD_POLL_NVAL
);
693 DBG("Urgent read on fd %d", pollfd
[i
].fd
);
695 ret
= read_subbuffer(local_kconsumerd_fd
[i
]);
696 /* it's ok to have an unavailable sub-buffer (FIXME : is it ?) */
704 /* If every buffer FD has hung up, we end the read loop here */
705 if (nb_fd
> 0 && num_hup
== nb_fd
) {
706 DBG("every buffer FD has hung up\n");
707 send_error(KCONSUMERD_POLL_HUP
);
711 /* Take care of low priority channels. */
713 for (i
= 0; i
< nb_fd
; i
++) {
714 switch(pollfd
[i
].revents
) {
716 DBG("Normal read on fd %d", pollfd
[i
].fd
);
717 ret
= read_subbuffer(local_kconsumerd_fd
[i
]);
718 /* it's ok to have an unavailable subbuffer (FIXME : is it ?) */
728 if (pollfd
!= NULL
) {
732 if (local_kconsumerd_fd
!= NULL
) {
733 free(local_kconsumerd_fd
);
734 local_kconsumerd_fd
= NULL
;
741 * usage function on stderr
743 static void usage(void)
745 fprintf(stderr
, "Usage: %s OPTIONS\n\nOptions:\n", progname
);
746 fprintf(stderr
, " -h, --help "
747 "Display this usage.\n");
748 fprintf(stderr
, " -c, --kconsumerd-cmd-sock PATH "
749 "Specify path for the command socket\n");
750 fprintf(stderr
, " -e, --kconsumerd-err-sock PATH "
751 "Specify path for the error socket\n");
752 fprintf(stderr
, " -d, --daemonize "
753 "Start as a daemon.\n");
754 fprintf(stderr
, " -q, --quiet "
755 "No output at all.\n");
756 fprintf(stderr
, " -v, --verbose "
757 "Verbose mode. Activate DBG() macro.\n");
758 fprintf(stderr
, " -V, --version "
759 "Show version number.\n");
763 * daemon argument parsing
765 static void parse_args(int argc
, char **argv
)
769 static struct option long_options
[] = {
770 { "kconsumerd-cmd-sock", 1, 0, 'c' },
771 { "kconsumerd-err-sock", 1, 0, 'e' },
772 { "daemonize", 0, 0, 'd' },
773 { "help", 0, 0, 'h' },
774 { "quiet", 0, 0, 'q' },
775 { "verbose", 0, 0, 'v' },
776 { "version", 0, 0, 'V' },
781 int option_index
= 0;
782 c
= getopt_long(argc
, argv
, "dhqvV" "c:e:", long_options
, &option_index
);
789 fprintf(stderr
, "option %s", long_options
[option_index
].name
);
791 fprintf(stderr
, " with arg %s\n", optarg
);
795 snprintf(command_sock_path
, PATH_MAX
, "%s", optarg
);
798 snprintf(error_sock_path
, PATH_MAX
, "%s", optarg
);
813 fprintf(stdout
, "%s\n", VERSION
);
826 int main(int argc
, char **argv
)
832 /* Parse arguments */
834 parse_args(argc
, argv
);
845 if (strlen(command_sock_path
) == 0) {
846 snprintf(command_sock_path
, PATH_MAX
,
847 KCONSUMERD_CMD_SOCK_PATH
);
849 if (strlen(error_sock_path
) == 0) {
850 snprintf(error_sock_path
, PATH_MAX
,
851 KCONSUMERD_ERR_SOCK_PATH
);
854 if (set_signal_handler() < 0) {
858 /* create the pipe to wake to polling thread when needed */
859 ret
= pipe(poll_pipe
);
861 perror("Error creating poll pipe");
865 /* Connect to the socket created by ltt-sessiond to report errors */
866 DBG("Connecting to error socket %s", error_sock_path
);
867 error_socket
= lttcomm_connect_unix_sock(error_sock_path
);
868 /* not a fatal error, but all communication with ltt-sessiond will fail */
869 if (error_socket
< 0) {
870 WARN("Cannot connect to error socket, is ltt-sessiond started ?");
873 /* Create the thread to manage the receive of fd */
874 ret
= pthread_create(&threads
[0], NULL
, thread_receive_fds
, (void *) NULL
);
876 perror("pthread_create");
880 /* Create thread to manage the polling/writing of traces */
881 ret
= pthread_create(&threads
[1], NULL
, thread_poll_fds
, (void *) NULL
);
883 perror("pthread_create");
887 for (i
= 0; i
< 2; i
++) {
888 ret
= pthread_join(threads
[i
], &status
);
890 perror("pthread_join");
895 send_error(KCONSUMERD_EXIT_SUCCESS
);
900 send_error(KCONSUMERD_EXIT_FAILURE
);