Commit 2145a80e authored by Bernd Zeimetz's avatar Bernd Zeimetz

New upstream version 2.2

parent 97a6ece8
script: make
language: c
os:
- linux
- osx
compiler:
- gcc
- clang
matrix:
exclude:
# gcc is actually clang on OSX, so pointless to test twice
- os: osx
compiler: gcc
env:
global:
# The next declaration is the encrypted COVERITY_SCAN_TOKEN, created
# via the "travis encrypt" command using the project repo's public key
- secure: "H844xDJC5CUXhUyB1SFsGiiz/Q4viVSALkj/KnWAuagrbrzIXJoK4tRWpPxCzUar5QsdEua1WIXTBezPA6V97w2cis2OGUXtgsMd1fWnwlG0RwPRj2+LAVxJ+i5z1Asx8hkYOiZIv8y0VkBGldjOrD0l0pBcxB+CDZC/zsGSRtnMLOs+smfEjOF6JPGw8Ye0XJ+YKrfbHOXhfS7XTMwUnjgAV7lTH/aAlKuedc3mYsuC3WHJOMptYLXaiPdbPKoIXfLdmszP91SkThr4f0elyQskugzGM3D7M1tIFhfmordSD+DfzLvvdSu32hHUNQjHvAm7Pz8QmdLC2IjjE8y8MkIkEy2+wR/5AjacnKR8OgFzXgt+yIH+CBc6qbTi40dZ7Go33Q8uNyKJGrA1CyL195NS8q0reMbuS7WIxI0rNOy7cdw8MO0sNjeofcb1RW90U5u5qciRQpbwhFwMFMCuD0LL41mjfN+IHGNeUwp148Qibbvyu+d7y4Y1F7/CsdMpZVBH9Ae7CHXetN8biT81fJp1rn0uEGQqoPuxvKeePsL40MXu6wvlrSf5wLKV3aqp5lzbjoJsPrOwV5iVwMHrvdhoAFlLc6Y1aJkCTPPt7yjLHgLBisqRXLTZ9lcD3AG8ubTLr03DsseUUM3/tWvhWfVRfxzkuOHsDRLMfdY2Q20="
- CFLAGS="-O3 -Wall -Werror -Wshadow -pipe"
addons:
coverity_scan:
project:
name: "grobian/carbon-c-relay"
description: "Build submitted via Travis CI"
notification_email: grobian@gentoo.org
build_command_prepend: "make clean"
build_command: "make -j 4"
branch_pattern: master
# 2.0 (unreleased master branch)
# 2.3 (unreleased master branch)
# 2.2 (2016-09-11)
### New Features
* **relay** socket receive and send buffer sizes can be adjusted
the `-U` option was introduced to allow setting the socket
buffer size in bytes, [Issue #207](https://github.com/grobian/carbon-c-relay/issues/207).
### Breaking Changes
### Enhancements
* **relay** the listen backlog default got increased from 3 to 32
* **server** TCP\_NODELAY is now set to improve small writes
### Bugfixes
* [Issue #188](https://github.com/grobian/carbon-c-relay/issues/188)
SIGHUP leads to SIGSEGV when config didn't change, or a SIGHUP is
received while a previous HUP is being processed
* [Issue #193](https://github.com/grobian/carbon-c-relay/issues/193)
race condition in aggregator leads to crash
* [Issue #195](https://github.com/grobian/carbon-c-relay/issues/195)
assertion fails when reloading config
* [Issue #200](https://github.com/grobian/carbon-c-relay/issues/200)
use after free during shutdown in aggregator
* [Issue #203](https://github.com/grobian/carbon-c-relay/issues/203)
change default connection listen backlog
* [Issue #208](https://github.com/grobian/carbon-c-relay/issues/208)
TCP\_NODELAY should be off for connections relaying data
* [Issue #199](https://github.com/grobian/carbon-c-relay/issues/199)
Various race conditions reported by TSAN
# 2.1 (16-06-2016)
### Enhancements
* **router** the optimiser now tries harder to form groups of
consecutive rules that have a matching common pattern
### Bugfixes
* [Issue #180](https://github.com/grobian/carbon-c-relay/issues/180)
include directive possibly overrides previous included components
* [Issue #184](https://github.com/grobian/carbon-c-relay/issues/184)
router optimise doesn't work correctly with regex groups
# 2.0 (30-05-2016)
### New Features
* **router** `include` directive was added to add content of another
file, see also [Issue #165](https://github.com/grobian/carbon-c-relay/issues/165). The include can also use glob patterns, see [Pull #174](https://github.com/grobian/carbon-c-relay/pull/174)
* **server** the number of stalls performed on writes can now be
controlled (and also disabled) using the `-L` flag.
[Issue #172](https://github.com/grobian/carbon-c-relay/issues/172)
### Breaking Changes
### Enhancements
* **server** incomplete writes are now retried a couple of times before
they are considered fatal. This should reduce the amount
of messages in the logs about them, and be more like the
consumer expects, e.g. less sudden disconnects for the
client.
* **router** reloading the config now prints the difference between the
old and the new config in `diff -u` format.
* **router** reloading the config now maintains the queues for the
servers, such that unavailable servers don't get metrics
dropped.
### Bugfixes
* [Issue #154](https://github.com/grobian/carbon-c-relay/issues/159)
when a store becomes a bottleneck it shouldn't indefinitely stall
* [Issue #164](https://github.com/grobian/carbon-c-relay/issues/164)
config reload should re-use unmodified servers
# 1.11 (23-03-2016)
### New Features
* **router** `send statistics to` construct was added to direct internal
statistics to a specific cluster
### Enhancements
* **collector** UDP connections are now suffixed with `-udp` in
destination target
* **router** `send statistics to` construct was added to direct internal
statistics to a specific cluster
### Bugfixes
* [Issue #159](https://github.com/grobian/carbon-c-relay/issues/159)
......
......@@ -45,6 +45,14 @@ OBJS = \
relay: $(OBJS)
$(CC) -o $@ $(LDFLAGS) $^ $(LIBS)
man:
sed -e '/travis-ci.org\/grobian\/carbon-c-relay.svg/d' carbon-c-relay.md | \
ronn \
--manual="Graphite data collection and visualisation" \
--organization=Graphite \
--roff \
> carbon-c-relay.1
VERSION = $(shell sed -n '/VERSION/s/^.*"\([0-9.]\+\)".*$$/\1/p' relay.h)
dist:
git archive \
......
This diff is collapsed.
carbon-c-relay.md
\ No newline at end of file
......@@ -193,6 +193,10 @@ aggregator_putmetric(
struct _aggr_bucket *bucket;
struct _aggr_bucket_entries *entries;
/* do not accept new values when shutting down, issue #200 */
if (__sync_bool_compare_and_swap(&keep_running, 0, 0))
return;
/* get value */
if ((v = strchr(firstspace + 1, ' ')) == NULL) {
/* metric includes \n */
......@@ -202,7 +206,7 @@ aggregator_putmetric(
return;
}
s->received++;
__sync_add_and_fetch(&s->received, 1);
val = atof(firstspace + 1);
epoch = atoll(v + 1);
......@@ -237,21 +241,11 @@ aggregator_putmetric(
if (invocation->hash == omhash && \
strcmp(o, invocation->metric) == 0) /* match */ \
break;
pthread_rwlock_rdlock(&compute->invlock);
find_invocation(ometric);
/* switch to a write lock from here, since we've found what we
* were looking for (or are going to create it) and modify it */
pthread_rwlock_unlock(&compute->invlock);
pthread_rwlock_wrlock(&compute->invlock);
if (invocation == NULL) {
/* we need to recheck there wasn't someone else who did the
* same thing we want to do below */
find_invocation(ometric);
}
find_invocation(ometric);
if (invocation == NULL) { /* no match, add */
int i;
long long int i;
time_t now;
if ((invocation = malloc(sizeof(*invocation))) == NULL) {
......@@ -295,7 +289,8 @@ aggregator_putmetric(
continue;
}
for (i = 0; i < s->bucketcnt; i++) {
invocation->buckets[i].start = now + (i * s->interval);
invocation->buckets[i].start =
now + (i * (long long int)s->interval);
invocation->buckets[i].cnt = 0;
invocation->buckets[i].entries.size = 0;
invocation->buckets[i].entries.values = NULL;
......@@ -310,7 +305,7 @@ aggregator_putmetric(
itime = epoch - invocation->buckets[0].start;
if (itime < 0) {
/* drop too old metric */
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
pthread_rwlock_unlock(&compute->invlock);
continue;
}
......@@ -322,7 +317,7 @@ aggregator_putmetric(
"future (%lld > %lld): %s from %s", epoch,
invocation->buckets[s->bucketcnt - 1].start,
ometric, metric);
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
pthread_rwlock_unlock(&compute->invlock);
continue;
}
......@@ -403,14 +398,15 @@ aggregator_expire(void *sub)
/* send metrics for buckets that are completely past the
* expiry time, unless we are shutting down, then send
* metrics for all buckets that have completed */
now = time(NULL) + (keep_running ? 0 : s->expire - s->interval);
now = time(NULL) + (__sync_bool_compare_and_swap(&keep_running, 1, 1) ? 0 : s->expire - s->interval);
for (c = s->computes; c != NULL; c = c->next) {
pthread_rwlock_wrlock(&c->invlock);
for (i = 0; i < (1 << AGGR_HT_POW_SIZE); i++) {
lastinv = NULL;
isempty = 0;
for (inv = c->invocations_ht[i]; inv != NULL; ) {
while (inv->buckets[0].start +
(keep_running ? inv->expire : s->expire) < now)
(__sync_bool_compare_and_swap(&keep_running, 1, 1) ? inv->expire : s->expire) < now)
{
/* yay, let's produce something cool */
b = &inv->buckets[0];
......@@ -501,19 +497,18 @@ aggregator_expire(void *sub)
logerr("aggregator: failed to write to "
"pipe (fd=%d): %s\n",
s->fd, strerror(errno));
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
} else if (ts < len) {
logerr("aggregator: uncomplete write on "
"pipe (fd=%d)\n", s->fd);
s->dropped++;
__sync_add_and_fetch(&s->dropped, 1);
} else {
s->sent++;
__sync_add_and_fetch(&s->sent, 1);
}
}
/* move the bucket to the end, to make room for
* new ones */
pthread_rwlock_wrlock(&c->invlock);
b = &inv->buckets[0];
len = b->entries.size;
values = b->entries.values;
......@@ -526,7 +521,6 @@ aggregator_expire(void *sub)
s->interval;
b->entries.size = len;
b->entries.values = values;
pthread_rwlock_unlock(&c->invlock);
work++;
}
......@@ -541,7 +535,6 @@ aggregator_expire(void *sub)
}
}
if (isempty) {
pthread_rwlock_wrlock(&c->invlock);
/* free and unlink */
if (c->entries_needed)
for (j = 0; j < s->bucketcnt; j++)
......@@ -558,18 +551,18 @@ aggregator_expire(void *sub)
free(inv);
inv = c->invocations_ht[i];
}
pthread_rwlock_unlock(&c->invlock);
} else {
lastinv = inv;
inv = inv->next;
}
}
}
pthread_rwlock_unlock(&c->invlock);
}
}
if (work == 0) {
if (!keep_running)
if (__sync_bool_compare_and_swap(&keep_running, 0, 0))
break;
/* nothing done, avoid spinlocking */
usleep(250 * 1000); /* 250ms */
......@@ -664,7 +657,7 @@ aggregator_start(aggregator *aggrs)
void
aggregator_stop(void)
{
keep_running = 0;
__sync_bool_compare_and_swap(&keep_running, 1, 0);
pthread_join(aggregatorid, NULL);
}
......@@ -677,7 +670,7 @@ aggregator_get_received(aggregator *a)
size_t totreceived = 0;
for ( ; a != NULL; a = a->next)
totreceived += a->received;
totreceived += __sync_add_and_fetch(&a->received, 0);
return totreceived;
}
......@@ -703,7 +696,7 @@ aggregator_get_sent(aggregator *a)
size_t totsent = 0;
for ( ; a != NULL; a = a->next)
totsent += a->sent;
totsent += __sync_add_and_fetch(&a->sent, 0);
return totsent;
}
......@@ -731,7 +724,7 @@ aggregator_get_dropped(aggregator *a)
size_t totdropped = 0;
for ( ; a != NULL; a = a->next)
totdropped += a->dropped;
totdropped += __sync_add_and_fetch(&a->dropped, 0);
return totdropped;
}
......
This diff is collapsed.
This diff is collapsed.
......@@ -114,7 +114,7 @@ collector_runner(void *s)
server_send(submission, strdup(metric), 1);
nextcycle = time(NULL) + collector_interval;
while (keep_running) {
while (__sync_bool_compare_and_swap(&keep_running, 1, 1)) {
if (cluster_refresh_pending) {
char *stub = router_getcollectorstub(pending_router);
server **newservers = router_getservers(pending_router);
......@@ -225,6 +225,8 @@ collector_runner(void *s)
snprintf(destbuf, sizeof(destbuf), "%s:%u-udp",
server_ip(srvs[i]), server_port(srvs[i]));
break;
default:
assert(0); /* help code analysis tools */
}
for (p = destbuf; *p != '\0'; p++)
if (*p == '.')
......@@ -359,6 +361,8 @@ collector_writer(void *unused)
}
totconn = dispatch_get_accepted_connections();
totdisc = dispatch_get_closed_connections();
if (dticks == 0)
dticks = 1; /* for Coverity */
printf("%5zu %7zu " /* metrics in */
"%5zu %7zu " /* metrics out */
"%5zu %7zu " /* metrics dropped */
......@@ -458,6 +462,6 @@ collector_start(dispatcher **d, router *rtr, server *submission, char cum)
void
collector_stop(void)
{
keep_running = 0;
__sync_bool_compare_and_swap(&keep_running, 1, 0);
pthread_join(collectorid, NULL);
}
......@@ -84,7 +84,7 @@ fnv1a_hashpos(const char *key, const char *end)
* Computes the bucket number for key in the range [0, bckcnt). The
* algorithm used is the jump consistent hash by Lamping and Veach.
*/
static unsigned int
static int
jump_bucketpos(unsigned long long int key, int bckcnt)
{
long long int b = -1, j = 0;
......@@ -481,8 +481,6 @@ ch_free(ch_ring *ring)
for (; ring->entries != NULL; ring->entries = ring->entries->next) {
if (ring->entries->malloced) {
free(ring->entries->server);
if (deletes == NULL) {
w = deletes = ring->entries;
} else {
......@@ -491,12 +489,13 @@ ch_free(ch_ring *ring)
}
}
assert(w != NULL);
w->next = NULL;
while (deletes != NULL) {
w = deletes->next;
free(deletes);
deletes = w;
if (w != NULL) {
w->next = NULL;
while (deletes != NULL) {
w = deletes->next;
free(deletes);
deletes = w;
}
}
if (ring->entrylist != NULL)
......
This diff is collapsed.
......@@ -30,7 +30,7 @@ int dispatch_addlistener_udp(int sock);
void dispatch_removelistener(int sock);
int dispatch_addconnection(int sock);
int dispatch_addconnection_aggr(int sock);
dispatcher *dispatch_new_listener(void);
dispatcher *dispatch_new_listener(unsigned int sockbufsize);
dispatcher *dispatch_new_connection(router *r, char *allowed_chars);
void dispatch_stop(dispatcher *d);
void dispatch_shutdown(dispatcher *d);
......
......@@ -38,7 +38,7 @@
#define SRVCNT 8
#define REPLCNT 2
enum rmode mode = NORMAL;
unsigned char mode = 0;
int
relaylog(enum logdst dest, const char *fmt, ...)
......@@ -88,6 +88,7 @@ int main(int argc, char *argv[]) {
NULL,
1024,
128,
4,
800
);
MD5((unsigned char *)ip, strlen(ip), md5);
......
include issues/issue10.conf;
include issues/issue16.conf;
match baz
send to blackhole
;
match foo
send to blackhole
;
include issue180-a.conf
;
match bar
send to blackhole
;
# 51 times the same aggr to trigger the optimiser
aggregate
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
(foo|bar)\.(.+)
every 1 seconds
expire after 2 seconds
compute sum write to
\1
stop;
aggregate
dothework.(foo|bar)\.(.+)
somethingelse.dothework\.(.+)
more.dothework.(yo.)
a.nonmatching.thing
whatever.dothework.(yo.)
every 1 seconds
expire after 2 seconds
compute sum write to
\1
stop;
cluster my_cluster forward 127.0.0.1:2003;
cluster my_homie_cluster forward 127.0.0.1:2004;
match ^awesome\.(.+)
send to
my_cluster
my_homie_cluster
stop
;
/*
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* Simple example of how to use the routing functionality of the relay
* https://github.com/grobian/carbon-c-relay/issues/192
*
* compile with something like:
* gcc -o routertest routertest.c router.c server.c consistent-hash.c \
* md5.c queue.c dispatcher.c aggregator.c -lnsl -lsocket -lm
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "relay.h"
#include "server.h"
#include "router.h"
unsigned char mode = 0;
relaylog(enum logdst dest, const char *fmt, ...)
{
return 0;
}
#define DESTS_SIZE 32
int main() {
size_t len = 0;
size_t i;
destination dests[DESTS_SIZE];
char *metric;
char *firstspace;
server *s;
router *rtr;
if ((rtr = router_readconfig(NULL, "test.conf", 100, 10, 1, 100)) == NULL)
exit(1);
metric = strdup("bla.bla.bla");
firstspace = metric + strlen(metric);
(void)router_route(rtr, dests, &len, DESTS_SIZE, "127.0.0.1", metric, firstspace);
for (i = 0; i < len; i++) {
free((char *)dests[i].metric);
s = dests[i].dest;
printf("%zd: %s:%d=%s\n", i,
server_ip(s), server_port(s), server_instance(s));
}
return 0;
}
......@@ -79,8 +79,8 @@ queue_destroy(queue *q)
/**
* Enqueues the string pointed to by p at queue q. If the queue is
* full, the oldest entry is dropped. For this reason, enqueuing will
* never fail. This function assumes the pointer p is a private copy
* for this queue, and hence will be freed once processed.
* never fail. This function assumes the pointer p is a copy for this
* queue, that is returned on dequeue, or freed when dropped.
*/
void
queue_enqueue(queue *q, const char *p)
......@@ -200,7 +200,11 @@ queue_putback(queue *q, const char *p)
inline size_t
queue_len(queue *q)
{
return q->len;
size_t len;
pthread_mutex_lock(&q->lock);
len = q->len;
pthread_mutex_unlock(&q->lock);
return len;
}
/**
......@@ -210,7 +214,7 @@ queue_len(queue *q)
inline size_t
queue_free(queue *q)
{
return q->end - q->len;
return q->end - queue_len(q);
}
/**
......
This diff is collapsed.
......@@ -18,7 +18,7 @@
#ifndef HAVE_RELAY_H
#define HAVE_RELAY_H 1
#define VERSION "1.11"
#define VERSION "2.2"
#define METRIC_BUFSIZ 8192
......
This diff is collapsed.
......@@ -34,8 +34,11 @@ typedef struct _router router;
#define RE_MAX_MATCHES 64
router *router_readconfig(const char *path, size_t queuesize, size_t batchsize, unsigned short iotimeout);
router *router_readconfig(router *orig, const char *path, size_t queuesize, size_t batchsize, int maxstalls, unsigned short iotimeout, unsigned int sockbufsize);
void router_optimise(router *r);
char router_printdiffs(router *old, router *new, FILE *out);
void router_transplant_queues(router *new, router *old);
char router_start(router *r);
size_t router_rewrite_metric(char (*newmetric)[METRIC_BUFSIZ], char **newfirstspace, const char *metric, const char *firstspace, const char *replacement, const size_t nmatch, const regmatch_t *pmatch);
void router_printconfig(router *r, FILE *f, char mode);
char router_route(router *r, destination ret[], size_t *retcnt, size_t retsize, char *srcaddr, char *metric, char *firstspace);
......@@ -43,6 +46,7 @@ void router_test(router *r, char *metric_path);
server **router_getservers(router *r);
aggregator *router_getaggregators(router *r);
char *router_getcollectorstub(router *r);
void router_shutdown(router *r);
void router_free(router *r);
#endif
This diff is collapsed.
......@@ -22,6 +22,8 @@
#include "relay.h"
#define SERVER_STALL_BITS 4 /* 0 up to 15 */
typedef struct _server server;
server *server_new(
......@@ -31,13 +33,17 @@ server *server_new(
struct addrinfo *saddr,
size_t queuesize,
size_t batchsize,
unsigned short iotimeout);
int maxstalls,
unsigned short iotimeout,
unsigned int sockbufsize);
char server_start(server *s);
void server_add_secondaries(server *d, server **sec, size_t cnt);
void server_set_failover(server *d);
void server_set_instance(server *d, char *inst);
char server_send(server *s, const char *d, char force);
void server_shutdown(server *s);
void server_free(server *s);
void server_swap_queue(server *l, server *r);
const char *server_ip(server *s);
unsigned short server_port(server *s);
char *server_instance(server *s);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment