Commit 84e9be9d authored by Bernd Zeimetz's avatar Bernd Zeimetz

Imported Upstream version 1.1

parent 7fcba211
......@@ -472,6 +472,151 @@ without the `send to`, the metric name can't be kept its original name,
for the output now directly goes to the cluster.
When carbon-c-relay is run without `-d` or `-s` arguments, statistics
will be produced and sent to the relay itself in the form of
`carbon.relays.<hostname>.*`. The hostname is determined on startup,
and can be overriden using the `-H` argument. While many metrics have a
similar name to what would produce, their values are
different. To obtain a more compatible set of values, the `-m` argument
can be used to make values non-cumulative, that is, they will report the
change compared to the previous value. By default, most values are
running counters which only increase over time. The use of the
nonNegativeDerivative() function from graphite is useful with these.
The default sending interval is 1 minute (60 seconds), but can be
overridden using the `-S` argument specified in seconds.
The following metrics are produced in the `carbon.relays.<hostname>`
* metricsReceived
The number of metrics that were received by the relay. Received here
means that they were seen and processed by any of the dispatchers.
* metricsSent
The number of metrics that were sent from the relay. This is a total
count for all servers combined. When incoming metrics are duplicated
by the cluster configuration, this counter will include all those
duplications. In other words, the amount of metrics that were
successfully sent to other systems. Note that metrics that are
processed (received) but still in the sending queue (queued) are not
included in this counter.
* metricsQueued
The total number of metrics that are currently in the queues for all
the server targets. This metric is not cumulative, for it is a sample
of the queue size, which can (and should) go up and down. Therefore
you should not use the derivative function for this metric.
* metricsDropped
The total number of metric that had to be dropped due to server queues
overflowing. A queue typically overflows when the server it tries to
send its metrics to is not reachable, or too slow in ingesting the
amount of metrics queued. This can be network or resource related,
and also greatly depends on the rate of metrics being sent to the
particular server.
* metricsBlackholed
The number of metrics that did not match any rule, or matched a rule
with blackhole as target. Depending on your configuration, a high
value might be an indication of a misconfiguration somewhere. These
metrics were received by the relay, but never sent anywhere, thus they
* metricStalls
The number of times the relay had to stall a client to indicate that
the downstream server cannot handle the stream of metrics. A stall is
only performed when the queue is full and the server is actually
receptive of metrics, but just too slow at the moment. Stalls
typically happen during micro-bursts, where the client typically is
unaware that it should stop sending more data, while it is able to.
* connections
The number of connect requests handled. This is an ever increasing
number just counting how many connections were accepted.
* disconnects
The number of disconnected clients. A disconnect either happens
because the client goes away, or due to an idle timeout in the relay.
The difference between this metric and connections is the amount of
connections actively held by the relay. In normal situations this
amount remains within reasonable bounds. Many connections, but few
disconnections typically indicate a possible connection leak in the
client. The idle connections disconnect in the relay here is to guard
against resource drain in such scenarios.
* dispatch\_busy
The number of dispatchers actively doing work at the moment of the
sample. This is just an indication of the work pressure on the relay.
* dispatch\_idle
The number of dispatchers sleeping at the moment of the sample. When
this number nears 0, dispatch\_busy should be high. When the
configured number of worker threads is low, this might mean more
worker threads should be added (if the system allows it) or the relay
is reaching its limits with regard to how much it can process. A
relay with no idle dispatchers will likely appear slow for clients,
for the relay has too much work to serve them instantly.
* dispatch\_wallTime\_us
The number of microseconds spent by the dispatchers to do their work.
In particular on multi-core systems, this value can be confusing,
however, it indicates how long the dispatchers were doing work
handling clients. It includes everything they do, from reading data
from a socket, cleaning up the input metric, to adding the metric to
the appropriate queues. The larger the configuration, and more
complex in terms of matches, the more time the dispatchers will spend
on the cpu.
* server\_wallTime\_us
The number of microseconds spent by the servers to send the metrics
from their queues. This value includes connection creation, reading
from the queue, and sending metrics over the network.
* dispatcherX
For each indivual dispatcher, the metrics received and blackholed plus
the wall clock time. The values are as described above.
* destinations.X
For all known destinations, the number of dropped, queued and sent
metrics plus the wall clock time spent. The values are as described
* aggregators.metricsReceived
The number of metrics that were matched an aggregator rule and were
accepted by the aggregator. When a metric matches multiple
aggregators, this value will reflect that. A metric is not counted
when it is considered syntactically invalid, e.g. no value was found.
* aggregators.metricsDropped
The number of metrics that were sent to an aggregator, but did not fit
timewise. This is either because the metric was too far in the past
or future. The expire after clause in aggregate statements controls
how long in the past metric values are accepted.
* aggregators.metricsSent
The number of metrics that were sent from the aggregators. These
metrics were produced and are the actual results of aggregations.
The original argument for building carbon-c-relay was speed, with
......@@ -26,6 +26,7 @@
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/resource.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include "relay.h"
cluster should-work-fine
fnv1a_ch replication 2
cluster really-doesnt-make-sense
fnv1a_ch replication 10
......@@ -43,6 +43,7 @@ enum rmode mode = NORMAL;
static char *config = NULL;
static int batchsize = 2500;
static int queuesize = 25000;
static unsigned short iotimeout = 600;
static dispatcher **workers = NULL;
static char workercnt = 0;
static cluster *clusters = NULL;
......@@ -161,7 +162,7 @@ hup_handler(int sig)
logout("reloading config from '%s'...\n", config);
if (router_readconfig(&newclusters, &newroutes,
config, queuesize, batchsize) == 0)
config, queuesize, batchsize, iotimeout) == 0)
logerr("failed to read configuration '%s', aborting reload\n",
......@@ -226,6 +227,7 @@ do_usage(int exitcode)
printf(" -b server send batch size, defaults to 2500\n");
printf(" -q server queue size, defaults to 25000\n");
printf(" -S statistics sending interval in seconds, defaults to 60\n");
printf(" -T IO timeout in milliseconds for server connections, defaults to 600\n");
printf(" -m send statistics like, e.g. not cumulative\n");
printf(" -c characters to allow next to [A-Za-z0-9], defaults to -_:#\n");
printf(" -d debug mode: currently writes statistics to log, prints hash\n"
......@@ -260,7 +262,7 @@ main(int argc, char * const argv[])
if (gethostname(relay_hostname, sizeof(relay_hostname)) < 0)
snprintf(relay_hostname, sizeof(relay_hostname), "");
while ((ch = getopt(argc, argv, ":hvdmstf:i:l:p:w:b:q:S:c:H:")) != -1) {
while ((ch = getopt(argc, argv, ":hvdmstf:i:l:p:w:b:q:S:T:c:H:")) != -1) {
switch (ch) {
case 'v':
......@@ -330,6 +332,17 @@ main(int argc, char * const argv[])
case 'T': {
int val = atoi(optarg);
if (val <= 0) {
fprintf(stderr, "error: server IO timeout needs to be a number >0\n");
} else if (val >= 60000) {
fprintf(stderr, "error: server IO timeout needs to be less than one minute\n");
iotimeout = (unsigned short)val;
} break;
case 'c':
allowed_chars = optarg;
......@@ -389,6 +402,8 @@ main(int argc, char * const argv[])
fprintf(relay_stdout, " server queue size = %d\n", queuesize);
fprintf(relay_stdout, " statistics submission interval = %ds\n",
fprintf(relay_stdout, " server connection IO timeout = %dms\n",
if (allowed_chars != NULL)
fprintf(relay_stdout, " extra allowed characters = %s\n",
......@@ -400,7 +415,7 @@ main(int argc, char * const argv[])
fprintf(relay_stdout, "\n");
if (router_readconfig(&clusters, &routes,
config, queuesize, batchsize) == 0)
config, queuesize, batchsize, iotimeout) == 0)
logerr("failed to read configuration '%s'\n", config);
return 1;
......@@ -410,11 +425,11 @@ main(int argc, char * const argv[])
numaggregators = aggregator_numaggregators();
numcomputes = aggregator_numcomputes();
#define dbg (mode == DEBUG || mode == DEBUGTEST ? 2 : 0)
if (numaggregators > 10) {
if (numaggregators > 10 && !dbg) {
fprintf(relay_stdout, "parsed configuration follows:\n"
"(%zd aggregations with %zd computations omitted "
"for brevity)\n", numaggregators, numcomputes);
router_printconfig(relay_stdout, 0 + dbg, clusters, routes);
router_printconfig(relay_stdout, 0, clusters, routes);
} else {
fprintf(relay_stdout, "parsed configuration follows:\n");
router_printconfig(relay_stdout, 1 + dbg, clusters, routes);
......@@ -505,7 +520,8 @@ main(int argc, char * const argv[])
/* server used for delivering metrics produced inside the relay,
* that is collector (statistics) and aggregator (aggregations) */
if ((internal_submission = server_new("internal", listenport, CON_PIPE,
NULL, 3000 + (numcomputes * 3), batchsize)) == NULL)
NULL, 3000 + (numcomputes * 3),
batchsize, iotimeout)) == NULL)
logerr("failed to create internal submission queue, shutting down\n");
keep_running = 0;
......@@ -18,7 +18,7 @@
#ifndef HAVE_RELAY_H
#define HAVE_RELAY_H 1
#define VERSION "0.45"
#define VERSION "1.1"
#define METRIC_BUFSIZ 8192
......@@ -21,6 +21,8 @@
#include <ctype.h>
#include <regex.h>
#include <sys/stat.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <errno.h>
#include <assert.h>
......@@ -260,7 +262,8 @@ determine_if_regex(route *r, char *pat, int flags)
router_readconfig(cluster **clret, route **rret,
const char *path, size_t queuesize, size_t batchsize)
const char *path, size_t queuesize, size_t batchsize,
unsigned short iotimeout)
FILE *cnf;
char *buf;
......@@ -607,7 +610,7 @@ router_readconfig(cluster **clret, route **rret,
*proto == 'f' ? CON_FILE :
*proto == 'u' ? CON_UDP : CON_TCP,
walk == (void *)1 ? NULL : walk,
queuesize, batchsize);
queuesize, batchsize, iotimeout);
if (newserver == NULL) {
logerr("failed to add server %s:%d (%s) "
"to cluster %s: %s\n", ip, port, proto,
......@@ -708,6 +711,20 @@ router_readconfig(cluster **clret, route **rret,
if (cl->type == FAILOVER)
} else if (cl->type == CARBON_CH || cl->type == FNV1A_CH) {
/* check that replication count is actually <= the
* number of servers */
size_t i = 0;
for (w = cl->>servers; w != NULL; w = w->next)
if (i <= cl->>repl_factor) {
logerr("invalid cluster '%s': replication count (%zd) is "
"larger than the number of servers (%zd)\n",
name, cl->>repl_factor, i);
return 0;
cl->name = strdup(name);
cl->next = NULL;
......@@ -34,7 +34,7 @@ typedef struct _route route;
#define RE_MAX_MATCHES 64
int router_readconfig(cluster **clret, route **rret, const char *path, size_t queuesize, size_t batchsize);
int router_readconfig(cluster **clret, route **rret, const char *path, size_t queuesize, size_t batchsize, unsigned short iotimeout);
void router_optimise(route **routes);
size_t router_rewrite_metric(char (*newmetric)[METRIC_BUFSIZ], char **newfirstspace, const char *metric, const char *firstspace, const char *replacement, const size_t nmatch, const regmatch_t *pmatch);
void router_printconfig(FILE *f, char mode, cluster *clusters, route *routes);
......@@ -43,6 +43,7 @@ struct _server {
int fd;
queue *queue;
size_t bsize;
short iotimeout;
const char **batch;
serv_ctype ctype;
pthread_t tid;
......@@ -80,8 +81,10 @@ server_queuereader(void *d)
const char **metric = self->batch;
struct timeval start, stop;
struct timeval timeout;
int timeoutms;
queue *queue;
char idle = 0;
size_t *secpos = NULL;
*metric = NULL;
self->metrics = 0;
......@@ -116,6 +119,34 @@ server_queuereader(void *d)
(!self->failover && LEN_CRITICAL(self->queue))))
size_t i;
if (self->secondariescnt > 0) {
if (secpos == NULL) {
secpos = malloc(sizeof(size_t) * self->secondariescnt);
if (secpos == NULL) {
logerr("server: failed to allocate memory "
"for secpos\n");
for (i = 0; i < self->secondariescnt; i++)
secpos[i] = i;
if (!self->failover) {
/* randomise the failover list such that in the
* grand scheme of things we don't punish the first
* working server in the list to deal with all
* traffic meant for a now failing server */
for (i = 0; i < self->secondariescnt; i++) {
size_t n = rand() % (self->secondariescnt - i);
if (n != i) {
size_t t = secpos[n];
secpos[n] = secpos[i];
secpos[i] = t;
/* offload data from our queue to our secondaries
* when doing so, observe the following:
* - avoid nodes that are in failure mode
......@@ -129,9 +160,9 @@ server_queuereader(void *d)
queue = NULL;
for (i = 0; i < self->secondariescnt; i++) {
/* both conditions below make sure we skip ourself */
if (self->secondaries[i]->failure)
if (self->secondaries[secpos[i]]->failure)
queue = self->secondaries[i]->queue;
queue = self->secondaries[secpos[i]]->queue;
if (!self->failover && LEN_CRITICAL(queue)) {
queue = NULL;
......@@ -251,8 +282,9 @@ server_queuereader(void *d)
fd_set fds;
FD_SET(self->fd, &fds);
timeout.tv_sec = 0;
timeout.tv_usec = (600 + (rand() % 100)) * 1000;
timeoutms = self->iotimeout + (rand() % 100);
timeout.tv_sec = timeoutms / 1000;
timeout.tv_usec = (timeoutms % 1000) * 1000;
ret = select(self->fd + 1, NULL, &fds, NULL, &timeout);
if (ret == 0) {
/* time limit expired */
......@@ -317,8 +349,9 @@ server_queuereader(void *d)
/* ensure we will break out of connections being stuck */
timeout.tv_sec = 0;
timeout.tv_usec = (600 + (rand() % 100)) * 1000;
timeoutms = self->iotimeout + (rand() % 100);
timeout.tv_sec = timeoutms / 1000;
timeout.tv_usec = (timeoutms % 1000) * 1000;
setsockopt(self->fd, SOL_SOCKET, SO_SNDTIMEO,
&timeout, sizeof(timeout));
......@@ -408,7 +441,8 @@ server_new(
serv_ctype ctype,
struct addrinfo *saddr,
size_t qsize,
size_t bsize)
size_t bsize,
unsigned short iotimeout)
server *ret;
......@@ -423,6 +457,7 @@ server_new(
ret->port = port;
ret->instance = NULL;
ret->bsize = bsize;
ret->iotimeout = iotimeout < 250 ? 600 : iotimeout;
if ((ret->batch = malloc(sizeof(char *) * (bsize + 1))) == NULL) {
return NULL;
......@@ -30,7 +30,8 @@ server *server_new(
serv_ctype ctype,
struct addrinfo *saddr,
size_t queuesize,
size_t batchsize);
size_t batchsize,
unsigned short iotimeout);
void server_add_secondaries(server *d, server **sec, size_t cnt);
void server_set_failover(server *d);
void server_set_instance(server *d, char *inst);
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment