* Send command. Fill up the header and append the data.
*/
static int send_command(struct lttcomm_sock *sock,
- enum lttcomm_sessiond_command cmd, void *data, size_t size,
+ enum lttcomm_relayd_command cmd, void *data, size_t size,
int flags)
{
int ret;
return ret;
}
+/*
+ * Send a RELAYD_CREATE_SESSION command to the relayd with the given socket and
+ * set session_id of the relayd if we have a successful reply from the relayd.
+ *
+ * On success, return 0 else a negative value being a lttng_error_code returned
+ * from the relayd.
+ */
+int relayd_create_session(struct lttcomm_sock *sock, uint64_t *session_id)
+{
+ int ret;
+ struct lttcomm_relayd_status_session reply;
+
+ assert(sock);
+ assert(session_id);
+
+ DBG("Relayd create session");
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_CREATE_SESSION, NULL, 0, 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.session_id = be64toh(reply.session_id);
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -reply.ret_code;
+ ERR("Relayd create session replied error %d", ret);
+ goto error;
+ } else {
+ ret = 0;
+ *session_id = reply.session_id;
+ }
+
+ DBG("Relayd session created with id %" PRIu64, reply.session_id);
+
+error:
+ return ret;
+}
+
/*
* Add stream on the relayd and assign stream handle to the stream_id argument.
*
/*
* Check on the relayd side for a quiescent state on the control socket.
*/
-int relayd_quiescent_control(struct lttcomm_sock *sock)
+int relayd_quiescent_control(struct lttcomm_sock *sock,
+ uint64_t metadata_stream_id)
{
int ret;
+ struct lttcomm_relayd_quiescent_control msg;
struct lttcomm_relayd_generic_reply reply;
/* Code flow error. Safety net. */
DBG("Relayd checking quiescent control state");
+ msg.stream_id = htobe64(metadata_stream_id);
+
/* Send command */
- ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, NULL, 0, 0);
+ ret = send_command(sock, RELAYD_QUIESCENT_CONTROL, &msg, sizeof(msg), 0);
if (ret < 0) {
goto error;
}
error:
return ret;
}
+
+/*
+ * Begin a data pending command for a specific session id.
+ */
+int relayd_begin_data_pending(struct lttcomm_sock *sock, uint64_t id)
+{
+ int ret;
+ struct lttcomm_relayd_begin_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd begin data pending");
+
+ msg.session_id = htobe64(id);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_BEGIN_DATA_PENDING, &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+
+ /* Return session id or negative ret code. */
+ if (reply.ret_code != LTTNG_OK) {
+ ret = -reply.ret_code;
+ ERR("Relayd begin data pending replied error %d", ret);
+ goto error;
+ }
+
+ return 0;
+
+error:
+ return ret;
+}
+
+/*
+ * End a data pending command for a specific session id.
+ *
+ * Return 0 on success and set is_data_inflight to 0 if no data is being
+ * streamed or 1 if it is the case.
+ */
+int relayd_end_data_pending(struct lttcomm_sock *sock, uint64_t id,
+ unsigned int *is_data_inflight)
+{
+ int ret;
+ struct lttcomm_relayd_end_data_pending msg;
+ struct lttcomm_relayd_generic_reply reply;
+
+ /* Code flow error. Safety net. */
+ assert(sock);
+
+ DBG("Relayd end data pending");
+
+ msg.session_id = htobe64(id);
+
+ /* Send command */
+ ret = send_command(sock, RELAYD_END_DATA_PENDING, &msg, sizeof(msg), 0);
+ if (ret < 0) {
+ goto error;
+ }
+
+ /* Recevie response */
+ ret = recv_reply(sock, (void *) &reply, sizeof(reply));
+ if (ret < 0) {
+ goto error;
+ }
+
+ reply.ret_code = be32toh(reply.ret_code);
+ if (reply.ret_code < 0) {
+ ret = reply.ret_code;
+ goto error;
+ }
+
+ *is_data_inflight = reply.ret_code;
+
+ DBG("Relayd end data pending is data inflight: %d", reply.ret_code);
+
+ return 0;
+
+error:
+ return ret;
+}