consumerd: send a buffer static sample on flush command
[lttng-tools.git] / src / bin / lttng-sessiond / thread.cpp
1 /*
2 * Copyright (C) 2018 Jérémie Galarneau <jeremie.galarneau@efficios.com>
3 *
4 * SPDX-License-Identifier: GPL-2.0-only
5 *
6 */
7
8 #include "thread.hpp"
9 #include <urcu/list.h>
10 #include <urcu/ref.h>
11 #include <pthread.h>
12 #include <common/macros.hpp>
13 #include <common/error.hpp>
14 #include <common/defaults.hpp>
15
16 namespace {
17 struct thread_list {
18 struct cds_list_head head;
19 pthread_mutex_t lock;
20 } thread_list = {
21 .head = CDS_LIST_HEAD_INIT(thread_list.head),
22 .lock = PTHREAD_MUTEX_INITIALIZER,
23 };
24 } /* namespace */
25
26 struct lttng_thread {
27 struct urcu_ref ref;
28 struct cds_list_head node;
29 pthread_t thread;
30 const char *name;
31 /* Main thread function */
32 lttng_thread_entry_point entry;
33 /*
34 * Thread-specific shutdown method. Allows threads to implement their
35 * own shutdown mechanism as some of them use a structured message
36 * passed through a command queue and some rely on a dedicated "quit"
37 * pipe.
38 */
39 lttng_thread_shutdown_cb shutdown;
40 lttng_thread_cleanup_cb cleanup;
41 /* Thread implementation-specific data. */
42 void *data;
43 };
44
45 static
46 void lttng_thread_destroy(struct lttng_thread *thread)
47 {
48 if (thread->cleanup) {
49 thread->cleanup(thread->data);
50 }
51 free(thread);
52 }
53
54 static
55 void lttng_thread_release(struct urcu_ref *ref)
56 {
57 lttng_thread_destroy(lttng::utils::container_of(ref, &lttng_thread::ref));
58 }
59
60 static
61 void *launch_thread(void *data)
62 {
63 void *ret;
64 struct lttng_thread *thread = (struct lttng_thread *) data;
65
66 logger_set_thread_name(thread->name, true);
67 DBG("Entering thread entry point");
68 ret = thread->entry(thread->data);
69 DBG("Thread entry point has returned");
70 return ret;
71 }
72
73 struct lttng_thread *lttng_thread_create(const char *name,
74 lttng_thread_entry_point entry,
75 lttng_thread_shutdown_cb shutdown,
76 lttng_thread_cleanup_cb cleanup,
77 void *thread_data)
78 {
79 int ret;
80 struct lttng_thread *thread;
81
82 thread = zmalloc<lttng_thread>();
83 if (!thread) {
84 goto error_alloc;
85 }
86
87 urcu_ref_init(&thread->ref);
88 CDS_INIT_LIST_HEAD(&thread->node);
89 /*
90 * Thread names are assumed to be statically allocated strings.
91 * It is unnecessary to copy this attribute.
92 */
93 thread->name = name;
94 thread->entry = entry;
95 thread->shutdown = shutdown;
96 thread->cleanup = cleanup;
97 thread->data = thread_data;
98
99 pthread_mutex_lock(&thread_list.lock);
100 /*
101 * Add the thread at the head of the list to shutdown threads in the
102 * opposite order of their creation. A reference is taken for the
103 * thread list which will be released on shutdown of the thread.
104 */
105 cds_list_add(&thread->node, &thread_list.head);
106 (void) lttng_thread_get(thread);
107
108 ret = pthread_create(&thread->thread, default_pthread_attr(),
109 launch_thread, thread);
110 if (ret) {
111 PERROR("Failed to create \"%s\" thread", thread->name);
112 goto error_pthread_create;
113 }
114
115 pthread_mutex_unlock(&thread_list.lock);
116 return thread;
117
118 error_pthread_create:
119 cds_list_del(&thread->node);
120 /* Release list reference. */
121 lttng_thread_put(thread);
122 pthread_mutex_unlock(&thread_list.lock);
123 /* Release initial reference. */
124 lttng_thread_put(thread);
125 error_alloc:
126 return NULL;
127 }
128
129 bool lttng_thread_get(struct lttng_thread *thread)
130 {
131 return urcu_ref_get_unless_zero(&thread->ref);
132 }
133
134 void lttng_thread_put(struct lttng_thread *thread)
135 {
136 if (!thread) {
137 return;
138 }
139 LTTNG_ASSERT(thread->ref.refcount);
140 urcu_ref_put(&thread->ref, lttng_thread_release);
141 }
142
143 const char *lttng_thread_get_name(const struct lttng_thread *thread)
144 {
145 return thread->name;
146 }
147
148 static
149 bool _lttng_thread_shutdown(struct lttng_thread *thread)
150 {
151 int ret;
152 void *status;
153 bool result = true;
154
155 DBG("Shutting down \"%s\" thread", thread->name);
156 if (thread->shutdown) {
157 result = thread->shutdown(thread->data);
158 if (!result) {
159 result = false;
160 goto end;
161 }
162 }
163
164 ret = pthread_join(thread->thread, &status);
165 if (ret) {
166 PERROR("Failed to join \"%s\" thread", thread->name);
167 result = false;
168 goto end;
169 }
170 DBG("Joined thread \"%s\"", thread->name);
171 end:
172 return result;
173 }
174
175 bool lttng_thread_shutdown(struct lttng_thread *thread)
176 {
177 const bool result = _lttng_thread_shutdown(thread);
178
179 if (result) {
180 /* Release the list's reference to the thread. */
181 pthread_mutex_lock(&thread_list.lock);
182 cds_list_del(&thread->node);
183 lttng_thread_put(thread);
184 pthread_mutex_unlock(&thread_list.lock);
185 }
186 return result;
187 }
188
189 void lttng_thread_list_shutdown_orphans(void)
190 {
191 struct lttng_thread *thread, *tmp;
192
193 pthread_mutex_lock(&thread_list.lock);
194 cds_list_for_each_entry_safe(thread, tmp, &thread_list.head, node) {
195 bool result;
196 const long ref = uatomic_read(&thread->ref.refcount);
197
198 if (ref != 1) {
199 /*
200 * Other external references to the thread exist, skip.
201 */
202 continue;
203 }
204
205 result = _lttng_thread_shutdown(thread);
206 if (!result) {
207 ERR("Failed to shutdown thread \"%s\"", thread->name);
208 }
209 }
210 pthread_mutex_unlock(&thread_list.lock);
211 }
This page took 0.034401 seconds and 4 git commands to generate.