Commit 97a6ece8 authored by Bernd Zeimetz's avatar Bernd Zeimetz

Imported Upstream version 1.11

parent 78e3e79c
# 2.0 (unreleased master branch)
### New Features
### Breaking Changes
### Enhancements
### Bugfixes
# 1.11 (23-03-2016)
### Enhancements
* **collector** UDP connections are now suffixed with `-udp` in
destination target
* **router** `send statistics to` construct was added to direct internal
statistics to a specific cluster
### Bugfixes
* [Issue #159](https://github.com/grobian/carbon-c-relay/issues/159)
corrupted statistics for file clusters
* [Issue #160](https://github.com/grobian/carbon-c-relay/issues/160)
metricsBlackholed stays zero when blackhole target is used
# 1.10 (09-03-2016)
### Breaking Changes
* **statistics** dispatch\_busy and dispatch\_idle have been replaced with
wallTime\_us and sleepTime\_us
### Bugfixes
* [Issue #152](https://github.com/grobian/carbon-c-relay/issues/152)
crash in aggregator\_expire for data-contained aggregations
# 1.9 (07-03-2016)
### Enhancements
* **statistics** dispatch\_busy is slightly more realistic now
### Bugfixes
* [Issue #153](https://github.com/grobian/carbon-c-relay/issues/153)
aggregator statistics are garbage with `-m`
# 1.8 (23-02-2016)
### New Features
* **relay** new flags `-D` for daemon mode and `-p` for pidfile
creation
### Enhancements
* **dispatcher** server stalling (to slow down too fast writers) is now
based on a random timeout
* **server** write timeout is now large enough to deal with upstream
relay stalling
* **relay** number of workers/dispatchers is now determined in a way
that doesn''t need OpenMP any more
# 1.7 (29-01-2016)
### New Features
* **relay** new flag `-B` to set the listen backlog for TCP and UNIX
connections, [issue #143](https://github.com/grobian/carbon-c-relay/issues/143)
### Enhancements
* **dispatcher** switch from select() to poll() to fix crashes when too
many connections were made to the relay
* Misc (memory) leak fixes
# 1.6 (27-01-2016)
### Breaking Changes
* **relay** startup and shutdown messages are now better in line
### Enhancements
* **relay** fixed segfault when issuing `SIGHUP` under active load
# 1.5 (13-01-2016)
### Enhancements
* **aggregator** metrics are now written directly to dispatchers to
avoid overload of the internal\_submission queue, which is likely to to
happen with many aggregates
* **collector** properly report file-based servers in statistics
* **collector** re-introduce the interal destination in statistics
# 1.4 (04-01-2016)
### New Features
* **collector** when run in debug and submission mode, there is a iostat
like output
### Enhancements
* **relay** reloading config now no longer unconditionally starts the
aggregator
* **aggregator** misc cleanup/free fixes
* **relay** allow reloading aggregator
### Bugfixes
* [Issue #133](https://github.com/grobian/carbon-c-relay/issues/133)
_stub_aggregator metrics seen after a reload
# 1.3 (16-12-2015)
### Enhancements
* **consistent-hash** fix jump\_fnv1a\_ch metric submission, it didn''t
work at all
### Bugfixes
* [Issue #126](https://github.com/grobian/carbon-c-relay/issues/126)
double free crash
* [Issue #131](https://github.com/grobian/carbon-c-relay/issues/131)
segfault using stddev in aggregator
* [Issue #132](https://github.com/grobian/carbon-c-relay/issues/132)
crash with glibc double free message
# 1.2 (10-12-2015)
### New Features
* **consistent-hash** new algorithm jump\_fnv1a\_ch for near perfect
distribution of metrics
* **distributiontest** test program used to see unbalancedness of
clusters for a given input metric see
[graphite-project/carbon#485](https://github.com/graphite-project/carbon/issues/485)
### Enhancements
* **router** fix cluster checking with regards replication count and the
number of servers to allow equal counts
### Bugfixes
* [Issue #126](https://github.com/grobian/carbon-c-relay/issues/126)
prevent calling read() too often
# 1.1 (25-11-2015)
### Enhancements
* **router** fix distribution of any\_of cluster if members have failed
# 1.0 (23-11-2015)
* many improvements
# 0.45 (05-11-2015)
* Many aggregator improvements, more flexible routing support.
# 0.44 (13-08-2015)
* Feature to set hash-keys for fnv1a\_ch.
# 0.43 (27-07-2015)
* Bugfix release for segfault when using any\_of clusters.
# 0.42 (24-07-2015)
* Reduced warning level for submission mode queue pileups. Allow
writing to a file (cluster type). Fix splay on aggregator not to
affect timestamps of input. No more dep on openssl for md5.
# 0.40 (11-05-2015)
* Hefty optimisations on aggregations. Fix for UDP port closure.
......@@ -13,11 +13,7 @@
# limitations under the License.
CFLAGS ?= -O2 -Wall
# if your compiler doesn't support OpenMP, comment out this line, or
# define OPENMP_FLAGS to be empty
OPENMP_FLAGS ?= -fopenmp
override CC += $(OPENMP_FLAGS)
CFLAGS ?= -O2 -Wall -Wshadow
GIT_VERSION := $(shell git describe --abbrev=6 --dirty --always || date +%F)
GVCFLAGS += -DGIT_VERSION=\"$(GIT_VERSION)\"
......
......@@ -78,6 +78,9 @@ aggregate
[send to <cluster ...>]
[stop]
;
send statistics to <cluster ...>
[stop]
;
```
Multiple clusters can be defined, and need not to be referenced by a
......@@ -198,6 +201,12 @@ possible. Like for match rules, it is possible to define multiple
cluster targets. Also, like match rules, the `stop` keyword applies to
control the flow of metrics in the matching process.
The special `send statistics to` construct is much like a `match` rule
which matches the (internal) statistics produced by the relay. It can
be used to avoid router loops when sending the statistics to a certain
destination. The `send statistics` construct can only be used once, but
multiple destinations can be used then required.
Examples
--------
......@@ -567,21 +576,6 @@ namespace:
client. The idle connections disconnect in the relay here is to guard
against resource drain in such scenarios.
* dispatch\_busy
The number of dispatchers actively doing work at the moment of the
sample. This is just an indication of the work pressure on the relay.
* dispatch\_idle
The number of dispatchers sleeping at the moment of the sample. When
this number nears 0, dispatch\_busy should be high. When the
configured number of worker threads is low, this might mean more
worker threads should be added (if the system allows it) or the relay
is reaching its limits with regard to how much it can process. A
relay with no idle dispatchers will likely appear slow for clients,
for the relay has too much work to serve them instantly.
* dispatch\_wallTime\_us
The number of microseconds spent by the dispatchers to do their work.
......@@ -591,7 +585,20 @@ namespace:
from a socket, cleaning up the input metric, to adding the metric to
the appropriate queues. The larger the configuration, and more
complex in terms of matches, the more time the dispatchers will spend
on the cpu.
on the cpu. But also time they do /not/ spend on the cpu is included
in this number. It is the pure wallclock time the dispatcher was
serving a client.
* dispatch\_sleepTime\_us
The number of microseconds spent by the dispatchers sleeping waiting
for work. When this value gets small (or even zero) the dispatcher
has so much work that it doesn't sleep any more, and likely can't
process the work in a timely fashion any more. This value plus the
wallTime from above sort of sums up to the total uptime taken by this
dispatcher. Therefore, expressing the wallTime as percentage of this
sum gives the busyness percentage draining all the way up to 100% if
sleepTime goes to 0.
* server\_wallTime\_us
......
......@@ -78,8 +78,6 @@ aggregator_new(
ret->computes = NULL;
ret->next = NULL;
pthread_mutex_init(&ret->bucketlock, NULL);
return ret;
}
......@@ -143,6 +141,7 @@ aggregator_add_compute(
ac->metric = strdup(metric);
memset(ac->invocations_ht, 0, sizeof(ac->invocations_ht));
ac->entries_needed = store;
pthread_rwlock_init(&ac->invlock, NULL);
ac->next = NULL;
return 0;
......@@ -197,7 +196,7 @@ aggregator_putmetric(
/* get value */
if ((v = strchr(firstspace + 1, ' ')) == NULL) {
/* metric includes \n */
if (mode == DEBUG || mode == DEBUGTEST)
if (mode & MODE_DEBUG)
logerr("aggregator: dropping incorrect metric: %s",
metric);
return;
......@@ -208,7 +207,6 @@ aggregator_putmetric(
val = atof(firstspace + 1);
epoch = atoll(v + 1);
pthread_mutex_lock(&s->bucketlock);
for (compute = s->computes; compute != NULL; compute = compute->next) {
if (nmatch == 0) {
ometric = compute->metric;
......@@ -232,11 +230,26 @@ aggregator_putmetric(
omhtbucket =
((omhash >> AGGR_HT_POW_SIZE) ^ omhash) &
(((unsigned int)1 << AGGR_HT_POW_SIZE) - 1);
invocation = compute->invocations_ht[omhtbucket];
for (; invocation != NULL; invocation = invocation->next)
if (invocation->hash == omhash &&
strcmp(ometric, invocation->metric) == 0) /* match */
#define find_invocation(o) \
invocation = compute->invocations_ht[omhtbucket]; \
for (; invocation != NULL; invocation = invocation->next) \
if (invocation->hash == omhash && \
strcmp(o, invocation->metric) == 0) /* match */ \
break;
pthread_rwlock_rdlock(&compute->invlock);
find_invocation(ometric);
/* switch to a write lock from here, since we've found what we
* were looking for (or are going to create it) and modify it */
pthread_rwlock_unlock(&compute->invlock);
pthread_rwlock_wrlock(&compute->invlock);
if (invocation == NULL) {
/* we need to recheck there wasn't someone else who did the
* same thing we want to do below */
find_invocation(ometric);
}
if (invocation == NULL) { /* no match, add */
int i;
time_t now;
......@@ -244,12 +257,14 @@ aggregator_putmetric(
if ((invocation = malloc(sizeof(*invocation))) == NULL) {
logerr("aggregator: out of memory creating %s from %s",
ometric, metric);
pthread_rwlock_unlock(&compute->invlock);
continue;
}
if ((invocation->metric = strdup(ometric)) == NULL) {
logerr("aggregator: out of memory creating %s from %s",
ometric, metric);
free(invocation);
pthread_rwlock_unlock(&compute->invlock);
continue;
}
invocation->hash = omhash;
......@@ -276,6 +291,7 @@ aggregator_putmetric(
ometric, metric);
free(invocation->metric);
free(invocation);
pthread_rwlock_unlock(&compute->invlock);
continue;
}
for (i = 0; i < s->bucketcnt; i++) {
......@@ -295,17 +311,19 @@ aggregator_putmetric(
if (itime < 0) {
/* drop too old metric */
s->dropped++;
pthread_rwlock_unlock(&compute->invlock);
continue;
}
slot = itime / s->interval;
if (slot >= s->bucketcnt) {
if (mode == DEBUG || mode == DEBUGTEST)
if (mode & MODE_DEBUG)
logerr("aggregator: dropping metric too far in the "
"future (%lld > %lld): %s from %s", epoch,
invocation->buckets[s->bucketcnt - 1].start,
ometric, metric);
s->dropped++;
pthread_rwlock_unlock(&compute->invlock);
continue;
}
......@@ -339,8 +357,9 @@ aggregator_putmetric(
entries->values[bucket->cnt] = val;
}
bucket->cnt++;
pthread_rwlock_unlock(&compute->invlock);
}
pthread_mutex_unlock(&s->bucketlock);
return;
}
......@@ -370,6 +389,7 @@ aggregator_expire(void *sub)
double *values;
size_t len = 0;
int i;
size_t k;
unsigned char j;
int work;
char metric[METRIC_BUFSIZ];
......@@ -439,10 +459,9 @@ aggregator_expire(void *sub)
break;
case MEDN:
/* median == 50th percentile */
case PCTL: {
case PCTL:
/* nearest rank method */
size_t n =
(int)(((double)c->percentile/100.0 *
k = (int)(((double)c->percentile/100.0 *
(double)b->cnt) + 0.9);
values = b->entries.values;
/* TODO: lazy approach, in case
......@@ -456,16 +475,16 @@ aggregator_expire(void *sub)
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric,
values[n - 1],
values[k - 1],
ts);
} break;
break;
case VAR:
case SDEV: {
double avg = b->sum / (double)b->cnt;
double ksum = 0;
values = b->entries.values;
for (i = 0; i < b->cnt; i++)
ksum += pow(values[i] - avg, 2);
for (k = 0; k < b->cnt; k++)
ksum += pow(values[k] - avg, 2);
ksum /= (double)b->cnt;
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
......@@ -477,7 +496,15 @@ aggregator_expire(void *sub)
default:
assert(0); /* for compiler (len) */
}
if (write(s->fd, metric, len) != len) {
ts = write(s->fd, metric, len);
if (ts < 0) {
logerr("aggregator: failed to write to "
"pipe (fd=%d): %s\n",
s->fd, strerror(errno));
s->dropped++;
} else if (ts < len) {
logerr("aggregator: uncomplete write on "
"pipe (fd=%d)\n", s->fd);
s->dropped++;
} else {
s->sent++;
......@@ -486,7 +513,7 @@ aggregator_expire(void *sub)
/* move the bucket to the end, to make room for
* new ones */
pthread_mutex_lock(&s->bucketlock);
pthread_rwlock_wrlock(&c->invlock);
b = &inv->buckets[0];
len = b->entries.size;
values = b->entries.values;
......@@ -499,23 +526,22 @@ aggregator_expire(void *sub)
s->interval;
b->entries.size = len;
b->entries.values = values;
pthread_mutex_unlock(&s->bucketlock);
pthread_rwlock_unlock(&c->invlock);
work++;
}
if (isempty) {
/* see if the remaining buckets are empty too */
pthread_mutex_lock(&s->bucketlock);
for (j = 0; j < s->bucketcnt; j++) {
if (inv->buckets[j].cnt != 0) {
isempty = 0;
pthread_mutex_unlock(&s->bucketlock);
break;
}
}
}
if (isempty) {
pthread_rwlock_wrlock(&c->invlock);
/* free and unlink */
if (c->entries_needed)
for (j = 0; j < s->bucketcnt; j++)
......@@ -532,7 +558,7 @@ aggregator_expire(void *sub)
free(inv);
inv = c->invocations_ht[i];
}
pthread_mutex_unlock(&s->bucketlock);
pthread_rwlock_unlock(&c->invlock);
} else {
lastinv = inv;
inv = inv->next;
......@@ -574,6 +600,8 @@ aggregator_expire(void *sub)
}
}
pthread_rwlock_destroy(&c->invlock);
s->computes = c->next;
free(c);
}
......@@ -661,9 +689,8 @@ aggregator_get_received(aggregator *a)
inline size_t
aggregator_get_received_sub(aggregator *aggrs)
{
size_t d = aggregator_get_received(aggrs);
size_t r = d - prevreceived;
prevreceived += d;
size_t r = aggregator_get_received(aggrs) - prevreceived;
prevreceived += r;
return r;
}
......@@ -688,9 +715,8 @@ aggregator_get_sent(aggregator *a)
inline size_t
aggregator_get_sent_sub(aggregator *aggrs)
{
size_t d = aggregator_get_sent(aggrs);
size_t r = d - prevsent;
prevsent += d;
size_t r = aggregator_get_sent(aggrs) - prevsent;
prevsent += r;
return r;
}
......@@ -717,8 +743,7 @@ aggregator_get_dropped(aggregator *a)
inline size_t
aggregator_get_dropped_sub(aggregator *aggrs)
{
size_t d = aggregator_get_dropped(aggrs);
size_t r = d - prevdropped;
prevdropped += d;
size_t r = aggregator_get_dropped(aggrs) - prevdropped;
prevdropped += r;
return r;
}
......@@ -57,9 +57,9 @@ typedef struct _aggregator {
} *invocations_ht[1 << AGGR_HT_POW_SIZE];
unsigned char entries_needed:1;
unsigned char percentile:7;
pthread_rwlock_t invlock;
struct _aggr_computes *next;
} *computes;
pthread_mutex_t bucketlock;
struct _aggregator *next;
} aggregator;
......
This diff is collapsed.
......@@ -29,9 +29,9 @@ extern int collector_interval;
#define timediff(X, Y) \
(Y.tv_sec > X.tv_sec ? (Y.tv_sec - X.tv_sec) * 1000 * 1000 + ((Y.tv_usec - X.tv_usec)) : Y.tv_usec - X.tv_usec)
void collector_start(dispatcher **d, cluster *c, aggregator *a, server *submission, char cum);
void collector_start(dispatcher **d, router *rtr, server *submission, char cum);
void collector_stop(void);
void collector_schedulereload(cluster *c, aggregator *a);
void collector_schedulereload(router *rtr);
char collector_reloadcomplete(void);
#endif
......@@ -470,7 +470,8 @@ ch_gethashpos(ch_ring *ring, const char *key, const char *end)
}
/**
* Frees the ring structure and its added nodes.
* Frees the ring structure and its added nodes, leaves the referenced
* servers untouched.
*/
void
ch_free(ch_ring *ring)
......@@ -480,7 +481,6 @@ ch_free(ch_ring *ring)
for (; ring->entries != NULL; ring->entries = ring->entries->next) {
if (ring->entries->malloced) {
server_shutdown(ring->entries->server);
free(ring->entries->server);
if (deletes == NULL) {
......
......@@ -53,6 +53,7 @@ typedef struct _connection {
destination dests[CONN_DESTS_SIZE];
size_t destlen;
struct timeval lastwork;
unsigned int maxsenddelay;
char hadwork:1;
char isaggr:1;
} connection;
......@@ -64,13 +65,14 @@ struct _dispatcher {
size_t metrics;
size_t blackholes;
size_t ticks;
size_t sleeps;
size_t prevmetrics;
size_t prevblackholes;
size_t prevticks;
enum { RUNNING, SLEEPING } state;
size_t prevsleeps;
char keep_running:1;
route *routes;
route *pending_routes;
router *rtr;
router *pending_rtr;
char route_refresh_pending:1;
char hold:1;
char *allowed_chars;
......@@ -293,9 +295,13 @@ inline static char
dispatch_process_dests(connection *conn, dispatcher *self, struct timeval now)
{
int i;
char force = timediff(conn->lastwork, now) > 1 * 1000 * 1000; /* 1 sec timeout */
char force;
if (conn->destlen > 0) {
if (conn->maxsenddelay == 0)
conn->maxsenddelay = ((rand() % 750) + 250) * 1000;
/* force after timeout */
force = timediff(conn->lastwork, now) > conn->maxsenddelay;
for (i = 0; i < conn->destlen; i++) {
if (server_send(conn->dests[i].dest, conn->dests[i].metric, force) == 0)
break;
......@@ -338,22 +344,16 @@ dispatch_process_dests(connection *conn, dispatcher *self, struct timeval now)
* block reads
*/
static int
dispatch_connection(connection *conn, dispatcher *self)
dispatch_connection(connection *conn, dispatcher *self, struct timeval start)
{
char *p, *q, *firstspace, *lastnl;
int len;
struct timeval start, stop;
gettimeofday(&start, NULL);
/* first try to resume any work being blocked */
if (dispatch_process_dests(conn, self, start) == 0) {
gettimeofday(&stop, NULL);
self->ticks += timediff(start, stop);
conn->takenby = 0;
return 0;
}
gettimeofday(&stop, NULL);
self->ticks += timediff(start, stop);
/* don't poll (read) when the last time we ran nothing happened,
* this is to avoid excessive CPU usage, issue #126 */
......@@ -363,7 +363,6 @@ dispatch_connection(connection *conn, dispatcher *self)
}
conn->hadwork = 0;
gettimeofday(&start, NULL);
len = -2;
/* try to read more data, if that succeeds, or we still have data
* left in the buffer, try to process the buffer */
......@@ -402,10 +401,10 @@ dispatch_connection(connection *conn, dispatcher *self)
*q = '\0'; /* can do this because we substract one from buf */
/* perform routing of this metric */
self->blackholes += router_route(
self->blackholes += router_route(self->rtr,
conn->dests, &conn->destlen, CONN_DESTS_SIZE,
conn->srcaddr,
conn->metric, firstspace, self->routes);
conn->metric, firstspace);
/* restart building new one from the start */
q = conn->metric;
......@@ -413,6 +412,7 @@ dispatch_connection(connection *conn, dispatcher *self)
conn->hadwork = 1;
gettimeofday(&conn->lastwork, NULL);
conn->maxsenddelay = 0;
/* send the metric to where it is supposed to go */
if (dispatch_process_dests(conn, self, conn->lastwork) == 0)
break;
......@@ -458,13 +458,13 @@ dispatch_connection(connection *conn, dispatcher *self)
memmove(conn->buf, lastnl + 1, conn->buflen);
}
}
gettimeofday(&stop, NULL);
self->ticks += timediff(start, stop);
if (len == -1 && (errno == EINTR ||
errno == EAGAIN ||
errno == EWOULDBLOCK))
{
/* nothing available/no work done */
struct timeval stop;
gettimeofday(&stop, NULL);
if (!conn->noexpire &&
timediff(conn->lastwork, stop) > IDLE_DISCONNECT_TIME)
{
......@@ -514,16 +514,16 @@ dispatch_runner(void *arg)
{
dispatcher *self = (dispatcher *)arg;
connection *conn;
int work;
int c;
self->metrics = 0;
self->blackholes = 0;
self->ticks = 0;
self->sleeps = 0;
self->prevmetrics = 0;
self->prevblackholes = 0;
self->prevticks = 0;
self->state = SLEEPING;
self->prevsleeps = 0;
if (self->type == LISTENER) {
struct pollfd ufds[sizeof(listeners) / sizeof(connection *)];
......@@ -558,15 +558,20 @@ dispatch_runner(void *arg)
}
}
} else if (self->type == CONNECTION) {
int work;
struct timeval start, stop;
while (self->keep_running) {
work = 0;
if (self->route_refresh_pending) {
self->routes = self->pending_routes;
self->pending_routes = NULL;
self->rtr = self->pending_rtr;
self->pending_rtr = NULL;
self->route_refresh_pending = 0;
self->hold = 0;
}
gettimeofday(&start, NULL);
pthread_rwlock_rdlock(&connectionslock);
for (c = 0; c < connectionslen; c++) {
conn = &(connections[c]);
......@@ -577,15 +582,19 @@ dispatch_runner(void *arg)
conn->takenby = 0;
continue;
}
self->state = RUNNING;
work += dispatch_connection(conn, self);
work += dispatch_connection(conn, self, start);
}
pthread_rwlock_unlock(&connectionslock);
gettimeofday(&stop, NULL);
self->ticks += timediff(start, stop);
self->state = SLEEPING;
/* nothing done, avoid spinlocking */
if (self->keep_running && work == 0)
if (self->keep_running && work == 0) {
gettimeofday(&start, NULL);
usleep((100 + (rand() % 200)) * 1000); /* 100ms - 300ms */
gettimeofday(&stop, NULL);
self->sleeps += timediff(start, stop);
}
}
} else {
logerr("huh? unknown self type!\n");
......@@ -599,7 +608,7 @@ dispatch_runner(void *arg)
* Returns its handle.
*/
static dispatcher *
dispatch_new(char id, enum conntype type, route *routes, char *allowed_chars)
dispatch_new(char id, enum conntype type, router *r, char *allowed_chars)
{
dispatcher *ret = malloc(sizeof(dispatcher));
......@@ -609,7 +618,7 @@ dispatch_new(char id, enum conntype type, route *routes, char *allowed_chars)
ret->id = id;
ret->type = type;
ret->keep_running = 1;
ret->routes = routes;
ret->rtr = r;
ret->route_refresh_pending = 0;
ret->hold = 0;
ret->allowed_chars = allowed_chars;
......@@ -639,10 +648,10 @@ dispatch_new_listener(void)
* existing connections.
*/
dispatcher *
dispatch_new_connection(route *routes, char *allowed_chars)
dispatch_new_connection(router *r, char *allowed_chars)
{
char id = globalid++;
return dispatch_new(id, CONNECTION, routes, allowed_chars);
return dispatch_new(id, CONNECTION, r, allowed_chars);
}
/**
......@@ -655,14 +664,22 @@ dispatch_stop(dispatcher *d)
}
/**
* Shuts down and frees up dispatcher d. Returns when the dispatcher
* has terminated.
* Shuts down dispatcher d. Returns when the dispatcher has terminated.
*/
void
dispatch_shutdown(dispatcher *d)
{
dispatch_stop(d);
pthread_join(d->tid, NULL);
}