Relayd "unique" ids wrap every 32-bit, and in some cases, negative
values are considered as error.
Change this to make the error value specifically -1ULL, use a direct
comparison (since we use an unsigned 64-bit integer, comparison with 0
becomes incorrect).
Since we now use a 64-bit ID, it is assumed to _never_ wrap-around
(remember, value -1ULL is an _error_). Therefore,
consumer_add_relayd_socket() can become much more strict than it was:
instead of accepting re-use of net_seq_idx, we can now assert that upon
LTTNG_STREAM_CONTROL socket, we have indeed allocated a relayd object,
and upon LTTNG_STREAM_DATA, we have found a relayd object.
Signed-off-by: Mathieu Desnoyers <mathieu.desnoyers@efficios.com>
Signed-off-by: David Goulet <dgoulet@efficios.com>
-static void steal_stream_key(int key, struct lttng_ht *ht)
+static void steal_stream_key(uint64_t key, struct lttng_ht *ht)
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
stream = find_stream(key, ht);
if (stream) {
{
struct lttng_consumer_stream *stream;
rcu_read_lock();
stream = find_stream(key, ht);
if (stream) {
+ stream->key = (uint64_t) -1ULL;
/*
* We don't want the lookup to match, but we still need
* to iterate on this stream when iterating over the hash table. Just
* change the node key.
*/
/*
* We don't want the lookup to match, but we still need
* to iterate on this stream when iterating over the hash table. Just
* change the node key.
*/
- stream->node.key = -1ULL;
+ stream->node.key = (uint64_t) -1ULL;
* It's atomically set without having the stream mutex locked which is fine
* because we handle the write/read race with a pipe wakeup for each thread.
*/
* It's atomically set without having the stream mutex locked which is fine
* because we handle the write/read race with a pipe wakeup for each thread.
*/
-static void update_endpoint_status_by_netidx(int net_seq_idx,
+static void update_endpoint_status_by_netidx(uint64_t net_seq_idx,
enum consumer_endpoint_status status)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
enum consumer_endpoint_status status)
{
struct lttng_ht_iter iter;
struct lttng_consumer_stream *stream;
- DBG("Consumer set delete flag on stream by idx %d", net_seq_idx);
+ DBG("Consumer set delete flag on stream by idx %" PRIu64, net_seq_idx);
static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
struct lttng_consumer_local_data *ctx)
{
static void cleanup_relayd(struct consumer_relayd_sock_pair *relayd,
struct lttng_consumer_local_data *ctx)
{
* Allocate and return a consumer relayd socket.
*/
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
* Allocate and return a consumer relayd socket.
*/
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
{
struct consumer_relayd_sock_pair *obj = NULL;
{
struct consumer_relayd_sock_pair *obj = NULL;
- /* Negative net sequence index is a failure */
- if (net_seq_idx < 0) {
+ /* net sequence index of -1 is a failure */
+ if (net_seq_idx == (uint64_t) -1ULL) {
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
rcu_read_lock();
/* Flag that the current stream if set for network streaming. */
- if (stream->net_seq_idx != -1) {
+ if (stream->net_seq_idx != (uint64_t) -1ULL) {
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
relayd = consumer_find_relayd(stream->net_seq_idx);
if (relayd == NULL) {
goto end;
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
* This will create a relayd socket pair and add it to the relayd hash table.
* The caller MUST acquire a RCU read side lock before calling it.
*/
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll,
struct lttcomm_relayd_sock *relayd_sock, unsigned int sessiond_id)
assert(ctx);
assert(relayd_sock);
assert(ctx);
assert(relayd_sock);
- DBG("Consumer adding relayd socket (idx: %d)", net_seq_idx);
+ DBG("Consumer adding relayd socket (idx: %" PRIu64 ")", net_seq_idx);
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
/* Get relayd reference if exists. */
relayd = consumer_find_relayd(net_seq_idx);
if (relayd == NULL) {
+ assert(sock_type == LTTNG_STREAM_CONTROL);
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
/* Not found. Allocate one. */
relayd = consumer_allocate_relayd_sock_pair(net_seq_idx);
if (relayd == NULL) {
* we can notify the session daemon and continue our work without
* killing everything.
*/
* we can notify the session daemon and continue our work without
* killing everything.
*/
+ } else {
+ /*
+ * relayd key should never be found for control socket.
+ */
+ assert(sock_type != LTTNG_STREAM_CONTROL);
}
/* First send a status message before receiving the fds. */
}
/* First send a status message before receiving the fds. */
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->control_sock.sock);
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->control_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->control_sock.sock);
- /* Immediately try to close the created socket if valid. */
- if (relayd->control_sock.sock.fd >= 0) {
- if (close(relayd->control_sock.sock.fd)) {
- PERROR("close relayd control socket");
- }
- }
/* Handle create_sock error. */
if (ret < 0) {
goto error;
}
/* Handle create_sock error. */
if (ret < 0) {
goto error;
}
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->control_sock.sock.fd)) {
+ PERROR("close");
+ }
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
/* Assign new file descriptor */
relayd->control_sock.sock.fd = fd;
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->data_sock.sock);
/* Copy received lttcomm socket */
lttcomm_copy_sock(&relayd->data_sock.sock, &relayd_sock->sock);
ret = lttcomm_create_sock(&relayd->data_sock.sock);
- /* Immediately try to close the created socket if valid. */
- if (relayd->data_sock.sock.fd >= 0) {
- if (close(relayd->data_sock.sock.fd)) {
- PERROR("close relayd data socket");
- }
- }
/* Handle create_sock error. */
if (ret < 0) {
goto error;
}
/* Handle create_sock error. */
if (ret < 0) {
goto error;
}
+ /*
+ * Close the socket created internally by
+ * lttcomm_create_sock, so we can replace it by the one
+ * received from sessiond.
+ */
+ if (close(relayd->data_sock.sock.fd)) {
+ PERROR("close");
+ }
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
/* Assign new file descriptor */
relayd->data_sock.sock.fd = fd;
/* UID and GID of the channel. */
uid_t uid;
gid_t gid;
/* UID and GID of the channel. */
uid_t uid;
gid_t gid;
- /* Relayd id of the channel. -1 if it does not apply. */
- int64_t relayd_id;
+ /* Relayd id of the channel. -1ULL if it does not apply. */
+ uint64_t relayd_id;
/*
* Number of streams NOT initialized yet. This is used in order to not
* delete this channel if streams are getting initialized.
/*
* Number of streams NOT initialized yet. This is used in order to not
* delete this channel if streams are getting initialized.
*/
struct consumer_relayd_sock_pair {
/* Network sequence number. */
*/
struct consumer_relayd_sock_pair {
/* Network sequence number. */
/* Number of stream associated with this relayd */
unsigned int refcount;
/* Number of stream associated with this relayd */
unsigned int refcount;
/* lttng-relayd consumer command */
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
/* lttng-relayd consumer command */
struct consumer_relayd_sock_pair *consumer_allocate_relayd_sock_pair(
struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
struct consumer_relayd_sock_pair *consumer_find_relayd(uint64_t key);
struct lttng_consumer_channel *consumer_find_channel(uint64_t key);
int consumer_handle_stream_before_relayd(struct lttng_consumer_stream *stream,
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
ssize_t lttng_consumer_read_subbuffer(struct lttng_consumer_stream *stream,
struct lttng_consumer_local_data *ctx);
int lttng_consumer_on_recv_stream(struct lttng_consumer_stream *stream);
-int consumer_add_relayd_socket(int net_seq_idx, int sock_type,
+int consumer_add_relayd_socket(uint64_t net_seq_idx, int sock_type,
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
unsigned int sessiond_id);
struct lttng_consumer_local_data *ctx, int sock,
struct pollfd *consumer_sockpoll, struct lttcomm_relayd_sock *relayd_sock,
unsigned int sessiond_id);
*/
static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
const char *pathname, const char *name, uid_t uid, gid_t gid,
*/
static struct lttng_consumer_channel *allocate_channel(uint64_t session_id,
const char *pathname, const char *name, uid_t uid, gid_t gid,
- int relayd_id, uint64_t key, enum lttng_event_output output,
+ uint64_t relayd_id, uint64_t key, enum lttng_event_output output,
uint64_t tracefile_size, uint64_t tracefile_count,
uint64_t session_id_per_pid, unsigned int monitor)
{
uint64_t tracefile_size, uint64_t tracefile_count,
uint64_t session_id_per_pid, unsigned int monitor)
{