Fix: Unbalanced rcu_read_unlock() on stream file creation failure
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index 43d7891a21aeefc4079fd89b953836f4f425b837..e957a9bda97d61fc41027c10fac163e68b824c32 100644 (file)
@@ -44,6 +44,7 @@
 #include <common/common.h>
 #include <common/compat/poll.h>
 #include <common/compat/socket.h>
+#include <common/compat/endian.h>
 #include <common/defaults.h>
 #include <common/daemonize.h>
 #include <common/futex.h>
@@ -232,6 +233,11 @@ int set_option(int opt, const char *arg, const char *optname)
                break;
        case 'g':
                tracing_group_name = strdup(arg);
+               if (tracing_group_name == NULL) {
+                       ret = -errno;
+                       PERROR("strdup");
+                       goto end;
+               }
                tracing_group_name_override = 1;
                break;
        case 'h':
@@ -250,7 +256,10 @@ int set_option(int opt, const char *arg, const char *optname)
                if (arg) {
                        lttng_opt_verbose = config_parse_value(arg);
                } else {
-                       lttng_opt_verbose += 1;
+                       /* Only 3 level of verbosity (-vvv). */
+                       if (lttng_opt_verbose < 3) {
+                               lttng_opt_verbose += 1;
+                       }
                }
                break;
        default:
