{
struct msghdr msg;
struct iovec iov[1];
- int ret, i, tmp2;
+ int ret = 0, i, tmp2;
struct cmsghdr *cmsg;
int nb_fd;
- char tmp[CMSG_SPACE(size)];
- struct lttcomm_kconsumerd_msg *buf;
+ char recv_fd[CMSG_SPACE(sizeof(int))];
+ struct lttcomm_kconsumerd_msg lkm;
+
/* the number of fds we are about to receive */
- nb_fd = size/sizeof(struct lttcomm_kconsumerd_msg);
+ nb_fd = size / sizeof(struct lttcomm_kconsumerd_msg);
- buf = malloc(size);
+ for (i = 0; i < nb_fd; i++) {
+ memset(&msg, 0, sizeof(msg));
- memset(&msg, 0, sizeof(msg));
+ /* Prepare to receive the structures */
+ iov[0].iov_base = &lkm;
+ iov[0].iov_len = sizeof(lkm);
+ msg.msg_iov = iov;
+ msg.msg_iovlen = 1;
- /* Prepare to receive the structures */
- iov[0].iov_base = buf;
- iov[0].iov_len = size;
- msg.msg_iov = iov;
- msg.msg_iovlen = 1;
+ msg.msg_control = recv_fd;
+ msg.msg_controllen = sizeof(recv_fd);
- msg.msg_control = tmp;
- msg.msg_controllen = sizeof(tmp);
+ DBG("Waiting to receive fd");
+ if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
+ perror("recvmsg");
+ continue;
+ }
- DBG("Waiting to receive fds");
- if ((ret = recvmsg(sfd, &msg, 0)) < 0) {
- perror("recvmsg");
- }
- if (ret != size) {
- ERR("Received only %d, expected %d", ret, size);
- send_error(KCONSUMERD_ERROR_RECV_FD);
- goto end;
- }
+ if (ret != (size / nb_fd)) {
+ ERR("Received only %d, expected %d", ret, size);
+ send_error(KCONSUMERD_ERROR_RECV_FD);
+ goto end;
+ }
- cmsg = CMSG_FIRSTHDR(&msg);
- if (!cmsg) {
- ERR("Invalid control message header");
- ret = -1;
- send_error(KCONSUMERD_ERROR_RECV_FD);
- goto end;
- }
+ cmsg = CMSG_FIRSTHDR(&msg);
+ if (!cmsg) {
+ ERR("Invalid control message header");
+ ret = -1;
+ send_error(KCONSUMERD_ERROR_RECV_FD);
+ goto end;
+ }
- /* if we received fds */
- if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
- DBG("Receive : expecting %d fds", nb_fd);
- for (i = 0; i < nb_fd; i++) {
+ /* if we received fds */
+ if (cmsg->cmsg_level == SOL_SOCKET && cmsg->cmsg_type == SCM_RIGHTS) {
switch (cmd_type) {
case ADD_STREAM:
- DBG("add_fd %s (%d)", buf[i].path_name, ((int *)CMSG_DATA(cmsg))[i]);
- ret = add_fd(&buf[i], ((int *)CMSG_DATA(cmsg))[i]);
+ DBG("add_fd %s (%d)", lkm.path_name, (CMSG_DATA(cmsg)[0]));
+ ret = add_fd(&lkm, (CMSG_DATA(cmsg)[0]));
if (ret < 0) {
send_error(KCONSUMERD_OUTFD_ERROR);
goto end;
}
break;
case UPDATE_STREAM:
- change_fd_state(buf[i].fd, buf[i].state);
+ change_fd_state(lkm.fd, lkm.state);
break;
default:
break;
}
+ /* flag to tell the polling thread to update its fd array */
+ update_fd_array = 1;
+ /* signal the poll thread */
+ tmp2 = write(poll_pipe[1], "4", 1);
+ } else {
+ ERR("Didn't received any fd");
+ send_error(KCONSUMERD_ERROR_RECV_FD);
+ ret = -1;
+ goto end;
}
- /* flag to tell the polling thread to update its fd array */
- update_fd_array = 1;
- /* signal the poll thread */
- tmp2 = write(poll_pipe[1], "4", 1);
- } else {
- ERR("Didn't received any fd");
- send_error(KCONSUMERD_ERROR_RECV_FD);
- ret = -1;
- goto end;
}
end:
DBG("consumerd_recv_fd thread exiting");
- if (buf != NULL) {
- free(buf);
- buf = NULL;
- }
return ret;
}
}
if (tmp.cmd_type == STOP) {
DBG("Received STOP command");
- quit = 1;
goto end;
}
/* we received a command to add or update fds */
end:
DBG("thread_receive_fds exiting");
+ quit = 1;
+ ret = write(poll_pipe[1], "4", 1);
+ if (ret < 0) {
+ perror("poll pipe write");
+ }
return NULL;
}
goto end;
}
+ /* No FDs and quit, cleanup the thread */
+ if (nb_fd == 0 && quit == 1) {
+ goto end;
+ }
+
/*
* if only the poll_pipe triggered poll to return just return to the
* beginning of the loop to update the array