#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>
+#include <inttypes.h>
#include <urcu/futex.h>
#include <urcu/uatomic.h>
#include <unistd.h>
return NULL;
}
+/*
+ * Return nonzero if stream needs to be closed.
+ */
+static
+int close_stream_check(struct relay_stream *stream)
+{
+
+ if (stream->close_flag && stream->prev_seq == stream->last_net_seq_num) {
+ return 1;
+ }
+ return 0;
+}
+
/*
* This thread manages the listening for new connections on the network
*/
return;
}
- DBG("Relay deleting session %lu", cmd->session->id);
+ DBG("Relay deleting session %" PRIu64, cmd->session->id);
free(cmd->session->sock);
rcu_read_lock();
rcu_read_lock();
stream->stream_handle = ++last_relay_stream_id;
- stream->seq = 0;
+ stream->prev_seq = -1ULL;
stream->session = session;
root_path = create_output_path(stream_info.pathname);
return ret;
}
+/*
+ * relay_close_stream: close a specific stream
+ */
+static
+int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
+ struct relay_command *cmd, struct lttng_ht *streams_ht)
+{
+ struct relay_session *session = cmd->session;
+ struct lttcomm_relayd_close_stream stream_info;
+ struct lttcomm_relayd_generic_reply reply;
+ struct relay_stream *stream;
+ int ret, send_ret;
+ struct lttng_ht_node_ulong *node;
+ struct lttng_ht_iter iter;
+
+ DBG("Close stream received");
+
+ if (!session || session->version_check_done == 0) {
+ ERR("Trying to close a stream before version check");
+ ret = -1;
+ goto end_no_session;
+ }
+
+ ret = cmd->sock->ops->recvmsg(cmd->sock, &stream_info,
+ sizeof(struct lttcomm_relayd_close_stream), MSG_WAITALL);
+ if (ret < sizeof(struct lttcomm_relayd_close_stream)) {
+ ERR("Relay didn't receive valid add_stream struct size : %d", ret);
+ ret = -1;
+ goto end_no_session;
+ }
+
+ rcu_read_lock();
+ lttng_ht_lookup(streams_ht,
+ (void *)((unsigned long) be64toh(stream_info.stream_id)),
+ &iter);
+ node = lttng_ht_iter_get_node_ulong(&iter);
+ if (node == NULL) {
+ DBG("Relay stream %" PRIu64 " not found", be64toh(stream_info.stream_id));
+ ret = -1;
+ goto end_unlock;
+ }
+
+ stream = caa_container_of(node, struct relay_stream, stream_n);
+ if (!stream) {
+ ret = -1;
+ goto end_unlock;
+ }
+
+ stream->close_flag = 1;
+
+ if (close_stream_check(stream)) {
+ int delret;
+
+ close(stream->fd);
+ delret = lttng_ht_del(streams_ht, &iter);
+ assert(!delret);
+ call_rcu(&stream->rcu_node,
+ deferred_free_stream);
+ DBG("Closed tracefile %d from close stream", stream->fd);
+ }
+
+end_unlock:
+ rcu_read_unlock();
+
+ if (ret < 0) {
+ reply.ret_code = htobe32(LTTCOMM_ERR);
+ } else {
+ reply.ret_code = htobe32(LTTCOMM_OK);
+ }
+ send_ret = cmd->sock->ops->sendmsg(cmd->sock, &reply,
+ sizeof(struct lttcomm_relayd_generic_reply), 0);
+ if (send_ret < 0) {
+ ERR("Relay sending stream id");
+ }
+
+end_no_session:
+ return ret;
+}
+
/*
* relay_unknown_command: send -1 if received unknown command
*/
&iter);
node = lttng_ht_iter_get_node_ulong(&iter);
if (node == NULL) {
- DBG("Relay stream %lu not found", stream_id);
+ DBG("Relay stream %" PRIu64 " not found", stream_id);
ret = NULL;
goto end;
}
data_buffer_size = data_size;
}
memset(data_buffer, 0, data_size);
- DBG2("Relay receiving metadata, waiting for %lu bytes", data_size);
+ DBG2("Relay receiving metadata, waiting for %" PRIu64 " bytes", data_size);
ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size,
MSG_WAITALL);
if (ret < 0 || ret != data_size) {
goto end;
}
session->id = ++last_relay_session_id;
- DBG("Created session %lu", session->id);
+ DBG("Created session %" PRIu64, session->id);
cmd->session = session;
}
session->version_check_done = 1;
case RELAYD_VERSION:
ret = relay_send_version(recv_hdr, cmd);
break;
+ case RELAYD_CLOSE_STREAM:
+ ret = relay_close_stream(recv_hdr, cmd, streams_ht);
+ break;
case RELAYD_UPDATE_SYNC_INFO:
default:
ERR("Received unknown command (%u)", be32toh(recv_hdr->cmd));
struct relay_stream *stream;
struct lttcomm_relayd_data_hdr data_hdr;
uint64_t stream_id;
+ uint64_t net_seq_num;
uint32_t data_size;
ret = cmd->sock->ops->recvmsg(cmd->sock, &data_hdr,
}
memset(data_buffer, 0, data_size);
- DBG3("Receiving data of size %u for stream id %zu", data_size, stream_id);
+ net_seq_num = be64toh(data_hdr.net_seq_num);
+
+ DBG3("Receiving data of size %u for stream id %" PRIu64 " seqnum %" PRIu64,
+ data_size, stream_id, net_seq_num);
ret = cmd->sock->ops->recvmsg(cmd->sock, data_buffer, data_size, MSG_WAITALL);
if (ret <= 0) {
ret = -1;
ret = -1;
goto end_unlock;
}
- DBG2("Relay wrote %d bytes to tracefile for stream id %lu", ret, stream->stream_handle);
+ DBG2("Relay wrote %d bytes to tracefile for stream id %" PRIu64,
+ ret, stream->stream_handle);
+
+ stream->prev_seq = net_seq_num;
+
+ /* Check if we need to close the FD */
+ if (close_stream_check(stream)) {
+ struct lttng_ht_iter iter;
+
+ close(stream->fd);
+ iter.iter.node = &stream->stream_n.node;
+ ret = lttng_ht_del(streams_ht, &iter);
+ assert(!ret);
+ call_rcu(&stream->rcu_node,
+ deferred_free_stream);
+ DBG("Closed tracefile %d after recv data", stream->fd);
+ }
end_unlock:
rcu_read_unlock();
DBG("Control connection closed with %d", pollfd);
} else {
if (relay_connection->session) {
- DBG2("Relay worker receiving data for session : %lu",
+ DBG2("Relay worker receiving data for session : %" PRIu64,
relay_connection->session->id);
}
ret = relay_process_control(&recv_hdr,