Commit 78e3e79c authored by Bernd Zeimetz's avatar Bernd Zeimetz

Imported Upstream version 1.7

parent 021c14cc
# Copyright 2013-2015 Fabian Groffen
# Copyright 2013-2016 Fabian Groffen
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
......
......@@ -55,7 +55,7 @@ cluster <name>
<host[:port][=instance] [proto <udp | tcp>]> ...
;
cluster <name>
file
file [ip]
</path/to/file> ...
;
match
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -23,6 +23,7 @@
#include <math.h>
#include <regex.h>
#include <pthread.h>
#include <errno.h>
#include <assert.h>
#include "relay.h"
......@@ -33,8 +34,6 @@
#include "fnv1a.h"
static pthread_t aggregatorid;
static aggregator *aggregators = NULL;
static aggregator *lastaggr = NULL;
static size_t prevreceived = 0;
static size_t prevsent = 0;
static size_t prevdropped = 0;
......@@ -52,6 +51,7 @@ aggregator_new(
enum _aggr_timestamp tswhen)
{
aggregator *ret = malloc(sizeof(aggregator));
int intconn[2];
if (ret == NULL)
return ret;
......@@ -59,11 +59,14 @@ aggregator_new(
assert(interval != 0);
assert(interval < expire);
if (aggregators == NULL) {
aggregators = lastaggr = ret;
} else {
lastaggr = lastaggr->next = ret;
if (pipe(intconn) < 0) {
logerr("failed to create pipe for aggregator: %s\n",
strerror(errno));
free(ret);
return NULL;
}
ret->disp_conn = dispatch_addconnection_aggr(intconn[0]);
ret->fd = intconn[1];
ret->interval = interval;
ret->expire = expire;
......@@ -357,6 +360,7 @@ cmp_entry(const void *l, const void *r)
static void *
aggregator_expire(void *sub)
{
aggregator *aggrs = (aggregator *)sub;
time_t now;
aggregator *s;
struct _aggr_bucket *b;
......@@ -364,11 +368,10 @@ aggregator_expire(void *sub)
struct _aggr_invocations *inv;
struct _aggr_invocations *lastinv;
double *values;
size_t len;
size_t len = 0;
int i;
unsigned char j;
int work;
server *submission = (server *)sub;
char metric[METRIC_BUFSIZ];
char isempty;
long long int ts = 0;
......@@ -376,7 +379,7 @@ aggregator_expire(void *sub)
while (1) {
work = 0;
for (s = aggregators; s != NULL; s = s->next) {
for (s = aggrs; s != NULL; s = s->next) {
/* send metrics for buckets that are completely past the
* expiry time, unless we are shutting down, then send
* metrics for all buckets that have completed */
......@@ -409,27 +412,27 @@ aggregator_expire(void *sub)
}
switch (c->type) {
case SUM:
snprintf(metric, sizeof(metric),
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric, b->sum, ts);
break;
case CNT:
snprintf(metric, sizeof(metric),
"%s %zd %lld\n",
len = snprintf(metric, sizeof(metric),
"%s %zu %lld\n",
inv->metric, b->cnt, ts);
break;
case MAX:
snprintf(metric, sizeof(metric),
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric, b->max, ts);
break;
case MIN:
snprintf(metric, sizeof(metric),
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric, b->min, ts);
break;
case AVG:
snprintf(metric, sizeof(metric),
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric,
b->sum / (double)b->cnt, ts);
......@@ -450,7 +453,7 @@ aggregator_expire(void *sub)
* iso sorting the full array */
qsort(values, b->cnt,
sizeof(double), cmp_entry);
snprintf(metric, sizeof(metric),
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric,
values[n - 1],
......@@ -464,16 +467,21 @@ aggregator_expire(void *sub)
for (i = 0; i < b->cnt; i++)
ksum += pow(values[i] - avg, 2);
ksum /= (double)b->cnt;
snprintf(metric, sizeof(metric),
len = snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric,
c->type == VAR ? ksum :
sqrt(ksum),
ts);
} break;
default:
assert(0); /* for compiler (len) */
}
if (write(s->fd, metric, len) != len) {
s->dropped++;
} else {
s->sent++;
}
server_send(submission, strdup(metric), 1);
s->sent++;
}
/* move the bucket to the end, to make room for
......@@ -543,7 +551,7 @@ aggregator_expire(void *sub)
}
/* free up value buckets */
for (s = aggregators; s != NULL; s = s->next) {
while ((s = aggrs) != NULL) {
while (s->computes != NULL) {
c = s->computes;
......@@ -562,13 +570,15 @@ aggregator_expire(void *sub)
free(inv->buckets);
inv = invocation->next;
free(inv);
free(invocation);
}
}
s->computes = c->next;
free(c);
}
aggrs = aggrs->next;
free(s);
}
return NULL;
......@@ -578,12 +588,12 @@ aggregator_expire(void *sub)
* Returns the number of aggregators defined.
*/
size_t
aggregator_numaggregators(void)
aggregator_numaggregators(aggregator *aggrs)
{
size_t totaggregators = 0;
aggregator *a;
for (a = aggregators; a != NULL; a = a->next)
for (a = aggrs; a != NULL; a = a->next)
totaggregators++;
return totaggregators;
......@@ -593,13 +603,13 @@ aggregator_numaggregators(void)
* Returns the total number of computations defined.
*/
size_t
aggregator_numcomputes(void)
aggregator_numcomputes(aggregator *aggrs)
{
size_t totcomputes = 0;
aggregator *a;
struct _aggr_computes *c;
for (a = aggregators; a != NULL; a = a->next)
for (a = aggrs; a != NULL; a = a->next)
for (c = a->computes; c != NULL; c = c->next)
totcomputes++;
......@@ -611,9 +621,10 @@ aggregator_numcomputes(void)
* failed, true otherwise.
*/
int
aggregator_start(server *submission)
aggregator_start(aggregator *aggrs)
{
if (pthread_create(&aggregatorid, NULL, aggregator_expire, submission) != 0)
keep_running = 1;
if (pthread_create(&aggregatorid, NULL, aggregator_expire, aggrs) != 0)
return 0;
return 1;
......@@ -633,12 +644,11 @@ aggregator_stop(void)
* Returns an approximate number of received metrics by all aggregators.
*/
size_t
aggregator_get_received(void)
aggregator_get_received(aggregator *a)
{
size_t totreceived = 0;
aggregator *a;
for (a = aggregators; a != NULL; a = a->next)
for ( ; a != NULL; a = a->next)
totreceived += a->received;
return totreceived;
......@@ -649,9 +659,9 @@ aggregator_get_received(void)
* since the last call to this function.
*/
inline size_t
aggregator_get_received_sub()
aggregator_get_received_sub(aggregator *aggrs)
{
size_t d = aggregator_get_received();
size_t d = aggregator_get_received(aggrs);
size_t r = d - prevreceived;
prevreceived += d;
return r;
......@@ -661,12 +671,11 @@ aggregator_get_received_sub()
* Returns an approximate number of metrics sent by all aggregators.
*/
size_t
aggregator_get_sent(void)
aggregator_get_sent(aggregator *a)
{
size_t totsent = 0;
aggregator *a;
for (a = aggregators; a != NULL; a = a->next)
for ( ; a != NULL; a = a->next)
totsent += a->sent;
return totsent;
......@@ -677,9 +686,9 @@ aggregator_get_sent(void)
* since the last call to this function.
*/
inline size_t
aggregator_get_sent_sub()
aggregator_get_sent_sub(aggregator *aggrs)
{
size_t d = aggregator_get_sent();
size_t d = aggregator_get_sent(aggrs);
size_t r = d - prevsent;
prevsent += d;
return r;
......@@ -691,12 +700,11 @@ aggregator_get_sent_sub()
* time) or if they are too much in the future.
*/
size_t
aggregator_get_dropped(void)
aggregator_get_dropped(aggregator *a)
{
size_t totdropped = 0;
aggregator *a;
for (a = aggregators; a != NULL; a = a->next)
for ( ; a != NULL; a = a->next)
totdropped += a->dropped;
return totdropped;
......@@ -707,9 +715,9 @@ aggregator_get_dropped(void)
* since the last call to this function.
*/
inline size_t
aggregator_get_dropped_sub()
aggregator_get_dropped_sub(aggregator *aggrs)
{
size_t d = aggregator_get_dropped();
size_t d = aggregator_get_dropped(aggrs);
size_t r = d - prevdropped;
prevdropped += d;
return r;
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -29,6 +29,8 @@ typedef struct _aggregator {
unsigned short expire; /* when incoming metrics are no longer valid */
enum _aggr_timestamp { TS_START, TS_MIDDLE, TS_END } tswhen;
unsigned char bucketcnt;
int disp_conn;
int fd;
size_t received;
size_t sent;
size_t dropped;
......@@ -65,15 +67,15 @@ aggregator *aggregator_new(unsigned int interval, unsigned int expire, enum _agg
char aggregator_add_compute(aggregator *s, const char *metric, const char *type);
void aggregator_set_stub(aggregator *s, const char *stubname);
void aggregator_putmetric(aggregator *s, const char *metric, const char *firstspace, size_t nmatch, regmatch_t *pmatch);
int aggregator_start(server *submission);
int aggregator_start(aggregator *aggrs);
void aggregator_stop(void);
size_t aggregator_numaggregators(void);
size_t aggregator_numcomputes(void);
size_t aggregator_get_received(void);
size_t aggregator_get_sent(void);
size_t aggregator_get_dropped(void);
size_t aggregator_get_received_sub(void);
size_t aggregator_get_sent_sub(void);
size_t aggregator_get_dropped_sub(void);
size_t aggregator_numaggregators(aggregator *agrs);
size_t aggregator_numcomputes(aggregator *aggrs);
size_t aggregator_get_received(aggregator *aggrs);
size_t aggregator_get_sent(aggregator *aggrs);
size_t aggregator_get_dropped(aggregator *aggrs);
size_t aggregator_get_received_sub(aggregator *aggrs);
size_t aggregator_get_sent_sub(aggregator *aggrs);
size_t aggregator_get_dropped_sub(aggregator *aggrs);
#endif
This diff is collapsed.
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -20,6 +20,7 @@
#include "dispatcher.h"
#include "router.h"
#include "aggregator.h"
#include "server.h"
#include "relay.h"
......@@ -28,9 +29,9 @@ extern int collector_interval;
#define timediff(X, Y) \
(Y.tv_sec > X.tv_sec ? (Y.tv_sec - X.tv_sec) * 1000 * 1000 + ((Y.tv_usec - X.tv_usec)) : Y.tv_usec - X.tv_usec)
void collector_start(dispatcher **d, cluster *c, server *submission, char cum);
void collector_start(dispatcher **d, cluster *c, aggregator *a, server *submission, char cum);
void collector_stop(void);
void collector_schedulereload(cluster *c);
void collector_schedulereload(cluster *c, aggregator *a);
char collector_reloadcomplete(void);
#endif
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -445,6 +445,8 @@ ch_printhashring(ch_ring *ring, FILE *f)
column = 0;
}
}
if (column != 0)
fprintf(f, "\n");
}
unsigned short
......@@ -477,10 +479,10 @@ ch_free(ch_ring *ring)
ch_ring_entry *w = NULL;
for (; ring->entries != NULL; ring->entries = ring->entries->next) {
server_shutdown(ring->entries->server);
free(ring->entries->server);
if (ring->entries->malloced) {
server_shutdown(ring->entries->server);
free(ring->entries->server);
if (deletes == NULL) {
w = deletes = ring->entries;
} else {
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -20,6 +20,7 @@
#include <fcntl.h>
#include <string.h>
#include <errno.h>
#include <poll.h>
#include <pthread.h>
#include <sys/uio.h>
#include <sys/time.h>
......@@ -53,6 +54,7 @@ typedef struct _connection {
size_t destlen;
struct timeval lastwork;
char hadwork:1;
char isaggr:1;
} connection;
struct _dispatcher {
......@@ -70,6 +72,7 @@ struct _dispatcher {
route *routes;
route *pending_routes;
char route_refresh_pending:1;
char hold:1;
char *allowed_chars;
};
......@@ -127,7 +130,7 @@ dispatch_addlistener(int sock)
if (c == sizeof(listeners) / sizeof(connection *)) {
free(newconn);
logerr("cannot add new listener: "
"no more free listener slots (max = %zd)\n",
"no more free listener slots (max = %zu)\n",
sizeof(listeners) / sizeof(connection *));
return 1;
}
......@@ -193,7 +196,7 @@ dispatch_addconnection(int sock)
sizeof(connection) * (connectionslen + CONNGROWSZ));
if (newlst == NULL) {
logerr("cannot add new connection: "
"out of memory allocating more slots (max = %zd)\n",
"out of memory allocating more slots (max = %zu)\n",
connectionslen);
pthread_rwlock_unlock(&connectionslock);
......@@ -234,6 +237,7 @@ dispatch_addconnection(int sock)
connections[c].buflen = 0;
connections[c].needmore = 0;
connections[c].noexpire = 0;
connections[c].isaggr = 0;
connections[c].destlen = 0;
gettimeofday(&connections[c].lastwork, NULL);
connections[c].hadwork = 1; /* force first iteration before stalling */
......@@ -243,6 +247,27 @@ dispatch_addconnection(int sock)
return c;
}
/**
* Adds a connection which we know is from an aggregator, so direct
* pipe. This is different from normal connections that we don't want
* to count them, never expire them, and want to recognise them when
* we're doing reloads.
*/
int
dispatch_addconnection_aggr(int sock)
{
int conn = dispatch_addconnection(sock);
if (conn == -1)
return 1;
connections[conn].noexpire = 1;
connections[conn].isaggr = 1;
acceptedconnections--;
return 0;
}
/**
* Adds a pseudo-listener for datagram (UDP) sockets, which is pseudo,
* for in fact it adds a new connection, but makes sure that connection
......@@ -501,32 +526,22 @@ dispatch_runner(void *arg)
self->state = SLEEPING;
if (self->type == LISTENER) {
fd_set fds;
int maxfd = -1;
struct timeval tv;
struct pollfd ufds[sizeof(listeners) / sizeof(connection *)];
while (self->keep_running) {
FD_ZERO(&fds);
tv.tv_sec = 0;
tv.tv_usec = 250 * 1000; /* 250 ms */
for (c = 0; c < sizeof(listeners) / sizeof(connection *); c++) {
conn = listeners[c];
if (conn == NULL)
if (listeners[c] == NULL)
break;
FD_SET(conn->sock, &fds);
if (conn->sock > maxfd)
maxfd = conn->sock;
ufds[c].fd = listeners[c]->sock;
ufds[c].events = POLLIN;
}
if (select(maxfd + 1, &fds, NULL, NULL, &tv) > 0) {
for (c = 0; c < sizeof(listeners) / sizeof(connection *); c++) {
conn = listeners[c];
if (conn == NULL)
break;
if (FD_ISSET(conn->sock, &fds)) {
if (poll(ufds, c, 1000) > 0) {
for (--c; c >= 0; c--) {
if (ufds[c].revents & POLLIN) {
int client;
struct sockaddr addr;
socklen_t addrlen = sizeof(addr);
if ((client = accept(conn->sock, &addr, &addrlen)) < 0)
if ((client = accept(ufds[c].fd, &addr, &addrlen)) < 0)
{
logerr("dispatch: failed to "
"accept() new connection: %s\n",
......@@ -549,6 +564,7 @@ dispatch_runner(void *arg)
self->routes = self->pending_routes;
self->pending_routes = NULL;
self->route_refresh_pending = 0;
self->hold = 0;
}
pthread_rwlock_rdlock(&connectionslock);
......@@ -557,6 +573,10 @@ dispatch_runner(void *arg)
/* atomically try to "claim" this connection */
if (!__sync_bool_compare_and_swap(&(conn->takenby), 0, self->id))
continue;
if (self->hold && !conn->isaggr) {
conn->takenby = 0;
continue;
}
self->state = RUNNING;
work += dispatch_connection(conn, self);
}
......@@ -591,6 +611,7 @@ dispatch_new(char id, enum conntype type, route *routes, char *allowed_chars)
ret->keep_running = 1;
ret->routes = routes;
ret->route_refresh_pending = 0;
ret->hold = 0;
ret->allowed_chars = allowed_chars;
if (pthread_create(&ret->tid, NULL, dispatch_runner, ret) != 0) {
free(ret);
......@@ -645,6 +666,19 @@ dispatch_shutdown(dispatcher *d)
free(d);
}
/**
* Requests this dispatcher to stop processing connections. As soon as
* schedulereload finishes reloading the routes, this dispatcher will
* un-hold and continue processing connections.
* Returns when the dispatcher is no longer doing work.
*/
inline void
dispatch_hold(dispatcher *d)
{
d->hold = 1;
}
/**
* Schedules routes r to be put in place for the current routes. The
* replacement is performed at the next cycle of the dispatcher.
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -29,6 +29,7 @@ int dispatch_addlistener(int sock);
int dispatch_addlistener_udp(int sock);
void dispatch_removelistener(int sock);
int dispatch_addconnection(int sock);
int dispatch_addconnection_aggr(int sock);
dispatcher *dispatch_new_listener(void);
dispatcher *dispatch_new_connection(route *routes, char *allowed_chars);
void dispatch_stop(dispatcher *d);
......@@ -42,6 +43,7 @@ size_t dispatch_get_blackholes_sub(dispatcher *self);
char dispatch_busy(dispatcher *self);
size_t dispatch_get_accepted_connections(void);
size_t dispatch_get_closed_connections(void);
void dispatch_hold(dispatcher *d);
void dispatch_schedulereload(dispatcher *d, route *r);
char dispatch_reloadcomplete(dispatcher *d);
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......
cluster test
file
test.out
;
aggregate
^test1\.(.*)
every 10 seconds
expire after 30 seconds
compute sum write to
aggregates.sum.test
compute max write to
aggregates.max.test
send to test
stop
;
aggregate
^test2\.
every 10 seconds
expire after 30 seconds
compute sum write to
aggregates.sum.test
compute max write to
aggregates.max.test
send to test
stop
;
match *
send to test
stop
;
# The # in POS# is seen as comment marker, so end of this line is not
# seen by the parser. I will not fix this for the time being until it
# becomes a real issue, would need quoting or something in that case.
rewrite env\.app\.POS.\.yadda\.count into env.app.POS#.yadda.count;
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -46,7 +46,7 @@ int
bindlisten(
int ret_stream[], int *retlen_stream,
int ret_dgram[], int *retlen_dgram,
const char *interface, unsigned short port)
const char *interface, unsigned short port, unsigned int backlog)
{
int sock;
int optval;
......@@ -115,7 +115,7 @@ bindlisten(
}
if (resw->ai_protocol == IPPROTO_TCP) {
if (listen(sock, 3) < 0) { /* backlog of 3, enough? */
if (listen(sock, backlog) < 0) {
close(sock);
continue;
}
......@@ -161,7 +161,7 @@ bindlisten(
break;
}
if (listen(sock, 3) < 0) { /* backlog of 3, enough? */
if (listen(sock, backlog) < 0) {
close(sock);
break;
}
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -18,7 +18,7 @@
#ifndef RECEPTOR_H
#define RECEPTOR_H 1
int bindlisten(int ret_stream[], int *retlen_stream, int ret_dgram[], int *retlen_dgram, const char *interface, unsigned short port);
int bindlisten(int ret_stream[], int *retlen_stream, int ret_dgram[], int *retlen_dgram, const char *interface, unsigned short port, unsigned int backlog);
void destroy_usock(unsigned short port);
......
This diff is collapsed.
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -18,11 +18,11 @@
#ifndef HAVE_RELAY_H
#define HAVE_RELAY_H 1
#define VERSION "1.3"
#define VERSION "1.7"
#define METRIC_BUFSIZ 8192
enum rmode { NORMAL, DEBUG, SUBMISSION, TEST, DEBUGTEST };
enum rmode { NORMAL, DEBUG, SUBMISSION, DEBUGSUBMISSION, TEST, DEBUGTEST };
typedef enum { CON_TCP, CON_UDP, CON_PIPE, CON_FILE } serv_ctype;
......
/*
* Copyright 2013-2015 Fabian Groffen
* Copyright 2013-2016 Fabian Groffen
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
......@@ -223,7 +223,7 @@ determine_if_regex(route *r, char *pat, int flags)
* (ip:port[=instance] [proto (tcp | udp)] ...)
* ;
* cluster (name)
* log [ip]
* file [ip]
* (/path/to/file ...)
* ;
* match
......@@ -262,7 +262,7 @@ determine_if_regex(route *r, char *pat, int flags)
* stop;
*/