Perform factor reduction as a modular step
[lttv.git] / lttv / lttv / sync / sync_chain_lttv.c
1 /* This file is part of the Linux Trace Toolkit viewer
2 * Copyright (C) 2009, 2010 Benjamin Poirier <benjamin.poirier@polymtl.ca>
3 *
4 * This program is free software: you can redistribute it and/or modify it
5 * under the terms of the GNU Lesser General Public License as published by
6 * the Free Software Foundation, either version 2.1 of the License, or (at
7 * your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful, but WITHOUT
10 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
11 * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
12 * License for more details.
13 *
14 * You should have received a copy of the GNU Lesser General Public License
15 * along with this program. If not, see <http://www.gnu.org/licenses/>.
16 */
17
18 #define _ISOC99_SOURCE
19
20 #ifdef HAVE_CONFIG_H
21 #include <config.h>
22 #endif
23
24 #include <errno.h>
25 #include <fcntl.h>
26 #include <math.h>
27 #include <stdio.h>
28 #include <stdlib.h>
29 #include <sys/resource.h>
30 #include <sys/stat.h>
31 #include <sys/types.h>
32 #include <sys/stat.h>
33 #include <unistd.h>
34
35 #include <lttv/module.h>
36 #include <lttv/option.h>
37
38
39 #include "event_processing_lttng_standard.h"
40 #include "event_processing_lttng_null.h"
41 #include "event_matching_tcp.h"
42 #include "event_matching_broadcast.h"
43 #include "event_matching_distributor.h"
44 #include "event_analysis_chull.h"
45 #include "event_analysis_linreg.h"
46 #include "event_analysis_eval.h"
47 #include "factor_reduction_accuracy.h"
48 #include "sync_chain.h"
49 #include "sync_chain_lttv.h"
50
51
52 static void init();
53 static void destroy();
54
55 static void gfAddModuleOption(gpointer data, gpointer user_data);
56 static void gfRemoveModuleOption(gpointer data, gpointer user_data);
57
58 static ModuleOption optionSync= {
59 .longName= "sync",
60 .hasArg= NO_ARG,
61 .optionHelp= "synchronize the time between the traces",
62 };
63 static ModuleOption optionSyncStats= {
64 .longName= "sync-stats",
65 .hasArg= NO_ARG,
66 .optionHelp= "print statistics about the time synchronization",
67 };
68 static ModuleOption optionSyncNull= {
69 .longName= "sync-null",
70 .hasArg= NO_ARG,
71 .optionHelp= "read the events but do not perform any processing",
72 };
73 static GString* analysisModulesNames;
74 static ModuleOption optionSyncAnalysis= {
75 .longName= "sync-analysis",
76 .hasArg= REQUIRED_ARG,
77 .optionHelp= "specify the algorithm to use for event analysis",
78 };
79 static GString* reductionModulesNames;
80 static ModuleOption optionSyncReduction= {
81 .longName= "sync-reduction",
82 .hasArg= REQUIRED_ARG,
83 .optionHelp= "specify the algorithm to use for factor reduction",
84 };
85 static ModuleOption optionSyncGraphs= {
86 .longName= "sync-graphs",
87 .hasArg= NO_ARG,
88 .optionHelp= "output gnuplot graph showing synchronization points",
89 };
90 static char graphsDir[20];
91 static ModuleOption optionSyncGraphsDir= {
92 .longName= "sync-graphs-dir",
93 .hasArg= REQUIRED_ARG,
94 .optionHelp= "specify the directory where to store the graphs",
95 };
96
97
98 /*
99 * Module init function
100 *
101 * This function is declared to be the module initialization function.
102 */
103 static void init()
104 {
105 int retval;
106 unsigned int i;
107 const struct
108 {
109 GQueue* modules;
110 ModuleOption* option;
111 size_t nameOffset;
112 GString** names;
113 void (*gfAppendName)(gpointer data, gpointer user_data);
114 } loopValues[]= {
115 {&analysisModules, &optionSyncAnalysis, offsetof(AnalysisModule,
116 name), &analysisModulesNames, &gfAppendAnalysisName},
117 {&reductionModules, &optionSyncReduction, offsetof(ReductionModule,
118 name), &reductionModulesNames, &gfAppendReductionName},
119 };
120
121 g_debug("Sync init");
122
123 /*
124 * Initialize event modules
125 * Call the "constructor" or initialization function of each event module
126 * so it can register itself. This must be done before elements in
127 * processingModules, matchingModules, analysisModules or moduleOptions
128 * are accessed.
129 */
130 registerProcessingLTTVStandard();
131 registerProcessingLTTVNull();
132
133 registerMatchingTCP();
134 registerMatchingBroadcast();
135 registerMatchingDistributor();
136
137 registerAnalysisCHull();
138 registerAnalysisLinReg();
139 registerAnalysisEval();
140
141 registerReductionAccuracy();
142
143 // Build module names lists for option and help string
144 for (i= 0; i < ARRAY_SIZE(loopValues); i++)
145 {
146 g_assert(g_queue_get_length(loopValues[i].modules) > 0);
147 loopValues[i].option->arg= (char*)(*(void**)
148 g_queue_peek_head(loopValues[i].modules) +
149 loopValues[i].nameOffset);
150 *loopValues[i].names= g_string_new("");
151 g_queue_foreach(loopValues[i].modules, loopValues[i].gfAppendName,
152 *loopValues[i].names);
153 // remove the last ", "
154 g_string_truncate(*loopValues[i].names, (*loopValues[i].names)->len -
155 2);
156 loopValues[i].option->argHelp= (*loopValues[i].names)->str;
157 }
158
159 retval= snprintf(graphsDir, sizeof(graphsDir), "graphs-%d", getpid());
160 if (retval > sizeof(graphsDir) - 1)
161 {
162 graphsDir[sizeof(graphsDir) - 1]= '\0';
163 }
164 optionSyncGraphsDir.arg= graphsDir;
165 optionSyncGraphsDir.argHelp= graphsDir;
166
167 g_queue_push_head(&moduleOptions, &optionSyncGraphsDir);
168 g_queue_push_head(&moduleOptions, &optionSyncGraphs);
169 g_queue_push_head(&moduleOptions, &optionSyncReduction);
170 g_queue_push_head(&moduleOptions, &optionSyncAnalysis);
171 g_queue_push_head(&moduleOptions, &optionSyncNull);
172 g_queue_push_head(&moduleOptions, &optionSyncStats);
173 g_queue_push_head(&moduleOptions, &optionSync);
174
175 g_queue_foreach(&moduleOptions, &gfAddModuleOption, NULL);
176 }
177
178
179 /*
180 * Module unload function
181 */
182 static void destroy()
183 {
184 g_debug("Sync destroy");
185
186 g_queue_foreach(&moduleOptions, &gfRemoveModuleOption, NULL);
187 g_string_free(analysisModulesNames, TRUE);
188 g_string_free(reductionModulesNames, TRUE);
189
190 g_queue_clear(&processingModules);
191 g_queue_clear(&matchingModules);
192 g_queue_clear(&analysisModules);
193 g_queue_clear(&reductionModules);
194 g_queue_clear(&moduleOptions);
195 }
196
197
198 /*
199 * Calculate a traceset's drift and offset values based on network events
200 *
201 * The individual correction factors are written out to each trace.
202 *
203 * Args:
204 * traceSetContext: traceset
205 *
206 * Returns:
207 * false if synchronization was not performed, true otherwise
208 */
209 bool syncTraceset(LttvTracesetContext* const traceSetContext)
210 {
211 SyncState* syncState;
212 struct timeval startTime, endTime;
213 struct rusage startUsage, endUsage;
214 GList* result;
215 unsigned int i;
216 AllFactors* allFactors;
217 GArray* factors;
218 double minOffset, minDrift;
219 unsigned int refFreqTrace;
220 int retval;
221
222 if (!optionSync.present)
223 {
224 g_debug("Not synchronizing traceset because option is disabled");
225 return false;
226 }
227
228 if (optionSyncStats.present)
229 {
230 gettimeofday(&startTime, 0);
231 getrusage(RUSAGE_SELF, &startUsage);
232 }
233
234 // Initialize data structures
235 syncState= malloc(sizeof(SyncState));
236
237 if (optionSyncStats.present)
238 {
239 syncState->stats= true;
240 }
241 else
242 {
243 syncState->stats= false;
244 }
245
246 if (!optionSyncNull.present && optionSyncGraphs.present)
247 {
248 // Create the graph directory right away in case the module initialization
249 // functions have something to write in it.
250 syncState->graphsDir= optionSyncGraphsDir.arg;
251 syncState->graphsStream= createGraphsDir(syncState->graphsDir);
252 }
253 else
254 {
255 syncState->graphsStream= NULL;
256 syncState->graphsDir= NULL;
257 }
258
259 // Identify and initialize modules
260 syncState->processingData= NULL;
261 if (optionSyncNull.present)
262 {
263 result= g_queue_find_custom(&processingModules, "LTTV-null",
264 &gcfCompareProcessing);
265 }
266 else
267 {
268 result= g_queue_find_custom(&processingModules, "LTTV-standard",
269 &gcfCompareProcessing);
270 }
271 g_assert(result != NULL);
272 syncState->processingModule= (ProcessingModule*) result->data;
273
274 syncState->matchingData= NULL;
275 result= g_queue_find_custom(&matchingModules, "TCP", &gcfCompareMatching);
276 g_assert(result != NULL);
277 syncState->matchingModule= (MatchingModule*) result->data;
278
279 syncState->analysisData= NULL;
280 result= g_queue_find_custom(&analysisModules, optionSyncAnalysis.arg,
281 &gcfCompareAnalysis);
282 if (result != NULL)
283 {
284 syncState->analysisModule= (AnalysisModule*) result->data;
285 }
286 else
287 {
288 g_error("Analysis module '%s' not found", optionSyncAnalysis.arg);
289 }
290
291 syncState->reductionData= NULL;
292 result= g_queue_find_custom(&reductionModules, optionSyncReduction.arg,
293 &gcfCompareReduction);
294 if (result != NULL)
295 {
296 syncState->reductionModule= (ReductionModule*) result->data;
297 }
298 else
299 {
300 g_error("Reduction module '%s' not found", optionSyncReduction.arg);
301 }
302
303 syncState->processingModule->initProcessing(syncState, traceSetContext);
304 if (!optionSyncNull.present)
305 {
306 syncState->matchingModule->initMatching(syncState);
307 syncState->analysisModule->initAnalysis(syncState);
308 syncState->reductionModule->initReduction(syncState);
309 }
310
311 // Process traceset
312 lttv_process_traceset_seek_time(traceSetContext, ltt_time_zero);
313 lttv_process_traceset_middle(traceSetContext, ltt_time_infinite,
314 G_MAXULONG, NULL);
315 lttv_process_traceset_seek_time(traceSetContext, ltt_time_zero);
316
317 // Obtain, reduce, adjust and set correction factors
318 allFactors= syncState->processingModule->finalizeProcessing(syncState);
319 factors= syncState->reductionModule->finalizeReduction(syncState,
320 allFactors);
321 freeAllFactors(allFactors, syncState->traceNb);
322
323 /* The offsets are adjusted so the lowest one is 0. This is done because
324 * of a Lttv specific limitation: events cannot have negative times. By
325 * having non-negative offsets, events cannot be moved backwards to
326 * negative times.
327 */
328 minOffset= 0;
329 for (i= 0; i < syncState->traceNb; i++)
330 {
331 minOffset= MIN(g_array_index(factors, Factors, i).offset, minOffset);
332 }
333
334 for (i= 0; i < syncState->traceNb; i++)
335 {
336 g_array_index(factors, Factors, i).offset-= minOffset;
337 }
338
339 /* Because the timestamps are corrected at the TSC level (not at the
340 * LttTime level) all trace frequencies must be made equal. We use the
341 * frequency of the system with the lowest drift
342 */
343 minDrift= INFINITY;
344 refFreqTrace= 0;
345 for (i= 0; i < syncState->traceNb; i++)
346 {
347 if (g_array_index(factors, Factors, i).drift < minDrift)
348 {
349 minDrift= g_array_index(factors, Factors, i).drift;
350 refFreqTrace= i;
351 }
352 }
353 g_assert(syncState->traceNb == 0 || minDrift != INFINITY);
354
355 // Write the factors to the LttTrace structures
356 for (i= 0; i < syncState->traceNb; i++)
357 {
358 LttTrace* t;
359 Factors* traceFactors;
360
361 t= traceSetContext->traces[i]->t;
362 traceFactors= &g_array_index(factors, Factors, i);
363
364 t->drift= traceFactors->drift;
365 t->offset= traceFactors->offset;
366 t->start_freq= traceSetContext->traces[refFreqTrace]->t->start_freq;
367 t->freq_scale= traceSetContext->traces[refFreqTrace]->t->freq_scale;
368 t->start_time_from_tsc =
369 ltt_time_from_uint64(tsc_to_uint64(t->freq_scale, t->start_freq,
370 t->drift * t->start_tsc + t->offset));
371 }
372
373 g_array_free(factors, TRUE);
374
375 lttv_traceset_context_compute_time_span(traceSetContext,
376 &traceSetContext->time_span);
377
378 g_debug("traceset start %ld.%09ld end %ld.%09ld",
379 traceSetContext->time_span.start_time.tv_sec,
380 traceSetContext->time_span.start_time.tv_nsec,
381 traceSetContext->time_span.end_time.tv_sec,
382 traceSetContext->time_span.end_time.tv_nsec);
383
384 // Write graphs file
385 if (!optionSyncNull.present && optionSyncGraphs.present)
386 {
387 writeGraphsScript(syncState);
388
389 if (fclose(syncState->graphsStream) != 0)
390 {
391 g_error(strerror(errno));
392 }
393 }
394
395 if (!optionSyncNull.present && optionSyncStats.present)
396 {
397 printStats(syncState);
398
399 printf("Resulting synchronization factors:\n");
400 for (i= 0; i < syncState->traceNb; i++)
401 {
402 LttTrace* t;
403
404 t= traceSetContext->traces[i]->t;
405
406 printf("\ttrace %u drift= %g offset= %g (%f) start time= %ld.%09ld\n",
407 i, t->drift, t->offset, (double) tsc_to_uint64(t->freq_scale,
408 t->start_freq, t->offset) / NANOSECONDS_PER_SECOND,
409 t->start_time_from_tsc.tv_sec,
410 t->start_time_from_tsc.tv_nsec);
411 }
412 }
413
414 syncState->processingModule->destroyProcessing(syncState);
415 if (syncState->matchingModule != NULL)
416 {
417 syncState->matchingModule->destroyMatching(syncState);
418 }
419 if (syncState->analysisModule != NULL)
420 {
421 syncState->analysisModule->destroyAnalysis(syncState);
422 }
423 if (syncState->reductionModule != NULL)
424 {
425 syncState->reductionModule->destroyReduction(syncState);
426 }
427
428 free(syncState);
429
430 if (optionSyncStats.present)
431 {
432 gettimeofday(&endTime, 0);
433 retval= getrusage(RUSAGE_SELF, &endUsage);
434
435 timeDiff(&endTime, &startTime);
436 timeDiff(&endUsage.ru_utime, &startUsage.ru_utime);
437 timeDiff(&endUsage.ru_stime, &startUsage.ru_stime);
438
439 printf("Synchronization time:\n");
440 printf("\treal time: %ld.%06ld\n", endTime.tv_sec, endTime.tv_usec);
441 printf("\tuser time: %ld.%06ld\n", endUsage.ru_utime.tv_sec,
442 endUsage.ru_utime.tv_usec);
443 printf("\tsystem time: %ld.%06ld\n", endUsage.ru_stime.tv_sec,
444 endUsage.ru_stime.tv_usec);
445 }
446
447 return true;
448 }
449
450
451 /*
452 * A GFunc for g_queue_foreach()
453 *
454 * Args:
455 * data: ModuleOption*
456 * user_data: NULL
457 */
458 static void gfAddModuleOption(gpointer data, gpointer user_data)
459 {
460 ModuleOption* option= data;
461 LttvOptionType conversion[]= {
462 [NO_ARG]= LTTV_OPT_NONE,
463 [OPTIONAL_ARG]= LTTV_OPT_NONE,
464 [REQUIRED_ARG]= LTTV_OPT_STRING,
465 };
466 size_t fieldOffset[]= {
467 [NO_ARG]= offsetof(ModuleOption, present),
468 [REQUIRED_ARG]= offsetof(ModuleOption, arg),
469 };
470 static const char* argHelpNone= "none";
471
472 g_assert_cmpuint(sizeof(conversion) / sizeof(*conversion), ==,
473 HAS_ARG_COUNT);
474 if (option->hasArg == OPTIONAL_ARG)
475 {
476 g_warning("Parameters with optional arguments not supported by the "
477 "lttv option scheme, parameter '%s' will not be available",
478 option->longName);
479 }
480 else
481 {
482 lttv_option_add(option->longName, '\0', option->optionHelp,
483 option->argHelp ? option->argHelp : argHelpNone,
484 conversion[option->hasArg], (void*) option + fieldOffset[option->hasArg],
485 NULL, NULL);
486 }
487 }
488
489
490 /*
491 * A GFunc for g_queue_foreach()
492 *
493 * Args:
494 * data: ModuleOption*
495 * user_data: NULL
496 */
497 static void gfRemoveModuleOption(gpointer data, gpointer user_data)
498 {
499 lttv_option_remove(((ModuleOption*) data)->longName);
500 }
501
502
503 LTTV_MODULE("sync", "Synchronize traces", \
504 "Synchronizes a traceset based on the correspondance of network events", \
505 init, destroy, "option")
This page took 0.046584 seconds and 4 git commands to generate.