X-Git-Url: http://git.liburcu.org/?a=blobdiff_plain;f=src%2Fbin%2Flttng-sessiond%2Fconsumer.c;h=7c8f4e83093590e61ac4d4e565d8d0bc9ba7b5f1;hb=c8f690f85a292e7678c480e2a5dca80b4dd80f79;hp=41ad46d8689338f3c27c3fc80fb12dd46bbdb53f;hpb=6addfa379ee608b20cfe5e15d135bcb6a9724e90;p=lttng-tools.git diff --git a/src/bin/lttng-sessiond/consumer.c b/src/bin/lttng-sessiond/consumer.c index 41ad46d86..7c8f4e830 100644 --- a/src/bin/lttng-sessiond/consumer.c +++ b/src/bin/lttng-sessiond/consumer.c @@ -15,7 +15,6 @@ * Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. */ -#define _GNU_SOURCE #define _LGPL_SOURCE #include #include @@ -564,6 +563,8 @@ struct consumer_output *consumer_copy_output(struct consumer_output *obj) output->net_seq_index = obj->net_seq_index; memcpy(output->subdir, obj->subdir, PATH_MAX); output->snapshot = obj->snapshot; + output->relay_major_version = obj->relay_major_version; + output->relay_minor_version = obj->relay_minor_version; memcpy(&output->dst, &obj->dst, sizeof(output->dst)); ret = consumer_copy_sockets(output, obj); if (ret < 0) { @@ -714,7 +715,10 @@ int consumer_set_network_uri(struct consumer_output *obj, goto error; } - strncpy(obj->subdir, tmp_path, sizeof(obj->subdir)); + if (lttng_strncpy(obj->subdir, tmp_path, sizeof(obj->subdir))) { + ret = -LTTNG_ERR_INVALID; + goto error; + } DBG3("Consumer set network uri subdir path %s", tmp_path); } @@ -735,6 +739,7 @@ int consumer_send_fds(struct consumer_socket *sock, int *fds, size_t nb_fd) assert(fds); assert(sock); assert(nb_fd > 0); + assert(pthread_mutex_trylock(sock->lock) == EBUSY); ret = lttcomm_send_fds_unix_sock(*sock->fd_ptr, fds, nb_fd); if (ret < 0) { @@ -759,6 +764,7 @@ int consumer_send_msg(struct consumer_socket *sock, assert(msg); assert(sock); + assert(pthread_mutex_trylock(sock->lock) == EBUSY); ret = consumer_socket_send(sock, msg, sizeof(struct lttcomm_consumer_msg)); if (ret < 0) { @@ -981,6 +987,8 @@ error: /* * Send relayd socket to consumer associated with a session name. * + * The consumer socket lock must be held by the caller. + * * On success return positive value. On error, negative value. */ int consumer_send_relayd_socket(struct consumer_socket *consumer_sock, @@ -1085,7 +1093,11 @@ int consumer_set_subdir(struct consumer_output *consumer, goto error; } - strncpy(consumer->subdir, tmp_path, sizeof(consumer->subdir)); + if (lttng_strncpy(consumer->subdir, tmp_path, + sizeof(consumer->subdir))) { + ret = -EINVAL; + goto error; + } DBG2("Consumer subdir set to %s", consumer->subdir); error: @@ -1093,11 +1105,8 @@ error: } /* - * Ask the consumer if the data is ready to read (NOT pending) for the specific - * session id. - * - * This function has a different behavior with the consumer i.e. that it waits - * for a reply from the consumer if yes or no the data is pending. + * Ask the consumer if the data is pending for the specific session id. + * Returns 1 if data is pending, 0 otherwise, or < 0 on error. */ int consumer_is_data_pending(uint64_t session_id, struct consumer_output *consumer) @@ -1186,6 +1195,38 @@ end: return ret; } +/* + * Send a clear quiescent command to consumer using the given channel key. + * + * Return 0 on success else a negative value. + */ +int consumer_clear_quiescent_channel(struct consumer_socket *socket, uint64_t key) +{ + int ret; + struct lttcomm_consumer_msg msg; + + assert(socket); + + DBG2("Consumer clear quiescent channel key %" PRIu64, key); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_CLEAR_QUIESCENT_CHANNEL; + msg.u.clear_quiescent_channel.key = key; + + pthread_mutex_lock(socket->lock); + health_code_update(); + + ret = consumer_send_msg(socket, &msg); + if (ret < 0) { + goto end; + } + +end: + health_code_update(); + pthread_mutex_unlock(socket->lock); + return ret; +} + /* * Send a close metadata command to consumer using the given channel key. * Called with registry lock held. @@ -1261,7 +1302,7 @@ end: */ int consumer_push_metadata(struct consumer_socket *socket, uint64_t metadata_key, char *metadata_str, size_t len, - size_t target_offset) + size_t target_offset, uint64_t version) { int ret; struct lttcomm_consumer_msg msg; @@ -1277,6 +1318,7 @@ int consumer_push_metadata(struct consumer_socket *socket, msg.u.push_metadata.key = metadata_key; msg.u.push_metadata.target_offset = target_offset; msg.u.push_metadata.len = len; + msg.u.push_metadata.version = version; health_code_update(); ret = consumer_send_msg(socket, &msg); @@ -1356,7 +1398,7 @@ int consumer_snapshot_channel(struct consumer_socket *socket, uint64_t key, ret = run_as_mkdir_recursive(msg.u.snapshot_channel.pathname, S_IRWXU | S_IRWXG, uid, gid); if (ret < 0) { - if (ret != -EEXIST) { + if (errno != EEXIST) { ERR("Trace directory creation error"); goto error; } @@ -1373,3 +1415,117 @@ error: health_code_update(); return ret; } + +/* + * Ask the consumer the number of discarded events for a channel. + */ +int consumer_get_discarded_events(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *discarded) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer discarded events id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_DISCARDED_EVENTS; + msg.u.discarded_events.session_id = session_id; + msg.u.discarded_events.channel_key = channel_key; + + *discarded = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_discarded = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_discarded, + sizeof(consumer_discarded)); + if (ret < 0) { + ERR("get discarded events"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *discarded += consumer_discarded; + } + ret = 0; + DBG("Consumer discarded %" PRIu64 " events in session id %" PRIu64, + *discarded, session_id); + +end: + rcu_read_unlock(); + return ret; +} + +/* + * Ask the consumer the number of lost packets for a channel. + */ +int consumer_get_lost_packets(uint64_t session_id, uint64_t channel_key, + struct consumer_output *consumer, uint64_t *lost) +{ + int ret; + struct consumer_socket *socket; + struct lttng_ht_iter iter; + struct lttcomm_consumer_msg msg; + + assert(consumer); + + DBG3("Consumer lost packets id %" PRIu64, session_id); + + memset(&msg, 0, sizeof(msg)); + msg.cmd_type = LTTNG_CONSUMER_LOST_PACKETS; + msg.u.lost_packets.session_id = session_id; + msg.u.lost_packets.channel_key = channel_key; + + *lost = 0; + + /* Send command for each consumer */ + rcu_read_lock(); + cds_lfht_for_each_entry(consumer->socks->ht, &iter.iter, socket, + node.node) { + uint64_t consumer_lost = 0; + pthread_mutex_lock(socket->lock); + ret = consumer_socket_send(socket, &msg, sizeof(msg)); + if (ret < 0) { + pthread_mutex_unlock(socket->lock); + goto end; + } + + /* + * No need for a recv reply status because the answer to the + * command is the reply status message. + */ + ret = consumer_socket_recv(socket, &consumer_lost, + sizeof(consumer_lost)); + if (ret < 0) { + ERR("get lost packets"); + pthread_mutex_unlock(socket->lock); + goto end; + } + pthread_mutex_unlock(socket->lock); + *lost += consumer_lost; + } + ret = 0; + DBG("Consumer lost %" PRIu64 " packets in session id %" PRIu64, + *lost, session_id); + +end: + rcu_read_unlock(); + return ret; +}