2 #include "aggregators.h"
6 #define __USE_XOPEN_EXTENDED
12 static bool use_one_field(char *config_str
, int *num_fields
, char *fields
[])
16 fields
[0] = config_str
;
26 static bool use_two_fields(char *config_str
, int *num_fields
, char *fields
[])
30 char *comma
= strchr(config_str
, ',');
31 if(!comma
) return false;
34 fields
[0] = config_str
;
54 static bool avg_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
56 return use_one_field(config_str
, num_fields
, fields
);
59 static void avg_init(void *config_data
, void *_d
)
61 struct avg_data
*d
= _d
;
66 static void avg_add(void *config_data
, void *_d
, char *ch_data
[], double num_data
[])
68 struct avg_data
*d
= _d
;
69 if(!isnan(num_data
[0]))
71 d
->total
+= num_data
[0];
76 static void avg_dump(void *config_data
, void *_d
)
78 struct avg_data
*d
= _d
;
79 printf("%g", d
->total
/ d
->count
);
93 struct concat_config_data
99 static bool concat_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
103 char *comma
= strchr(config_str
, ',');
104 if(!comma
) return false;
107 struct concat_config_data
*c
= *config_data
= malloc(sizeof(struct concat_config_data
));
108 c
->delim
= strdup(config_str
);
109 c
->delim_len
= strlen(c
->delim
);
122 static void concat_init(void *_c
, void *_d
)
124 struct concat_data
*d
= _d
;
127 d
->concat_buf
= malloc(sizeof(char) * d
->buf_size
);
130 static void concat_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
132 struct concat_config_data
*c
= _c
;
133 struct concat_data
*d
= _d
;
134 int len
= strlen(ch_data
[0]);
135 RESIZE_ARRAY_IF_NECESSARY(d
->concat_buf
, d
->buf_size
, d
->buf_len
+ len
+ c
->delim_len
);
139 memcpy(d
->concat_buf
+ d
->buf_len
, c
->delim
, c
->delim_len
);
140 d
->buf_len
+= c
->delim_len
;
143 memcpy(d
->concat_buf
+ d
->buf_len
, ch_data
[0], len
);
147 static void concat_dump(void *_c
, void *_d
)
149 struct concat_data
*d
= _d
;
151 fwrite(d
->concat_buf
, sizeof(char), d
->buf_len
, stdout
);
155 static void concat_free(void *_c
, void *_d
)
157 struct concat_data
*d
= _d
;
170 static bool count_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
175 static void count_init(void *_c
, void *_d
)
177 struct count_data
*d
= _d
;
181 static void count_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
183 struct count_data
*d
= _d
;
187 static void count_dump(void *_c
, void *_d
)
189 struct count_data
*d
= _d
;
190 printf("%llu", d
->count
);
200 double sum_of_products
;
202 double sum_of_second
;
205 static bool cov_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
207 return use_two_fields(config_str
, num_fields
, fields
);
210 static void cov_init(void *_c
, void *_d
)
212 struct cov_data
*d
= _d
;
214 d
->sum_of_products
= 0;
216 d
->sum_of_second
= 0;
219 static void cov_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
221 struct cov_data
*d
= _d
;
222 if(!isnan(num_data
[0]) && !isnan(num_data
[1]))
225 d
->sum_of_products
+= num_data
[0] * num_data
[1];
226 d
->sum_of_first
+= num_data
[0];
227 d
->sum_of_second
+= num_data
[1];
231 static double cov_val(struct cov_data
*d
)
233 double cov
= (d
->sum_of_products
/ d
->count
) -
234 ((d
->sum_of_first
/ d
->count
) * (d
->sum_of_second
/ d
->count
));
238 static void cov_dump(void *_c
, void *_d
)
240 struct cov_data
*d
= _d
;
241 printf("%f", cov_val(d
));
253 static bool max_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
255 return use_one_field(config_str
, num_fields
, fields
);
258 static void max_init(void *_c
, void *_d
)
260 struct max_data
*d
= _d
;
264 static void max_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
266 struct max_data
*d
= _d
;
267 if(!isnan(num_data
[0]) && num_data
[0] > d
->max
)
268 d
->max
= num_data
[0];
271 static void max_dump(void *_c
, void *_d
)
273 struct max_data
*d
= _d
;
274 printf("%g", d
->max
);
286 static bool min_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
288 return use_one_field(config_str
, num_fields
, fields
);
291 static void min_init(void *_c
, void *_d
)
293 struct min_data
*d
= _d
;
297 static void min_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
299 struct min_data
*d
= _d
;
300 if(!isnan(num_data
[0]) && num_data
[0] < d
->min
)
301 d
->min
= num_data
[0];
304 static void min_dump(void *_c
, void *_d
)
306 struct min_data
*d
= _d
;
307 printf("%g", d
->min
);
319 static bool sum_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
321 return use_one_field(config_str
, num_fields
, fields
);
324 static void sum_init(void *_c
, void *_d
)
326 struct sum_data
*d
= _d
;
330 static void sum_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
332 struct sum_data
*d
= _d
;
333 if(!isnan(num_data
[0]))
334 d
->sum
+= num_data
[0];
337 static void sum_dump(void *_c
, void *_d
)
339 struct sum_data
*d
= _d
;
340 printf("%g", d
->sum
);
347 struct perc_config_data
359 static bool perc_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
363 char *comma
= strchr(config_str
, ',');
364 if(!comma
) return false;
367 struct perc_config_data
*c
= *config_data
= malloc(sizeof(struct perc_config_data
));
369 c
->percentile
= strtod(config_str
, &endp
);
370 if(endp
== config_str
) return false; /* failed to parse into number */
383 static void perc_init(void *_c
, void *_d
)
385 struct perc_data
*d
= _d
;
388 d
->values
= malloc(sizeof(*d
->values
) * d
->values_size
);
391 static void perc_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
393 struct perc_data
*d
= _d
;
394 if(!isnan(num_data
[0]))
396 RESIZE_ARRAY_IF_NECESSARY(d
->values
, d
->values_size
, d
->values_len
+1);
397 d
->values
[d
->values_len
++] = num_data
[0];
401 static int cmp_dbl(const void *s1
, const void *s2
)
403 double d1
= *(double*)s1
;
404 double d2
= *(double*)s2
;
405 if(d1
< d2
) return -1;
406 else if(d1
> d2
) return 1;
410 static void perc_dump(void *_c
, void *_d
)
412 struct perc_config_data
*c
= _c
;
413 struct perc_data
*d
= _d
;
414 qsort(d
->values
, d
->values_len
, sizeof(*d
->values
), cmp_dbl
);
415 double perc
= d
->values
[(int)floor((c
->percentile
/ 100) * d
->values_len
)];
419 static void perc_free(void *_c
, void *_d
)
421 struct concat_data
*d
= _d
;
429 static hash_val_t
str_hash_func(const void *_k
)
432 return hashlittle(k
, strlen(k
), 0);
444 /* this is just a pool of nodes */
445 int nodes_len
, nodes_size
;
448 /* this is just a pool of table_entry objs */
449 int entries_len
, entries_size
;
450 struct table_entry
*entries
;
453 static bool mode_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
455 return use_one_field(config_str
, num_fields
, fields
);
458 static void mode_init(void *_c
, void *_d
)
460 struct mode_data
*d
= _d
;
461 d
->hash_table
= hash_create(HASHCOUNT_T_MAX
, NULL
, str_hash_func
);
462 d
->nodes_size
= d
->entries_size
= 32;
463 d
->nodes_len
= d
->entries_len
= 0;
464 d
->nodes
= malloc(sizeof(*d
->nodes
) * d
->nodes_size
);
465 d
->entries
= malloc(sizeof(*d
->entries
) * d
->entries_size
);
468 static void mode_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
470 struct mode_data
*d
= _d
;
471 hnode_t
*node
= hash_lookup(d
->hash_table
, ch_data
[0]);
474 RESIZE_ARRAY_IF_NECESSARY(d
->nodes
, d
->nodes_size
, d
->nodes_len
+1);
475 RESIZE_ARRAY_IF_NECESSARY(d
->entries
, d
->entries_size
, d
->entries_len
+1);
476 struct table_entry
*entry
= &d
->entries
[d
->entries_len
++];
478 node
= hnode_init(&d
->nodes
[d
->nodes_len
++], entry
);
479 hash_insert(d
->hash_table
, node
, strdup(ch_data
[0]));
481 struct table_entry
*entry
= hnode_get(node
);
485 static void mode_dump(void *_c
, void *_d
)
487 printf("Mode dump!\n");
488 struct mode_data
*d
= _d
;
490 hash_scan_begin(&scan
, d
->hash_table
);
493 char *max_val
= NULL
;
494 while((node
= hash_scan_next(&scan
)))
496 struct table_entry
*entry
= hnode_get(node
);
497 if(entry
->count
> max_num
)
499 max_num
= entry
->count
;
500 max_val
= (char*)hnode_getkey(node
);
504 fputs(max_val
, stdout
);
508 static void mode_free(void *_c
, void *_d
)
510 struct mode_data
*d
= _d
;
512 /* free all the hash keys */
514 hash_scan_begin(&scan
, d
->hash_table
);
516 while((node
= hash_scan_next(&scan
)))
518 hash_scan_delete(d
->hash_table
, node
);
519 free((void*)hnode_getkey(node
));
522 hash_destroy(d
->hash_table
);
524 /* free the pools of nodes and entries */
537 double sum_of_squares
;
541 static bool var_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
543 return use_one_field(config_str
, num_fields
, fields
);
546 static void var_init(void *_c
, void *_d
)
548 struct var_data
*d
= _d
;
550 d
->sum_of_squares
= 0;
554 static void var_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
556 struct var_data
*d
= _d
;
557 if(!isnan(num_data
[0]))
560 d
->sum_of_squares
+= num_data
[0] * num_data
[0];
561 d
->sum
+= num_data
[0];
565 static double var_val(struct var_data
*d
)
567 double avg
= d
->sum
/ d
->count
;
568 double var
= (d
->sum_of_squares
/ d
->count
) - (avg
* avg
);
572 static void var_dump(void *_c
, void *_d
)
574 struct var_data
*d
= _d
;
575 printf("%g", var_val(d
));
584 struct cov_data cov_data
;
585 struct var_data var_data1
;
586 struct var_data var_data2
;
589 static bool corr_parse_args(void **config_data
, char *config_str
, int *num_fields
, char **fields
)
591 return use_two_fields(config_str
, num_fields
, fields
);
594 static void corr_init(void *_c
, void *_d
)
596 struct corr_data
*d
= _d
;
597 cov_init(_c
, &d
->cov_data
);
598 var_init(_c
, &d
->var_data1
);
599 var_init(_c
, &d
->var_data2
);
602 static void corr_add(void *_c
, void *_d
, char *ch_data
[], double num_data
[])
604 struct corr_data
*d
= _d
;
605 cov_add(NULL
, &d
->cov_data
, ch_data
, num_data
);
606 var_add(NULL
, &d
->var_data1
, ch_data
, num_data
);
607 var_add(NULL
, &d
->var_data2
, ch_data
+1, num_data
+1);
610 static void corr_dump(void *_c
, void *_d
)
612 struct corr_data
*d
= _d
;
613 double cov
= cov_val(&d
->cov_data
);
614 double var1
= var_val(&d
->var_data1
);
615 double var2
= var_val(&d
->var_data2
);
616 double corr
= cov
/ sqrt(var1
* var2
);
620 struct aggregator aggregators
[] = {
621 {"average", "avg", sizeof(struct avg_data
),
622 avg_parse_args
, avg_init
, avg_add
, avg_dump
, NULL
},
623 {"concatenate", "concat", sizeof(struct concat_data
),
624 concat_parse_args
, concat_init
, concat_add
, concat_dump
, concat_free
},
625 {"count", "ct", sizeof(struct count_data
),
626 count_parse_args
, count_init
, count_add
, count_dump
, NULL
},
627 {"correlation", "corr", sizeof(struct corr_data
),
628 corr_parse_args
, corr_init
, corr_add
, corr_dump
, NULL
},
629 {"covariance", "cov", sizeof(struct cov_data
),
630 cov_parse_args
, cov_init
, cov_add
, cov_dump
, NULL
},
631 {"maximum", "max", sizeof(struct max_data
),
632 max_parse_args
, max_init
, max_add
, max_dump
, NULL
},
633 {"minimum", "min", sizeof(struct min_data
),
634 min_parse_args
, min_init
, min_add
, min_dump
, NULL
},
635 {"mode", "mode", sizeof(struct mode_data
),
636 mode_parse_args
, mode_init
, mode_add
, mode_dump
, mode_free
},
637 {"percentile", "perc", sizeof(struct perc_data
),
638 perc_parse_args
, perc_init
, perc_add
, perc_dump
, perc_free
},
639 {"sum", "sum", sizeof(struct sum_data
),
640 sum_parse_args
, sum_init
, sum_add
, sum_dump
, NULL
},
641 {"variance", "var", sizeof(struct var_data
),
642 var_parse_args
, var_init
, var_add
, var_dump
, NULL
},
643 {NULL
, NULL
, 0, NULL
, NULL
, NULL
, NULL
, NULL
}