Add a module to distribute messages to many analysis modules
[lttv.git] / lttv / lttv / sync / event_analysis_eval.c
CommitLineData
cdce23b3
BP
1/* This file is part of the Linux Trace Toolkit viewer
2 * Copyright (C) 2009 Benjamin Poirier <benjamin.poirier@polymtl.ca>
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License Version 2 as
6 * published by the Free Software Foundation;
7 *
8 * This program is distributed in the hope that it will be useful,
9 * but WITHOUT ANY WARRANTY; without even the implied warranty of
10 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 * GNU General Public License for more details.
12 *
13 * You should have received a copy of the GNU General Public License
14 * along with this program; if not, write to the Free Software
15 * Foundation, Inc., 59 Temple Place - Suite 330, Boston,
16 * MA 02111-1307, USA.
17 */
18
2bd4b3e4 19#define _GNU_SOURCE
d4721e1a 20#define _ISOC99_SOURCE
2bd4b3e4 21
cdce23b3
BP
22#ifdef HAVE_CONFIG_H
23#include <config.h>
24#endif
25
2bd4b3e4
BP
26#include <arpa/inet.h>
27#include <errno.h>
76be6fc2 28#include <math.h>
2bd4b3e4
BP
29#include <netinet/in.h>
30#include <stddef.h>
cdce23b3 31#include <stdlib.h>
2bd4b3e4
BP
32#include <stdio.h>
33#include <string.h>
34#include <sys/socket.h>
cdce23b3 35
2bd4b3e4
BP
36#include "lookup3.h"
37#include "sync_chain.h"
cdce23b3
BP
38
39#include "event_analysis_eval.h"
40
41
42// Functions common to all analysis modules
43static void initAnalysisEval(SyncState* const syncState);
44static void destroyAnalysisEval(SyncState* const syncState);
45
46static void analyzeMessageEval(SyncState* const syncState, Message* const
47 message);
48static void analyzeExchangeEval(SyncState* const syncState, Exchange* const
49 exchange);
50static void analyzeBroadcastEval(SyncState* const syncState, Broadcast* const
51 broadcast);
52static GArray* finalizeAnalysisEval(SyncState* const syncState);
53static void printAnalysisStatsEval(SyncState* const syncState);
54
55// Functions specific to this module
56static void registerAnalysisEval() __attribute__((constructor (102)));
2bd4b3e4
BP
57static guint ghfRttKeyHash(gconstpointer key);
58static gboolean gefRttKeyEqual(gconstpointer a, gconstpointer b);
59static void gdnDestroyRttKey(gpointer data);
60static void gdnDestroyDouble(gpointer data);
61static void readRttInfo(GHashTable* rttInfo, FILE* rttFile);
62static void positionStream(FILE* stream);
cdce23b3 63
76be6fc2
BP
64static void gfSum(gpointer data, gpointer userData);
65static void gfSumSquares(gpointer data, gpointer userData);
d4721e1a 66static void ghfPrintExchangeRtt(gpointer key, gpointer value, gpointer user_data);
76be6fc2 67
cdce23b3
BP
68
69static AnalysisModule analysisModuleEval= {
70 .name= "eval",
71 .initAnalysis= &initAnalysisEval,
72 .destroyAnalysis= &destroyAnalysisEval,
73 .analyzeMessage= &analyzeMessageEval,
74 .analyzeExchange= &analyzeExchangeEval,
75 .analyzeBroadcast= &analyzeBroadcastEval,
76 .finalizeAnalysis= &finalizeAnalysisEval,
77 .printAnalysisStats= &printAnalysisStatsEval,
78 .writeAnalysisGraphsPlots= NULL,
79 .writeAnalysisGraphsOptions= NULL,
80};
81
2bd4b3e4
BP
82static ModuleOption optionEvalRttFile= {
83 .longName= "eval-rtt-file",
84 .hasArg= REQUIRED_ARG,
85 {.arg= NULL},
86 .optionHelp= "specify the file containing rtt information",
87 .argHelp= "FILE",
88};
89
cdce23b3
BP
90
91/*
92 * Analysis module registering function
93 */
94static void registerAnalysisEval()
95{
96 g_queue_push_tail(&analysisModules, &analysisModuleEval);
2bd4b3e4 97 g_queue_push_tail(&moduleOptions, &optionEvalRttFile);
cdce23b3
BP
98}
99
100
101/*
102 * Analysis init function
103 *
104 * This function is called at the beginning of a synchronization run for a set
105 * of traces.
106 *
107 * Args:
108 * syncState container for synchronization data.
109 */
110static void initAnalysisEval(SyncState* const syncState)
111{
112 AnalysisDataEval* analysisData;
113 unsigned int i;
114
115 analysisData= malloc(sizeof(AnalysisDataEval));
116 syncState->analysisData= analysisData;
117
2bd4b3e4
BP
118 analysisData->rttInfo= g_hash_table_new_full(&ghfRttKeyHash,
119 &gefRttKeyEqual, &gdnDestroyRttKey, &gdnDestroyDouble);
120 if (optionEvalRttFile.arg)
121 {
122 FILE* rttStream;
123 int retval;
124
125 rttStream= fopen(optionEvalRttFile.arg, "r");
126 if (rttStream == NULL)
127 {
128 g_error(strerror(errno));
129 }
130
131 readRttInfo(analysisData->rttInfo, rttStream);
132
133 retval= fclose(rttStream);
134 if (retval == EOF)
135 {
136 g_error(strerror(errno));
137 }
138 }
cdce23b3
BP
139
140 if (syncState->stats)
141 {
76be6fc2 142 analysisData->stats= calloc(1, sizeof(AnalysisStatsEval));
cdce23b3
BP
143 analysisData->stats->broadcastDiffSum= 0.;
144
76be6fc2
BP
145 analysisData->stats->messageStats= malloc(syncState->traceNb *
146 sizeof(MessageStats*));
cdce23b3
BP
147 for (i= 0; i < syncState->traceNb; i++)
148 {
76be6fc2
BP
149 analysisData->stats->messageStats[i]= calloc(syncState->traceNb,
150 sizeof(MessageStats));
cdce23b3 151 }
d4721e1a
BP
152
153 analysisData->stats->exchangeRtt=
154 g_hash_table_new_full(&ghfRttKeyHash, &gefRttKeyEqual,
155 &gdnDestroyRttKey, &gdnDestroyDouble);
cdce23b3
BP
156 }
157}
158
159
160/*
161 * Analysis destroy function
162 *
163 * Free the analysis specific data structures
164 *
165 * Args:
166 * syncState container for synchronization data.
167 */
168static void destroyAnalysisEval(SyncState* const syncState)
169{
170 unsigned int i;
171 AnalysisDataEval* analysisData;
172
173 analysisData= (AnalysisDataEval*) syncState->analysisData;
174
175 if (analysisData == NULL || analysisData->rttInfo == NULL)
176 {
177 return;
178 }
179
2bd4b3e4 180 g_hash_table_destroy(analysisData->rttInfo);
cdce23b3
BP
181 analysisData->rttInfo= NULL;
182
183 if (syncState->stats)
184 {
185 for (i= 0; i < syncState->traceNb; i++)
186 {
76be6fc2 187 free(analysisData->stats->messageStats[i]);
cdce23b3 188 }
76be6fc2 189 free(analysisData->stats->messageStats);
d4721e1a
BP
190
191 g_hash_table_destroy(analysisData->stats->exchangeRtt);
192
cdce23b3
BP
193 free(analysisData->stats);
194 }
195
196 free(syncState->analysisData);
197 syncState->analysisData= NULL;
198}
199
200
201/*
202 * Perform analysis on an event pair.
203 *
76be6fc2
BP
204 * Check if there is message inversion or messages that are too fast.
205 *
cdce23b3
BP
206 * Args:
207 * syncState container for synchronization data
208 * message structure containing the events
209 */
210static void analyzeMessageEval(SyncState* const syncState, Message* const message)
211{
212 AnalysisDataEval* analysisData;
76be6fc2 213 MessageStats* messageStats;
d4721e1a 214 double* rtt;
76be6fc2
BP
215 double tt;
216 struct RttKey rttKey;
217
218 if (!syncState->stats)
219 {
220 return;
221 }
cdce23b3
BP
222
223 analysisData= (AnalysisDataEval*) syncState->analysisData;
76be6fc2
BP
224 messageStats=
225 &analysisData->stats->messageStats[message->outE->traceNum][message->inE->traceNum];
226
227 messageStats->total++;
228
229 tt= wallTimeSub(&message->inE->wallTime, &message->outE->wallTime);
230 if (tt <= 0)
231 {
232 messageStats->inversionNb++;
233 }
234
d4721e1a
BP
235 g_assert(message->inE->type == TCP);
236 rttKey.saddr=
237 message->inE->event.tcpEvent->segmentKey->connectionKey.saddr;
238 rttKey.daddr=
239 message->inE->event.tcpEvent->segmentKey->connectionKey.daddr;
240 rtt= g_hash_table_lookup(analysisData->rttInfo, &rttKey);
241 g_debug("rttInfo, looking up (%u, %u)->(%f)", rttKey.saddr,
242 rttKey.daddr, rtt ? *rtt : NAN);
76be6fc2 243
d4721e1a 244 if (rtt)
76be6fc2 245 {
d4721e1a
BP
246 g_debug("rttInfo, tt: %f rtt / 2: %f", tt, *rtt / 2.);
247 if (tt < *rtt / 2.)
76be6fc2
BP
248 {
249 messageStats->tooFastNb++;
250 }
251 }
252 else
253 {
254 messageStats->noRTTInfoNb++;
255 }
cdce23b3
BP
256}
257
258
259/*
260 * Perform analysis on multiple messages
261 *
76be6fc2
BP
262 * Measure the RTT
263 *
cdce23b3
BP
264 * Args:
265 * syncState container for synchronization data
266 * exchange structure containing the messages
267 */
268static void analyzeExchangeEval(SyncState* const syncState, Exchange* const exchange)
269{
d4721e1a
BP
270 AnalysisDataEval* analysisData= syncState->analysisData;
271 Message* m1= g_queue_peek_tail(exchange->acks);
272 Message* m2= exchange->message;
273 struct RttKey* rttKey;
274 double* rtt, * exchangeRtt;
cdce23b3 275
d4721e1a
BP
276 if (!syncState->stats)
277 {
278 return;
279 }
280
281 // (T2 - T1) - (T3 - T4)
282 rtt= malloc(sizeof(double));
283 *rtt= wallTimeSub(&m1->inE->wallTime, &m1->outE->wallTime) -
284 wallTimeSub(&m2->outE->wallTime, &m2->inE->wallTime);
285
286 g_assert(m1->inE->type == TCP);
287 rttKey= malloc(sizeof(struct RttKey));
288 rttKey->saddr=
289 MIN(m1->inE->event.tcpEvent->segmentKey->connectionKey.saddr,
290 m1->inE->event.tcpEvent->segmentKey->connectionKey.daddr);
291 rttKey->daddr=
292 MAX(m1->inE->event.tcpEvent->segmentKey->connectionKey.saddr,
293 m1->inE->event.tcpEvent->segmentKey->connectionKey.daddr);
294 exchangeRtt= g_hash_table_lookup(analysisData->stats->exchangeRtt,
295 rttKey);
296
297 if (exchangeRtt)
298 {
299 if (*rtt < *exchangeRtt)
300 {
301 g_hash_table_replace(analysisData->stats->exchangeRtt, rttKey, rtt);
302 }
303 }
304 else
305 {
306 g_hash_table_insert(analysisData->stats->exchangeRtt, rttKey, rtt);
307 }
cdce23b3
BP
308}
309
310
311/*
312 * Perform analysis on muliple events
313 *
76be6fc2
BP
314 * Sum the broadcast differential delays
315 *
cdce23b3
BP
316 * Args:
317 * syncState container for synchronization data
318 * broadcast structure containing the events
319 */
320static void analyzeBroadcastEval(SyncState* const syncState, Broadcast* const broadcast)
321{
322 AnalysisDataEval* analysisData;
76be6fc2
BP
323 double sum= 0, squaresSum= 0;
324 double y;
325
326 if (!syncState->stats)
327 {
328 return;
329 }
cdce23b3
BP
330
331 analysisData= (AnalysisDataEval*) syncState->analysisData;
76be6fc2
BP
332
333 g_queue_foreach(broadcast->events, &gfSum, &sum);
334 g_queue_foreach(broadcast->events, &gfSumSquares, &squaresSum);
335
336 analysisData->stats->broadcastNb++;
337 // Because of numerical errors, this can at times be < 0
338 y= squaresSum / g_queue_get_length(broadcast->events) - pow(sum /
339 g_queue_get_length(broadcast->events), 2.);
340 if (y > 0)
341 {
342 analysisData->stats->broadcastDiffSum+= sqrt(y);
343 }
cdce23b3
BP
344}
345
346
347/*
348 * Finalize the factor calculations
349 *
350 * Since this module does not really calculate factors, identity factors are
351 * returned.
352 *
353 * Args:
354 * syncState container for synchronization data.
355 *
356 * Returns:
d4721e1a 357 * Factors[traceNb] identity factors for each trace
cdce23b3
BP
358 */
359static GArray* finalizeAnalysisEval(SyncState* const syncState)
360{
361 GArray* factors;
362 unsigned int i;
363
364 factors= g_array_sized_new(FALSE, FALSE, sizeof(Factors),
365 syncState->traceNb);
366 g_array_set_size(factors, syncState->traceNb);
367 for (i= 0; i < syncState->traceNb; i++)
368 {
369 Factors* e;
370
371 e= &g_array_index(factors, Factors, i);
372 e->drift= 1.;
373 e->offset= 0.;
374 }
375
376 return factors;
377}
378
379
380/*
381 * Print statistics related to analysis. Must be called after
382 * finalizeAnalysis.
383 *
384 * Args:
385 * syncState container for synchronization data.
386 */
387static void printAnalysisStatsEval(SyncState* const syncState)
388{
389 AnalysisDataEval* analysisData;
390 unsigned int i, j;
391
392 if (!syncState->stats)
393 {
394 return;
395 }
396
397 analysisData= (AnalysisDataEval*) syncState->analysisData;
398
399 printf("Synchronization evaluation analysis stats:\n");
400 printf("\tsum of broadcast differential delays: %g\n",
401 analysisData->stats->broadcastDiffSum);
76be6fc2
BP
402 printf("\taverage broadcast differential delays: %g\n",
403 analysisData->stats->broadcastDiffSum /
404 analysisData->stats->broadcastNb);
cdce23b3
BP
405
406 printf("\tIndividual evaluation:\n"
76be6fc2 407 "\t\tTrace pair Inversions Too fast (No RTT info) Total\n");
cdce23b3
BP
408
409 for (i= 0; i < syncState->traceNb; i++)
410 {
411 for (j= i + 1; j < syncState->traceNb; j++)
412 {
76be6fc2
BP
413 MessageStats* messageStats;
414 const char* format= "\t\t%3d - %-3d %-10u %-10u %-10u %u\n";
cdce23b3 415
76be6fc2 416 messageStats= &analysisData->stats->messageStats[i][j];
cdce23b3 417
76be6fc2
BP
418 printf(format, i, j, messageStats->inversionNb, messageStats->tooFastNb,
419 messageStats->noRTTInfoNb, messageStats->total);
cdce23b3 420
76be6fc2 421 messageStats= &analysisData->stats->messageStats[j][i];
cdce23b3 422
76be6fc2
BP
423 printf(format, j, i, messageStats->inversionNb, messageStats->tooFastNb,
424 messageStats->noRTTInfoNb, messageStats->total);
cdce23b3
BP
425 }
426 }
d4721e1a
BP
427
428 printf("\tRound-trip times:\n"
429 "\t\tHost pair RTT from exchanges RTTs from file (ms)\n");
430 g_hash_table_foreach(analysisData->stats->exchangeRtt,
431 &ghfPrintExchangeRtt, analysisData->rttInfo);
432}
433
434
435/*
436 * A GHFunc for g_hash_table_foreach()
437 *
438 * Args:
439 * key: RttKey* where saddr < daddr
440 * value: double*, RTT estimated from exchanges
441 * user_data GHashTable* rttInfo
442 */
443static void ghfPrintExchangeRtt(gpointer key, gpointer value, gpointer user_data)
444{
445 char addr1[16], addr2[16];
446 struct RttKey* rttKey1= key;
447 struct RttKey rttKey2= {rttKey1->daddr, rttKey1->saddr};
448 double* fileRtt1, *fileRtt2;
449 GHashTable* rttInfo= user_data;
450
451 convertIP(addr1, rttKey1->saddr);
452 convertIP(addr2, rttKey1->daddr);
453
454 fileRtt1= g_hash_table_lookup(rttInfo, rttKey1);
455 fileRtt2= g_hash_table_lookup(rttInfo, &rttKey2);
456
457 printf("\t\t(%15s, %-15s) %-18.3f ", addr1, addr2, *(double*) value * 1e3);
458
459 if (fileRtt1 || fileRtt2)
460 {
461 if (fileRtt1)
462 {
463 printf("%.3f", *fileRtt1 * 1e3);
464 }
465 if (fileRtt1 && fileRtt2)
466 {
467 printf(", ");
468 }
469 if (fileRtt2)
470 {
471 printf("%.3f", *fileRtt2 * 1e3);
472 }
473 }
474 else
475 {
476 printf("-");
477 }
478 printf("\n");
cdce23b3 479}
2bd4b3e4
BP
480
481
482/*
483 * A GHashFunc for g_hash_table_new()
484 *
485 * Args:
486 * key struct RttKey*
487 */
488static guint ghfRttKeyHash(gconstpointer key)
489{
490 struct RttKey* rttKey;
491 uint32_t a, b, c;
492
493 rttKey= (struct RttKey*) key;
494
495 a= rttKey->saddr;
496 b= rttKey->daddr;
497 c= 0;
498 final(a, b, c);
499
500 return c;
501}
502
503
504/*
505 * A GDestroyNotify function for g_hash_table_new_full()
506 *
507 * Args:
508 * data: struct RttKey*
509 */
510static void gdnDestroyRttKey(gpointer data)
511{
512 free(data);
513}
514
515
516/*
517 * A GDestroyNotify function for g_hash_table_new_full()
518 *
519 * Args:
520 * data: double*
521 */
522static void gdnDestroyDouble(gpointer data)
523{
524 free(data);
525}
526
527
528/*
529 * A GEqualFunc for g_hash_table_new()
530 *
531 * Args:
532 * a, b RttKey*
533 *
534 * Returns:
535 * TRUE if both values are equal
536 */
537static gboolean gefRttKeyEqual(gconstpointer a, gconstpointer b)
538{
539 const struct RttKey* rkA, * rkB;
540
541 rkA= (struct RttKey*) a;
542 rkB= (struct RttKey*) b;
543
544 if (rkA->saddr == rkB->saddr && rkA->daddr == rkB->daddr)
545 {
546 return TRUE;
547 }
548 else
549 {
550 return FALSE;
551 }
552}
553
554
555/*
556 * Read a file contain minimum round trip time values and fill an array with
557 * them. The file is formatted as such:
558 * <host1 IP> <host2 IP> <RTT in milliseconds>
559 * ip's should be in dotted quad format
560 *
561 * Args:
562 * rttInfo: double* rttInfo[RttKey], empty table, will be filled
563 * rttStream: stream from which to read
564 */
565static void readRttInfo(GHashTable* rttInfo, FILE* rttStream)
566{
567 char* line= NULL;
568 size_t len;
569 int retval;
570
571 positionStream(rttStream);
572 retval= getline(&line, &len, rttStream);
573 while(!feof(rttStream))
574 {
575 struct RttKey* rttKey;
576 char saddrDQ[20], daddrDQ[20];
577 double* rtt;
578 char tmp;
579 struct in_addr addr;
580 unsigned int i;
581 struct {
582 char* dq;
583 size_t offset;
584 } loopValues[] = {
585 {saddrDQ, offsetof(struct RttKey, saddr)},
586 {daddrDQ, offsetof(struct RttKey, daddr)}
587 };
588
589 if (retval == -1 && !feof(rttStream))
590 {
591 g_error(strerror(errno));
592 }
593
594 if (line[retval - 1] == '\n')
595 {
596 line[retval - 1]= '\0';
597 }
598
599 rtt= malloc(sizeof(double));
600 retval= sscanf(line, " %19s %19s %lf %c", saddrDQ, daddrDQ, rtt,
601 &tmp);
602 if (retval == EOF)
603 {
604 g_error(strerror(errno));
605 }
606 else if (retval != 3)
607 {
608 g_error("Error parsing RTT file, line was '%s'", line);
609 }
610
611 rttKey= malloc(sizeof(struct RttKey));
612 for (i= 0; i < sizeof(loopValues) / sizeof(*loopValues); i++)
613 {
614 retval= inet_aton(loopValues[i].dq, &addr);
615 if (retval == 0)
616 {
617 g_error("Error converting address '%s'", loopValues[i].dq);
618 }
619 *(uint32_t*) ((void*) rttKey + loopValues[i].offset)=
620 addr.s_addr;
621 }
622
76be6fc2 623 *rtt/= 1e3;
d4721e1a
BP
624 g_debug("rttInfo, Inserting (%u, %u)->(%f)", rttKey->saddr,
625 rttKey->daddr, *rtt);
2bd4b3e4
BP
626 g_hash_table_insert(rttInfo, rttKey, rtt);
627
628 positionStream(rttStream);
629 retval= getline(&line, &len, rttStream);
630 }
631
632 if (line)
633 {
634 free(line);
635 }
636}
637
638
639/*
640 * Advance stream over empty space, empty lines and lines that begin with '#'
641 *
642 * Args:
643 * stream: stream, at exit, will be over the first non-empty character
644 * of a line of be at EOF
645 */
646static void positionStream(FILE* stream)
647{
648 int firstChar;
649 ssize_t retval;
650 char* line= NULL;
651 size_t len;
652
653 do
654 {
655 firstChar= fgetc(stream);
656 if (firstChar == (int) '#')
657 {
658 retval= getline(&line, &len, stream);
659 if (retval == -1)
660 {
661 if (feof(stream))
662 {
663 goto outEof;
664 }
665 else
666 {
667 g_error(strerror(errno));
668 }
669 }
670 }
671 else if (firstChar == (int) '\n' || firstChar == (int) ' ' ||
672 firstChar == (int) '\t')
673 {}
674 else if (firstChar == EOF)
675 {
676 goto outEof;
677 }
678 else
679 {
680 break;
681 }
682 } while (true);
683 retval= ungetc(firstChar, stream);
684 if (retval == EOF)
685 {
686 g_error("Error: ungetc()");
687 }
688
689outEof:
690 if (line)
691 {
692 free(line);
693 }
694}
76be6fc2
BP
695
696
697/*
698 * A GFunc for g_queue_foreach()
699 *
700 * Args:
701 * data Event*, a UDP broadcast event
702 * user_data double*, the running sum
703 *
704 * Returns:
705 * Adds the time of the event to the sum
706 */
707static void gfSum(gpointer data, gpointer userData)
708{
709 Event* event= (Event*) data;
710
711 *(double*) userData+= event->wallTime.seconds + event->wallTime.nanosec /
712 1e9;
713}
714
715
716/*
717 * A GFunc for g_queue_foreach()
718 *
719 * Args:
720 * data Event*, a UDP broadcast event
721 * user_data double*, the running sum
722 *
723 * Returns:
724 * Adds the square of the time of the event to the sum
725 */
726static void gfSumSquares(gpointer data, gpointer userData)
727{
728 Event* event= (Event*) data;
729
730 *(double*) userData+= pow(event->wallTime.seconds + event->wallTime.nanosec
731 / 1e9, 2.);
732}
This page took 0.051407 seconds and 4 git commands to generate.