@@ -382,8 +391,9 @@ int set_options(int argc, char **argv)
 
        /* assign default values */
        if (control_uri == NULL) {
-               ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
-                               DEFAULT_NETWORK_CONTROL_PORT);
+               ret = asprintf(&default_address,
+                       "tcp://" DEFAULT_NETWORK_CONTROL_BIND_ADDRESS ":%d",
+                       DEFAULT_NETWORK_CONTROL_PORT);
                if (ret < 0) {
                        PERROR("asprintf default data address");
                        goto exit;
@@ -397,8 +407,9 @@ int set_options(int argc, char **argv)
                }
        }
        if (data_uri == NULL) {
-               ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
-                               DEFAULT_NETWORK_DATA_PORT);
+               ret = asprintf(&default_address,
+                       "tcp://" DEFAULT_NETWORK_DATA_BIND_ADDRESS ":%d",
+                       DEFAULT_NETWORK_DATA_PORT);
                if (ret < 0) {
                        PERROR("asprintf default data address");
                        goto exit;
@@ -412,8 +423,9 @@ int set_options(int argc, char **argv)
                }
        }
        if (live_uri == NULL) {
-               ret = asprintf(&default_address, "tcp://0.0.0.0:%d",
-                               DEFAULT_NETWORK_VIEWER_PORT);
+               ret = asprintf(&default_address,
+                       "tcp://" DEFAULT_NETWORK_VIEWER_BIND_ADDRESS ":%d",
+                       DEFAULT_NETWORK_VIEWER_PORT);
                if (ret < 0) {
                        PERROR("asprintf default viewer control address");
                        goto exit;
@@ -729,7 +741,7 @@ static void try_close_stream(struct relay_session *session,
        pthread_mutex_unlock(&session->viewer_ready_lock);
 
        ret = stream_close(session, stream);
-       if (!ret) {
+       if (ret || session->snapshot) {
                /* Already close thus the ctf trace is being or has been destroyed. */
                goto end;
        }
@@ -822,6 +834,11 @@ restart:
                        revents = LTTNG_POLL_GETEV(&events, i);
                        pollfd = LTTNG_POLL_GETFD(&events, i);
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -1198,13 +1215,14 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->session_id = session->id;
        stream->index_fd = -1;
        stream->read_index_fd = -1;
+       stream->ctf_stream_id = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
 
        ret = utils_mkdir_recursive(stream->path_name, S_IRWXU | S_IRWXG);
        if (ret < 0) {
                ERR("relay creating output directory");
-               goto end;
+               goto err_free_stream;
        }
 
        /*
@@ -1215,7 +1233,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        stream->tracefile_size, 0, relayd_uid, relayd_gid, NULL);
        if (ret < 0) {
                ERR("Create output file");
-               goto end;
+               goto err_free_stream;
        }
        stream->fd = ret;
        if (stream->tracefile_size) {
@@ -1261,12 +1279,13 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
                        stream->stream_handle);
 
 end:
+       memset(&reply, 0, sizeof(reply));
        reply.handle = htobe64(stream->stream_handle);
        /* send the session id to the client or a negative return code on error */
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
                /* stream was not properly added to the ht, so free it */
-               free(stream);
+               stream_destroy(stream);
        } else {
                reply.ret_code = htobe32(LTTNG_OK);
        }
@@ -1283,9 +1302,7 @@ end_no_session:
        return ret;
 
 err_free_stream:
-       free(stream->path_name);
-       free(stream->channel_name);
-       free(stream);
+       stream_destroy(stream);
        return ret;
 }
 
@@ -1342,6 +1359,7 @@ int relay_close_stream(struct lttcomm_relayd_hdr *recv_hdr,
 end_unlock:
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
        } else {
@@ -1367,6 +1385,7 @@ void relay_unknown_command(struct relay_connection *conn)
        struct lttcomm_relayd_generic_reply reply;
        int ret;
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(LTTNG_ERR_UNK);
        ret = conn->sock->ops->sendmsg(conn->sock, &reply,
                        sizeof(struct lttcomm_relayd_generic_reply), 0);
@@ -1392,6 +1411,7 @@ int relay_start(struct lttcomm_relayd_hdr *recv_hdr,
                ret = htobe32(LTTNG_ERR_UNK);
        }
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = ret;
        ret = conn->sock->ops->sendmsg(conn->sock, &reply,
                        sizeof(struct lttcomm_relayd_generic_reply), 0);
@@ -1553,6 +1573,7 @@ int relay_send_version(struct lttcomm_relayd_hdr *recv_hdr,
                goto end;
        }
 
+       memset(&reply, 0, sizeof(reply));
        reply.major = RELAYD_VERSION_COMM_MAJOR;
        reply.minor = RELAYD_VERSION_COMM_MINOR;
 
@@ -1652,6 +1673,7 @@ int relay_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
 end_unlock:
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(ret);
        ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
        if (ret < 0) {
@@ -1715,6 +1737,7 @@ int relay_quiescent_control(struct lttcomm_relayd_hdr *recv_hdr,
        }
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(LTTNG_OK);
        ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
        if (ret < 0) {
@@ -1785,6 +1808,7 @@ int relay_begin_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
        }
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
        reply.ret_code = htobe32(LTTNG_OK);
 
@@ -1849,7 +1873,7 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
        cds_lfht_for_each_entry(relay_streams_ht->ht, &iter.iter, stream,
                        node.node) {
                if (stream->session_id == session_id &&
-                               !stream->data_pending_check_done) {
+                               !stream->data_pending_check_done && !stream->terminated_flag) {
                        is_data_inflight = 1;
                        DBG("Data is still in flight for stream %" PRIu64,
                                        stream->stream_handle);
@@ -1858,6 +1882,7 @@ int relay_end_data_pending(struct lttcomm_relayd_hdr *recv_hdr,
        }
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        /* All good, send back reply. */
        reply.ret_code = htobe32(is_data_inflight);
 
@@ -1925,9 +1950,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
 
                /*
-                * Only flag a stream inactive when it has already received data.
+                * Only flag a stream inactive when it has already received data
+                * and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0) {
+               if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end = be64toh(index_info.timestamp_end);
                }
                ret = 0;
@@ -1944,9 +1970,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        copy_index_control_data(index, &index_info);
+       if (stream->ctf_stream_id == -1ULL) {
+               stream->ctf_stream_id = be64toh(index_info.stream_id);
+       }
 
        if (index_created) {
                /*
@@ -1966,27 +1996,19 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
 
        /* Do we have a writable ready index to write on disk. */
        if (wr_index) {
-               /* Starting at 2.4, create the index file if none available. */
-               if (conn->minor >= 4 && stream->index_fd < 0) {
-                       ret = index_create_file(stream->path_name, stream->channel_name,
-                                       relayd_uid, relayd_gid, stream->tracefile_size,
-                                       stream->tracefile_count_current);
-                       if (ret < 0) {
-                               goto end_rcu_unlock;
-                       }
-                       stream->index_fd = ret;
-               }
-
                ret = relay_index_write(wr_index->fd, wr_index);
                if (ret < 0) {
                        goto end_rcu_unlock;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 end_rcu_unlock:
        rcu_read_unlock();
 
+       memset(&reply, 0, sizeof(reply));
        if (ret < 0) {
                reply.ret_code = htobe32(LTTNG_ERR_UNK);
        } else {
@@ -2033,8 +2055,11 @@ int relay_streams_sent(struct lttcomm_relayd_hdr *recv_hdr,
        /*
         * Inform the viewer that there are new streams in the session.
         */
-       uatomic_set(&conn->session->new_streams, 1);
+       if (conn->session->viewer_refcount) {
+               uatomic_set(&conn->session->new_streams, 1);
+       }
 
+       memset(&reply, 0, sizeof(reply));
        reply.ret_code = htobe32(LTTNG_OK);
        send_ret = conn->sock->ops->sendmsg(conn->sock, &reply, sizeof(reply), 0);
        if (send_ret < 0) {
@@ -2141,6 +2166,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        if (rotate_index || stream->index_fd < 0) {
@@ -2183,6 +2209,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 error:
@@ -2294,7 +2322,7 @@ int relay_process_data(struct relay_connection *conn)
                                pthread_mutex_lock(&vstream->overwrite_lock);
                                vstream->abort_flag = 1;
                                pthread_mutex_unlock(&vstream->overwrite_lock);
-                               DBG("Streaming side setting abort_flag on stream %s_%lu\n",
+                               DBG("Streaming side setting abort_flag on stream %s_%" PRIu64 "\n",
                                                stream->channel_name, new_id);
                        } else if (vstream->tracefile_count_current ==
                                        stream->tracefile_count_current) {
@@ -2311,7 +2339,6 @@ int relay_process_data(struct relay_connection *conn)
                                stream->tracefile_size, stream->tracefile_count,
                                relayd_uid, relayd_gid, stream->fd,
                                &(stream->tracefile_count_current), &stream->fd);
-               stream->total_index_received = 0;
                pthread_mutex_unlock(&stream->viewer_stream_rotation_lock);
                if (ret < 0) {
                        ERR("Rotating stream output file");
@@ -2384,7 +2411,7 @@ static void destroy_connection(struct lttng_ht *relay_connections_ht,
        connection_delete(relay_connections_ht, conn);
 
        /* For the control socket, we try to destroy the session. */
-       if (conn->type == RELAY_CONTROL) {
+       if (conn->type == RELAY_CONTROL && conn->session) {
                destroy_session(conn->session, conn->sessions_ht);
        }
 
@@ -2406,6 +2433,7 @@ void *relay_thread_worker(void *data)
        struct lttcomm_relayd_hdr recv_hdr;
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
        struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+       struct relay_index *index;
 
        DBG("[thread] Relay worker started");
 
@@ -2476,6 +2504,11 @@ restart:
 
                        health_code_update();
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        /* Thread quit pipe has been closed. Killing thread. */
                        ret = check_thread_quit_pipe(pollfd, revents);
                        if (ret) {
@@ -2578,45 +2611,49 @@ restart:
 
                        health_code_update();
 
+                       if (!revents) {
+                               /* No activity for this FD (poll implementation). */
+                               continue;
+                       }
+
                        /* Skip the command pipe. It's handled in the first loop. */
                        if (pollfd == relay_conn_pipe[0]) {
                                continue;
                        }
 
-                       if (revents) {
-                               rcu_read_lock();
-                               conn = connection_find_by_sock(relay_connections_ht, pollfd);
-                               if (!conn) {
-                                       /* Skip it. Might be removed before. */
+                       rcu_read_lock();
+                       conn = connection_find_by_sock(relay_connections_ht, pollfd);
+                       if (!conn) {
+                               /* Skip it. Might be removed before. */
+                               rcu_read_unlock();
+                               continue;
+                       }
+
+                       if (revents & LPOLLIN) {
+                               if (conn->type != RELAY_DATA) {
                                        rcu_read_unlock();
                                        continue;
                                }
 
-                               if (revents & LPOLLIN) {
-                                       if (conn->type != RELAY_DATA) {
-                                               continue;
-                                       }
-
-                                       ret = relay_process_data(conn);
-                                       /* Connection closed */
-                                       if (ret < 0) {
-                                               cleanup_connection_pollfd(&events, pollfd);
-                                               destroy_connection(relay_connections_ht, conn);
-                                               DBG("Data connection closed with %d", pollfd);
-                                               /*
-                                                * Every goto restart call sets the last seen fd where
-                                                * here we don't really care since we gracefully
-                                                * continue the loop after the connection is deleted.
-                                                */
-                                       } else {
-                                               /* Keep last seen port. */
-                                               last_seen_data_fd = pollfd;
-                                               rcu_read_unlock();
-                                               goto restart;
-                                       }
+                               ret = relay_process_data(conn);
+                               /* Connection closed */
+                               if (ret < 0) {
+                                       cleanup_connection_pollfd(&events, pollfd);
+                                       destroy_connection(relay_connections_ht, conn);
+                                       DBG("Data connection closed with %d", pollfd);
+                                       /*
+                                        * Every goto restart call sets the last seen fd where
+                                        * here we don't really care since we gracefully
+                                        * continue the loop after the connection is deleted.
+                                        */
+                               } else {
+                                       /* Keep last seen port. */
+                                       last_seen_data_fd = pollfd;
+                                       rcu_read_unlock();
+                                       goto restart;
                                }
-                               rcu_read_unlock();
                        }
+                       rcu_read_unlock();
                }
                last_seen_data_fd = -1;
        }
@@ -2637,6 +2674,14 @@ error:
        }
        rcu_read_unlock();
 error_poll_create:
+       rcu_read_lock();
+       cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
+                       index_n.node) {
+               health_code_update();
+               relay_index_delete(index);
+               relay_index_free_safe(index);
+       }
+       rcu_read_unlock();
        lttng_ht_destroy(indexes_ht);
 indexes_ht_error:
        lttng_ht_destroy(relay_connections_ht);
This page took 0.029344 seconds and 4 git commands to generate.