remove unused variables
[lttng-ust.git] / libustd / libustd.c
1 /* Copyright (C) 2009 Pierre-Marc Fournier
2 * 2010 Alexis Halle
3 *
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Lesser General Public
6 * License as published by the Free Software Foundation; either
7 * version 2.1 of the License, or (at your option) any later version.
8 *
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Lesser General Public License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public
15 * License along with this library; if not, write to the Free Software
16 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17 */
18
19 #define _GNU_SOURCE
20
21 #include <sys/shm.h>
22 #include <unistd.h>
23 #include <pthread.h>
24 #include <signal.h>
25
26 #include <stdlib.h>
27 #include <stdio.h>
28 #include <string.h>
29 #include <errno.h>
30 #include <assert.h>
31
32 #include "libustd.h"
33 #include "usterr.h"
34 #include "ustcomm.h"
35
36 /* return value: 0 = subbuffer is finished, it won't produce data anymore
37 * 1 = got subbuffer successfully
38 * <0 = error
39 */
40
41 #define GET_SUBBUF_OK 1
42 #define GET_SUBBUF_DONE 0
43 #define GET_SUBBUF_DIED 2
44
45 #define PUT_SUBBUF_OK 1
46 #define PUT_SUBBUF_DIED 0
47 #define PUT_SUBBUF_PUSHED 2
48 #define PUT_SUBBUF_DONE 3
49
50 #define UNIX_PATH_MAX 108
51
52 int get_subbuffer(struct buffer_info *buf)
53 {
54 char *send_msg=NULL;
55 char *received_msg=NULL;
56 char *rep_code=NULL;
57 int retval;
58 int result;
59
60 asprintf(&send_msg, "get_subbuffer %s", buf->name);
61 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
62 if((result == -1 && (errno == ECONNRESET || errno == EPIPE)) || result == 0) {
63 DBG("app died while being traced");
64 retval = GET_SUBBUF_DIED;
65 goto end;
66 }
67 else if(result < 0) {
68 ERR("get_subbuffer: ustcomm_send_request failed");
69 retval = -1;
70 goto end;
71 }
72
73 result = sscanf(received_msg, "%as %ld", &rep_code, &buf->consumed_old);
74 if(result != 2 && result != 1) {
75 ERR("unable to parse response to get_subbuffer");
76 retval = -1;
77 free(received_msg);
78 goto end_rep;
79 }
80
81 if(!strcmp(rep_code, "OK")) {
82 DBG("got subbuffer %s", buf->name);
83 retval = GET_SUBBUF_OK;
84 }
85 else if(nth_token_is(received_msg, "END", 0) == 1) {
86 retval = GET_SUBBUF_DONE;
87 goto end_rep;
88 }
89 else if(!strcmp(received_msg, "NOTFOUND")) {
90 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
91 retval = GET_SUBBUF_DIED;
92 goto end_rep;
93 }
94 else {
95 DBG("error getting subbuffer %s", buf->name);
96 retval = -1;
97 }
98
99 /* FIXME: free correctly the stuff */
100 end_rep:
101 if(rep_code)
102 free(rep_code);
103 end:
104 if(send_msg)
105 free(send_msg);
106 if(received_msg)
107 free(received_msg);
108
109 return retval;
110 }
111
112 int put_subbuffer(struct buffer_info *buf)
113 {
114 char *send_msg=NULL;
115 char *received_msg=NULL;
116 char *rep_code=NULL;
117 int retval;
118 int result;
119
120 asprintf(&send_msg, "put_subbuffer %s %ld", buf->name, buf->consumed_old);
121 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
122 if(result < 0 && (errno == ECONNRESET || errno == EPIPE)) {
123 retval = PUT_SUBBUF_DIED;
124 goto end;
125 }
126 else if(result < 0) {
127 ERR("put_subbuffer: send_message failed");
128 retval = -1;
129 goto end;
130 }
131 else if(result == 0) {
132 /* Program seems finished. However this might not be
133 * the last subbuffer that has to be collected.
134 */
135 retval = PUT_SUBBUF_DIED;
136 goto end;
137 }
138
139 result = sscanf(received_msg, "%as", &rep_code);
140 if(result != 1) {
141 ERR("unable to parse response to put_subbuffer");
142 retval = -1;
143 goto end_rep;
144 }
145
146 if(!strcmp(rep_code, "OK")) {
147 DBG("subbuffer put %s", buf->name);
148 retval = PUT_SUBBUF_OK;
149 }
150 else if(!strcmp(received_msg, "NOTFOUND")) {
151 DBG("For buffer %s, the trace was not found. This likely means it was destroyed by the user.", buf->name);
152 /* However, maybe this was not the last subbuffer. So
153 * we return the program died.
154 */
155 retval = PUT_SUBBUF_DIED;
156 goto end_rep;
157 }
158 else {
159 DBG("put_subbuffer: received error, we were pushed");
160 retval = PUT_SUBBUF_PUSHED;
161 goto end_rep;
162 }
163
164 end_rep:
165 if(rep_code)
166 free(rep_code);
167
168 end:
169 if(send_msg)
170 free(send_msg);
171 if(received_msg)
172 free(received_msg);
173
174 return retval;
175 }
176
177 void decrement_active_buffers(void *arg)
178 {
179 struct libustd_instance *instance = arg;
180 pthread_mutex_lock(&instance->mutex);
181 instance->active_buffers--;
182 pthread_mutex_unlock(&instance->mutex);
183 }
184
185 struct buffer_info *connect_buffer(struct libustd_instance *instance, pid_t pid, const char *bufname)
186 {
187 struct buffer_info *buf;
188 char *send_msg;
189 char *received_msg;
190 int result;
191 struct shmid_ds shmds;
192
193 buf = (struct buffer_info *) malloc(sizeof(struct buffer_info));
194 if(buf == NULL) {
195 ERR("add_buffer: insufficient memory");
196 return NULL;
197 }
198
199 buf->name = bufname;
200 buf->pid = pid;
201
202 /* connect to app */
203 result = ustcomm_connect_app(buf->pid, &buf->conn);
204 if(result) {
205 WARN("unable to connect to process, it probably died before we were able to connect");
206 return NULL;
207 }
208
209 /* get pidunique */
210 asprintf(&send_msg, "get_pidunique");
211 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
212 free(send_msg);
213 if(result == -1) {
214 ERR("problem in ustcomm_send_request(get_pidunique)");
215 return NULL;
216 }
217 if(result == 0) {
218 goto error;
219 }
220
221 result = sscanf(received_msg, "%lld", &buf->pidunique);
222 if(result != 1) {
223 ERR("unable to parse response to get_pidunique");
224 return NULL;
225 }
226 free(received_msg);
227 DBG("got pidunique %lld", buf->pidunique);
228
229 /* get shmid */
230 asprintf(&send_msg, "get_shmid %s", buf->name);
231 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
232 free(send_msg);
233 if(result == -1) {
234 ERR("problem in ustcomm_send_request(get_shmid)");
235 return NULL;
236 }
237 if(result == 0) {
238 goto error;
239 }
240
241 result = sscanf(received_msg, "%d %d", &buf->shmid, &buf->bufstruct_shmid);
242 if(result != 2) {
243 ERR("unable to parse response to get_shmid (\"%s\")", received_msg);
244 return NULL;
245 }
246 free(received_msg);
247 DBG("got shmids %d %d", buf->shmid, buf->bufstruct_shmid);
248
249 /* get n_subbufs */
250 asprintf(&send_msg, "get_n_subbufs %s", buf->name);
251 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
252 free(send_msg);
253 if(result == -1) {
254 ERR("problem in ustcomm_send_request(g_n_subbufs)");
255 return NULL;
256 }
257 if(result == 0) {
258 goto error;
259 }
260
261 result = sscanf(received_msg, "%d", &buf->n_subbufs);
262 if(result != 1) {
263 ERR("unable to parse response to get_n_subbufs");
264 return NULL;
265 }
266 free(received_msg);
267 DBG("got n_subbufs %d", buf->n_subbufs);
268
269 /* get subbuf size */
270 asprintf(&send_msg, "get_subbuf_size %s", buf->name);
271 result = ustcomm_send_request(&buf->conn, send_msg, &received_msg);
272 free(send_msg);
273 if(result == -1) {
274 ERR("problem in ustcomm_send_request(get_subbuf_size)");
275 return NULL;
276 }
277 if(result == 0) {
278 goto error;
279 }
280
281 result = sscanf(received_msg, "%d", &buf->subbuf_size);
282 if(result != 1) {
283 ERR("unable to parse response to get_subbuf_size");
284 return NULL;
285 }
286 free(received_msg);
287 DBG("got subbuf_size %d", buf->subbuf_size);
288
289 /* attach memory */
290 buf->mem = shmat(buf->shmid, NULL, 0);
291 if(buf->mem == (void *) 0) {
292 PERROR("shmat");
293 return NULL;
294 }
295 DBG("successfully attached buffer memory");
296
297 buf->bufstruct_mem = shmat(buf->bufstruct_shmid, NULL, 0);
298 if(buf->bufstruct_mem == (void *) 0) {
299 PERROR("shmat");
300 return NULL;
301 }
302 DBG("successfully attached buffer bufstruct memory");
303
304 /* obtain info on the memory segment */
305 result = shmctl(buf->shmid, IPC_STAT, &shmds);
306 if(result == -1) {
307 PERROR("shmctl");
308 return NULL;
309 }
310 buf->memlen = shmds.shm_segsz;
311
312 if(instance->callbacks->on_open_buffer)
313 instance->callbacks->on_open_buffer(instance->callbacks, buf);
314
315 pthread_mutex_lock(&instance->mutex);
316 instance->active_buffers++;
317 pthread_mutex_unlock(&instance->mutex);
318
319 return buf;
320
321 error:
322 free(buf);
323 return NULL;
324 }
325
326 static void destroy_buffer(struct libustd_callbacks *callbacks,
327 struct buffer_info *buf)
328 {
329 int result;
330
331 result = ustcomm_close_app(&buf->conn);
332 if(result == -1) {
333 WARN("problem calling ustcomm_close_app");
334 }
335
336 result = shmdt(buf->mem);
337 if(result == -1) {
338 PERROR("shmdt");
339 }
340
341 result = shmdt(buf->bufstruct_mem);
342 if(result == -1) {
343 PERROR("shmdt");
344 }
345
346 if(callbacks->on_close_buffer)
347 callbacks->on_close_buffer(callbacks, buf);
348
349 free(buf);
350 }
351
352 int consumer_loop(struct libustd_instance *instance, struct buffer_info *buf)
353 {
354 int result;
355
356 pthread_cleanup_push(decrement_active_buffers, instance);
357
358 for(;;) {
359 /* get the subbuffer */
360 result = get_subbuffer(buf);
361 if(result == -1) {
362 ERR("error getting subbuffer");
363 continue;
364 }
365 else if(result == GET_SUBBUF_DONE) {
366 /* this is done */
367 break;
368 }
369 else if(result == GET_SUBBUF_DIED) {
370 finish_consuming_dead_subbuffer(instance->callbacks, buf);
371 break;
372 }
373
374 if(instance->callbacks->on_read_subbuffer)
375 instance->callbacks->on_read_subbuffer(instance->callbacks, buf);
376
377 /* put the subbuffer */
378 result = put_subbuffer(buf);
379 if(result == -1) {
380 ERR("unknown error putting subbuffer (channel=%s)", buf->name);
381 break;
382 }
383 else if(result == PUT_SUBBUF_PUSHED) {
384 ERR("Buffer overflow (channel=%s), reader pushed. This channel will not be usable passed this point.", buf->name);
385 break;
386 }
387 else if(result == PUT_SUBBUF_DIED) {
388 DBG("application died while putting subbuffer");
389 /* Skip the first subbuffer. We are not sure it is trustable
390 * because the put_subbuffer() did not complete.
391 */
392 if(instance->callbacks->on_put_error)
393 instance->callbacks->on_put_error(instance->callbacks, buf);
394
395 finish_consuming_dead_subbuffer(instance->callbacks, buf);
396 break;
397 }
398 else if(result == PUT_SUBBUF_DONE) {
399 /* Done with this subbuffer */
400 /* FIXME: add a case where this branch is used? Upon
401 * normal trace termination, at put_subbuf time, a
402 * special last-subbuffer code could be returned by
403 * the listener.
404 */
405 break;
406 }
407 else if(result == PUT_SUBBUF_OK) {
408 }
409 }
410
411 DBG("thread for buffer %s is stopping", buf->name);
412
413 /* FIXME: destroy, unalloc... */
414
415 pthread_cleanup_pop(1);
416
417 return 0;
418 }
419
420 struct consumer_thread_args {
421 pid_t pid;
422 const char *bufname;
423 struct libustd_instance *instance;
424 };
425
426 void *consumer_thread(void *arg)
427 {
428 struct buffer_info *buf;
429 struct consumer_thread_args *args = (struct consumer_thread_args *) arg;
430 int result;
431 sigset_t sigset;
432
433 DBG("GOT ARGS: pid %d bufname %s", args->pid, args->bufname);
434
435 if(args->instance->callbacks->on_new_thread)
436 args->instance->callbacks->on_new_thread(args->instance->callbacks);
437
438 /* Block signals that should be handled by the main thread. */
439 result = sigemptyset(&sigset);
440 if(result == -1) {
441 PERROR("sigemptyset");
442 goto end;
443 }
444 result = sigaddset(&sigset, SIGTERM);
445 if(result == -1) {
446 PERROR("sigaddset");
447 goto end;
448 }
449 result = sigaddset(&sigset, SIGINT);
450 if(result == -1) {
451 PERROR("sigaddset");
452 goto end;
453 }
454 result = sigprocmask(SIG_BLOCK, &sigset, NULL);
455 if(result == -1) {
456 PERROR("sigprocmask");
457 goto end;
458 }
459
460 buf = connect_buffer(args->instance, args->pid, args->bufname);
461 if(buf == NULL) {
462 ERR("failed to connect to buffer");
463 goto end;
464 }
465
466 consumer_loop(args->instance, buf);
467
468 destroy_buffer(args->instance->callbacks, buf);
469
470 end:
471
472 if(args->instance->callbacks->on_close_thread)
473 args->instance->callbacks->on_close_thread(args->instance->callbacks);
474
475 free((void *)args->bufname);
476 free(args);
477 return NULL;
478 }
479
480 int start_consuming_buffer(
481 struct libustd_instance *instance, pid_t pid, const char *bufname)
482 {
483 pthread_t thr;
484 struct consumer_thread_args *args;
485 int result;
486
487 DBG("beginning of start_consuming_buffer: args: pid %d bufname %s", pid, bufname);
488
489 args = (struct consumer_thread_args *) malloc(sizeof(struct consumer_thread_args));
490
491 args->pid = pid;
492 args->bufname = strdup(bufname);
493 args->instance = instance;
494 DBG("beginning2 of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
495
496 result = pthread_create(&thr, NULL, consumer_thread, args);
497 if(result == -1) {
498 ERR("pthread_create failed");
499 return -1;
500 }
501 result = pthread_detach(thr);
502 if(result == -1) {
503 ERR("pthread_detach failed");
504 return -1;
505 }
506 DBG("end of start_consuming_buffer: args: pid %d bufname %s", args->pid, args->bufname);
507
508 return 0;
509 }
510
511 int libustd_start_instance(struct libustd_instance *instance)
512 {
513 int result;
514 int timeout = -1;
515
516 if(!instance->is_init) {
517 ERR("libustd instance not initialized");
518 return 1;
519 }
520
521 /* app loop */
522 for(;;) {
523 char *recvbuf;
524
525 /* check for requests on our public socket */
526 result = ustcomm_ustd_recv_message(&instance->comm, &recvbuf, NULL, timeout);
527 if(result == -1 && errno == EINTR) {
528 /* Caught signal */
529 }
530 else if(result == -1) {
531 ERR("error in ustcomm_ustd_recv_message");
532 goto loop_end;
533 }
534 else if(result > 0) {
535 if(!strncmp(recvbuf, "collect", 7)) {
536 pid_t pid;
537 char *bufname;
538 int result;
539
540 result = sscanf(recvbuf, "%*s %d %50as", &pid, &bufname);
541 if(result != 2) {
542 ERR("parsing error: %s", recvbuf);
543 goto free_bufname;
544 }
545
546 result = start_consuming_buffer(instance, pid, bufname);
547 if(result < 0) {
548 ERR("error in add_buffer");
549 goto free_bufname;
550 }
551
552 free_bufname:
553 free(bufname);
554 }
555 else if(!strncmp(recvbuf, "exit", 4)) {
556 /* Only there to force poll to return */
557 }
558 else {
559 WARN("unknown command: %s", recvbuf);
560 }
561
562 free(recvbuf);
563 }
564
565 loop_end:
566
567 if(instance->quit_program) {
568 pthread_mutex_lock(&instance->mutex);
569 if(instance->active_buffers == 0) {
570 pthread_mutex_unlock(&instance->mutex);
571 break;
572 }
573 pthread_mutex_unlock(&instance->mutex);
574 timeout = 100;
575 }
576 }
577
578 if(instance->callbacks->on_trace_end)
579 instance->callbacks->on_trace_end(instance);
580
581 libustd_delete_instance(instance);
582
583 return 0;
584 }
585
586 void libustd_delete_instance(struct libustd_instance *instance)
587 {
588 if(instance->is_init)
589 ustcomm_fini_ustd(&instance->comm);
590
591 pthread_mutex_destroy(&instance->mutex);
592 free(instance->sock_path);
593 free(instance);
594 }
595
596 int libustd_stop_instance(struct libustd_instance *instance, int send_msg)
597 {
598 int result;
599 int fd;
600 int bytes = 0;
601
602 char msg[] = "exit";
603
604 instance->quit_program = 1;
605
606 if(!send_msg)
607 return 0;
608
609 /* Send a message through the socket to force poll to return */
610
611 struct sockaddr_un addr;
612
613 result = fd = socket(PF_UNIX, SOCK_STREAM, 0);
614 if(result == -1) {
615 PERROR("socket");
616 return 1;
617 }
618
619 addr.sun_family = AF_UNIX;
620
621 strncpy(addr.sun_path, instance->sock_path, UNIX_PATH_MAX);
622 addr.sun_path[UNIX_PATH_MAX-1] = '\0';
623
624 result = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
625 if(result == -1) {
626 PERROR("connect");
627 }
628
629 while(bytes != sizeof(msg))
630 bytes += send(fd, msg, sizeof(msg), 0);
631
632 close(fd);
633
634 return 0;
635 }
636
637 struct libustd_instance *libustd_new_instance(
638 struct libustd_callbacks *callbacks, char *sock_path)
639 {
640 struct libustd_instance *instance =
641 malloc(sizeof(struct libustd_instance));
642 if(!instance)
643 return NULL;
644
645 instance->callbacks = callbacks;
646 instance->quit_program = 0;
647 instance->is_init = 0;
648 instance->active_buffers = 0;
649 pthread_mutex_init(&instance->mutex, NULL);
650
651 if(sock_path)
652 instance->sock_path = strdup(sock_path);
653 else
654 instance->sock_path = NULL;
655
656 return instance;
657 }
658
659 int libustd_init_instance(struct libustd_instance *instance)
660 {
661 int result;
662 result = ustcomm_init_ustd(&instance->comm, instance->sock_path);
663 if(result == -1) {
664 ERR("failed to initialize socket");
665 return 1;
666 }
667 instance->is_init = 1;
668 return 0;
669 }
670
This page took 0.059975 seconds and 4 git commands to generate.