Commit 7fcba211 authored by Bernd Zeimetz's avatar Bernd Zeimetz

Imported Upstream version 0.45

parent 8ecff0a4
......@@ -29,7 +29,10 @@ ifeq ($(shell uname), SunOS)
SOCKET_LIBS += -lsocket -lnsl
endif
override LIBS += $(SOCKET_LIBS) -pthread
# should be accepted sort of anywhere
MATH_LIBS = -lm
override LIBS += $(SOCKET_LIBS) $(MATH_LIBS) -pthread
OBJS = \
relay.o \
......
This diff is collapsed.
......@@ -20,6 +20,7 @@
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <math.h>
#include <regex.h>
#include <pthread.h>
#include <assert.h>
......@@ -29,10 +30,14 @@
#include "server.h"
#include "router.h"
#include "aggregator.h"
#include "fnv1a.h"
static pthread_t aggregatorid;
static aggregator *aggregators = NULL;
static aggregator *lastaggr = NULL;
static size_t prevreceived = 0;
static size_t prevsent = 0;
static size_t prevdropped = 0;
static char keep_running = 1;
......@@ -50,10 +55,9 @@ aggregator_new(
if (ret == NULL)
return ret;
if (interval == 0 || expire == 0) {
free(ret);
return NULL;
}
assert(interval != 0);
assert(interval < expire);
if (aggregators == NULL) {
aggregators = lastaggr = ret;
......@@ -64,7 +68,7 @@ aggregator_new(
ret->interval = interval;
ret->expire = expire;
ret->tswhen = tswhen;
ret->bucketcnt = (expire / interval) * 2 + 1 ;
ret->bucketcnt = (expire + (interval - 1)) / interval + 1 + 1;
ret->received = 0;
ret->sent = 0;
ret->dropped = 0;
......@@ -88,6 +92,8 @@ aggregator_add_compute(
{
struct _aggr_computes *ac = s->computes;
enum _aggr_compute_type act;
char store = 0;
int pctl = 0;
if (strcmp(type, "sum") == 0) {
act = SUM;
......@@ -99,6 +105,24 @@ aggregator_add_compute(
act = MIN;
} else if (strcmp(type, "average") == 0 || strcmp(type, "avg") == 0) {
act = AVG;
} else if (strcmp(type, "median") == 0) {
act = MEDN;
pctl = 50;
store = 1;
} else if (strncmp(type, "percentile", strlen("percentile")) == 0) {
pctl = atoi(type + strlen("percentile"));
if (pctl > 100 || pctl <= 0) {
return -1;
} else {
act = PCTL;
store = 1;
}
} else if (strcmp(type, "variance") == 0) {
act = VAR;
store = 1;
} else if (strcmp(type, "stddev") == 0) {
act = SDEV;
store = 1;
} else {
return -1;
}
......@@ -112,13 +136,30 @@ aggregator_add_compute(
}
ac->type = act;
ac->percentile = (unsigned char)pctl;
ac->metric = strdup(metric);
memset(ac->invocations_ht, 0, sizeof(ac->invocations_ht));
ac->entries_needed = store;
ac->next = NULL;
return 0;
}
void
aggregator_set_stub(
aggregator *s,
const char *stubname)
{
struct _aggr_computes *ac;
char newmetric[METRIC_BUFSIZ];
for (ac = s->computes; ac != NULL; ac = ac->next) {
snprintf(newmetric, sizeof(newmetric), "%s%s", stubname, ac->metric);
free((void *)ac->metric);
ac->metric = strdup(newmetric);
}
}
/**
* Adds a new metric to aggregator s. The value from the metric is put
* in the bucket matching the epoch contained in the metric. In cases
......@@ -138,7 +179,6 @@ aggregator_putmetric(
long long int epoch;
long long int itime;
int slot;
struct _bucket *bucket;
char newmetric[METRIC_BUFSIZ];
char *newfirstspace = NULL;
size_t len;
......@@ -148,6 +188,8 @@ aggregator_putmetric(
unsigned int omhtbucket;
struct _aggr_computes *compute;
struct _aggr_invocations *invocation;
struct _aggr_bucket *bucket;
struct _aggr_bucket_entries *entries;
/* get value */
if ((v = strchr(firstspace + 1, ' ')) == NULL) {
......@@ -180,9 +222,9 @@ aggregator_putmetric(
ometric = newmetric;
}
omhash = 2166136261UL; /* FNV1a */
omhash = FNV1A_32_OFFSET;
for (omp = ometric; *omp != '\0'; omp++)
omhash = (omhash ^ (unsigned int)*omp) * 16777619;
omhash = (omhash ^ (unsigned int)*omp) * FNV1A_32_PRIME;
omhtbucket =
((omhash >> AGGR_HT_POW_SIZE) ^ omhash) &
......@@ -215,14 +257,17 @@ aggregator_putmetric(
* aggregator is spammed with metrics, e.g. right after
* startup when other relays flush their queues. This
* approach shouldn't affect the timing of the buckets as
* requested in issue #72. */
* requested in issue #72.
* For consistency with other tools/old carbon-aggregator
* align the buckets to interval boundaries such that it is
* predictable what intervals will be taken, issue #104. */
time(&now);
now -= s->expire;
now = ((now - s->expire) / s->interval) * s->interval;
invocation->expire = s->expire + (rand() % s->interval);
/* allocate enough buckets to hold the past + future */
invocation->buckets =
malloc(sizeof(struct _bucket) * s->bucketcnt);
malloc(sizeof(struct _aggr_bucket) * s->bucketcnt);
if (invocation->buckets == NULL) {
logerr("aggregator: out of memory creating %s from %s",
ometric, metric);
......@@ -261,24 +306,45 @@ aggregator_putmetric(
bucket = &invocation->buckets[slot];
if (bucket->cnt == 0) {
bucket->cnt = 1;
bucket->sum = val;
bucket->max = val;
bucket->min = val;
} else {
bucket->cnt++;
bucket->sum += val;
if (bucket->max < val)
bucket->max = val;
if (bucket->min > val)
bucket->min = val;
}
entries = &bucket->entries;
if (compute->entries_needed) {
if (bucket->cnt == entries->size) {
#define E_I_SZ 64
double *new = realloc(entries->values, entries->size + E_I_SZ);
if (new == NULL) {
logerr("aggregator: out of memory creating entry bucket "
"(%s from %s)", ometric, metric);
} else {
entries->values = new;
entries->size += E_I_SZ;
}
}
if (bucket->cnt < entries->size)
entries->values[bucket->cnt] = val;
}
bucket->cnt++;
}
pthread_mutex_unlock(&s->bucketlock);
return;
}
static inline int
cmp_entry(const void *l, const void *r)
{
return *(const double *)l - *(const double *)r;
}
/**
* Checks if the oldest bucket should be expired, if so, sends out
* computed aggregate metrics and moves the bucket to the end of the
......@@ -290,11 +356,13 @@ aggregator_expire(void *sub)
{
time_t now;
aggregator *s;
struct _bucket *b;
struct _aggr_bucket *b;
struct _aggr_computes *c;
struct _aggr_invocations *inv;
struct _aggr_invocations *lastinv;
double *values;
int i;
unsigned char j;
int work;
server *submission = (server *)sub;
char metric[METRIC_BUFSIZ];
......@@ -362,6 +430,43 @@ aggregator_expire(void *sub)
inv->metric,
b->sum / (double)b->cnt, ts);
break;
case MEDN:
/* median == 50th percentile */
case PCTL: {
/* nearest rank method */
size_t n =
(int)(((double)c->percentile/100.0 *
(double)b->cnt) + 0.9);
values = b->entries.values;
/* TODO: lazy approach, in case
* of 1 (first/last) or 2 buckets
* distance we could do a
* forward run picking the max
* entries and returning that
* iso sorting the full array */
qsort(values, b->cnt,
sizeof(double), cmp_entry);
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric,
values[n - 1],
b->start + s->interval);
} break;
case VAR:
case SDEV: {
double avg = b->sum / (double)b->cnt;
double ksum = 0;
values = b->entries.values;
for (i = 0; i < b->cnt; i++)
ksum += pow(values[i] - avg, 2);
ksum /= (double)b->cnt;
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric,
c->type == VAR ? ksum :
sqrt(ksum),
b->start + s->interval);
} break;
}
server_send(submission, strdup(metric), 1);
s->sent++;
......@@ -383,7 +488,6 @@ aggregator_expire(void *sub)
}
if (isempty) {
int j;
/* see if the remaining buckets are empty too */
pthread_mutex_lock(&s->bucketlock);
for (j = 0; j < s->bucketcnt; j++) {
......@@ -396,6 +500,10 @@ aggregator_expire(void *sub)
}
if (isempty) {
/* free and unlink */
if (c->entries_needed)
for (j = 0; j < s->bucketcnt; j++)
if (inv->buckets[j].entries.values)
free(inv->buckets[j].entries.values);
free(inv->metric);
free(inv->buckets);
if (lastinv != NULL) {
......@@ -498,6 +606,19 @@ aggregator_get_received(void)
return totreceived;
}
/**
* Returns an approximate number of metrics received by all aggregators
* since the last call to this function.
*/
inline size_t
aggregator_get_received_sub()
{
size_t d = aggregator_get_received();
size_t r = d - prevreceived;
prevreceived += d;
return r;
}
/**
* Returns an approximate number of metrics sent by all aggregators.
*/
......@@ -513,6 +634,19 @@ aggregator_get_sent(void)
return totsent;
}
/**
* Returns an approximate number of metrics sent by all aggregators
* since the last call to this function.
*/
inline size_t
aggregator_get_sent_sub()
{
size_t d = aggregator_get_sent();
size_t r = d - prevsent;
prevsent += d;
return r;
}
/**
* Returns an approximate number of dropped metrics by all aggregators.
* Metrics are dropped if they are too much in the past (past expiry
......@@ -529,3 +663,16 @@ aggregator_get_dropped(void)
return totdropped;
}
/**
* Returns an approximate number of metrics dropped by all aggregators
* since the last call to this function.
*/
inline size_t
aggregator_get_dropped_sub()
{
size_t d = aggregator_get_dropped();
size_t r = d - prevdropped;
prevdropped += d;
return r;
}
......@@ -33,21 +33,28 @@ typedef struct _aggregator {
size_t sent;
size_t dropped;
struct _aggr_computes {
enum _aggr_compute_type { SUM, CNT, MAX, MIN, AVG } type;
enum _aggr_compute_type { SUM, CNT, MAX, MIN, AVG,
MEDN, PCTL, VAR, SDEV } type;
const char *metric; /* name template of metric to produce */
struct _aggr_invocations {
char *metric; /* actual name to emit */
unsigned int hash; /* to speed up matching */
unsigned short expire; /* expire + splay */
struct _bucket {
struct _aggr_bucket {
long long int start;
size_t cnt;
double sum;
double max;
double min;
struct _aggr_bucket_entries {
size_t size;
double *values;
} entries;
} *buckets;
struct _aggr_invocations *next;
} *invocations_ht[1 << AGGR_HT_POW_SIZE];
unsigned char entries_needed:1;
unsigned char percentile:7;
struct _aggr_computes *next;
} *computes;
pthread_mutex_t bucketlock;
......@@ -56,6 +63,7 @@ typedef struct _aggregator {
aggregator *aggregator_new(unsigned int interval, unsigned int expire, enum _aggr_timestamp tswhen);
char aggregator_add_compute(aggregator *s, const char *metric, const char *type);
void aggregator_set_stub(aggregator *s, const char *stubname);
void aggregator_putmetric(aggregator *s, const char *metric, const char *firstspace, size_t nmatch, regmatch_t *pmatch);
int aggregator_start(server *submission);
void aggregator_stop(void);
......@@ -64,5 +72,8 @@ size_t aggregator_numcomputes(void);
size_t aggregator_get_received(void);
size_t aggregator_get_sent(void);
size_t aggregator_get_dropped(void);
size_t aggregator_get_received_sub(void);
size_t aggregator_get_sent_sub(void);
size_t aggregator_get_dropped_sub(void);
#endif
......@@ -45,11 +45,13 @@ collector_runner(void *s)
int i;
size_t totticks;
size_t totmetrics;
size_t totblackholes;
size_t totqueued;
size_t totstalls;
size_t totdropped;
size_t ticks;
size_t metrics;
size_t blackholes;
size_t queued;
size_t stalls;
size_t dropped;
......@@ -65,6 +67,16 @@ collector_runner(void *s)
char metric[METRIC_BUFSIZ];
char *m;
size_t sizem = 0;
size_t (*s_ticks)(server *);
size_t (*s_metrics)(server *);
size_t (*s_stalls)(server *);
size_t (*s_dropped)(server *);
size_t (*d_ticks)(dispatcher *);
size_t (*d_metrics)(dispatcher *);
size_t (*d_blackholes)(dispatcher *);
size_t (*a_received)(void);
size_t (*a_sent)(void);
size_t (*a_dropped)(void);
/* prepare hostname for graphite metrics */
snprintf(metric, sizeof(metric), "carbon.relays.%s", relay_hostname);
......@@ -75,8 +87,33 @@ collector_runner(void *s)
*m = '\0';
sizem = sizeof(metric) - (m - metric);
/* setup functions to target what the user wants */
if (debug & 2) {
s_ticks = server_get_ticks_sub;
s_metrics = server_get_metrics_sub;
s_stalls = server_get_stalls_sub;
s_dropped = server_get_dropped_sub;
d_ticks = dispatch_get_ticks_sub;
d_metrics = dispatch_get_metrics_sub;
d_blackholes = dispatch_get_blackholes_sub;
a_received = aggregator_get_received_sub;
a_sent = aggregator_get_sent_sub;
a_dropped = aggregator_get_dropped_sub;
} else {
s_ticks = server_get_ticks;
s_metrics = server_get_metrics;
s_stalls = server_get_stalls;
s_dropped = server_get_dropped;
d_ticks = dispatch_get_ticks;
d_metrics = dispatch_get_metrics;
d_blackholes = dispatch_get_blackholes;
a_received = aggregator_get_received;
a_sent = aggregator_get_sent;
a_dropped = aggregator_get_dropped;
}
#define send(metric) \
if (debug) \
if (debug & 1) \
logout("%s", metric); \
else \
server_send(submission, strdup(metric), 1);
......@@ -98,6 +135,7 @@ collector_runner(void *s)
nextcycle += collector_interval;
totticks = 0;
totmetrics = 0;
totblackholes = 0;
dispatchers_idle = 0;
dispatchers_busy = 0;
for (i = 0; dispatchers[i] != NULL; i++) {
......@@ -106,11 +144,15 @@ collector_runner(void *s)
} else {
dispatchers_idle++;
}
totticks += ticks = dispatch_get_ticks(dispatchers[i]);
totmetrics += metrics = dispatch_get_metrics(dispatchers[i]);
totticks += ticks = d_ticks(dispatchers[i]);
totmetrics += metrics = d_metrics(dispatchers[i]);
totblackholes += blackholes = d_blackholes(dispatchers[i]);
snprintf(m, sizem, "dispatcher%d.metricsReceived %zd %zd\n",
i + 1, metrics, (size_t)now);
send(metric);
snprintf(m, sizem, "dispatcher%d.metricsBlackholed %zd %zd\n",
i + 1, blackholes, (size_t)now);
send(metric);
snprintf(m, sizem, "dispatcher%d.wallTime_us %zd %zd\n",
i + 1, ticks, (size_t)now);
send(metric);
......@@ -118,6 +160,9 @@ collector_runner(void *s)
snprintf(m, sizem, "metricsReceived %zd %zd\n",
totmetrics, (size_t)now);
send(metric);
snprintf(m, sizem, "metricsBlackholed %zd %zd\n",
totblackholes, (size_t)now);
send(metric);
snprintf(m, sizem, "dispatch_wallTime_us %zd %zd\n",
totticks, (size_t)now);
send(metric);
......@@ -137,11 +182,11 @@ collector_runner(void *s)
if (server_ctype(srvs[i]) == CON_PIPE) {
strncpy(ipbuf, "internal", sizeof(ipbuf));
ticks = server_get_ticks(srvs[i]);
metrics = server_get_metrics(srvs[i]);
ticks = s_ticks(srvs[i]);
metrics = s_metrics(srvs[i]);
queued = server_get_queue_len(srvs[i]);
stalls = server_get_stalls(srvs[i]);
dropped = server_get_dropped(srvs[i]);
stalls = s_stalls(srvs[i]);
dropped = s_dropped(srvs[i]);
} else {
snprintf(ipbuf, sizeof(ipbuf), "%s:%u",
server_ip(srvs[i]), server_port(srvs[i]));
......@@ -149,11 +194,11 @@ collector_runner(void *s)
if (*p == '.')
*p = '_';
totticks += ticks = server_get_ticks(srvs[i]);
totmetrics += metrics = server_get_metrics(srvs[i]);
totticks += ticks = s_ticks(srvs[i]);
totmetrics += metrics = s_metrics(srvs[i]);
totqueued += queued = server_get_queue_len(srvs[i]);
totstalls += stalls = server_get_stalls(srvs[i]);
totdropped += dropped = server_get_dropped(srvs[i]);
totstalls += stalls = s_stalls(srvs[i]);
totdropped += dropped = s_dropped(srvs[i]);
}
snprintf(m, sizem, "destinations.%s.sent %zd %zd\n",
ipbuf, metrics, (size_t)now);
......@@ -196,17 +241,17 @@ collector_runner(void *s)
if (numaggregators > 0) {
snprintf(m, sizem, "aggregators.metricsReceived %zd %zd\n",
aggregator_get_received(), (size_t)now);
a_received(), (size_t)now);
send(metric);
snprintf(m, sizem, "aggregators.metricsSent %zd %zd\n",
aggregator_get_sent(), (size_t)now);
a_sent(), (size_t)now);
send(metric);
snprintf(m, sizem, "aggregators.metricsDropped %zd %zd\n",
aggregator_get_dropped(), (size_t)now);
a_dropped(), (size_t)now);
send(metric);
}
if (debug)
if (debug & 1)
fflush(stdout);
}
......@@ -296,13 +341,14 @@ collector_reloadcomplete(void)
* Initialises and starts the collector.
*/
void
collector_start(dispatcher **d, cluster *c, server *submission)
collector_start(dispatcher **d, cluster *c, server *submission, char cum)
{
dispatchers = d;
collector_schedulereload(c);
if (mode == DEBUG || mode == DEBUGTEST)
debug = 1;
debug |= (cum ? 0 : 2);
if (mode != SUBMISSION) {
if (pthread_create(&collectorid, NULL, collector_runner, submission) != 0)
......
......@@ -28,7 +28,7 @@ extern int collector_interval;
#define timediff(X, Y) \
(Y.tv_sec > X.tv_sec ? (Y.tv_sec - X.tv_sec) * 1000 * 1000 + ((Y.tv_usec - X.tv_usec)) : Y.tv_usec - X.tv_usec)
void collector_start(dispatcher **d, cluster *c, server *submission);
void collector_start(dispatcher **d, cluster *c, server *submission, char cum);
void collector_stop(void);
void collector_schedulereload(cluster *c);
char collector_reloadcomplete(void);
......
......@@ -20,6 +20,7 @@
#include <string.h>
#include <assert.h>
#include "fnv1a.h"
#include "md5.h"
#include "server.h"
......@@ -69,10 +70,9 @@ carbon_hashpos(const char *key, const char *end)
static unsigned short
fnv1a_hashpos(const char *key, const char *end)
{
unsigned int hash = 2166136261UL; /* FNV1a */
unsigned int hash;
for (; key < end; key++)
hash = (hash ^ (unsigned int)*key) * 16777619;
fnv1a_32(hash, key, key, end);
return (unsigned short)((hash >> 16) ^ (hash & (unsigned int)0xFFFF));
}
......
......@@ -26,6 +26,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/resource.h>
#include <arpa/inet.h>
#include "relay.h"
#include "router.h"
......@@ -41,6 +42,7 @@ enum conntype {
typedef struct _connection {
int sock;
char takenby; /* -2: being setup, -1: free, 0: not taken, >0: tid */
char srcaddr[24]; /* string representation of source address */
char buf[METRIC_BUFSIZ];
int buflen;
char needmore:1;
......@@ -56,7 +58,11 @@ struct _dispatcher {
enum conntype type;
char id;
size_t metrics;
size_t blackholes;
size_t ticks;
size_t prevmetrics;
size_t prevblackholes;
size_t prevticks;
enum { RUNNING, SLEEPING } state;
char keep_running:1;
route *routes;
......@@ -163,6 +169,8 @@ int
dispatch_addconnection(int sock)
{
size_t c;
struct sockaddr_in6 saddr;
socklen_t saddr_len = sizeof(saddr);
pthread_rwlock_rdlock(&connectionslock);
for (c = 0; c < connectionslen; c++)
......@@ -202,6 +210,23 @@ dispatch_addconnection(int sock)
pthread_rwlock_unlock(&connectionslock);
}
/* figure out who's calling */
if (getpeername(sock, (struct sockaddr *)&saddr, &saddr_len) == 0) {
snprintf(connections[c].srcaddr, sizeof(connections[c].srcaddr),
"(unknown)");
switch (saddr.sin6_family) {
case PF_INET:
inet_ntop(saddr.sin6_family,
&((struct sockaddr_in *)&saddr)->sin_addr,
connections[c].srcaddr, sizeof(connections[c].srcaddr));
break;
case PF_INET6:
inet_ntop(saddr.sin6_family, &saddr.sin6_addr,
connections[c].srcaddr, sizeof(connections[c].srcaddr));
break;
}
}
(void) fcntl(sock, F_SETFL, O_NONBLOCK);
connections[c].sock = sock;
connections[c].buflen = 0;
......@@ -225,7 +250,7 @@ int
dispatch_addlistener_udp(int sock)
{
int conn = dispatch_addconnection(sock);
if (conn == -1)
return 1;
......@@ -306,9 +331,9 @@ dispatch_connection(connection *conn, dispatcher *self)
/* try to read more data, if that succeeds, or we still have data
* left in the buffer, try to process the buffer */
if (
(!conn->needmore && conn->buflen > 0) ||
(!conn->needmore && conn->buflen > 0) ||
(len = read(conn->sock,
conn->buf + conn->buflen,
conn->buf + conn->buflen,
(sizeof(conn->buf) - 1) - conn->buflen)) > 0
)
{
......@@ -340,9 +365,10 @@ dispatch_connection(connection *conn, dispatcher *self)
*q = '\0'; /* can do this because we substract one from buf */
/* perform routing of this metric */
conn->destlen =
router_route(conn->dests, CONN_DESTS_SIZE,
conn->metric, firstspace, self->routes);
self->blackholes += router_route(
conn->dests, &conn->destlen, CONN_DESTS_SIZE,
conn->srcaddr,
conn->metric, firstspace, self->routes);
/* restart building new one from the start */
q = conn->metric;
......@@ -458,7 +484,11 @@ dispatch_runner(void *arg)
int c;
self->metrics = 0;
self->blackholes = 0;
self->ticks = 0;
self->prevmetrics = 0;
self->prevblackholes = 0;
self->prevticks = 0;
self->state = SLEEPING;
if (self->type == LISTENER) {
......@@ -636,6 +666,18 @@ dispatch_get_ticks(dispatcher *self)
return self->ticks;
}
/**
* Returns the wall-clock time consumed since last call to this
* function.
*/
inline size_t
dispatch_get_ticks_sub(dispatcher *self)
{
size_t d = self->ticks - self->prevticks;
self->prevticks += d;
return d;
}
/**
* Returns the number of metrics dispatched since start.
*/
......@@ -645,6 +687,40 @@ dispatch_get_metrics(dispatcher *self)
return self->metrics;
}
/**
* Returns the number of metrics dispatched since last call to this
* function.
*/
inline size_t
dispatch_get_metrics_sub(dispatcher *self)
{
size_t d = self->metrics - self->prevmetrics;
self->prevmetrics += d;
return d;
}
/**
* Returns the number of metrics that were explicitly or implicitly
* blackholed since start.
*/
inline size_t
dispatch_get_blackholes(dispatcher *self)
{
return self->blackholes;
}
/**
* Returns the number of metrics that were blackholed since last call to
* this function.
*/