1 /* Copyright (C) 2009 Pierre-Marc Fournier
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
21 #include <sys/epoll.h>
23 #include <sys/types.h>
35 #include <ust/ustconsumer.h>
40 #define GET_SUBBUF_OK 1
41 #define GET_SUBBUF_DONE 0
42 #define GET_SUBBUF_DIED 2
44 #define PUT_SUBBUF_OK 1
45 #define PUT_SUBBUF_DIED 0
46 #define PUT_SUBBUF_PUSHED 2
47 #define PUT_SUBBUF_DONE 3
49 #define UNIX_PATH_MAX 108
51 static int get_subbuffer(struct buffer_info
*buf
)
53 struct ustcomm_header _send_hdr
, *send_hdr
;
54 struct ustcomm_header _recv_hdr
, *recv_hdr
;
55 struct ustcomm_buffer_info _send_msg
, _recv_msg
;
56 struct ustcomm_buffer_info
*send_msg
, *recv_msg
;
59 send_hdr
= &_send_hdr
;
60 recv_hdr
= &_recv_hdr
;
61 send_msg
= &_send_msg
;
62 recv_msg
= &_recv_msg
;
64 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
65 buf
->channel
, buf
->channel_cpu
);
70 send_hdr
->command
= GET_SUBBUFFER
;
72 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
73 recv_hdr
, (char *)recv_msg
);
74 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
76 DBG("app died while being traced");
77 return GET_SUBBUF_DIED
;
78 } else if (result
< 0) {
79 ERR("get_subbuffer: ustcomm_req failed");
83 if (!recv_hdr
->result
) {
84 DBG("got subbuffer %s", buf
->name
);
85 buf
->consumed_old
= recv_msg
->consumed_old
;
87 } else if (recv_hdr
->result
== -ENODATA
) {
88 DBG("For buffer %s, the trace was not found. This likely means"
89 " it was destroyed by the user.", buf
->name
);
90 return GET_SUBBUF_DIED
;
93 DBG("error getting subbuffer %s", buf
->name
);
94 return recv_hdr
->result
;
97 static int put_subbuffer(struct buffer_info
*buf
)
99 struct ustcomm_header _send_hdr
, *send_hdr
;
100 struct ustcomm_header _recv_hdr
, *recv_hdr
;
101 struct ustcomm_buffer_info _send_msg
, *send_msg
;
104 send_hdr
= &_send_hdr
;
105 recv_hdr
= &_recv_hdr
;
106 send_msg
= &_send_msg
;
108 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
109 buf
->channel
, buf
->channel_cpu
);
114 send_hdr
->command
= PUT_SUBBUFFER
;
115 send_msg
->consumed_old
= buf
->consumed_old
;
117 result
= ustcomm_req(buf
->app_sock
, send_hdr
, (char *)send_msg
,
119 if ((result
< 0 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
121 DBG("app died while being traced");
122 return PUT_SUBBUF_DIED
;
123 } else if (result
< 0) {
124 ERR("put_subbuffer: ustcomm_req failed");
128 if (!recv_hdr
->result
) {
129 DBG("put subbuffer %s", buf
->name
);
130 return PUT_SUBBUF_OK
;
131 } else if (recv_hdr
->result
== -ENODATA
) {
132 DBG("For buffer %s, the trace was not found. This likely means"
133 " it was destroyed by the user.", buf
->name
);
134 return PUT_SUBBUF_DIED
;
137 DBG("error getting subbuffer %s", buf
->name
);
138 return recv_hdr
->result
;
141 void decrement_active_buffers(void *arg
)
143 struct ustconsumer_instance
*instance
= arg
;
144 pthread_mutex_lock(&instance
->mutex
);
145 instance
->active_buffers
--;
146 pthread_mutex_unlock(&instance
->mutex
);
149 static int get_pidunique(int sock
, s64
*pidunique
)
151 struct ustcomm_header _send_hdr
, *send_hdr
;
152 struct ustcomm_header _recv_hdr
, *recv_hdr
;
153 struct ustcomm_pidunique _recv_msg
, *recv_msg
;
156 send_hdr
= &_send_hdr
;
157 recv_hdr
= &_recv_hdr
;
158 recv_msg
= &_recv_msg
;
160 memset(send_hdr
, 0, sizeof(*send_hdr
));
162 send_hdr
->command
= GET_PIDUNIQUE
;
163 result
= ustcomm_req(sock
, send_hdr
, NULL
, recv_hdr
, (char *)recv_msg
);
167 if (recv_hdr
->result
< 0) {
168 ERR("App responded with error: %s", strerror(recv_hdr
->result
));
169 return recv_hdr
->result
;
172 *pidunique
= recv_msg
->pidunique
;
177 static int get_buf_shmid_pipe_fd(int sock
, struct buffer_info
*buf
,
178 int *buf_shmid
, int *buf_struct_shmid
,
181 struct ustcomm_header _send_hdr
, *send_hdr
;
182 struct ustcomm_header _recv_hdr
, *recv_hdr
;
183 struct ustcomm_buffer_info _send_msg
, *send_msg
;
184 struct ustcomm_buffer_info _recv_msg
, *recv_msg
;
185 int result
, recv_pipe_fd
;
187 send_hdr
= &_send_hdr
;
188 recv_hdr
= &_recv_hdr
;
189 send_msg
= &_send_msg
;
190 recv_msg
= &_recv_msg
;
192 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
193 buf
->channel
, buf
->channel_cpu
);
195 ERR("Failed to pack buffer info");
199 send_hdr
->command
= GET_BUF_SHMID_PIPE_FD
;
201 result
= ustcomm_send(sock
, send_hdr
, (char *)send_msg
);
203 ERR("Failed to send request");
206 result
= ustcomm_recv_fd(sock
, recv_hdr
, (char *)recv_msg
, &recv_pipe_fd
);
208 ERR("Failed to receive message and fd");
211 if (recv_hdr
->result
< 0) {
212 ERR("App responded with error %s", strerror(recv_hdr
->result
));
213 return recv_hdr
->result
;
216 *buf_shmid
= recv_msg
->buf_shmid
;
217 *buf_struct_shmid
= recv_msg
->buf_struct_shmid
;
218 *buf_pipe_fd
= recv_pipe_fd
;
223 static int get_subbuf_num_size(int sock
, struct buffer_info
*buf
,
224 int *subbuf_num
, int *subbuf_size
)
226 struct ustcomm_header _send_hdr
, *send_hdr
;
227 struct ustcomm_header _recv_hdr
, *recv_hdr
;
228 struct ustcomm_channel_info _send_msg
, *send_msg
;
229 struct ustcomm_channel_info _recv_msg
, *recv_msg
;
232 send_hdr
= &_send_hdr
;
233 recv_hdr
= &_recv_hdr
;
234 send_msg
= &_send_msg
;
235 recv_msg
= &_recv_msg
;
237 result
= ustcomm_pack_channel_info(send_hdr
, send_msg
, buf
->trace
,
243 send_hdr
->command
= GET_SUBBUF_NUM_SIZE
;
245 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
246 recv_hdr
, (char *)recv_msg
);
251 *subbuf_num
= recv_msg
->subbuf_num
;
252 *subbuf_size
= recv_msg
->subbuf_size
;
254 return recv_hdr
->result
;
258 static int notify_buffer_mapped(int sock
, struct buffer_info
*buf
)
260 struct ustcomm_header _send_hdr
, *send_hdr
;
261 struct ustcomm_header _recv_hdr
, *recv_hdr
;
262 struct ustcomm_buffer_info _send_msg
, *send_msg
;
265 send_hdr
= &_send_hdr
;
266 recv_hdr
= &_recv_hdr
;
267 send_msg
= &_send_msg
;
269 result
= ustcomm_pack_buffer_info(send_hdr
, send_msg
, buf
->trace
,
270 buf
->channel
, buf
->channel_cpu
);
275 send_hdr
->command
= NOTIFY_BUF_MAPPED
;
277 result
= ustcomm_req(sock
, send_hdr
, (char *)send_msg
,
283 return recv_hdr
->result
;
287 struct buffer_info
*connect_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
288 const char *trace
, const char *channel
,
291 struct buffer_info
*buf
;
293 struct shmid_ds shmds
;
295 buf
= (struct buffer_info
*) zmalloc(sizeof(struct buffer_info
));
297 ERR("add_buffer: insufficient memory");
301 buf
->trace
= strdup(trace
);
306 buf
->channel
= strdup(channel
);
311 result
= asprintf(&buf
->name
, "%s_%d", channel
, channel_cpu
);
312 if (result
< 0 || buf
->name
== NULL
) {
313 goto free_buf_channel
;
316 buf
->channel_cpu
= channel_cpu
;
319 result
= ustcomm_connect_app(buf
->pid
, &buf
->app_sock
);
321 WARN("unable to connect to process, it probably died before we were able to connect");
326 result
= get_pidunique(buf
->app_sock
, &buf
->pidunique
);
328 ERR("Failed to get pidunique");
332 /* get shmid and pipe fd */
333 result
= get_buf_shmid_pipe_fd(buf
->app_sock
, buf
, &buf
->shmid
,
334 &buf
->bufstruct_shmid
, &buf
->pipe_fd
);
336 ERR("Failed to get buf_shmid and pipe_fd");
340 fstat(buf
->pipe_fd
, &temp
);
341 if (!S_ISFIFO(temp
.st_mode
)) {
342 ERR("Didn't receive a fifo from the app");
348 /* get number of subbufs and subbuf size */
349 result
= get_subbuf_num_size(buf
->app_sock
, buf
, &buf
->n_subbufs
,
352 ERR("Failed to get subbuf number and size");
357 buf
->mem
= shmat(buf
->shmid
, NULL
, 0);
358 if(buf
->mem
== (void *) 0) {
362 DBG("successfully attached buffer memory");
364 buf
->bufstruct_mem
= shmat(buf
->bufstruct_shmid
, NULL
, 0);
365 if(buf
->bufstruct_mem
== (void *) 0) {
369 DBG("successfully attached buffer bufstruct memory");
371 /* obtain info on the memory segment */
372 result
= shmctl(buf
->shmid
, IPC_STAT
, &shmds
);
375 goto shmdt_bufstruct_mem
;
377 buf
->memlen
= shmds
.shm_segsz
;
379 /* Notify the application that we have mapped the buffer */
380 result
= notify_buffer_mapped(buf
->app_sock
, buf
);
382 goto shmdt_bufstruct_mem
;
385 if(instance
->callbacks
->on_open_buffer
)
386 instance
->callbacks
->on_open_buffer(instance
->callbacks
, buf
);
388 pthread_mutex_lock(&instance
->mutex
);
389 instance
->active_buffers
++;
390 pthread_mutex_unlock(&instance
->mutex
);
395 shmdt(buf
->bufstruct_mem
);
404 close(buf
->app_sock
);
420 static void destroy_buffer(struct ustconsumer_callbacks
*callbacks
,
421 struct buffer_info
*buf
)
425 result
= close(buf
->app_sock
);
427 WARN("problem calling ustcomm_close_app");
430 result
= shmdt(buf
->mem
);
435 result
= shmdt(buf
->bufstruct_mem
);
440 if(callbacks
->on_close_buffer
)
441 callbacks
->on_close_buffer(callbacks
, buf
);
446 int consumer_loop(struct ustconsumer_instance
*instance
, struct buffer_info
*buf
)
448 int result
, read_result
;
451 pthread_cleanup_push(decrement_active_buffers
, instance
);
454 read_result
= read(buf
->pipe_fd
, &read_buf
, 1);
455 /* get the subbuffer */
456 if (read_result
== 1) {
457 result
= get_subbuffer(buf
);
459 ERR("error getting subbuffer");
461 } else if (result
== GET_SUBBUF_DIED
) {
462 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
465 } else if ((read_result
== -1 && (errno
== ECONNRESET
|| errno
== EPIPE
)) ||
467 DBG("App died while being traced");
468 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
472 if(instance
->callbacks
->on_read_subbuffer
)
473 instance
->callbacks
->on_read_subbuffer(instance
->callbacks
, buf
);
475 /* put the subbuffer */
476 result
= put_subbuffer(buf
);
478 ERR("unknown error putting subbuffer (channel=%s)", buf
->name
);
481 else if(result
== PUT_SUBBUF_PUSHED
) {
482 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf
->name
);
485 else if(result
== PUT_SUBBUF_DIED
) {
486 DBG("application died while putting subbuffer");
487 /* Skip the first subbuffer. We are not sure it is trustable
488 * because the put_subbuffer() did not complete.
490 if(instance
->callbacks
->on_put_error
)
491 instance
->callbacks
->on_put_error(instance
->callbacks
, buf
);
493 finish_consuming_dead_subbuffer(instance
->callbacks
, buf
);
496 else if(result
== PUT_SUBBUF_DONE
) {
497 /* Done with this subbuffer */
498 /* FIXME: add a case where this branch is used? Upon
499 * normal trace termination, at put_subbuf time, a
500 * special last-subbuffer code could be returned by
505 else if(result
== PUT_SUBBUF_OK
) {
509 DBG("thread for buffer %s is stopping", buf
->name
);
511 /* FIXME: destroy, unalloc... */
513 pthread_cleanup_pop(1);
518 struct consumer_thread_args
{
523 struct ustconsumer_instance
*instance
;
526 void *consumer_thread(void *arg
)
528 struct buffer_info
*buf
;
529 struct consumer_thread_args
*args
= (struct consumer_thread_args
*) arg
;
533 if(args
->instance
->callbacks
->on_new_thread
)
534 args
->instance
->callbacks
->on_new_thread(args
->instance
->callbacks
);
536 /* Block signals that should be handled by the main thread. */
537 result
= sigemptyset(&sigset
);
539 PERROR("sigemptyset");
542 result
= sigaddset(&sigset
, SIGTERM
);
547 result
= sigaddset(&sigset
, SIGINT
);
552 result
= sigprocmask(SIG_BLOCK
, &sigset
, NULL
);
554 PERROR("sigprocmask");
558 buf
= connect_buffer(args
->instance
, args
->pid
, args
->trace
,
559 args
->channel
, args
->channel_cpu
);
561 ERR("failed to connect to buffer");
565 consumer_loop(args
->instance
, buf
);
567 destroy_buffer(args
->instance
->callbacks
, buf
);
571 if(args
->instance
->callbacks
->on_close_thread
)
572 args
->instance
->callbacks
->on_close_thread(args
->instance
->callbacks
);
574 free((void *)args
->channel
);
579 int start_consuming_buffer(struct ustconsumer_instance
*instance
, pid_t pid
,
580 const char *trace
, const char *channel
,
584 struct consumer_thread_args
*args
;
587 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s_%d", pid
, channel
,
590 args
= (struct consumer_thread_args
*) zmalloc(sizeof(struct consumer_thread_args
));
596 args
->trace
= strdup(trace
);
597 args
->channel
= strdup(channel
);
598 args
->channel_cpu
= channel_cpu
;
599 args
->instance
= instance
;
600 DBG("beginning2 of start_consuming_buffer: args: pid %d trace %s"
601 " bufname %s_%d", args
->pid
, args
->channel
, args
->channel_cpu
);
603 result
= pthread_create(&thr
, NULL
, consumer_thread
, args
);
605 ERR("pthread_create failed");
608 result
= pthread_detach(thr
);
610 ERR("pthread_detach failed");
613 DBG("end of start_consuming_buffer: args: pid %d trace %s "
614 "bufname %s_%d", args
->pid
, args
->channel
, args
->channel_cpu
);
618 static void process_client_cmd(int sock
, struct ustcomm_header
*req_header
,
619 char *recvbuf
, struct ustconsumer_instance
*instance
)
622 struct ustcomm_header _res_header
;
623 struct ustcomm_header
*res_header
= &_res_header
;
624 struct ustcomm_buffer_info
*buf_inf
;
626 DBG("Processing client command");
628 switch (req_header
->command
) {
631 buf_inf
= (struct ustcomm_buffer_info
*)recvbuf
;
632 result
= ustcomm_unpack_buffer_info(buf_inf
);
634 ERR("Couldn't unpack buffer info");
638 DBG("Going to consume trace %s buffer %s_%d in process %d",
639 buf_inf
->trace
, buf_inf
->channel
, buf_inf
->ch_cpu
,
641 result
= start_consuming_buffer(instance
, buf_inf
->pid
,
646 ERR("error in add_buffer");
650 res_header
->result
= 0;
653 res_header
->result
= 0;
654 /* Only there to force poll to return */
657 res_header
->result
= -EINVAL
;
658 WARN("unknown command: %d", req_header
->command
);
661 if (ustcomm_send(sock
, res_header
, NULL
) <= 0) {
662 ERR("couldn't send command response");
666 #define MAX_EVENTS 10
668 int ustconsumer_start_instance(struct ustconsumer_instance
*instance
)
670 struct ustcomm_header recv_hdr
;
671 char recv_buf
[USTCOMM_BUFFER_SIZE
];
672 struct ustcomm_sock
*epoll_sock
;
673 struct epoll_event events
[MAX_EVENTS
];
674 struct sockaddr addr
;
675 int result
, epoll_fd
, accept_fd
, nfds
, i
, addr_size
, timeout
;
677 if(!instance
->is_init
) {
678 ERR("libustconsumer instance not initialized");
681 epoll_fd
= instance
->epoll_fd
;
687 nfds
= epoll_wait(epoll_fd
, events
, MAX_EVENTS
, timeout
);
688 if (nfds
== -1 && errno
== EINTR
) {
690 } else if (nfds
== -1) {
691 PERROR("ustconsumer_start_instance: epoll_wait failed");
695 for (i
= 0; i
< nfds
; ++i
) {
696 epoll_sock
= (struct ustcomm_sock
*)events
[i
].data
.ptr
;
697 if (epoll_sock
== instance
->listen_sock
) {
698 addr_size
= sizeof(struct sockaddr
);
699 accept_fd
= accept(epoll_sock
->fd
,
701 (socklen_t
*)&addr_size
);
702 if (accept_fd
== -1) {
703 PERROR("ustconsumer_start_instance: "
707 ustcomm_init_sock(accept_fd
, epoll_fd
,
708 &instance
->connections
);
710 result
= ustcomm_recv(epoll_sock
->fd
, &recv_hdr
,
713 ustcomm_del_sock(epoll_sock
, 0);
715 process_client_cmd(epoll_sock
->fd
,
723 if (instance
->quit_program
) {
724 pthread_mutex_lock(&instance
->mutex
);
725 if(instance
->active_buffers
== 0) {
726 pthread_mutex_unlock(&instance
->mutex
);
729 pthread_mutex_unlock(&instance
->mutex
);
734 if(instance
->callbacks
->on_trace_end
)
735 instance
->callbacks
->on_trace_end(instance
);
737 ustconsumer_delete_instance(instance
);
742 /* FIXME: threads and connections !? */
743 void ustconsumer_delete_instance(struct ustconsumer_instance
*instance
)
745 if (instance
->is_init
) {
746 ustcomm_del_named_sock(instance
->listen_sock
, 0);
747 close(instance
->epoll_fd
);
750 pthread_mutex_destroy(&instance
->mutex
);
751 free(instance
->sock_path
);
755 /* FIXME: Do something about the fixed path length, maybe get rid
756 * of the whole concept and use a pipe?
758 int ustconsumer_stop_instance(struct ustconsumer_instance
*instance
, int send_msg
)
766 instance
->quit_program
= 1;
771 /* Send a message through the socket to force poll to return */
773 struct sockaddr_un addr
;
775 result
= fd
= socket(PF_UNIX
, SOCK_STREAM
, 0);
781 addr
.sun_family
= AF_UNIX
;
783 strncpy(addr
.sun_path
, instance
->sock_path
, UNIX_PATH_MAX
);
784 addr
.sun_path
[UNIX_PATH_MAX
-1] = '\0';
786 result
= connect(fd
, (struct sockaddr
*)&addr
, sizeof(addr
));
791 while(bytes
!= sizeof(msg
))
792 bytes
+= send(fd
, msg
, sizeof(msg
), 0);
799 struct ustconsumer_instance
800 *ustconsumer_new_instance(struct ustconsumer_callbacks
*callbacks
,
803 struct ustconsumer_instance
*instance
=
804 zmalloc(sizeof(struct ustconsumer_instance
));
809 instance
->callbacks
= callbacks
;
810 instance
->quit_program
= 0;
811 instance
->is_init
= 0;
812 instance
->active_buffers
= 0;
813 pthread_mutex_init(&instance
->mutex
, NULL
);
816 instance
->sock_path
= strdup(sock_path
);
818 instance
->sock_path
= NULL
;
824 static int init_ustconsumer_socket(struct ustconsumer_instance
*instance
)
828 if (instance
->sock_path
) {
829 if (asprintf(&name
, "%s", instance
->sock_path
) < 0) {
830 ERR("ustcomm_init_ustconsumer : asprintf failed (sock_path %s)",
831 instance
->sock_path
);
837 /* Only check if socket dir exists if we are using the default directory */
838 result
= ensure_dir_exists(SOCK_DIR
);
840 ERR("Unable to create socket directory %s", SOCK_DIR
);
844 if (asprintf(&name
, "%s/%s", SOCK_DIR
, "ustconsumer") < 0) {
845 ERR("ustcomm_init_ustconsumer : asprintf failed (%s/ustconsumer)",
852 instance
->epoll_fd
= epoll_create(MAX_EVENTS
);
853 if (instance
->epoll_fd
== -1) {
854 ERR("epoll_create failed, start instance bailing");
858 /* Create the named socket */
859 instance
->listen_sock
= ustcomm_init_named_socket(name
,
861 if(!instance
->listen_sock
) {
862 ERR("error initializing named socket at %s", name
);
866 CDS_INIT_LIST_HEAD(&instance
->connections
);
873 close(instance
->epoll_fd
);
880 int ustconsumer_init_instance(struct ustconsumer_instance
*instance
)
883 result
= init_ustconsumer_socket(instance
);
885 ERR("failed to initialize socket");
888 instance
->is_init
= 1;