* Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
*/
-#define _GNU_SOURCE
#define _LGPL_SOURCE
#include <assert.h>
#include <stdio.h>
output->net_seq_index = obj->net_seq_index;
memcpy(output->subdir, obj->subdir, PATH_MAX);
output->snapshot = obj->snapshot;
+ output->relay_major_version = obj->relay_major_version;
+ output->relay_minor_version = obj->relay_minor_version;
memcpy(&output->dst, &obj->dst, sizeof(output->dst));
ret = consumer_copy_sockets(output, obj);
if (ret < 0) {
ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname,
S_IRWXU | S_IRWXG, uid, gid);
if (ret < 0) {
- if (ret != -EEXIST) {
+ if (errno != EEXIST) {
ERR("Trace directory creation error");
goto error;
}
health_code_update();
return ret;
}
+
+/*
+ * Ask the consumer the number of discarded events for a channel.
+ */
+int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *discarded)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer discarded events id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS;
+ msg.u.discarded_events.session_id = session_id;
+ msg.u.discarded_events.channel_key = channel_key;
+
+ *discarded = 0;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ uint64_t consumer_discarded = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_discarded,
+ sizeof(consumer_discarded));
+ if (ret < 0) {
+ ERR("get discarded events");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ *discarded += consumer_discarded;
+ }
+ ret = 0;
+ DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64,
+ *discarded, session_id);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}
+
+/*
+ * Ask the consumer the number of lost packets for a channel.
+ */
+int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key,
+ struct consumer_output *consumer, uint64_t *lost)
+{
+ int ret;
+ struct consumer_socket *socket;
+ struct lttng_ht_iter iter;
+ struct lttcomm_consumer_msg msg;
+
+ assert(consumer);
+
+ DBG3("Consumer lost packets id %" PRIu64, session_id);
+
+ memset(&msg, 0, sizeof(msg));
+ msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS;
+ msg.u.lost_packets.session_id = session_id;
+ msg.u.lost_packets.channel_key = channel_key;
+
+ *lost = 0;
+
+ /* Send command for each consumer */
+ rcu_read_lock();
+ cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket,
+ node.node) {
+ uint64_t consumer_lost = 0;
+ pthread_mutex_lock(socket->lock);
+ ret = consumer_socket_send(socket, &msg, sizeof(msg));
+ if (ret < 0) {
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+
+ /*
+ * No need for a recv reply status because the answer to the
+ * command is the reply status message.
+ */
+ ret = consumer_socket_recv(socket, &consumer_lost,
+ sizeof(consumer_lost));
+ if (ret < 0) {
+ ERR("get lost packets");
+ pthread_mutex_unlock(socket->lock);
+ goto end;
+ }
+ pthread_mutex_unlock(socket->lock);
+ *lost += consumer_lost;
+ }
+ ret = 0;
+ DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64,
+ *lost, session_id);
+
+end:
+ rcu_read_unlock();
+ return ret;
+}