char command_sock_path[PATH_MAX]; /* Global command socket path */
char error_sock_path[PATH_MAX]; /* Global error path */
+/* the liblttkconsumerd context */
+struct kconsumerd_local_data *ctx;
+
/*
* sighandler
*
return;
}
- kconsumerd_should_exit();
+ kconsumerd_should_exit(ctx);
}
/*
}
}
+/*
+ * read_subbuffer
+ *
+ * Consume data on a file descriptor and write it on a trace file
+ */
+static int read_subbuffer(struct kconsumerd_fd *kconsumerd_fd)
+{
+ unsigned long len;
+ int err;
+ long ret = 0;
+ int infd = kconsumerd_fd->consumerd_fd;
+
+ DBG("In kconsumerd_read_subbuffer (infd : %d)", infd);
+ /* Get the next subbuffer */
+ err = kernctl_get_next_subbuf(infd);
+ if (err != 0) {
+ ret = errno;
+ perror("Reserving sub buffer failed (everything is normal, "
+ "it is due to concurrency)");
+ goto end;
+ }
+
+ switch (DEFAULT_KERNEL_CHANNEL_OUTPUT) {
+ case LTTNG_EVENT_SPLICE:
+ /* read the whole subbuffer */
+ err = kernctl_get_padded_subbuf_size(infd, &len);
+ if (err != 0) {
+ ret = errno;
+ perror("Getting sub-buffer len failed.");
+ goto end;
+ }
+
+ /* splice the subbuffer to the tracefile */
+ ret = kconsumerd_on_read_subbuffer_splice(ctx, kconsumerd_fd, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error splicing to tracefile");
+ }
+ break;
+ case LTTNG_EVENT_MMAP:
+ /* read the used subbuffer size */
+ err = kernctl_get_subbuf_size(infd, &len);
+ if (err != 0) {
+ ret = errno;
+ perror("Getting sub-buffer len failed.");
+ goto end;
+ }
+ /* write the subbuffer to the tracefile */
+ ret = kconsumerd_on_read_subbuffer_mmap(ctx, kconsumerd_fd, len);
+ if (ret < 0) {
+ /*
+ * display the error but continue processing to try
+ * to release the subbuffer
+ */
+ ERR("Error writing to tracefile");
+ }
+ break;
+ default:
+ ERR("Unknown output method");
+ ret = -1;
+ }
+
+ err = kernctl_put_next_subbuf(infd);
+ if (err != 0) {
+ ret = errno;
+ if (errno == EFAULT) {
+ perror("Error in unreserving sub buffer\n");
+ } else if (errno == EIO) {
+ /* Should never happen with newer LTTng versions */
+ perror("Reader has been pushed by the writer, last sub-buffer corrupted.");
+ }
+ goto end;
+ }
+
+end:
+ return ret;
+}
/*
* main
snprintf(command_sock_path, PATH_MAX,
KCONSUMERD_CMD_SOCK_PATH);
}
- kconsumerd_set_command_socket_path(command_sock_path);
+ /* create the pipe to wake to receiving thread when needed */
+ ctx = kconsumerd_create(read_subbuffer);
+ if (ctx == NULL) {
+ goto error;
+ }
+
+ kconsumerd_set_command_socket_path(ctx, command_sock_path);
if (strlen(error_sock_path) == 0) {
snprintf(error_sock_path, PATH_MAX,
KCONSUMERD_ERR_SOCK_PATH);
goto error;
}
- /* create the pipe to wake to receiving thread when needed */
- ret = kconsumerd_init();
- if (ret < 0) {
- goto end;
- }
-
/* Connect to the socket created by ltt-sessiond to report errors */
DBG("Connecting to error socket %s", error_sock_path);
ret = lttcomm_connect_unix_sock(error_sock_path);
if (ret < 0) {
WARN("Cannot connect to error socket, is ltt-sessiond started ?");
}
- kconsumerd_set_error_socket(ret);
+ kconsumerd_set_error_socket(ctx, ret);
/* Create the thread to manage the receive of fd */
ret = pthread_create(&threads[0], NULL, kconsumerd_thread_receive_fds,
- (void *) NULL);
+ (void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
/* Create thread to manage the polling/writing of traces */
ret = pthread_create(&threads[1], NULL, kconsumerd_thread_poll_fds,
- (void *) NULL);
+ (void *) ctx);
if (ret != 0) {
perror("pthread_create");
goto error;
}
}
ret = EXIT_SUCCESS;
- kconsumerd_send_error(KCONSUMERD_EXIT_SUCCESS);
+ kconsumerd_send_error(ctx, KCONSUMERD_EXIT_SUCCESS);
goto end;
error:
ret = EXIT_FAILURE;
- kconsumerd_send_error(KCONSUMERD_EXIT_FAILURE);
+ kconsumerd_send_error(ctx, KCONSUMERD_EXIT_FAILURE);
end:
+ kconsumerd_destroy(ctx);
kconsumerd_cleanup();
return ret;