Fix: lttng-relayd: unhandled out of memory error
[lttng-tools.git] / src / bin / lttng-relayd / main.c
index ac03cf846a7ec7992d7c21f9423d0af6813f22db..69131a7bb374e9d73666449588fcf6ddc2251553 100644 (file)
@@ -233,6 +233,11 @@ int set_option(int opt, const char *arg, const char *optname)
                break;
        case 'g':
                tracing_group_name = strdup(arg);
+               if (tracing_group_name == NULL) {
+                       ret = -errno;
+                       PERROR("strdup");
+                       goto end;
+               }
                tracing_group_name_override = 1;
                break;
        case 'h':
@@ -1205,6 +1210,7 @@ int relay_add_stream(struct lttcomm_relayd_hdr *recv_hdr,
        stream->session_id = session->id;
        stream->index_fd = -1;
        stream->read_index_fd = -1;
+       stream->ctf_stream_id = -1ULL;
        lttng_ht_node_init_u64(&stream->node, stream->stream_handle);
        pthread_mutex_init(&stream->lock, NULL);
 
@@ -1941,9 +1947,10 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                DBG("Received live beacon for stream %" PRIu64, stream->stream_handle);
 
                /*
-                * Only flag a stream inactive when it has already received data.
+                * Only flag a stream inactive when it has already received data
+                * and no indexes are in flight.
                 */
-               if (stream->total_index_received > 0) {
+               if (stream->total_index_received > 0 && stream->indexes_in_flight == 0) {
                        stream->beacon_ts_end = be64toh(index_info.timestamp_end);
                }
                ret = 0;
@@ -1960,9 +1967,13 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        copy_index_control_data(index, &index_info);
+       if (stream->ctf_stream_id == -1ULL) {
+               stream->ctf_stream_id = be64toh(index_info.stream_id);
+       }
 
        if (index_created) {
                /*
@@ -1987,6 +1998,8 @@ int relay_recv_index(struct lttcomm_relayd_hdr *recv_hdr,
                        goto end_rcu_unlock;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 end_rcu_unlock:
@@ -2150,6 +2163,7 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                index_created = 1;
+               stream->indexes_in_flight++;
        }
 
        if (rotate_index || stream->index_fd < 0) {
@@ -2192,6 +2206,8 @@ static int handle_index_data(struct relay_stream *stream, uint64_t net_seq_num,
                        goto error;
                }
                stream->total_index_received++;
+               stream->indexes_in_flight--;
+               assert(stream->indexes_in_flight >= 0);
        }
 
 error:
@@ -2415,6 +2431,7 @@ void *relay_thread_worker(void *data)
        struct lttcomm_relayd_hdr recv_hdr;
        struct relay_local_data *relay_ctx = (struct relay_local_data *) data;
        struct lttng_ht *sessions_ht = relay_ctx->sessions_ht;
+       struct relay_index *index;
 
        DBG("[thread] Relay worker started");
 
@@ -2646,6 +2663,14 @@ error:
        }
        rcu_read_unlock();
 error_poll_create:
+       rcu_read_lock();
+       cds_lfht_for_each_entry(indexes_ht->ht, &iter.iter, index,
+                       index_n.node) {
+               health_code_update();
+               relay_index_delete(index);
+               relay_index_free_safe(index);
+       }
+       rcu_read_unlock();
        lttng_ht_destroy(indexes_ht);
 indexes_ht_error:
        lttng_ht_destroy(relay_connections_ht);
This page took 0.052983 seconds and 4 git commands to generate.