/*
- * Copyright (C) 2011 Julien Desfossez <julien.desfossez@polymtl.ca>
+ * Copyright (C) 2011 EfficiOS Inc.
* Copyright (C) 2011 Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
* Copyright (C) 2012 David Goulet <dgoulet@efficios.com>
*
*/
#include "common/index/ctf-index.h"
+#include <stdint.h>
#define _LGPL_SOURCE
#include <assert.h>
#include <poll.h>
ssize_t ret, written_bytes = 0;
int rotation_ret;
struct stream_subbuffer subbuffer = {};
+ enum get_next_subbuffer_status get_next_status;
if (!locked_by_caller) {
stream->read_subbuffer_ops.lock(stream);
+ } else {
+ stream->read_subbuffer_ops.assert_locked(stream);
}
if (stream->read_subbuffer_ops.on_wake_up) {
}
}
- ret = stream->read_subbuffer_ops.get_next_subbuffer(stream, &subbuffer);
- if (ret) {
- if (ret == -ENODATA) {
- /* Not an error. */
- ret = 0;
- goto sleep_stream;
- }
+ get_next_status = stream->read_subbuffer_ops.get_next_subbuffer(
+ stream, &subbuffer);
+ switch (get_next_status) {
+ case GET_NEXT_SUBBUFFER_STATUS_OK:
+ break;
+ case GET_NEXT_SUBBUFFER_STATUS_NO_DATA:
+ /* Not an error. */
+ ret = 0;
+ goto sleep_stream;
+ case GET_NEXT_SUBBUFFER_STATUS_ERROR:
+ ret = -1;
goto end;
+ default:
+ abort();
}
ret = stream->read_subbuffer_ops.pre_consume_subbuffer(
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
- void consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
- struct lttng_consumer_local_data *ctx, int sock,
+void consumer_add_relayd_socket(uint64_t net_seq_idx,
+ int sock_type,
+ struct lttng_consumer_local_data *ctx,
+ int sock,
struct pollfd *consumer_sockpoll,
- struct lttcomm_relayd_sock *relayd_sock, uint64_t sessiond_id,
- uint64_t relayd_session_id)
+ uint64_t sessiond_id,
+ uint64_t relayd_session_id,
+ uint32_t relayd_version_major,
+ uint32_t relayd_version_minor,
+ enum lttcomm_sock_proto relayd_socket_protocol)
{
int fd = -1, ret = -1, relayd_created = 0;
enum lttcomm_return_code ret_code = LTTCOMM_CONSUMERD_SUCCESS;
struct consumer_relayd_sock_pair *relayd = NULL;
assert(ctx);
- assert(relayd_sock);
DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
switch (sock_type) {
case LTTNG_STREAM_CONTROL:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
- ret = lttcomm_create_sock(&relayd->control_sock.sock);
- /* Handle create_sock error. */
- if (ret < 0) {
- ret_code = LTTCOMM_CONSUMERD_ENOMEM;
- goto error;
- }
- /*
- * Close the socket created internally by
- * lttcomm_create_sock, so we can replace it by the one
- * received from sessiond.
- */
- if (close(relayd->control_sock.sock.fd)) {
- PERROR("close");
- }
+ ret = lttcomm_populate_sock_from_open_socket(
+ &relayd->control_sock.sock, fd,
+ relayd_socket_protocol);
- /* Assign new file descriptor */
- relayd->control_sock.sock.fd = fd;
/* Assign version values. */
- relayd->control_sock.major = relayd_sock->major;
- relayd->control_sock.minor = relayd_sock->minor;
+ relayd->control_sock.major = relayd_version_major;
+ relayd->control_sock.minor = relayd_version_minor;
relayd->relayd_session_id = relayd_session_id;
break;
case LTTNG_STREAM_DATA:
/* Copy received lttcomm socket */
- lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
- ret = lttcomm_create_sock(&relayd->data_sock.sock);
- /* Handle create_sock error. */
- if (ret < 0) {
- ret_code = LTTCOMM_CONSUMERD_ENOMEM;
- goto error;
- }
- /*
- * Close the socket created internally by
- * lttcomm_create_sock, so we can replace it by the one
- * received from sessiond.
- */
- if (close(relayd->data_sock.sock.fd)) {
- PERROR("close");
- }
-
- /* Assign new file descriptor */
- relayd->data_sock.sock.fd = fd;
+ ret = lttcomm_populate_sock_from_open_socket(
+ &relayd->data_sock.sock, fd,
+ relayd_socket_protocol);
/* Assign version values. */
- relayd->data_sock.major = relayd_sock->major;
- relayd->data_sock.minor = relayd_sock->minor;
+ relayd->data_sock.major = relayd_version_major;
+ relayd->data_sock.minor = relayd_version_minor;
break;
default:
ERR("Unknown relayd socket type (%d)", sock_type);
goto error;
}
+ if (ret < 0) {
+ ret_code = LTTCOMM_CONSUMERD_FATAL;
+ goto error;
+ }
+
DBG("Consumer %s socket created successfully with net idx %" PRIu64 " (fd: %d)",
sock_type == LTTNG_STREAM_CONTROL ? "control" : "data",
relayd->net_seq_idx, fd);
break;
case LTTNG_CONSUMER32_UST:
case LTTNG_CONSUMER64_UST:
- lttng_ustconsumer_clear_buffer(stream);
+ ret = lttng_ustconsumer_clear_buffer(stream);
+ if (ret < 0) {
+ ERR("Failed to clear ust stream (ret = %d)", ret);
+ goto end;
+ }
break;
default:
ERR("Unknown consumer_data type");
pthread_mutex_unlock(&stream->lock);
goto end_rcu_unlock;
}
+
+void lttng_consumer_sigbus_handle(void *addr)
+{
+ lttng_ustconsumer_sigbus_handle(addr);
+}