Fix: Unexpected payload size in cmd_recv_stream_2_11
[lttng-tools.git] / src / common / consumer / consumer.c
index 26fc819f6ee64c6d572ff2012ec1d1ff25922434..301a8bc7ac52d3574c89ff73d5fa975ae75001a6 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
  * Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
  * Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
  *
@@ -8,6 +8,7 @@
  */
 
 #include "common/index/ctf-index.h"
+#include <stdint.h>
 #define _LGPL_SOURCE
 #include <assert.h>
 #include <poll.h>
@@ -3382,9 +3383,12 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
        ssize_t ret, written_bytes = 0;
        int rotation_ret;
        struct stream_subbuffer subbuffer = {};
+       enum get_next_subbuffer_status get_next_status;
 
        if (!locked_by_caller) {
                stream->read_subbuffer_ops.lock(stream);
+       } else {
+               stream->read_subbuffer_ops.assert_locked(stream);
        }
 
        if (stream->read_subbuffer_ops.on_wake_up) {
@@ -3407,14 +3411,20 @@ ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
                }
        }
 
-       ret = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
-       if (ret) {
-               if (ret == -ENODATA) {
-                       /* Not an error. */
-                       ret = 0;
-                       goto sleep_stream;
-               }
+       get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(
+                       stream, &subbuffer);
+       switch (get_next_status) {
+       case GET_NEXT_SUBBUFFER_STATUS_OK:
+               break;
+       case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
+               /* Not an error. */
+               ret = 0;
+               goto sleep_stream;
+       case GET_NEXT_SUBBUFFER_STATUS_ERROR:
+               ret = -1;
                goto end;
+       default:
+               abort();
        }
 
        ret = stream->read_subbuffer_ops.pre_consume_subbuffer(
@@ -3554,18 +3564,22 @@ error:
  * This will create a relayd socket pair and add it to the relayd hash table.
  * The caller MUST acquire a RCU read side lock before calling it.
  */
- void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
-               struct lttng_consumer_local_data *ctx, int sock,
+void consumer_add_relayd_socket(uint64_t net_seq_idx,
+               int sock_type,
+               struct lttng_consumer_local_data *ctx,
+               int sock,
                struct pollfd *consumer_sockpoll,
-               struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
-               uint64_t relayd_session_id)
+               uint64_t sessiond_id,
+               uint64_t relayd_session_id,
+               uint32_t relayd_version_major,
+               uint32_t relayd_version_minor,
+               enum lttcomm_sock_proto relayd_socket_protocol)
 {
        int fd = -1, ret = -1, relayd_created = 0;
        enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
        struct consumer_relayd_sock_pair *relayd = NULL;
 
        assert(ctx);
-       assert(relayd_sock);
 
        DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
 
@@ -3634,54 +3648,25 @@ error:
        switch (sock_type) {
        case LTTNG_STREAM_CONTROL:
                /* Copy received lttcomm socket */
-               lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
-               ret = lttcomm_create_sock(&relayd->control_sock.sock);
-               /* Handle create_sock error. */
-               if (ret < 0) {
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
-                       goto error;
-               }
-               /*
-                * Close the socket created internally by
-                * lttcomm_create_sock, so we can replace it by the one
-                * received from sessiond.
-                */
-               if (close(relayd->control_sock.sock.fd)) {
-                       PERROR("close");
-               }
+               ret = lttcomm_populate_sock_from_open_socket(
+                               &relayd->control_sock.sock, fd,
+                               relayd_socket_protocol);
 
-               /* Assign new file descriptor */
-               relayd->control_sock.sock.fd = fd;
                /* Assign version values. */
-               relayd->control_sock.major = relayd_sock->major;
-               relayd->control_sock.minor = relayd_sock->minor;
+               relayd->control_sock.major = relayd_version_major;
+               relayd->control_sock.minor = relayd_version_minor;
 
                relayd->relayd_session_id = relayd_session_id;
 
                break;
        case LTTNG_STREAM_DATA:
                /* Copy received lttcomm socket */
-               lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
-               ret = lttcomm_create_sock(&relayd->data_sock.sock);
-               /* Handle create_sock error. */
-               if (ret < 0) {
-                       ret_code = LTTCOMM_CONSUMERD_ENOMEM;
-                       goto error;
-               }
-               /*
-                * Close the socket created internally by
-                * lttcomm_create_sock, so we can replace it by the one
-                * received from sessiond.
-                */
-               if (close(relayd->data_sock.sock.fd)) {
-                       PERROR("close");
-               }
-
-               /* Assign new file descriptor */
-               relayd->data_sock.sock.fd = fd;
+               ret = lttcomm_populate_sock_from_open_socket(
+                               &relayd->data_sock.sock, fd,
+                               relayd_socket_protocol);
                /* Assign version values. */
-               relayd->data_sock.major = relayd_sock->major;
-               relayd->data_sock.minor = relayd_sock->minor;
+               relayd->data_sock.major = relayd_version_major;
+               relayd->data_sock.minor = relayd_version_minor;
                break;
        default:
                ERR("Unknown relayd socket type (%d)", sock_type);
@@ -3689,6 +3674,11 @@ error:
                goto error;
        }
 
+       if (ret < 0) {
+               ret_code = LTTCOMM_CONSUMERD_FATAL;
+               goto error;
+       }
+
        DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
                        sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
                        relayd->net_seq_idx, fd);
@@ -4392,7 +4382,11 @@ int consumer_clear_buffer(struct lttng_consumer_stream *stream)
                break;
        case LTTNG_CONSUMER32_UST:
        case LTTNG_CONSUMER64_UST:
-               lttng_ustconsumer_clear_buffer(stream);
+               ret = lttng_ustconsumer_clear_buffer(stream);
+               if (ret < 0) {
+                       ERR("Failed to clear ust stream (ret = %d)", ret);
+                       goto end;
+               }
                break;
        default:
                ERR("Unknown consumer_data type");
@@ -5242,3 +5236,8 @@ error_unlock:
        pthread_mutex_unlock(&stream->lock);
        goto end_rcu_unlock;
 }
+
+void lttng_consumer_sigbus_handle(void *addr)
+{
+       lttng_ustconsumer_sigbus_handle(addr);
+}
This page took 0.026061 seconds and 4 git commands to generate.