Commit 7408326e authored by Bernd Zeimetz's avatar Bernd Zeimetz

Merge branch 'master' into wheezy-backports

parents 7304a162 93a3956b
script: make
language: c
os:
- linux
- osx
compiler:
- gcc
- clang
matrix:
exclude:
# gcc is actually clang on OSX, so pointless to test twice
- os: osx
compiler: gcc
env:
global:
# The next declaration is the encrypted COVERITY_SCAN_TOKEN, created
# via the "travis encrypt" command using the project repo's public key
- secure: "H844xDJC5CUXhUyB1SFsGiiz/Q4viVSALkj/KnWAuagrbrzIXJoK4tRWpPxCzUar5QsdEua1WIXTBezPA6V97w2cis2OGUXtgsMd1fWnwlG0RwPRj2+LAVxJ+i5z1Asx8hkYOiZIv8y0VkBGldjOrD0l0pBcxB+CDZC/zsGSRtnMLOs+smfEjOF6JPGw8Ye0XJ+YKrfbHOXhfS7XTMwUnjgAV7lTH/aAlKuedc3mYsuC3WHJOMptYLXaiPdbPKoIXfLdmszP91SkThr4f0elyQskugzGM3D7M1tIFhfmordSD+DfzLvvdSu32hHUNQjHvAm7Pz8QmdLC2IjjE8y8MkIkEy2+wR/5AjacnKR8OgFzXgt+yIH+CBc6qbTi40dZ7Go33Q8uNyKJGrA1CyL195NS8q0reMbuS7WIxI0rNOy7cdw8MO0sNjeofcb1RW90U5u5qciRQpbwhFwMFMCuD0LL41mjfN+IHGNeUwp148Qibbvyu+d7y4Y1F7/CsdMpZVBH9Ae7CHXetN8biT81fJp1rn0uEGQqoPuxvKeePsL40MXu6wvlrSf5wLKV3aqp5lzbjoJsPrOwV5iVwMHrvdhoAFlLc6Y1aJkCTPPt7yjLHgLBisqRXLTZ9lcD3AG8ubTLr03DsseUUM3/tWvhWfVRfxzkuOHsDRLMfdY2Q20="
- CFLAGS="-O3 -Wall -Werror -Wshadow -pipe"
addons:
coverity_scan:
project:
name: "grobian/carbon-c-relay"
description: "Build submitted via Travis CI"
notification_email: grobian@gentoo.org
build_command_prepend: "make clean"
build_command: "make -j 4"
branch_pattern: master
# 2.0 (unreleased master branch)
# 2.4 (unreleased master branch)
# 2.3 (2016-11-07)
### Bugfixes
* [Issue #213](https://github.com/grobian/carbon-c-relay/issues/213)
Change to aggregates to not cause HUP to reload when more than 10
aggregates are defined.
* [Issue #214](https://github.com/grobian/carbon-c-relay/issues/214)
`-U` option doesn't set UDP receive buffer size.
* [Issue #218](https://github.com/grobian/carbon-c-relay/issues/218)
zeros inserted after some metrics.
* [Issue #219](https://github.com/grobian/carbon-c-relay/issues/219)
should fail if port is unavailable
* [Issue #224](https://github.com/grobian/carbon-c-relay/issues/224)
segfault during SIGHUP
# 2.2 (2016-09-11)
### New Features
* **relay** socket receive and send buffer sizes can be adjusted
the `-U` option was introduced to allow setting the socket
buffer size in bytes, [Issue #207](https://github.com/grobian/carbon-c-relay/issues/207).
### Breaking Changes
### Enhancements
* **relay** the listen backlog default got increased from 3 to 32
* **server** TCP\_NODELAY is now set to improve small writes
### Bugfixes
* [Issue #188](https://github.com/grobian/carbon-c-relay/issues/188)
SIGHUP leads to SIGSEGV when config didn't change, or a SIGHUP is
received while a previous HUP is being processed
* [Issue #193](https://github.com/grobian/carbon-c-relay/issues/193)
race condition in aggregator leads to crash
* [Issue #195](https://github.com/grobian/carbon-c-relay/issues/195)
assertion fails when reloading config
* [Issue #200](https://github.com/grobian/carbon-c-relay/issues/200)
use after free during shutdown in aggregator
* [Issue #203](https://github.com/grobian/carbon-c-relay/issues/203)
change default connection listen backlog
* [Issue #208](https://github.com/grobian/carbon-c-relay/issues/208)
TCP\_NODELAY should be off for connections relaying data
* [Issue #199](https://github.com/grobian/carbon-c-relay/issues/199)
Various race conditions reported by TSAN
# 2.1 (16-06-2016)
### Enhancements
* **router** the optimiser now tries harder to form groups of
consecutive rules that have a matching common pattern
### Bugfixes
* [Issue #180](https://github.com/grobian/carbon-c-relay/issues/180)
include directive possibly overrides previous included components
* [Issue #184](https://github.com/grobian/carbon-c-relay/issues/184)
router optimise doesn't work correctly with regex groups
# 2.0 (30-05-2016)
### New Features
* **router** `include` directive was added to add content of another
file, see also [Issue #165](https://github.com/grobian/carbon-c-relay/issues/165). The include can also use glob patterns, see [Pull #174](https://github.com/grobian/carbon-c-relay/pull/174)
* **server** the number of stalls performed on writes can now be
controlled (and also disabled) using the `-L` flag.
[Issue #172](https://github.com/grobian/carbon-c-relay/issues/172)
### Breaking Changes
### Enhancements
* **server** incomplete writes are now retried a couple of times before
they are considered fatal. This should reduce the amount
of messages in the logs about them, and be more like the
consumer expects, e.g. less sudden disconnects for the
client.
* **router** reloading the config now prints the difference between the
old and the new config in `diff -u` format.
* **router** reloading the config now maintains the queues for the
servers, such that unavailable servers don't get metrics
dropped.
### Bugfixes
* [Issue #154](https://github.com/grobian/carbon-c-relay/issues/159)
when a store becomes a bottleneck it shouldn't indefinitely stall
* [Issue #164](https://github.com/grobian/carbon-c-relay/issues/164)
config reload should re-use unmodified servers
# 1.11 (23-03-2016)
### New Features
* **router** `send statistics to` construct was added to direct internal
statistics to a specific cluster
### Enhancements
* **collector** UDP connections are now suffixed with `-udp` in
destination target
* **router** `send statistics to` construct was added to direct internal
statistics to a specific cluster
### Bugfixes
* [Issue #159](https://github.com/grobian/carbon-c-relay/issues/159)
......
......@@ -45,6 +45,14 @@ OBJS = \
relay: $(OBJS)
$(CC) -o $@ $(LDFLAGS) $^ $(LIBS)
man:
sed -e '/travis-ci.org\/grobian\/carbon-c-relay.svg/d' carbon-c-relay.md | \
ronn \
--manual="Graphite data collection and visualisation" \
--organization=Graphite \
--roff \
> carbon-c-relay.1
VERSION = $(shell sed -n '/VERSION/s/^.*"\([0-9.]\+\)".*$$/\1/p' relay.h)
dist:
git archive \
......
This diff is collapsed.
carbon-c-relay.md
\ No newline at end of file
......@@ -193,6 +193,10 @@ aggregator_putmetric(
struct _aggr_bucket *bucket;
struct _aggr_bucket_entries *entries;
/* do not accept new values when shutting down, issue #200 */
if (__sync_bool_compare_and_swap(&keep_running, 0, 0))
return;
/* get value */
if ((v = strchr(firstspace + 1, ' ')) == NULL) {
/* metric includes \n */
......@@ -202,7 +206,7 @@ aggregator_putmetric(
return;
}
s->received++;
__sync_add_and_fetch(&s->received, 1);
val = atof(firstspace + 1);
epoch = atoll(v + 1);
......@@ -237,21 +241,11 @@ aggregator_putmetric(
if (invocation->hash == omhash && \
strcmp(o, invocation->metric) == 0) /* match */ \
break;
pthread_rwlock_rdlock(&compute->invlock);
find_invocation(ometric);
/* switch to a write lock from here, since we've found what we
* were looking for (or are going to create it) and modify it */
pthread_rwlock_unlock(&compute->invlock);
pthread_rwlock_wrlock(&compute->invlock);
if (invocation == NULL) {
/* we need to recheck there wasn't someone else who did the
* same thing we want to do below */
find_invocation(ometric);
}
find_invocation(ometric);
if (invocation == NULL) { /* no match, add */
int i;
long long int i;
time_t now;
if ((invocation = malloc(sizeof(*invocation))) == NULL) {
......@@ -295,7 +289,8 @@ aggregator_putmetric(
continue;
}
for (i = 0; i < s->bucketcnt; i++) {
invocation->buckets[i].start = now + (i * s->interval);
invocation->buckets[i].start =
now + (i * (long long int)s->interval);
invocation->buckets[i].cnt = 0;
invocation->buckets[i].entries.size = 0;
invocation->buckets[i].entries.values = NULL;
......@@ -310,7 +305,7 @@ aggregator_putmetric(
itime = epoch - invocation->buckets[0].start;
if (itime < 0) {
/* drop too old metric */
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
pthread_rwlock_unlock(&compute->invlock);
continue;
}
......@@ -322,7 +317,7 @@ aggregator_putmetric(
"future (%lld > %lld): %s from %s", epoch,
invocation->buckets[s->bucketcnt - 1].start,
ometric, metric);
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
pthread_rwlock_unlock(&compute->invlock);
continue;
}
......@@ -403,14 +398,15 @@ aggregator_expire(void *sub)
/* send metrics for buckets that are completely past the
* expiry time, unless we are shutting down, then send
* metrics for all buckets that have completed */
now = time(NULL) + (keep_running ? 0 : s->expire - s->interval);
now = time(NULL) + (__sync_bool_compare_and_swap(&keep_running, 1, 1) ? 0 : s->expire - s->interval);
for (c = s->computes; c != NULL; c = c->next) {
pthread_rwlock_wrlock(&c->invlock);
for (i = 0; i < (1 << AGGR_HT_POW_SIZE); i++) {
lastinv = NULL;
isempty = 0;
for (inv = c->invocations_ht[i]; inv != NULL; ) {
while (inv->buckets[0].start +
(keep_running ? inv->expire : s->expire) < now)
(__sync_bool_compare_and_swap(&keep_running, 1, 1) ? inv->expire : s->expire) < now)
{
/* yay, let's produce something cool */
b = &inv->buckets[0];
......@@ -501,19 +497,18 @@ aggregator_expire(void *sub)
logerr("aggregator: failed to write to "
"pipe (fd=%d): %s\n",
s->fd, strerror(errno));
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
} else if (ts < len) {
logerr("aggregator: uncomplete write on "
"pipe (fd=%d)\n", s->fd);
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
} else {
s->sent++;
__sync_add_and_fetch(&s->sent, 1);
}
}
/* move the bucket to the end, to make room for
* new ones */
pthread_rwlock_wrlock(&c->invlock);
b = &inv->buckets[0];
len = b->entries.size;
values = b->entries.values;
......@@ -526,7 +521,6 @@ aggregator_expire(void *sub)
s->interval;
b->entries.size = len;
b->entries.values = values;
pthread_rwlock_unlock(&c->invlock);
work++;
}
......@@ -541,7 +535,6 @@ aggregator_expire(void *sub)
}
}
if (isempty) {
pthread_rwlock_wrlock(&c->invlock);
/* free and unlink */
if (c->entries_needed)
for (j = 0; j < s->bucketcnt; j++)
......@@ -558,18 +551,18 @@ aggregator_expire(void *sub)
free(inv);
inv = c->invocations_ht[i];
}
pthread_rwlock_unlock(&c->invlock);
} else {
lastinv = inv;
inv = inv->next;
}
}
}
pthread_rwlock_unlock(&c->invlock);
}
}
if (work == 0) {
if (!keep_running)
if (__sync_bool_compare_and_swap(&keep_running, 0, 0))
break;
/* nothing done, avoid spinlocking */
usleep(250 * 1000); /* 250ms */
......@@ -664,7 +657,7 @@ aggregator_start(aggregator *aggrs)
void
aggregator_stop(void)
{
keep_running = 0;
__sync_bool_compare_and_swap(&keep_running, 1, 0);
pthread_join(aggregatorid, NULL);
}
......@@ -677,7 +670,7 @@ aggregator_get_received(aggregator *a)
size_t totreceived = 0;
for ( ; a != NULL; a = a->next)
totreceived += a->received;
totreceived += __sync_add_and_fetch(&a->received, 0);
return totreceived;
}
......@@ -703,7 +696,7 @@ aggregator_get_sent(aggregator *a)
size_t totsent = 0;
for ( ; a != NULL; a = a->next)
totsent += a->sent;
totsent += __sync_add_and_fetch(&a->sent, 0);
return totsent;
}
......@@ -731,7 +724,7 @@ aggregator_get_dropped(aggregator *a)
size_t totdropped = 0;
for ( ; a != NULL; a = a->next)
totdropped += a->dropped;
totdropped += __sync_add_and_fetch(&a->dropped, 0);
return totdropped;
}
......
This diff is collapsed.
This diff is collapsed.
......@@ -114,7 +114,7 @@ collector_runner(void *s)
server_send(submission, strdup(metric), 1);
nextcycle = time(NULL) + collector_interval;
while (keep_running) {
while (__sync_bool_compare_and_swap(&keep_running, 1, 1)) {
if (cluster_refresh_pending) {
char *stub = router_getcollectorstub(pending_router);
server **newservers = router_getservers(pending_router);
......@@ -225,6 +225,8 @@ collector_runner(void *s)
snprintf(destbuf, sizeof(destbuf), "%s:%u-udp",
server_ip(srvs[i]), server_port(srvs[i]));
break;
default:
assert(0); /* help code analysis tools */
}
for (p = destbuf; *p != '\0'; p++)
if (*p == '.')
......@@ -359,6 +361,8 @@ collector_writer(void *unused)
}
totconn = dispatch_get_accepted_connections();
totdisc = dispatch_get_closed_connections();
if (dticks == 0)
dticks = 1; /* for Coverity */
printf("%5zu %7zu " /* metrics in */
"%5zu %7zu " /* metrics out */
"%5zu %7zu " /* metrics dropped */
......@@ -458,6 +462,6 @@ collector_start(dispatcher **d, router *rtr, server *submission, char cum)
void
collector_stop(void)
{
keep_running = 0;
__sync_bool_compare_and_swap(&keep_running, 1, 0);
pthread_join(collectorid, NULL);
}
......@@ -98,7 +98,7 @@ jump_bucketpos(unsigned long long int key, int bckcnt)
}
/* b cannot exceed the range of bckcnt, see while condition */
return (int)b;
return (unsigned int)b;
}
/**
......@@ -481,8 +481,6 @@ ch_free(ch_ring *ring)
for (; ring->entries != NULL; ring->entries = ring->entries->next) {
if (ring->entries->malloced) {
free(ring->entries->server);
if (deletes == NULL) {
w = deletes = ring->entries;
} else {
......@@ -491,12 +489,13 @@ ch_free(ch_ring *ring)
}
}
assert(w != NULL);
w->next = NULL;
while (deletes != NULL) {
w = deletes->next;
free(deletes);
deletes = w;
if (w != NULL) {
w->next = NULL;
while (deletes != NULL) {
w = deletes->next;
free(deletes);
deletes = w;
}
}
if (ring->entrylist != NULL)
......
carbon-c-relay (1.11-1~bpo7+1) wheezy-backports-sloppy; urgency=medium
carbon-c-relay (2.3-1~bpo7+1) wheezy-backports-sloppy; urgency=medium
* Rebuild for wheezy-backports-sloppy.
-- Bernd Zeimetz <bzed@debian.org> Mon, 04 Apr 2016 19:37:34 +0200
-- Bernd Zeimetz <bzed@debian.org> Fri, 02 Dec 2016 11:28:02 +0100
carbon-c-relay (2.3-1) unstable; urgency=medium
* [b994cee] Merge tag 'upstream/2.3'
Upstream version 2.3
-- Bernd Zeimetz <bzed@debian.org> Tue, 22 Nov 2016 21:52:53 +0100
carbon-c-relay (2.2-1) unstable; urgency=medium
* [e6c5142] Merge tag 'upstream/2.2'
Upstream version 2.2
-- Bernd Zeimetz <bzed@debian.org> Sat, 24 Sep 2016 11:49:41 +0200
carbon-c-relay (1.11-1) unstable; urgency=medium
......
This diff is collapsed.
......@@ -30,6 +30,7 @@ int dispatch_addlistener_udp(int sock);
void dispatch_removelistener(int sock);
int dispatch_addconnection(int sock);
int dispatch_addconnection_aggr(int sock);
void dispatch_set_bufsize(unsigned int sockbufsize);
dispatcher *dispatch_new_listener(void);
dispatcher *dispatch_new_connection(router *r, char *allowed_chars);
void dispatch_stop(dispatcher *d);
......
......@@ -38,7 +38,7 @@
#define SRVCNT 8
#define REPLCNT 2
enum rmode mode = NORMAL;
unsigned char mode = 0;
int
relaylog(enum logdst dest, const char *fmt, ...)
......@@ -88,6 +88,7 @@ int main(int argc, char *argv[]) {
NULL,
1024,
128,
4,
800
);
MD5((unsigned char *)ip, strlen(ip), md5);
......
include issues/issue10.conf;
include issues/issue16.conf;
match baz
send to blackhole
;
match foo
send to blackhole
;
include issue180-a.conf
;
match bar
send to blackhole
;
# 51 times the same aggr to trigger the optimiser
aggregate
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
every 1 seconds
expire after 2 seconds
compute sum write to
\1
stop;
aggregate
dothework.(foo|bar)\.(.+)
somethingelse.dothework\.(.+)
more.dothework.(yo.)
a.nonmatching.thing
whatever.dothework.(yo.)
every 1 seconds
expire after 2 seconds
compute sum write to
\1
stop;
cluster my_cluster forward 127.0.0.1:2003;
cluster my_homie_cluster forward 127.0.0.1:2004;
match ^awesome\.(.+)
send to
my_cluster
my_homie_cluster
stop
;
cluster out file /tmp/issue-211;
aggregate
^carbon\.relays\.[^.]+\.metricsReceived$
every 60 seconds
expire after 70 seconds
compute sum write to
collectd.sum.if_octets.total
stop;
match
^carbon\.relays\.
send to blackhole
stop;
match * send to out;
cluster graphite forward 127.0.0.1:2004;
aggregate
^1\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^2\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^3\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^4\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^5\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^6\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^7\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^8\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^9\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^10\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
aggregate
^11\.(foo|bar)\.some_instance\.([^.]+)\.(.+)
every 30 seconds
expire after 90 seconds
compute sum write to
sums.some_instance.\2.\3
send to graphite
;
rewrite abcdefg0 into b;
rewrite abcdefg1 into b;
rewrite abcdefg2 into b;
rewrite abcdefg3 into b;
rewrite abcdefg4 into b;
rewrite abcdefg5 into b;
rewrite abcdefg6 into b;
rewrite abcdefg7 into b;
rewrite abcdefg8 into b;
rewrite abcdefg9 into b;
rewrite abcdefg10 into b;
rewrite abcdefg11 into b;
rewrite abcdefg12 into b;
rewrite abcdefg13 into b;
rewrite abcdefg14 into b;
rewrite abcdefg15 into b;
rewrite abcdefg16 into b;
rewrite abcdefg17 into b;
rewrite abcdefg18 into b;
rewrite abcdefg19 into b;
rewrite abcdefg20 into b;
rewrite abcdefg21 into b;
rewrite abcdefg22 into b;
rewrite abcdefg23 into b;
rewrite abcdefg24 into b;
rewrite abcdefg25 into b;
rewrite abcdefg26 into b;
rewrite abcdefg27 into b;
rewrite abcdefg28 into b;
rewrite abcdefg29 into b;
rewrite abcdefg30 into b;
rewrite abcdefg31 into b;
rewrite abcdefg32 into b;
rewrite abcdefg33 into b;
rewrite abcdefg34 into b;
rewrite abcdefg35 into b;
rewrite abcdefg36 into b;
rewrite abcdefg37 into b;
rewrite abcdefg38 into b;
rewrite abcdefg39 into b;
rewrite abcdefg40 into b;
rewrite abcdefg41 into b;
rewrite abcdefg42 into b;
rewrite abcdefg43 into b;
rewrite abcdefg44 into b;
rewrite abcdefg45 into b;
rewrite abcdefg46 into b;
rewrite abcdefg47 into b;
rewrite abcdefg48 into b;
rewrite abcdefg49 into b;
rewrite abcdefg50 into b;
rewrite abcdefg51 into b;
rewrite abcdefg52 into b;
rewrite abcdefg53 into b;
rewrite abcdefg54 into b;
rewrite abcdefg55 into b;
rewrite abcdefg56 into b;
rewrite abcdefg57 into b;
rewrite abcdefg58 into b;