remove unneeded dir attic/
[ltt-control.git] / trunk / ltt-control / lttd / lttd.c
CommitLineData
617de8e1 1/* lttd
2 *
3 * Linux Trace Toolkit Daemon
4 *
c2ffa20f 5 * This is a simple daemon that reads a few relay+debugfs channels and save
6 * them in a trace.
617de8e1 7 *
31482529 8 * CPU hot-plugging is supported using inotify.
617de8e1 9 *
10 * Copyright 2005 -
11 * Mathieu Desnoyers <mathieu.desnoyers@polymtl.ca>
12 */
13
0bb647f5 14#ifdef HAVE_CONFIG_H
469206ed 15#include <config.h>
0bb647f5 16#endif
17
e54e1d5d 18#define _REENTRANT
0bb647f5 19#define _GNU_SOURCE
e54e1d5d 20#include <features.h>
617de8e1 21#include <stdio.h>
1d483eea 22#include <unistd.h>
617de8e1 23#include <errno.h>
24#include <sys/types.h>
25#include <sys/stat.h>
617de8e1 26#include <stdlib.h>
27#include <dirent.h>
28#include <string.h>
90ccaa9a 29#include <fcntl.h>
30#include <sys/poll.h>
1d483eea 31#include <sys/mman.h>
32#include <signal.h>
e54e1d5d 33#include <pthread.h>
357915bb 34#include <sys/syscall.h>
35#include <unistd.h>
36#include <asm/ioctls.h>
37
38#include <linux/version.h>
1d483eea 39
40/* Relayfs IOCTL */
41#include <asm/ioctl.h>
42#include <asm/types.h>
43
44/* Get the next sub buffer that can be read. */
766632ac 45#define RELAY_GET_SUBBUF _IOR(0xF5, 0x00,__u32)
1d483eea 46/* Release the oldest reserved (by "get") sub buffer. */
766632ac 47#define RELAY_PUT_SUBBUF _IOW(0xF5, 0x01,__u32)
1d483eea 48/* returns the number of sub buffers in the per cpu channel. */
766632ac 49#define RELAY_GET_N_SUBBUFS _IOR(0xF5, 0x02,__u32)
1d483eea 50/* returns the size of the sub buffers. */
766632ac 51#define RELAY_GET_SUBBUF_SIZE _IOR(0xF5, 0x03,__u32)
1d483eea 52
357915bb 53#if LINUX_VERSION_CODE >= KERNEL_VERSION(2,6,14)
7967f7c3 54#include <sys/inotify.h>
8d91577f 55#if 0 /* should now be provided by libc. */
357915bb 56/* From the inotify-tools 2.6 package */
57static inline int inotify_init (void)
58{
59 return syscall (__NR_inotify_init);
60}
61
62static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
63{
64 return syscall (__NR_inotify_add_watch, fd, name, mask);
65}
66
67static inline int inotify_rm_watch (int fd, __u32 wd)
68{
69 return syscall (__NR_inotify_rm_watch, fd, wd);
70}
8d91577f 71#endif //0
357915bb 72#define HAS_INOTIFY
73#else
74static inline int inotify_init (void)
75{
76 return -1;
77}
1d483eea 78
357915bb 79static inline int inotify_add_watch (int fd, const char *name, __u32 mask)
80{
81 return 0;
82}
83
84static inline int inotify_rm_watch (int fd, __u32 wd)
85{
86 return 0;
87}
88#undef HAS_INOTIFY
89#endif
617de8e1 90
91enum {
92 GET_SUBBUF,
93 PUT_SUBBUF,
94 GET_N_BUBBUFS,
95 GET_SUBBUF_SIZE
96};
97
98struct fd_pair {
99 int channel;
100 int trace;
1d483eea 101 unsigned int n_subbufs;
102 unsigned int subbuf_size;
103 void *mmap;
5ffb77aa 104 pthread_mutex_t mutex;
617de8e1 105};
106
107struct channel_trace_fd {
108 struct fd_pair *pair;
109 int num_pairs;
110};
111
357915bb 112struct inotify_watch {
113 int wd;
114 char path_channel[PATH_MAX];
115 char path_trace[PATH_MAX];
116};
117
118struct inotify_watch_array {
119 struct inotify_watch *elem;
120 int num;
121};
122
f01152ea 123static __thread int thread_pipe[2];
31482529 124
125struct channel_trace_fd fd_pairs = { NULL, 0 };
126int inotify_fd = -1;
127struct inotify_watch_array inotify_watch_array = { NULL, 0 };
128
129/* protects fd_pairs and inotify_watch_array */
130pthread_rwlock_t fd_pairs_lock = PTHREAD_RWLOCK_INITIALIZER;
131
132
89565b43 133static char *trace_name = NULL;
134static char *channel_name = NULL;
135static int daemon_mode = 0;
136static int append_mode = 0;
137static unsigned long num_threads = 1;
1d483eea 138volatile static int quit_program = 0; /* For signal handler */
89565b43 139static int dump_flight_only = 0;
140static int dump_normal_only = 0;
083518b7 141static int verbose_mode = 0;
142
143#define printf_verbose(fmt, args...) \
144 do { \
145 if (verbose_mode) \
146 printf(fmt, ##args); \
147 } while (0)
617de8e1 148
149/* Args :
150 *
151 * -t directory Directory name of the trace to write to. Will be created.
c2ffa20f 152 * -c directory Root directory of the debugfs trace channels.
617de8e1 153 * -d Run in background (daemon).
083518b7 154 * -a Trace append mode.
155 * -s Send SIGUSR1 to parent when ready for IO.
617de8e1 156 */
157void show_arguments(void)
158{
159 printf("Please use the following arguments :\n");
160 printf("\n");
161 printf("-t directory Directory name of the trace to write to.\n"
162 " It will be created.\n");
c2ffa20f 163 printf("-c directory Root directory of the debugfs trace channels.\n");
617de8e1 164 printf("-d Run in background (daemon).\n");
90ccaa9a 165 printf("-a Append to an possibly existing trace.\n");
5ffb77aa 166 printf("-N Number of threads to start.\n");
89565b43 167 printf("-f Dump only flight recorder channels.\n");
168 printf("-n Dump only normal channels.\n");
083518b7 169 printf("-v Verbose mode.\n");
617de8e1 170 printf("\n");
171}
172
173
174/* parse_arguments
175 *
176 * Parses the command line arguments.
177 *
178 * Returns 1 if the arguments were correct, but doesn't ask for program
179 * continuation. Returns -1 if the arguments are incorrect, or 0 if OK.
180 */
181int parse_arguments(int argc, char **argv)
182{
183 int ret = 0;
184 int argn = 1;
185
186 if(argc == 2) {
187 if(strcmp(argv[1], "-h") == 0) {
188 return 1;
189 }
190 }
191
90ccaa9a 192 while(argn < argc) {
617de8e1 193
194 switch(argv[argn][0]) {
195 case '-':
196 switch(argv[argn][1]) {
197 case 't':
90ccaa9a 198 if(argn+1 < argc) {
199 trace_name = argv[argn+1];
200 argn++;
201 }
617de8e1 202 break;
203 case 'c':
90ccaa9a 204 if(argn+1 < argc) {
205 channel_name = argv[argn+1];
206 argn++;
207 }
617de8e1 208 break;
209 case 'd':
210 daemon_mode = 1;
211 break;
90ccaa9a 212 case 'a':
213 append_mode = 1;
214 break;
5ffb77aa 215 case 'N':
e54e1d5d 216 if(argn+1 < argc) {
217 num_threads = strtoul(argv[argn+1], NULL, 0);
218 argn++;
219 }
220 break;
89565b43 221 case 'f':
222 dump_flight_only = 1;
223 break;
224 case 'n':
225 dump_normal_only = 1;
226 break;
083518b7 227 case 'v':
228 verbose_mode = 1;
229 break;
617de8e1 230 default:
231 printf("Invalid argument '%s'.\n", argv[argn]);
232 printf("\n");
233 ret = -1;
234 }
235 break;
236 default:
237 printf("Invalid argument '%s'.\n", argv[argn]);
238 printf("\n");
239 ret = -1;
240 }
241 argn++;
242 }
243
244 if(trace_name == NULL) {
245 printf("Please specify a trace name.\n");
246 printf("\n");
247 ret = -1;
248 }
249
250 if(channel_name == NULL) {
251 printf("Please specify a channel name.\n");
252 printf("\n");
253 ret = -1;
254 }
255
256 return ret;
257}
258
259void show_info(void)
260{
15061ecb 261 printf("Linux Trace Toolkit Trace Daemon " VERSION "\n");
617de8e1 262 printf("\n");
c2ffa20f 263 printf("Reading from debugfs directory : %s\n", channel_name);
617de8e1 264 printf("Writing to trace directory : %s\n", trace_name);
265 printf("\n");
266}
267
268
1d483eea 269/* signal handling */
270
271static void handler(int signo)
272{
273 printf("Signal %d received : exiting cleanly\n", signo);
274 quit_program = 1;
275}
276
277
357915bb 278int open_buffer_file(char *filename, char *path_channel, char *path_trace,
279 struct channel_trace_fd *fd_pairs)
280{
281 int open_ret = 0;
282 int ret = 0;
283 struct stat stat_buf;
284
285 if(strncmp(filename, "flight-", sizeof("flight-")-1) != 0) {
286 if(dump_flight_only) {
083518b7 287 printf_verbose("Skipping normal channel %s\n",
288 path_channel);
357915bb 289 return 0;
290 }
291 } else {
292 if(dump_normal_only) {
083518b7 293 printf_verbose("Skipping flight channel %s\n",
294 path_channel);
357915bb 295 return 0;
296 }
297 }
083518b7 298 printf_verbose("Opening file.\n");
357915bb 299
300 fd_pairs->pair = realloc(fd_pairs->pair,
301 ++fd_pairs->num_pairs * sizeof(struct fd_pair));
302
303 /* Open the channel in read mode */
304 fd_pairs->pair[fd_pairs->num_pairs-1].channel =
305 open(path_channel, O_RDONLY | O_NONBLOCK);
306 if(fd_pairs->pair[fd_pairs->num_pairs-1].channel == -1) {
307 perror(path_channel);
308 fd_pairs->num_pairs--;
309 return 0; /* continue */
310 }
311 /* Open the trace in write mode, only append if append_mode */
312 ret = stat(path_trace, &stat_buf);
313 if(ret == 0) {
314 if(append_mode) {
083518b7 315 printf_verbose("Appending to file %s as requested\n",
316 path_trace);
357915bb 317
318 fd_pairs->pair[fd_pairs->num_pairs-1].trace =
42e99028 319 open(path_trace, O_WRONLY,
357915bb 320 S_IRWXU|S_IRWXG|S_IRWXO);
357915bb 321 if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
322 perror(path_trace);
323 }
42e99028 324 ret = lseek(fd_pairs->pair[fd_pairs->num_pairs-1].trace,
325 0, SEEK_END);
326 if (ret < 0) {
327 perror(path_trace);
328 }
357915bb 329 } else {
330 printf("File %s exists, cannot open. Try append mode.\n", path_trace);
331 open_ret = -1;
332 goto end;
333 }
334 } else {
335 if(errno == ENOENT) {
336 fd_pairs->pair[fd_pairs->num_pairs-1].trace =
337 open(path_trace, O_WRONLY|O_CREAT|O_EXCL,
338 S_IRWXU|S_IRWXG|S_IRWXO);
339 if(fd_pairs->pair[fd_pairs->num_pairs-1].trace == -1) {
340 perror(path_trace);
341 }
342 }
343 }
344end:
345 return open_ret;
346}
1d483eea 347
617de8e1 348int open_channel_trace_pairs(char *subchannel_name, char *subtrace_name,
357915bb 349 struct channel_trace_fd *fd_pairs, int *inotify_fd,
350 struct inotify_watch_array *iwatch_array)
617de8e1 351{
352 DIR *channel_dir = opendir(subchannel_name);
353 struct dirent *entry;
354 struct stat stat_buf;
355 int ret;
356 char path_channel[PATH_MAX];
357 int path_channel_len;
358 char *path_channel_ptr;
359 char path_trace[PATH_MAX];
360 int path_trace_len;
361 char *path_trace_ptr;
002f91bb 362 int open_ret = 0;
617de8e1 363
364 if(channel_dir == NULL) {
365 perror(subchannel_name);
d304b1dd 366 open_ret = ENOENT;
002f91bb 367 goto end;
617de8e1 368 }
369
083518b7 370 printf_verbose("Creating trace subdirectory %s\n", subtrace_name);
617de8e1 371 ret = mkdir(subtrace_name, S_IRWXU|S_IRWXG|S_IRWXO);
372 if(ret == -1) {
b1e3e7c7 373 if(errno != EEXIST) {
90ccaa9a 374 perror(subtrace_name);
002f91bb 375 open_ret = -1;
d304b1dd 376 goto end;
90ccaa9a 377 }
617de8e1 378 }
379
380 strncpy(path_channel, subchannel_name, PATH_MAX-1);
381 path_channel_len = strlen(path_channel);
382 path_channel[path_channel_len] = '/';
383 path_channel_len++;
384 path_channel_ptr = path_channel + path_channel_len;
385
386 strncpy(path_trace, subtrace_name, PATH_MAX-1);
387 path_trace_len = strlen(path_trace);
388 path_trace[path_trace_len] = '/';
389 path_trace_len++;
390 path_trace_ptr = path_trace + path_trace_len;
391
357915bb 392#ifdef HAS_INOTIFY
393 iwatch_array->elem = realloc(iwatch_array->elem,
394 ++iwatch_array->num * sizeof(struct inotify_watch));
395
083518b7 396 printf_verbose("Adding inotify for channel %s\n", path_channel);
357915bb 397 iwatch_array->elem[iwatch_array->num-1].wd = inotify_add_watch(*inotify_fd, path_channel, IN_CREATE);
398 strcpy(iwatch_array->elem[iwatch_array->num-1].path_channel, path_channel);
399 strcpy(iwatch_array->elem[iwatch_array->num-1].path_trace, path_trace);
083518b7 400 printf_verbose("Added inotify for channel %s, wd %u\n",
401 iwatch_array->elem[iwatch_array->num-1].path_channel,
357915bb 402 iwatch_array->elem[iwatch_array->num-1].wd);
403#endif
404
617de8e1 405 while((entry = readdir(channel_dir)) != NULL) {
406
407 if(entry->d_name[0] == '.') continue;
408
409 strncpy(path_channel_ptr, entry->d_name, PATH_MAX - path_channel_len);
410 strncpy(path_trace_ptr, entry->d_name, PATH_MAX - path_trace_len);
411
412 ret = stat(path_channel, &stat_buf);
413 if(ret == -1) {
414 perror(path_channel);
415 continue;
416 }
417
083518b7 418 printf_verbose("Channel file : %s\n", path_channel);
617de8e1 419
420 if(S_ISDIR(stat_buf.st_mode)) {
421
083518b7 422 printf_verbose("Entering channel subdirectory...\n");
357915bb 423 ret = open_channel_trace_pairs(path_channel, path_trace, fd_pairs,
424 inotify_fd, iwatch_array);
617de8e1 425 if(ret < 0) continue;
90ccaa9a 426 } else if(S_ISREG(stat_buf.st_mode)) {
357915bb 427 open_ret = open_buffer_file(entry->d_name, path_channel, path_trace,
428 fd_pairs);
429 if(open_ret)
430 goto end;
617de8e1 431 }
617de8e1 432 }
433
d304b1dd 434end:
617de8e1 435 closedir(channel_dir);
436
d304b1dd 437 return open_ret;
617de8e1 438}
439
1d483eea 440
441int read_subbuffer(struct fd_pair *pair)
442{
f01152ea 443 unsigned int consumed_old;
444 int err;
445 long ret;
4e1c69e6 446 unsigned long len;
447 off_t offset;
1d483eea 448
449
f01152ea 450 err = ioctl(pair->channel, RELAY_GET_SUBBUF, &consumed_old);
083518b7 451 printf_verbose("cookie : %u\n", consumed_old);
469206ed 452 if(err != 0) {
5ffb77aa 453 ret = errno;
30478a4d 454 perror("Reserving sub buffer failed (everything is normal, it is due to concurrency)");
469206ed 455 goto get_error;
1d483eea 456 }
f01152ea 457#if 0
469206ed 458 err = TEMP_FAILURE_RETRY(write(pair->trace,
e4bed64a 459 pair->mmap
460 + (consumed_old & ((pair->n_subbufs * pair->subbuf_size)-1)),
1d483eea 461 pair->subbuf_size));
469206ed 462
463 if(err < 0) {
5ffb77aa 464 ret = errno;
1d483eea 465 perror("Error in writing to file");
469206ed 466 goto write_error;
1d483eea 467 }
f01152ea 468#endif //0
469 len = pair->subbuf_size;
470 offset = 0;
471 while (len > 0) {
4e1c69e6 472 printf_verbose("splice chan to pipe offset %lu\n",
473 (unsigned long)offset);
f01152ea 474 ret = splice(pair->channel, &offset, thread_pipe[1], NULL,
475 len, SPLICE_F_MOVE);
083518b7 476 printf_verbose("splice chan to pipe ret %ld\n", ret);
f01152ea 477 if (ret < 0) {
478 perror("Error in relay splice");
479 goto write_error;
480 }
481 ret = splice(thread_pipe[0], NULL, pair->trace, NULL,
482 ret, SPLICE_F_MOVE);
083518b7 483 printf_verbose("splice pipe to file %ld\n", ret);
f01152ea 484 if (ret < 0) {
485 perror("Error in file splice");
486 goto write_error;
487 }
488 len -= ret;
489 }
490
a7eb8aa2 491#if 0
492 err = fsync(pair->trace);
493 if(err < 0) {
494 ret = errno;
495 perror("Error in writing to file");
496 goto write_error;
497 }
498#endif //0
469206ed 499write_error:
f01152ea 500 ret = 0;
c2ffa20f 501 err = ioctl(pair->channel, RELAY_PUT_SUBBUF, &consumed_old);
469206ed 502 if(err != 0) {
5ffb77aa 503 ret = errno;
30478a4d 504 if(errno == EFAULT) {
5ffb77aa 505 perror("Error in unreserving sub buffer\n");
30478a4d 506 } else if(errno == EIO) {
4f31148b 507 perror("Reader has been pushed by the writer, last subbuffer corrupted.");
ec8cce5a 508 /* FIXME : we may delete the last written buffer if we wish. */
4f31148b 509 }
469206ed 510 goto get_error;
1d483eea 511 }
512
469206ed 513get_error:
514 return ret;
1d483eea 515}
516
517
357915bb 518int map_channels(struct channel_trace_fd *fd_pairs,
519 int idx_begin, int idx_end)
617de8e1 520{
1d483eea 521 int i,j;
e54e1d5d 522 int ret=0;
1d483eea 523
469206ed 524 if(fd_pairs->num_pairs <= 0) {
525 printf("No channel to read\n");
526 goto end;
527 }
528
1d483eea 529 /* Get the subbuf sizes and number */
530
357915bb 531 for(i=idx_begin;i<idx_end;i++) {
1d483eea 532 struct fd_pair *pair = &fd_pairs->pair[i];
90ccaa9a 533
c2ffa20f 534 ret = ioctl(pair->channel, RELAY_GET_N_SUBBUFS,
1d483eea 535 &pair->n_subbufs);
536 if(ret != 0) {
537 perror("Error in getting the number of subbuffers");
538 goto end;
539 }
c2ffa20f 540 ret = ioctl(pair->channel, RELAY_GET_SUBBUF_SIZE,
1d483eea 541 &pair->subbuf_size);
542 if(ret != 0) {
543 perror("Error in getting the size of the subbuffers");
544 goto end;
545 }
5ffb77aa 546 ret = pthread_mutex_init(&pair->mutex, NULL); /* Fast mutex */
547 if(ret != 0) {
548 perror("Error in mutex init");
549 goto end;
550 }
1d483eea 551 }
552
f01152ea 553#if 0
1d483eea 554 /* Mmap each FD */
357915bb 555 for(i=idx_begin;i<idx_end;i++) {
1d483eea 556 struct fd_pair *pair = &fd_pairs->pair[i];
557
558 pair->mmap = mmap(0, pair->subbuf_size * pair->n_subbufs, PROT_READ,
559 MAP_SHARED, pair->channel, 0);
560 if(pair->mmap == MAP_FAILED) {
561 perror("Mmap error");
562 goto munmap;
563 }
564 }
565
5ffb77aa 566 goto end; /* success */
1d483eea 567
e54e1d5d 568 /* Error handling */
569 /* munmap only the successfully mmapped indexes */
570munmap:
571 /* Munmap each FD */
357915bb 572 for(j=idx_begin;j<i;j++) {
e54e1d5d 573 struct fd_pair *pair = &fd_pairs->pair[j];
574 int err_ret;
575
576 err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
577 if(err_ret != 0) {
578 perror("Error in munmap");
579 }
580 ret |= err_ret;
581 }
582
f01152ea 583#endif //0
e54e1d5d 584end:
585 return ret;
e54e1d5d 586}
587
e54e1d5d 588int unmap_channels(struct channel_trace_fd *fd_pairs)
589{
590 int j;
591 int ret=0;
592
593 /* Munmap each FD */
594 for(j=0;j<fd_pairs->num_pairs;j++) {
595 struct fd_pair *pair = &fd_pairs->pair[j];
596 int err_ret;
597
f01152ea 598#if 0
e54e1d5d 599 err_ret = munmap(pair->mmap, pair->subbuf_size * pair->n_subbufs);
600 if(err_ret != 0) {
601 perror("Error in munmap");
602 }
603 ret |= err_ret;
f01152ea 604#endif //0
5ffb77aa 605 err_ret = pthread_mutex_destroy(&pair->mutex);
606 if(err_ret != 0) {
607 perror("Error in mutex destroy");
608 }
609 ret |= err_ret;
e54e1d5d 610 }
611
612 return ret;
613}
614
357915bb 615#ifdef HAS_INOTIFY
616/* Inotify event arrived.
617 *
618 * Only support add file for now.
619 */
620
621int read_inotify(int inotify_fd,
622 struct channel_trace_fd *fd_pairs,
623 struct inotify_watch_array *iwatch_array)
624{
625 char buf[sizeof(struct inotify_event) + PATH_MAX];
626 char path_channel[PATH_MAX];
627 char path_trace[PATH_MAX];
628 ssize_t len;
629 struct inotify_event *ievent;
630 size_t offset;
631 unsigned int i;
632 int ret;
633 int old_num;
634
635 offset = 0;
636 len = read(inotify_fd, buf, sizeof(struct inotify_event) + PATH_MAX);
637 if(len < 0) {
31482529 638
639 if(errno == EAGAIN)
640 return 0; /* another thread got the data before us */
641
357915bb 642 printf("Error in read from inotify FD %s.\n", strerror(len));
643 return -1;
644 }
645 while(offset < len) {
646 ievent = (struct inotify_event *)&(buf[offset]);
647 for(i=0; i<iwatch_array->num; i++) {
648 if(iwatch_array->elem[i].wd == ievent->wd &&
649 ievent->mask == IN_CREATE) {
083518b7 650 printf_verbose(
651 "inotify wd %u event mask : %u for %s%s\n",
357915bb 652 ievent->wd, ievent->mask,
083518b7 653 iwatch_array->elem[i].path_channel,
654 ievent->name);
357915bb 655 old_num = fd_pairs->num_pairs;
656 strcpy(path_channel, iwatch_array->elem[i].path_channel);
657 strcat(path_channel, ievent->name);
658 strcpy(path_trace, iwatch_array->elem[i].path_trace);
659 strcat(path_trace, ievent->name);
660 if(ret = open_buffer_file(ievent->name, path_channel,
661 path_trace, fd_pairs)) {
662 printf("Error opening buffer file\n");
663 return -1;
664 }
665 if(ret = map_channels(fd_pairs, old_num, fd_pairs->num_pairs)) {
666 printf("Error mapping channel\n");
667 return -1;
668 }
669
670 }
671 }
672 offset += sizeof(*ievent) + ievent->len;
673 }
674}
675#endif //HAS_INOTIFY
e54e1d5d 676
677/* read_channels
5ffb77aa 678 *
679 * Thread worker.
e54e1d5d 680 *
c2ffa20f 681 * Read the debugfs channels and write them in the paired tracefiles.
e54e1d5d 682 *
683 * @fd_pairs : paired channels and trace files.
684 *
357915bb 685 * returns 0 on success, -1 on error.
e54e1d5d 686 *
687 * Note that the high priority polled channels are consumed first. We then poll
688 * again to see if these channels are still in priority. Only when no
689 * high priority channel is left, we start reading low priority channels.
690 *
691 * Note that a channel is considered high priority when the buffer is almost
692 * full.
693 */
694
31482529 695int read_channels(unsigned long thread_num, struct channel_trace_fd *fd_pairs,
357915bb 696 int inotify_fd, struct inotify_watch_array *iwatch_array)
e54e1d5d 697{
357915bb 698 struct pollfd *pollfd = NULL;
31482529 699 int num_pollfd;
e54e1d5d 700 int i,j;
701 int num_rdy, num_hup;
702 int high_prio;
5ffb77aa 703 int ret = 0;
357915bb 704 int inotify_fds;
705 unsigned int old_num;
e54e1d5d 706
357915bb 707#ifdef HAS_INOTIFY
708 inotify_fds = 1;
709#else
710 inotify_fds = 0;
711#endif
712
31482529 713 pthread_rwlock_rdlock(&fd_pairs_lock);
714
357915bb 715 /* Start polling the FD. Keep one fd for inotify */
716 pollfd = malloc((inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
717
718#ifdef HAS_INOTIFY
719 pollfd[0].fd = inotify_fd;
720 pollfd[0].events = POLLIN|POLLPRI;
721#endif
90ccaa9a 722
723 for(i=0;i<fd_pairs->num_pairs;i++) {
357915bb 724 pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
725 pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
90ccaa9a 726 }
31482529 727 num_pollfd = inotify_fds + fd_pairs->num_pairs;
728
729
730 pthread_rwlock_unlock(&fd_pairs_lock);
731
90ccaa9a 732 while(1) {
4f45ea55 733 high_prio = 0;
1d483eea 734 num_hup = 0;
735#ifdef DEBUG
736 printf("Press a key for next poll...\n");
737 char buf[1];
738 read(STDIN_FILENO, &buf, 1);
31482529 739 printf("Next poll (polling %d fd) :\n", num_pollfd);
1d483eea 740#endif //DEBUG
357915bb 741
1d483eea 742 /* Have we received a signal ? */
743 if(quit_program) break;
744
31482529 745 num_rdy = poll(pollfd, num_pollfd, -1);
746
90ccaa9a 747 if(num_rdy == -1) {
748 perror("Poll error");
1d483eea 749 goto free_fd;
90ccaa9a 750 }
751
083518b7 752 printf_verbose("Data received\n");
357915bb 753#ifdef HAS_INOTIFY
754 switch(pollfd[0].revents) {
755 case POLLERR:
083518b7 756 printf_verbose(
757 "Error returned in polling inotify fd %d.\n",
758 pollfd[0].fd);
357915bb 759 break;
760 case POLLHUP:
083518b7 761 printf_verbose(
762 "Polling inotify fd %d tells it has hung up.\n",
763 pollfd[0].fd);
357915bb 764 break;
765 case POLLNVAL:
083518b7 766 printf_verbose(
767 "Polling inotify fd %d tells fd is not open.\n",
768 pollfd[0].fd);
357915bb 769 break;
770 case POLLPRI:
771 case POLLIN:
083518b7 772 printf_verbose(
773 "Polling inotify fd %d : data ready.\n",
774 pollfd[0].fd);
31482529 775
776 pthread_rwlock_wrlock(&fd_pairs_lock);
357915bb 777 read_inotify(inotify_fd, fd_pairs, iwatch_array);
31482529 778 pthread_rwlock_unlock(&fd_pairs_lock);
779
357915bb 780 break;
781 }
782#endif
90ccaa9a 783
31482529 784 for(i=inotify_fds;i<num_pollfd;i++) {
90ccaa9a 785 switch(pollfd[i].revents) {
786 case POLLERR:
083518b7 787 printf_verbose(
788 "Error returned in polling fd %d.\n",
789 pollfd[i].fd);
1d483eea 790 num_hup++;
90ccaa9a 791 break;
792 case POLLHUP:
083518b7 793 printf_verbose(
794 "Polling fd %d tells it has hung up.\n",
795 pollfd[i].fd);
1d483eea 796 num_hup++;
90ccaa9a 797 break;
798 case POLLNVAL:
083518b7 799 printf_verbose(
800 "Polling fd %d tells fd is not open.\n",
801 pollfd[i].fd);
1d483eea 802 num_hup++;
90ccaa9a 803 break;
804 case POLLPRI:
31482529 805 pthread_rwlock_rdlock(&fd_pairs_lock);
357915bb 806 if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
083518b7 807 printf_verbose(
808 "Urgent read on fd %d\n",
809 pollfd[i].fd);
5ffb77aa 810 /* Take care of high priority channels first. */
811 high_prio = 1;
812 /* it's ok to have an unavailable subbuffer */
357915bb 813 ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
30478a4d 814 if(ret == EAGAIN) ret = 0;
cdad9787 815
357915bb 816 ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
5ffb77aa 817 if(ret)
818 printf("Error in mutex unlock : %s\n", strerror(ret));
819 }
31482529 820 pthread_rwlock_unlock(&fd_pairs_lock);
90ccaa9a 821 break;
4f45ea55 822 }
90ccaa9a 823 }
357915bb 824 /* If every buffer FD has hung up, we end the read loop here */
31482529 825 if(num_hup == num_pollfd - inotify_fds) break;
90ccaa9a 826
4f45ea55 827 if(!high_prio) {
31482529 828 for(i=inotify_fds;i<num_pollfd;i++) {
4f45ea55 829 switch(pollfd[i].revents) {
830 case POLLIN:
31482529 831 pthread_rwlock_rdlock(&fd_pairs_lock);
357915bb 832 if(pthread_mutex_trylock(&fd_pairs->pair[i-inotify_fds].mutex) == 0) {
5ffb77aa 833 /* Take care of low priority channels. */
083518b7 834 printf_verbose(
835 "Normal read on fd %d\n",
836 pollfd[i].fd);
5ffb77aa 837 /* it's ok to have an unavailable subbuffer */
357915bb 838 ret = read_subbuffer(&fd_pairs->pair[i-inotify_fds]);
30478a4d 839 if(ret == EAGAIN) ret = 0;
cdad9787 840
357915bb 841 ret = pthread_mutex_unlock(&fd_pairs->pair[i-inotify_fds].mutex);
5ffb77aa 842 if(ret)
843 printf("Error in mutex unlock : %s\n", strerror(ret));
844 }
31482529 845 pthread_rwlock_unlock(&fd_pairs_lock);
4f45ea55 846 break;
847 }
848 }
90ccaa9a 849 }
850
31482529 851 /* Update pollfd array if an entry was added to fd_pairs */
852 pthread_rwlock_rdlock(&fd_pairs_lock);
853 if((inotify_fds + fd_pairs->num_pairs) != num_pollfd) {
854 pollfd = realloc(pollfd,
855 (inotify_fds + fd_pairs->num_pairs) * sizeof(struct pollfd));
856 for(i=num_pollfd-inotify_fds;i<fd_pairs->num_pairs;i++) {
857 pollfd[inotify_fds+i].fd = fd_pairs->pair[i].channel;
858 pollfd[inotify_fds+i].events = POLLIN|POLLPRI;
859 }
860 num_pollfd = fd_pairs->num_pairs + inotify_fds;
861 }
862 pthread_rwlock_unlock(&fd_pairs_lock);
863
864 /* NB: If the fd_pairs structure is updated by another thread from this
865 * point forward, the current thread will wait in the poll without
866 * monitoring the new channel. However, this thread will add the
867 * new channel on next poll (and this should not take too much time
868 * on a loaded system).
869 *
870 * This event is quite unlikely and can only occur if a CPU is
871 * hot-plugged while multple lttd threads are running.
872 */
90ccaa9a 873 }
874
1d483eea 875free_fd:
90ccaa9a 876 free(pollfd);
877
1d483eea 878end:
357915bb 879 return ret;
617de8e1 880}
881
882
357915bb 883void close_channel_trace_pairs(struct channel_trace_fd *fd_pairs, int inotify_fd,
884 struct inotify_watch_array *iwatch_array)
617de8e1 885{
90ccaa9a 886 int i;
887 int ret;
617de8e1 888
90ccaa9a 889 for(i=0;i<fd_pairs->num_pairs;i++) {
890 ret = close(fd_pairs->pair[i].channel);
891 if(ret == -1) perror("Close error on channel");
892 ret = close(fd_pairs->pair[i].trace);
893 if(ret == -1) perror("Close error on trace");
894 }
895 free(fd_pairs->pair);
357915bb 896 free(iwatch_array->elem);
897}
898
899/* Thread worker */
900void * thread_main(void *arg)
901{
31482529 902 long ret;
903 unsigned long thread_num = (unsigned long)arg;
904
f01152ea 905 ret = pipe(thread_pipe);
906 if (ret < 0) {
907 perror("Error creating pipe");
908 return (void*)ret;
909 }
31482529 910 ret = read_channels(thread_num, &fd_pairs, inotify_fd, &inotify_watch_array);
f01152ea 911 close(thread_pipe[0]); /* close read end */
912 close(thread_pipe[1]); /* close write end */
31482529 913 return (void*)ret;
914}
915
916
917int channels_init()
918{
357915bb 919 int ret = 0;
357915bb 920
921 inotify_fd = inotify_init();
31482529 922 fcntl(inotify_fd, F_SETFL, O_NONBLOCK);
357915bb 923
924 if(ret = open_channel_trace_pairs(channel_name, trace_name, &fd_pairs,
925 &inotify_fd, &inotify_watch_array))
926 goto close_channel;
c928825d 927 if (fd_pairs.num_pairs == 0) {
928 printf("No channel available for reading, exiting\n");
929 ret = -ENOENT;
930 goto close_channel;
931 }
357915bb 932 if(ret = map_channels(&fd_pairs, 0, fd_pairs.num_pairs))
933 goto close_channel;
31482529 934 return 0;
357915bb 935
936close_channel:
937 close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
938 if(inotify_fd >= 0)
939 close(inotify_fd);
31482529 940 return ret;
617de8e1 941}
942
31482529 943
617de8e1 944int main(int argc, char ** argv)
945{
e54e1d5d 946 int ret = 0;
1d483eea 947 struct sigaction act;
5ffb77aa 948 pthread_t *tids;
31482529 949 unsigned long i;
5ffb77aa 950 void *tret;
617de8e1 951
952 ret = parse_arguments(argc, argv);
953
954 if(ret != 0) show_arguments();
955 if(ret < 0) return EINVAL;
956 if(ret > 0) return 0;
957
958 show_info();
959
1d483eea 960 /* Connect the signal handlers */
961 act.sa_handler = handler;
962 act.sa_flags = 0;
963 sigemptyset(&(act.sa_mask));
964 sigaddset(&(act.sa_mask), SIGTERM);
965 sigaddset(&(act.sa_mask), SIGQUIT);
966 sigaddset(&(act.sa_mask), SIGINT);
967 sigaction(SIGTERM, &act, NULL);
968 sigaction(SIGQUIT, &act, NULL);
969 sigaction(SIGINT, &act, NULL);
970
31482529 971 if(ret = channels_init())
972 return ret;
973
06cb3ad3 974 if(daemon_mode) {
975 ret = daemon(0, 0);
976
977 if(ret == -1) {
978 perror("An error occured while daemonizing.");
979 exit(-1);
980 }
981 }
982
5ffb77aa 983 tids = malloc(sizeof(pthread_t) * num_threads);
984 for(i=0; i<num_threads; i++) {
ae410d24 985
357915bb 986 ret = pthread_create(&tids[i], NULL, thread_main, (void*)i);
5ffb77aa 987 if(ret) {
988 perror("Error creating thread");
989 break;
990 }
991 }
617de8e1 992
5ffb77aa 993 for(i=0; i<num_threads; i++) {
994 ret = pthread_join(tids[i], &tret);
995 if(ret) {
996 perror("Error joining thread");
997 break;
998 }
31482529 999 if((long)tret != 0) {
1000 printf("Error %s occured in thread %u\n",
1001 strerror((long)tret), i);
5ffb77aa 1002 }
1003 }
1004
1005 free(tids);
31482529 1006 ret = unmap_channels(&fd_pairs);
1007 close_channel_trace_pairs(&fd_pairs, inotify_fd, &inotify_watch_array);
1008 if(inotify_fd >= 0)
1009 close(inotify_fd);
5ffb77aa 1010
617de8e1 1011 return ret;
1012}
This page took 0.071128 seconds and 4 git commands to generate.