Commit 92bc19a2 authored by Filippo Giunchedi's avatar Filippo Giunchedi

Imported Upstream version 0.39

parents
*.o
/relay
This diff is collapsed.
# Copyright 2013-2015 Fabian Groffen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CFLAGS ?= -O2 -Wall
GIT_VERSION := $(shell git describe --abbrev=6 --dirty --always || date +%F)
GVCFLAGS += -DGIT_VERSION=\"$(GIT_VERSION)\"
override CFLAGS += $(GVCFLAGS) `pkg-config openssl --cflags` -pthread
SOCKET_LIBS =
ifeq ($(shell uname), SunOS)
SOCKET_LIBS += -lsocket -lnsl
endif
override LIBS += `pkg-config openssl --libs` $(SOCKET_LIBS) -pthread
OBJS = \
relay.o \
consistent-hash.o \
receptor.o \
dispatcher.o \
router.o \
queue.o \
server.o \
collector.o \
aggregator.o
relay: $(OBJS)
$(CC) -o $@ $(LDFLAGS) $^ $(LIBS)
VERSION = $(shell sed -n '/VERSION/s/^.*"\([0-9.]\+\)".*$$/\1/p' relay.h)
dist:
git archive \
--format=tar.gz \
--prefix=carbon-c-relay-$(VERSION)/ v$(VERSION) \
> carbon-c-relay-$(VERSION).tar.gz
clean:
rm -f *.o relay
This diff is collapsed.
This diff is collapsed.
/*
* Copyright 2013-2015 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef AGGREGATOR_H
#define AGGREGATOR_H 1
#include <regex.h>
#include <pthread.h>
#include "server.h"
typedef struct _aggregator {
unsigned short interval; /* when to perform the aggregation */
unsigned short expire; /* when incoming metrics are no longer valid */
unsigned char bucketcnt;
size_t received;
size_t sent;
size_t dropped;
struct _aggr_computes {
enum _aggr_compute_type { SUM, CNT, MAX, MIN, AVG } type;
const char *metric; /* name template of metric to produce */
struct _aggr_invocations {
char *metric; /* actual name to emit */
struct _bucket {
long long int start;
size_t cnt;
double sum;
double max;
double min;
} *buckets;
struct _aggr_invocations *next;
} *invocations;
struct _aggr_computes *next;
} *computes;
pthread_mutex_t bucketlock;
struct _aggregator *next;
} aggregator;
aggregator *aggregator_new(unsigned int interval, unsigned int expire);
char aggregator_add_compute(aggregator *s, const char *metric, const char *type);
void aggregator_putmetric(aggregator *s, const char *metric, const char *firstspace, size_t nmatch, regmatch_t *pmatch);
int aggregator_start(server *submission);
void aggregator_stop(void);
size_t aggregator_numaggregators(void);
size_t aggregator_numcomputes(void);
size_t aggregator_get_received(void);
size_t aggregator_get_sent(void);
size_t aggregator_get_dropped(void);
#endif
/*
* Copyright 2013-2015 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <assert.h>
#include "relay.h"
#include "dispatcher.h"
#include "server.h"
#include "aggregator.h"
#include "collector.h"
static dispatcher **dispatchers;
static char debug = 0;
static pthread_t collectorid;
static char keep_running = 1;
int collector_interval = 60;
static char cluster_refresh_pending = 0;
static cluster *pending_clusters = NULL;
/**
* Collects metrics from dispatchers and servers and emits them.
*/
static void *
collector_runner(void *s)
{
int i;
size_t totticks;
size_t totmetrics;
size_t totqueued;
size_t totstalls;
size_t totdropped;
size_t ticks;
size_t metrics;
size_t queued;
size_t stalls;
size_t dropped;
size_t dispatchers_idle;
size_t dispatchers_busy;
time_t now;
time_t nextcycle;
char ipbuf[32];
char *p;
size_t numaggregators = aggregator_numaggregators();
server *submission = (server *)s;
server **srvs = NULL;
char metric[METRIC_BUFSIZ];
char *m;
size_t sizem = 0;
/* prepare hostname for graphite metrics */
snprintf(metric, sizeof(metric), "carbon.relays.%s", relay_hostname);
for (m = metric + strlen("carbon.relays."); *m != '\0'; m++)
if (*m == '.')
*m = '_';
*m++ = '.';
*m = '\0';
sizem = sizeof(metric) - (m - metric);
#define send(metric) \
if (debug) \
logout("%s", metric); \
else \
server_send(submission, strdup(metric), 1);
nextcycle = time(NULL) + collector_interval;
while (keep_running) {
if (cluster_refresh_pending) {
server **newservers = router_getservers(pending_clusters);
if (srvs != NULL)
free(srvs);
srvs = newservers;
cluster_refresh_pending = 0;
}
assert(srvs != NULL);
sleep(1);
now = time(NULL);
if (nextcycle > now)
continue;
nextcycle += collector_interval;
totticks = 0;
totmetrics = 0;
dispatchers_idle = 0;
dispatchers_busy = 0;
for (i = 0; dispatchers[i] != NULL; i++) {
if (dispatch_busy(dispatchers[i])) {
dispatchers_busy++;
} else {
dispatchers_idle++;
}
totticks += ticks = dispatch_get_ticks(dispatchers[i]);
totmetrics += metrics = dispatch_get_metrics(dispatchers[i]);
snprintf(m, sizem, "dispatcher%d.metricsReceived %zd %zd\n",
i + 1, metrics, (size_t)now);
send(metric);
snprintf(m, sizem, "dispatcher%d.wallTime_us %zd %zd\n",
i + 1, ticks, (size_t)now);
send(metric);
}
snprintf(m, sizem, "metricsReceived %zd %zd\n",
totmetrics, (size_t)now);
send(metric);
snprintf(m, sizem, "dispatch_wallTime_us %zd %zd\n",
totticks, (size_t)now);
send(metric);
snprintf(m, sizem, "dispatch_busy %zd %zd\n",
dispatchers_busy, (size_t)now);
send(metric);
snprintf(m, sizem, "dispatch_idle %zd %zd\n",
dispatchers_idle, (size_t)now);
send(metric);
totticks = 0;
totmetrics = 0;
totqueued = 0;
totstalls = 0;
totdropped = 0;
for (i = 0; srvs[i] != NULL; i++) {
if (server_ctype(srvs[i]) == CON_PIPE) {
strncpy(ipbuf, "internal", sizeof(ipbuf));
ticks = server_get_ticks(srvs[i]);
metrics = server_get_metrics(srvs[i]);
queued = server_get_queue_len(srvs[i]);
stalls = server_get_stalls(srvs[i]);
dropped = server_get_dropped(srvs[i]);
} else {
snprintf(ipbuf, sizeof(ipbuf), "%s:%u",
server_ip(srvs[i]), server_port(srvs[i]));
for (p = ipbuf; *p != '\0'; p++)
if (*p == '.')
*p = '_';
totticks += ticks = server_get_ticks(srvs[i]);
totmetrics += metrics = server_get_metrics(srvs[i]);
totqueued += queued = server_get_queue_len(srvs[i]);
totstalls += stalls = server_get_stalls(srvs[i]);
totdropped += dropped = server_get_dropped(srvs[i]);
}
snprintf(m, sizem, "destinations.%s.sent %zd %zd\n",
ipbuf, metrics, (size_t)now);
send(metric);
snprintf(m, sizem, "destinations.%s.queued %zd %zd\n",
ipbuf, queued, (size_t)now);
send(metric);
snprintf(m, sizem, "destinations.%s.stalls %zd %zd\n",
ipbuf, stalls, (size_t)now);
send(metric);
snprintf(m, sizem, "destinations.%s.dropped %zd %zd\n",
ipbuf, dropped, (size_t)now);
send(metric);
snprintf(m, sizem, "destinations.%s.wallTime_us %zd %zd\n",
ipbuf, ticks, (size_t)now);
send(metric);
}
snprintf(m, sizem, "metricsSent %zd %zd\n",
totmetrics, (size_t)now);
send(metric);
snprintf(m, sizem, "metricsQueued %zd %zd\n",
totqueued, (size_t)now);
send(metric);
snprintf(m, sizem, "metricStalls %zd %zd\n",
totstalls, (size_t)now);
send(metric);
snprintf(m, sizem, "metricsDropped %zd %zd\n",
totdropped, (size_t)now);
send(metric);
snprintf(m, sizem, "server_wallTime_us %zd %zd\n",
totticks, (size_t)now);
send(metric);
snprintf(m, sizem, "connections %zd %zd\n",
dispatch_get_accepted_connections(), (size_t)now);
send(metric);
snprintf(m, sizem, "disconnects %zd %zd\n",
dispatch_get_closed_connections(), (size_t)now);
send(metric);
if (numaggregators > 0) {
snprintf(m, sizem, "aggregators.metricsReceived %zd %zd\n",
aggregator_get_received(), (size_t)now);
send(metric);
snprintf(m, sizem, "aggregators.metricsSent %zd %zd\n",
aggregator_get_sent(), (size_t)now);
send(metric);
snprintf(m, sizem, "aggregators.metricsDropped %zd %zd\n",
aggregator_get_dropped(), (size_t)now);
send(metric);
}
if (debug)
fflush(stdout);
}
return NULL;
}
/**
* Writes messages about dropped events or high queue sizes.
*/
static size_t lastdropped = 0;
static size_t lastaggrdropped = 0;
static void *
collector_writer(void *unused)
{
int i = 0;
size_t queued;
size_t totdropped;
size_t numaggregators = aggregator_numaggregators();
server **srvs = NULL;
while (keep_running) {
if (cluster_refresh_pending) {
server **newservers = router_getservers(pending_clusters);
if (srvs != NULL)
free(srvs);
srvs = newservers;
cluster_refresh_pending = 0;
}
assert(srvs != NULL);
sleep(1);
i++;
if (i < collector_interval)
continue;
totdropped = 0;
for (i = 0; srvs[i] != NULL; i++) {
queued = server_get_queue_len(srvs[i]);
totdropped += server_get_dropped(srvs[i]);
if (queued > 150)
logout("warning: metrics queuing up "
"for %s:%u: %zd metrics\n",
server_ip(srvs[i]), server_port(srvs[i]), queued);
}
if (totdropped - lastdropped > 0)
logout("warning: dropped %zd metrics\n", totdropped - lastdropped);
lastdropped = totdropped;
if (numaggregators > 0) {
totdropped = aggregator_get_dropped();
if (totdropped - lastaggrdropped > 0)
logout("warning: aggregator dropped %zd metrics\n",
totdropped - lastaggrdropped);
lastaggrdropped = totdropped;
}
i = 0;
}
return NULL;
}
/**
* Schedules routes r to be put in place for the current routes. The
* replacement is performed at the next cycle of the collector.
*/
inline void
collector_schedulereload(cluster *c)
{
pending_clusters = c;
cluster_refresh_pending = 1;
}
/**
* Returns true if the routes scheduled to be reloaded by a call to
* collector_schedulereload() have been activated.
*/
inline char
collector_reloadcomplete(void)
{
return cluster_refresh_pending == 0;
}
/**
* Initialises and starts the collector.
*/
void
collector_start(dispatcher **d, cluster *c, server *submission)
{
dispatchers = d;
collector_schedulereload(c);
if (mode == DEBUG)
debug = 1;
if (mode != SUBMISSION) {
if (pthread_create(&collectorid, NULL, collector_runner, submission) != 0)
logerr("failed to start collector!\n");
} else {
if (pthread_create(&collectorid, NULL, collector_writer, NULL) != 0)
logerr("failed to start collector!\n");
}
}
/**
* Shuts down the collector.
*/
void
collector_stop(void)
{
keep_running = 0;
pthread_join(collectorid, NULL);
}
/*
* Copyright 2013-2015 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef COLLECTOR_H
#define COLLECTOR_H 1
#include "dispatcher.h"
#include "router.h"
#include "server.h"
#include "relay.h"
extern int collector_interval;
#define timediff(X, Y) \
(Y.tv_sec > X.tv_sec ? (Y.tv_sec - X.tv_sec) * 1000 * 1000 + ((Y.tv_usec - X.tv_usec)) : Y.tv_usec - X.tv_usec)
void collector_start(dispatcher **d, cluster *c, server *submission);
void collector_stop(void);
void collector_schedulereload(cluster *c);
char collector_reloadcomplete(void);
#endif
/*
* Copyright 2013-2015 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <openssl/md5.h>
#include <assert.h>
#include "server.h"
#define CH_RING struct _ch_ring
#include "consistent-hash.h"
/* This value is hardwired in the carbon sources, and necessary to get
* fair (re)balancing of metrics in the hash ring. Because the value
* seems reasonable, we use the same value for all hash implementations. */
#define HASH_REPLICAS 100
typedef struct _ring_entry {
unsigned short pos;
unsigned char malloced:1;
server *server;
struct _ring_entry *next;
} ch_ring_entry;
struct _ch_ring {
ch_type type;
unsigned char hash_replicas;
ch_ring_entry *entries;
};
/**
* Computes the hash position for key in a 16-bit unsigned integer
* space. Returns a number between 0 and 65535 based on the highest 2
* bytes of the MD5 sum of key.
*/
static unsigned short
carbon_hashpos(const char *key, const char *end)
{
unsigned char md5[MD5_DIGEST_LENGTH];
MD5((unsigned char *)key, end - key, md5);
return ((md5[0] << 8) + md5[1]);
}
/**
* Computes the hash position for key in a 16-bit unsigned integer
* space. Returns a number between 0 and 65535 based on the FNV1a hash
* algorithm.
*/
static unsigned short
fnv1a_hashpos(const char *key, const char *end)
{
unsigned int hash = 2166136261UL; /* FNV1a */
for (; key < end; key++)
hash = (hash ^ (unsigned int)*key) * 16777619;
return (unsigned short)((hash >> 16) ^ (hash & (unsigned int)0xFFFF));
}
/**
* Qsort comparator for ch_ring_entry structs on pos.
*/
static int
entrycmp(const void *l, const void *r)
{
return ((ch_ring_entry *)l)->pos - ((ch_ring_entry *)r)->pos;
}
ch_ring *
ch_new(ch_type type)
{
ch_ring *ret = malloc(sizeof(ch_ring));
if (ret == NULL)
return NULL;
ret->type = type;
ret->hash_replicas = HASH_REPLICAS;
ret->entries = NULL;
return ret;
}
/**
* Computes the hash positions for the server name given. This is based
* on the hashpos function. The server name usually is the IPv4
* address. The port component is just stored and not used in the
* carbon hash calculation in case of carbon_ch. The instance component
* is used in the hash calculation of carbon_ch, it is ignored for
* fnv1a_ch. Returns an updated ring.
*/
ch_ring *
ch_addnode(ch_ring *ring, server *s)
{
int i;
char buf[256];
ch_ring_entry *entries;
if (ring == NULL)
return NULL;
entries =
(ch_ring_entry *)malloc(sizeof(ch_ring_entry) * ring->hash_replicas);
if (entries == NULL)
return NULL;
switch (ring->type) {
case CARBON:
for (i = 0; i < ring->hash_replicas; i++) {
char *instance = server_instance(s);
/* this format is actually Python's tuple format that is
* used in serialised form as input for the hash */
snprintf(buf, sizeof(buf), "('%s', %s%s%s):%d",
server_ip(s),
instance == NULL ? "" : "'",
instance == NULL ? "None" : instance,
instance == NULL ? "" : "'",
i);
/* TODO:
* https://github.com/graphite-project/carbon/commit/024f9e67ca47619438951c59154c0dec0b0518c7
* Question is how harmful the collision is -- it will probably
* change the location of some metrics */
entries[i].pos = carbon_hashpos(buf, buf + strlen(buf));
entries[i].server = s;
entries[i].next = NULL;
entries[i].malloced = 0;
}
break;
case FNV1a:
for (i = 0; i < ring->hash_replicas; i++) {
/* take all server info into account, such that
* different port numbers for the same hosts will work
* (unlike CARBON) */
snprintf(buf, sizeof(buf), "%d-%s:%u",
i, server_ip(s), server_port(s));
entries[i].pos = fnv1a_hashpos(buf, buf + strlen(buf));
entries[i].server = s;
entries[i].next = NULL;
entries[i].malloced = 0;
}
break;
}
/* sort to allow merge joins later down the road */
qsort(entries, ring->hash_replicas, sizeof(ch_ring_entry), *entrycmp);
entries[0].malloced = 1;
if (ring->entries == NULL) {
for (i = 1; i < ring->hash_replicas; i++)
entries[i - 1].next = &entries[i];
ring->entries = entries;
} else {
/* merge-join the two rings */
ch_ring_entry *w, *last;
i = 0;
last = NULL;
assert(ring->hash_replicas > 0);
for (w = ring->entries; w != NULL && i < ring->hash_replicas; ) {
if (w->pos < entries[i].pos) {
last = w;
w = w->next;
} else {
entries[i].next = w;
if (last == NULL) {
ring->entries = &entries[i];
} else {
last->next = &entries[i];
}
last = &entries[i];
i++;
}
}
if (w != NULL) {
last->next = w;
} else {
last->next = &entries[i];
for (i = i + 1; i < ring->hash_replicas; i++)
entries[i - 1].next = &entries[i];
}
}
return ring;
}
/**
* Retrieve the nodes responsible for storing the given metric. The
* replcnt argument specifies how many hosts should be retrieved.
* Results are stored in ret, an array of ch_ring pointers. The
* caller is responsible for ensuring that ret is large enough to store
* replcnt pointers.
*/
void
ch_get_nodes(
destination ret[],
ch_ring *ring,
const char replcnt,
const char *metric,
const char *firstspace)
{
ch_ring_entry *w;
unsigned short pos = 0;
int i, j;
switch (ring->type) {
case CARBON:
pos = carbon_hashpos(metric, firstspace);
break;
case FNV1a:
pos = fnv1a_hashpos(metric, firstspace);
break;
}
assert(ring->entries);
/* implement behaviour of Python's bisect_left on the ring (used in
* carbon hash source), one day we might want to implement it as
* real binary search iso forward pointer chasing */
for (w = ring->entries, i = 0; w != NULL; i++, w = w->next)
if (w->pos >= pos)
break;
/* now fetch enough unique servers to match the requested count */
for (i = 0; i < replcnt; i++, w = w->next) {
if (w == NULL)
w = ring->entries;
for (j = i - 1; j >= 0; j--) {
if (ret[j].dest == w->server) {
j = i;
break;
}
}
if (j == i) {
i--;
continue;