Commit 3f9a9d4e authored by Bernd Zeimetz's avatar Bernd Zeimetz

Imported Upstream version 0.40

parent 92bc19a2
......@@ -14,6 +14,8 @@
CFLAGS ?= -O2 -Wall
# if your compiler doesn't support OpenMP, comment out this line
CC += -fopenmp
GIT_VERSION := $(shell git describe --abbrev=6 --dirty --always || date +%F)
GVCFLAGS += -DGIT_VERSION=\"$(GIT_VERSION)\"
......
......@@ -3,7 +3,8 @@ carbon-c-relay
Carbon-like graphite line mode relay.
This project aims to be a replacement of the original [Carbon relay](http://graphite.readthedocs.org/en/1.0/carbon-daemons.html#carbon-relay-py)
This project aims to be a fast replacement of the original [Carbon
relay](http://graphite.readthedocs.org/en/1.0/carbon-daemons.html#carbon-relay-py)
The main reason to build a replacement is performance and
configurability. Carbon is single threaded, and sending metrics to
......@@ -139,6 +140,10 @@ of the writer to make sure the metrics are clean. If this is an issue
for routing, one can consider to have a rewrite-only instance that
forwards all metrics to another instance that will do the routing.
Obviously the second instance will cleanse the metrics as they come in.
The backreference notation allows to lowercase and uppercase the
replacement string with the use of the underscore (`_`) and carret
(`^`) symbols following directly after the backslash. For example,
`role.\_1.` as substitution will lowercase the contents of `\1`.
The aggregations defined take one or more input metrics expressed by one
or more regular expresions, similar to the match rules. Incoming
......@@ -163,16 +168,16 @@ Carbon-c-relay evolved over time, growing features on demand as the tool
proved to be stable and fitting the job well. Below follow some
annotated examples of constructs that can be used with the relay.
Clusters are defined as much as are necessary. They receive data from
match rules, then their type defines which members of the cluster get
the metric data. The simplest cluster form is a `forward` cluster:
Clusters can be defined as much as necessary. They receive data from
match rules, and their type defines which members of the cluster finally
get the metric data. The simplest cluster form is a `forward` cluster:
cluster send-through
forward
10.1.0.1
;
Any metric send the `send-through` cluster would simply be forwarded to
Any metric sent to the `send-through` cluster would simply be forwarded to
the server at IPv4 address `10.1.0.1`. If we define multiple servers,
all of those servers would get the same metric, thus:
......@@ -190,13 +195,19 @@ of the members. The same example with such cluster would be:
cluster send-to-any-one
any_of 10.1.0.1:2010 10.1.0.1:2011;
This would implement a fail-over scenario, where two servers are used,
the load between them is spreaded, but should any of them fail, all
metrics are send to the remaining one. This typically works well for
This would implement a multipath scenario, where two servers are used,
the load between them is spread, but should any of them fail, all
metrics are sent to the remaining one. This typically works well for
upstream relays, or for balancing carbon-cache processes running on the
same machine. Should any member become unavailable, for instance due to
rolling restart, the other members receive the traffic. This is
different from the two consistent hash cluster types:
a rolling restart, the other members receive the traffic. If it is
necessary to have true fail-over, where the secondary server is only
used if the first is down, the following would implement that:
cluster try-first-then-second
failover 10.1.0.1:2010 10.1.0.1:2011;
These types are different from the two consistent hash cluster types:
cluster graphite
carbon_ch
......@@ -206,14 +217,15 @@ different from the two consistent hash cluster types:
;
If a member in this example fails, all metrics that would go to that
members are kept in the queue, waiting for the member to return. This
is useful for carbon-cache machines where it is desirable that the same
member are kept in the queue, waiting for the member to return. This
is useful for clusters of carbon-cache machines where it is desirable
that the same
metric ends up on the same server always. The `carbon_ch` cluster type
is compatible with carbon-relay consistent hash, and can be used for
existing clusters populated by carbon-relay. For new clusters, however,
it is better to use the `fnv1a_ch` cluster type, for it is faster, and
allows to balance over the same address but different ports without an
instance number, unlike `carbon_ch`.
instance number, in constrast to `carbon_ch`.
Because we can use multiple clusters, we can also replicate without the
use of the `forward` cluster type, in a more intelligent way:
......@@ -311,10 +323,10 @@ to cleanup metrics from applications, or provide a migration path. In
it's simplest form a rewrite rule looks like this:
rewrite ^server\.(.+)\.(.+)\.([a-zA-Z]+)([0-9]+)
into server.\1.\2.\3.\3\4
into server.\_1.\2.\3.\3\4
;
In this example a metric like `server.dc.role.name123` would be
In this example a metric like `server.DC.role.name123` would be
transformed into `server.dc.role.name.name123`.
For rewrite rules hold the same as for matches, that their order
matters. Hence to build on top of the old/new cluster example done
......@@ -402,6 +414,43 @@ produced. Obviously, the input metrics define which hosts and clusters
are produced.
Performance
-----------
The original argument for building carbon-c-relay was speed, with
configurablility following close. To date, performance has bypassed the
original carbon-relay.py by orders of magnitude, but the actual speed
highly depends on perception and scenario. What follows below are some
rough numbers about the environment at Booking.com where carbon-c-relay
is used extensively in production.
carbon-c-relay runs on all of our machines as a local submission relay.
Its config is simply a match all to a `any_of` cluster with a number of
upstream relays to try and send the metrics to. These relays run with 4
workers, and receive a minimal amount of metrics per minute, typically
between 50 and 200. These instances take typically around 19MiB of RAM
and consume at top 0.8% CPU of a 2.4GHz core. The minimal footprint of
the relay is a desired property for running on all of our machines.
The main relays we run, have roughly 20 clusters defined with `fnv1a_ch`
hash. Average clustersize around 10 members. On top of that 30 match
rules are defined. For a mildly-loaded relay receiving 1M metrics per
minute, the relay consumes 750MiB of RAM and needs around 40% of a
2.4GHz core. A relay with more load but the same configuration, 3M
metrics per minute, needs almost 2GiB of RAM, and some 45% CPU of a
2.4GHz core. The memory usage is mainly in the buffers for writing to
the server stores.
On the stores, we run relays with a simple config with a match all rule
to an `any_of` cluster pointing to 13 locally running carbon-cache.py
instances. These relays receive up to 1.7M metrics per minute, and
require some 110MiB RAM for that. The CPU usage is around 15% of a
2.4GHz core.
For aggregations we don't do much traffic (55K per minute) on a couple
of aggregations expanding to a thousand of metrics. In our setup this
takes 30MiB of RAM usage with some 30% CPU usage.
Author
------
Fabian Groffen
......
......@@ -108,7 +108,7 @@ aggregator_add_compute(
ac->type = act;
ac->metric = strdup(metric);
ac->invocations = NULL;
memset(ac->invocations_ht, 0, sizeof(ac->invocations_ht));
ac->next = NULL;
return 0;
......@@ -118,7 +118,7 @@ aggregator_add_compute(
* Adds a new metric to aggregator s. The value from the metric is put
* in the bucket matching the epoch contained in the metric. In cases
* where the contained epoch is too old or too new, the metric is
* rejected.
* dropped.
*/
void
aggregator_putmetric(
......@@ -138,6 +138,9 @@ aggregator_putmetric(
char *newfirstspace = NULL;
size_t len;
const char *ometric;
const char *omp;
unsigned int omhash;
unsigned int omhtbucket;
struct _aggr_computes *compute;
struct _aggr_invocations *invocation;
......@@ -172,8 +175,17 @@ aggregator_putmetric(
ometric = newmetric;
}
for (invocation = compute->invocations; invocation != NULL; invocation = invocation->next)
if (strcmp(ometric, invocation->metric) == 0) /* match */
omhash = 2166136261UL; /* FNV1a */
for (omp = ometric; *omp != '\0'; omp++)
omhash = (omhash ^ (unsigned int)*omp) * 16777619;
omhtbucket =
((omhash >> AGGR_HT_POW_SIZE) ^ omhash) &
(((unsigned int)1 << AGGR_HT_POW_SIZE) - 1);
invocation = compute->invocations_ht[omhtbucket];
for (; invocation != NULL; invocation = invocation->next)
if (invocation->hash == omhash &&
strcmp(ometric, invocation->metric) == 0) /* match */
break;
if (invocation == NULL) { /* no match, add */
int i;
......@@ -190,6 +202,7 @@ aggregator_putmetric(
free(invocation);
continue;
}
invocation->hash = omhash;
/* Start buckets in the past with a splay, but before expiry
* the splay is necessary to avoid a thundering herd of
......@@ -197,7 +210,7 @@ aggregator_putmetric(
* e.g. right after startup when other relays flush their
* queues. */
time(&now);
now -= s->expire + 1 + (rand() % s->interval);
now -= s->expire - 1 - (rand() % s->interval);
/* allocate enough buckets to hold the past + future */
invocation->buckets =
......@@ -214,8 +227,8 @@ aggregator_putmetric(
invocation->buckets[i].cnt = 0;
}
invocation->next = compute->invocations;
compute->invocations = invocation;
invocation->next = compute->invocations_ht[omhtbucket];
compute->invocations_ht[omhtbucket] = invocation;
}
/* finally, try to do the maths */
......@@ -261,7 +274,8 @@ aggregator_putmetric(
/**
* Checks if the oldest bucket should be expired, if so, sends out
* computed aggregate metrics and moves the bucket to the end of the
* list.
* list. When no buckets are in use for an invocation, it is removed to
* cleanup resources.
*/
static void *
aggregator_expire(void *sub)
......@@ -270,10 +284,13 @@ aggregator_expire(void *sub)
aggregator *s;
struct _bucket *b;
struct _aggr_computes *c;
struct _aggr_invocations *i;
struct _aggr_invocations *inv;
struct _aggr_invocations *lastinv;
int i;
int work;
server *submission = (server *)sub;
char metric[METRIC_BUFSIZ];
char isempty;
while (1) {
work = 0;
......@@ -282,60 +299,100 @@ aggregator_expire(void *sub)
/* send metrics for buckets that are completely past the
* expiry time, unless we are shutting down, then send
* metrics for all buckets that have completed */
now = time(NULL) + (keep_running ? 0 : s->expire);
now = time(NULL) + (keep_running ? 0 : s->expire - s->interval);
for (c = s->computes; c != NULL; c = c->next) {
for (i = c->invocations; i != NULL; i = i->next) {
while (i->buckets[0].start + s->interval < now - s->expire) {
/* yay, let's produce something cool */
b = &i->buckets[0];
if (b->cnt > 0) { /* avoid emitting empty/unitialised data */
switch (c->type) {
case SUM:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
i->metric, b->sum,
b->start + s->interval);
break;
case CNT:
snprintf(metric, sizeof(metric),
"%s %zd %lld\n",
i->metric, b->cnt,
b->start + s->interval);
break;
case MAX:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
i->metric, b->max,
b->start + s->interval);
break;
case MIN:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
i->metric, b->min,
b->start + s->interval);
break;
case AVG:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
i->metric,
b->sum / (double)b->cnt,
b->start + s->interval);
for (i = 0; i < (1 << AGGR_HT_POW_SIZE); i++) {
lastinv = NULL;
isempty = 0;
for (inv = c->invocations_ht[i]; inv != NULL; ) {
while (inv->buckets[0].start + s->expire < now) {
/* yay, let's produce something cool */
b = &inv->buckets[0];
/* avoid emitting empty/unitialised data */
isempty = b->cnt == 0;
if (!isempty) {
switch (c->type) {
case SUM:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric, b->sum,
b->start + s->interval);
break;
case CNT:
snprintf(metric, sizeof(metric),
"%s %zd %lld\n",
inv->metric, b->cnt,
b->start + s->interval);
break;
case MAX:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric, b->max,
b->start + s->interval);
break;
case MIN:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric, b->min,
b->start + s->interval);
break;
case AVG:
snprintf(metric, sizeof(metric),
"%s %f %lld\n",
inv->metric,
b->sum / (double)b->cnt,
b->start + s->interval);
break;
}
server_send(submission, strdup(metric), 1);
s->sent++;
}
/* move the bucket to the end, to make room for
* new ones */
pthread_mutex_lock(&s->bucketlock);
memmove(&inv->buckets[0], &inv->buckets[1],
sizeof(*b) * (s->bucketcnt - 1));
b = &inv->buckets[s->bucketcnt - 1];
b->cnt = 0;
b->start =
inv->buckets[s->bucketcnt - 2].start +
s->interval;
pthread_mutex_unlock(&s->bucketlock);
work++;
}
if (isempty) {
int j;
/* see if the remaining buckets are empty too */
pthread_mutex_lock(&s->bucketlock);
for (j = 0; j < s->bucketcnt; j++) {
if (inv->buckets[j].cnt != 0) {
isempty = 0;
pthread_mutex_unlock(&s->bucketlock);
break;
}
}
}
if (isempty) {
/* free and unlink */
free(inv->metric);
free(inv->buckets);
if (lastinv != NULL) {
lastinv->next = inv->next;
free(inv);
inv = lastinv->next;
} else {
c->invocations_ht[i] = inv->next;
free(inv);
inv = c->invocations_ht[i];
}
server_send(submission, strdup(metric), 1);
pthread_mutex_unlock(&s->bucketlock);
} else {
lastinv = inv;
inv = inv->next;
}
pthread_mutex_lock(&s->bucketlock);
if (b->cnt > 0)
s->sent++;
/* move the bucket to the end, to make room for new ones */
memmove(&i->buckets[0], &i->buckets[1],
sizeof(*b) * (s->bucketcnt - 1));
b = &i->buckets[s->bucketcnt - 1];
b->cnt = 0;
b->start = i->buckets[s->bucketcnt - 2].start + s->interval;
pthread_mutex_unlock(&s->bucketlock);
work++;
}
}
}
......
......@@ -23,6 +23,7 @@
#include "server.h"
#define AGGR_HT_POW_SIZE 12 /* 4096: too big? issue #60 */
typedef struct _aggregator {
unsigned short interval; /* when to perform the aggregation */
unsigned short expire; /* when incoming metrics are no longer valid */
......@@ -34,7 +35,8 @@ typedef struct _aggregator {
enum _aggr_compute_type { SUM, CNT, MAX, MIN, AVG } type;
const char *metric; /* name template of metric to produce */
struct _aggr_invocations {
char *metric; /* actual name to emit */
char *metric; /* actual name to emit */
unsigned int hash; /* to speed up matching */
struct _bucket {
long long int start;
size_t cnt;
......@@ -43,7 +45,7 @@ typedef struct _aggregator {
double min;
} *buckets;
struct _aggr_invocations *next;
} *invocations;
} *invocations_ht[1 << AGGR_HT_POW_SIZE];
struct _aggr_computes *next;
} *computes;
pthread_mutex_t bucketlock;
......
......@@ -5,34 +5,34 @@
# relay Startup script for the carbon-c-relay metrics aggregation daemon
# Packaged for the BBC by Matthew Hollick <matthew@mayan-it.co.uk>
#
# description: Carbon-like graphite line mode relay.\n
#\n
#This project aims to be a replacement of the original Carbon relay\n
#\n
#The main reason to build a replacement is performance and configurability.\n
#Carbon is single threaded, and sending metrics to multiple consistent-hash\n
#clusters requires chaining of relays. This project provides a multithreaded\n
#relay which can address multiple targets and clusters for each and every\n
#metric based on pattern matches.\n
# description: Carbon-like graphite line mode relay.
#
#This project aims to be a replacement of the original Carbon relay
#
#The main reason to build a replacement is performance and configurability.
#Carbon is single threaded, and sending metrics to multiple consistent-hash
#clusters requires chaining of relays. This project provides a multithreaded
#relay which can address multiple targets and clusters for each and every
#metric based on pattern matches.
#
# chkconfig: 2345 80 80
#
# config: /etc/relay.conf
# pidfile: /var/run/carbon/relay.pid
# config: /etc/%%NAME%%.conf
# pidfile: /var/run/%%NAME%%/%%NAME%%.pid
# Source function library.
. /etc/init.d/functions
RETVAL=0
PROG="relay"
PROG="%%NAME%%"
DAEMON_CONFIG=/etc/${PROG}.conf
DAEMON_SYSCONFIG=/etc/sysconfig/${PROG}
DAEMON=/usr/bin/${PROG}
PID_FILE=/var/run/carbon/${PROG}.pid
PID_FILE=/var/run/${PROG}/${PROG}.pid
LOCK_FILE=/var/lock/subsys/${PROG}
LOG_FILE=/var/log/carbon/relay.log
DAEMON_USER="relay"
LOG_FILE=/var/log/${PROG}/${PROG}.log
DAEMON_USER="%%NAME%%"
FQDN=$(hostname --long)
. ${DAEMON_SYSCONFIG}
......
# not installed by default as logrotate is used to manage all carbon log files.
/var/log/carbon/relay.log
/var/log/%%NAME%%/%%NAME%%.log
{
sharedscripts
missingok
......@@ -7,6 +7,6 @@
rotate 30
compress
postrotate
[ ! -f /var/run/carbon/relay.pid ] || /etc/init.d/relay restart
[ ! -f /var/run/%%NAME%%/%%NAME%%.pid ] || /etc/init.d/%%NAME%% restart
endscript
}
# Monit script to ensure carbon c relay is always running
check process relay with pidfile /var/run/relay/relay.pid
start program = "/etc/init.d/relay start"
stop program = "/etc/init.d/relay stop"
check process %%NAME%% with pidfile /var/run/%%NAME%%/%%NAME%%.pid
start program = "/etc/init.d/%%NAME%% start"
stop program = "/etc/init.d/%%NAME%% stop"
if failed port 2003 type tcp then restart
if 5 restarts within 5 cycles then timeout
Name: relay
Version: 0.32
Release: 1%{?dist}
Summary: A C implementation of the Graphite carbon relay daemon packaged for mdr.
Group: System Environment/Daemons
License: See the LICENSE file at github.
URL: https://github.com/grobian/carbon-c-relay
Source0: %{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-root
Name: cc_relay
Version: 0.39
Release: 1%{?dist}
Summary: A C implementation of the Graphite carbon relay daemon packaged for MDR
Group: System Environment/Daemons
License: See the LICENSE file at github.
URL: https://github.com/grobian/carbon-c-relay
Source0: %{name}-%{version}.tar.gz
BuildRoot: %{_tmppath}/%{name}-%{version}-root
BuildRequires: openssl-devel
Requires(pre): /usr/sbin/useradd
Requires: daemonize
AutoReqProv: No
Requires(post): chkconfig
AutoReqProv: No
%description
......@@ -19,52 +20,79 @@ This project aims to be a replacement of the original Carbon relay.
Carbon C Relay has been packed as part of twiki for the BBC.
%prep
%setup -q
%setup -q -n carbon-c-relay
%{__sed} -i s/%%%%NAME%%%%/%{name}/g contrib/relay.init
%{__sed} -i s/%%%%NAME%%%%/%{name}/g contrib/relay.logrotate
%{__sed} -i s/%%%%NAME%%%%/%{name}/g contrib/relay.monit
%{__sed} -i s/%%%%NAME%%%%/%{name}/g contrib/relay.sysconfig
%build
make %{?_smp_mflags}
%install
mkdir -vp $RPM_BUILD_ROOT/var/log/carbon/
mkdir -vp $RPM_BUILD_ROOT/etc/monit.d/
mkdir -vp $RPM_BUILD_ROOT/var/run/carbon
mkdir -vp $RPM_BUILD_ROOT/var/lib/carbon
mkdir -vp $RPM_BUILD_ROOT/usr/bin
mkdir -vp $RPM_BUILD_ROOT/etc/init.d
mkdir -vp $RPM_BUILD_ROOT/etc/sysconfig
install -m 755 relay $RPM_BUILD_ROOT/usr/bin/relay
install -m 644 contrib/relay.conf $RPM_BUILD_ROOT/etc/relay.conf
install -m 755 contrib/relay.init $RPM_BUILD_ROOT/etc/init.d/relay
install -m 644 contrib/relay.sysconfig $RPM_BUILD_ROOT/etc/sysconfig/relay
install -m 644 contrib/relay.monit $RPM_BUILD_ROOT/etc/monit.d/relay.conf
mkdir -vp %{buildroot}%{_localstatedir}/log/%{name}
mkdir -vp %{buildroot}%{_sysconfdir}/monit.d/
mkdir -vp %{buildroot}%{_sysconfdir}/logrotate.d/
mkdir -vp %{buildroot}%{_localstatedir}/run/%{name}
mkdir -vp %{buildroot}%{_localstatedir}/lib/%{name}
mkdir -vp %{buildroot}%{_bindir}
mkdir -vp %{buildroot}%{_sysconfdir}/init.d
mkdir -vp %{buildroot}%{_sysconfdir}/sysconfig
%{__install} -m 755 relay %{buildroot}%{_bindir}/%{name}
%{__install} -m 644 contrib/relay.conf %{buildroot}%{_sysconfdir}/%{name}.conf
%{__install} -m 755 contrib/relay.init %{buildroot}%{_sysconfdir}/init.d/%{name}
%{__install} -m 644 contrib/relay.logrotate %{buildroot}%{_sysconfdir}/logrotate.d/%{name}
%{__install} -m 644 contrib/relay.monit %{buildroot}%{_sysconfdir}/monit.d/%{name}.conf
%{__install} -m 644 contrib/relay.sysconfig %{buildroot}%{_sysconfdir}/sysconfig/%{name}
%clean
make clean
%pre
getent group carbon >/dev/null || groupadd -r carbon
getent passwd carbon >/dev/null || \
useradd -r -g carbon -s /sbin/nologin \
-d $RPM_BUILD_ROOT/var/lib/carbon/ -c "Carbon Daemons" carbon
getent group %{name} >/dev/null || groupadd -r %{name}
getent passwd %{name} >/dev/null || \
useradd -r -g %{name} -s /sbin/nologin \
-d %{_localstatedir}/lib/%{name}/ -c "Carbon C Relay Daemon" %{name}
exit 0
%post
chgrp carbon /var/run/carbon
chmod 774 /var/run/relay
chown carbon:carbon /var/log/carbon
chmod 744 /var/log/carbon
chgrp %{name} %{_localstatedir}/run/%{name}
chmod 774 %{_localstatedir}/run/%{name}
chown %{name}:%{name} %{_localstatedir}/log/%{name}
chmod 744 %{_localstatedir}/log/%{name}
/sbin/chkconfig --add %{name}
%preun
/sbin/chkconfig --del %{name}
%files
%defattr(-,root,root,-)
/usr/bin/relay
%config(noreplace) /etc/relay.conf
/etc/init.d/relay
%config(noreplace) /etc/sysconfig/relay
%config(noreplace) /etc/monit.d/relay.conf
#/var/run/carbon
#/var/log/carbon
%{_bindir}/%{name}
%{_sysconfdir}/init.d/%{name}
%config(noreplace) %{_sysconfdir}/%{name}.conf
%config(noreplace) %{_sysconfdir}/sysconfig/%{name}
%config(noreplace) %{_sysconfdir}/logrotate.d/%{name}
%config(noreplace) %{_sysconfdir}/monit.d/%{name}.conf
%defattr(655, %{name}, %{name}, 755)
%{_localstatedir}/run/%{name}
%{_localstatedir}/log/%{name}
%changelog
* Tue Mar 24 2015 Andy "Bob" Brockhurst <andy.brockhurst@b3cft.com>
- updated spec file to use rpm macros where possible
- updated %setup to use carbon-c-relay as tar extracted location
- changed user/group to %{name}
- added placeholder %%NAME%% in init, logrotate, monit, sysconfig
-- added sed command to replace %%NAME%% in above at %prep stage
- added creation of /var/run/%{name} and /var/log/%{name}
- added chkconfig --add to %post
- added logrotate file to spec
- bump to version 0.39
* Mon Sep 8 2014 Matthew Hollick <matthew@mayan-it.co.uk>
- tidy up for github
- reverted site specific changes
......@@ -74,7 +102,7 @@ chmod 744 /var/log/carbon
* Tue Jul 1 2014 Matthew Hollick <matthew@mayan-it.co.uk>
- packaged as part of mdr
- binary renamed from 'relay' to 'cc_relay'
- binary renamed from 'relay' to '%{name}'
- pagage renamed to reflect function rather than component
- user / group named by function
......
#Usage: relay [-vdst] -f <config> [-p <port>] [-w <workers>] [-b <size>] [-q <size>]
#Usage: %%NAME%% [-vdst] -f <config> [-p <port>] [-w <workers>] [-b <size>] [-q <size>]
#
#Options:
# -v print version and exit
......
......@@ -255,7 +255,6 @@ dispatch_process_dests(connection *conn, dispatcher *self)
/* finally "complete" this metric */
conn->destlen = 0;
conn->wait = 0;
self->metrics++;
}
}
......@@ -264,7 +263,24 @@ dispatch_process_dests(connection *conn, dispatcher *self)
#define IDLE_DISCONNECT_TIME (10 * 60) /* 10 minutes */
/**
* Look at conn and see if works needs to be done. If so, do it.
* Look at conn and see if works needs to be done. If so, do it. This
* function operates on an (exclusive) lock on the connection it serves.
* Schematically, what this function does is like this:
*
* read (partial) data <----
* | |
* v |
* split and clean metrics |
* | |
* v |
* route metrics | feedback loop
* | | (stall client)
* v |
* send 1st attempt |
* \ |
* v* | * this is optional, but if a server's
* retry send (<1s) -------- queue is full, the client is stalled
* block reads
*/
static int
dispatch_connection(connection *conn, dispatcher *self)
......@@ -318,6 +334,7 @@ dispatch_connection(connection *conn, dispatcher *self)
continue;
}
self->metrics++;
*q++ = '\n';
*q = '\0'; /* can do this because we substract one from buf */
......@@ -403,13 +420,22 @@ dispatch_connection(connection *conn, dispatcher *self)
* size argument is 0) -> this is good, because we can't do much
* with such client */
closedconnections++;
close(conn->sock);
if (conn->noexpire) {
/* reset buffer only (UDP) and move on */
conn->needmore = 1;
conn->buflen = 0;
conn->takenby = 0;
return 0;
} else {
closedconnections++;
close(conn->sock);
/* flag this connection as no longer in use */
conn->takenby = -1;
/* flag this connection as no longer in use */
conn->takenby = -1;
return 0;
return 0;
}
}
/* "release" this connection again */
......@@ -542,7 +568,7 @@ static char globalid = 0;
dispatcher *
dispatch_new_listener(void)
{
char id = __sync_fetch_and_add(&globalid, 1);
char id = globalid++;
return dispatch_new(id, LISTENER, NULL);
}
......@@ -553,7 +579,7 @@ dispatch_new_listener(void)
dispatcher *
dispatch_new_connection(route *routes)
{
char id = __sync_fetch_and_add(&globalid, 1);
char id = globalid++;
return dispatch_new(id, CONNECTION, routes);
}
......
......@@ -19,14 +19,16 @@
#include <string.h>
#include <pthread.h>
typedef struct _queue {
#include "queue.h"
struct _queue {
const char **queue;
size_t end;
size_t write;
size_t read;
size_t len;
pthread_mutex_t lock;
} queue;
};
/**
......
......@@ -24,6 +24,9 @@
#include <time.h>
#include <errno.h>
#include <assert.h>
#if defined(_OPENMP)
# include <omp.h>
#endif
#include "relay.h"
#include "consistent-hash.h"
......@@ -191,7 +194,17 @@ hup_handler(int sig)
logout("SIGHUP handler complete\n");