Commit 8a3b5268 authored by Jonas Genannt's avatar Jonas Genannt

Imported Upstream version 0.9.14

parent 3182976d
Metadata-Version: 1.0
Name: carbon
Version: 0.9.13
Version: 0.9.14
Summary: Backend data caching and persistence daemon for Graphite
Home-page: http://graphite-project.github.com
Author: Chris Davis
......
......@@ -48,6 +48,7 @@ option_parser.add_option('--keyfunc', help="Use a custom key function (path/to/m
option_parser.add_option('--replication', type='int', default=1, help='Replication factor')
option_parser.add_option('--routing', default='consistent-hashing',
help='Routing method: "consistent-hashing" (default) or "relay"')
option_parser.add_option('--diverse-replicas', action='store_true', help="Spread replicas across diff. servers")
option_parser.add_option('--relayrules', default=default_relayrules,
help='relay-rules.conf file to use for relay routing')
......@@ -81,7 +82,7 @@ if options.debug:
defer.setDebugging(True)
if options.routing == 'consistent-hashing':
router = ConsistentHashingRouter(options.replication)
router = ConsistentHashingRouter(options.replication, diverse_replicas=options.diverse_replicas)
elif options.routing == 'relay':
if exists(options.relayrules):
router = RelayRulesRouter(options.relayrules)
......
......@@ -8,7 +8,7 @@
# Defaults to ../
# GRAPHITE_CONF_DIR - Configuration directory (where this file lives).
# Defaults to $GRAPHITE_ROOT/conf/
# GRAPHITE_STORAGE_DIR - Storage directory for whipser/rrd/log/pid files.
# GRAPHITE_STORAGE_DIR - Storage directory for whisper/rrd/log/pid files.
# Defaults to $GRAPHITE_ROOT/storage/
#
# To change other directory paths, add settings to this file. The following
......@@ -61,17 +61,25 @@ MAX_UPDATES_PER_SECOND = 500
# MAX_UPDATES_PER_SECOND_ON_SHUTDOWN = 1000
# Softly limits the number of whisper files that get created each minute.
# Setting this value low (like at 50) is a good way to ensure your graphite
# Setting this value low (e.g. 50) is a good way to ensure that your carbon
# system will not be adversely impacted when a bunch of new metrics are
# sent to it. The trade off is that it will take much longer for those metrics'
# database files to all get created and thus longer until the data becomes usable.
# Setting this value high (like "inf" for infinity) will cause graphite to create
# the files quickly but at the risk of slowing I/O down considerably for a while.
# sent to it. The trade off is that any metrics received in excess of this
# value will be silently dropped, and the whisper file will not be created
# until such point as a subsequent metric is received and fits within the
# defined rate limit. Setting this value high (like "inf" for infinity) will
# cause carbon to create the files quickly but at the risk of increased I/O.
MAX_CREATES_PER_MINUTE = 50
# Set the interface and port for the line (plain text) listener. Setting the
# interface to 0.0.0.0 listens on all interfaces. Port can be set to 0 to
# disable this listener if it is not required.
LINE_RECEIVER_INTERFACE = 0.0.0.0
LINE_RECEIVER_PORT = 2003
# Set the TCP backlog for the listen socket created by the line receiver. You
# shouldn't change this unless you know what you're doing.
# LINE_RECEIVER_BACKLOG = 1024
# Set this to True to enable the UDP listener. By default this is off
# because it is very common to run multiple carbon daemons and managing
# another (rarely used) port for every carbon instance is not fun.
......@@ -79,9 +87,16 @@ ENABLE_UDP_LISTENER = False
UDP_RECEIVER_INTERFACE = 0.0.0.0
UDP_RECEIVER_PORT = 2003
# Set the interface and port for the pickle listener. Setting the interface to
# 0.0.0.0 listens on all interfaces. Port can be set to 0 to disable this
# listener if it is not required.
PICKLE_RECEIVER_INTERFACE = 0.0.0.0
PICKLE_RECEIVER_PORT = 2004
# Set the TCP backlog for the listen socket created by the pickle receiver. You
# shouldn't change this unless you know what you're doing.
# PICKLE_RECEIVER_BACKLOG = 1024
# Set to false to disable logging of successful connections
LOG_LISTENER_CONNECTIONS = True
......@@ -93,6 +108,10 @@ USE_INSECURE_UNPICKLER = False
CACHE_QUERY_INTERFACE = 0.0.0.0
CACHE_QUERY_PORT = 7002
# Set the TCP backlog for the listen socket created by the cache query
# listener. You shouldn't change this unless you know what you're doing.
# CACHE_QUERY_BACKLOG = 1024
# Set this to False to drop datapoints received after the cache
# reaches MAX_CACHE_SIZE. If this is True (the default) then sockets
# over which metrics are received will temporarily stop accepting
......@@ -244,6 +263,14 @@ RELAY_METHOD = rules
# datapoint to more than one machine.
REPLICATION_FACTOR = 1
# For REPLICATION_FACTOR >=2, set DIVERSE_REPLICAS to True to guarantee replicas
# across distributed hosts. With this setting disabled, it's possible that replicas
# may be sent to different caches on the same host. This has been the default
# behavior since introduction of 'consistent-hashing' relay method.
# Note that enabling this on an existing pre-0.9.14 cluster will require rebalancing
# your metrics across the cluster nodes using a tool like Carbonate.
#DIVERSE_REPLICAS = False
# This is a list of carbon daemons we will send any relayed or
# generated metrics to. The default provided would send to a single
# carbon-cache instance on the default port. However if you
......
......@@ -10,7 +10,7 @@
# name: Arbitrary unique name to identify the rule
# pattern: Regex pattern to match against the metric name
# destinations: Comma-separated list of destinations.
# ex: 127.0.0.1, 10.1.2.3:2004, 10.1.2.4:2004:a, myserver.mydomain.com
# ex: 127.0.0.1:2004:a, 10.1.2.4:2004, myserver.mydomain.com:2004
# continue: Continue processing rules if this rule matches (default: False)
# You must have exactly one section with 'default = true'
......
#!/bin/bash
# chkconfig: - 25 75
# description: carbon-aggregator
# processname: carbon-aggregator
export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
# Source function library.
if [ -e /etc/rc.d/init.d/functions ]; then
. /etc/rc.d/init.d/functions;
fi;
CARBON_DAEMON="aggregator"
GRAPHITE_DIR="/opt/graphite"
INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
function die {
echo $1
exit 1
}
start(){
cd $GRAPHITE_DIR;
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
stop(){
cd $GRAPHITE_DIR
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
sleep 20;
/usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
fi;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
status(){
cd $GRAPHITE_DIR;
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status
;;
restart|reload)
stop
start
;;
*)
echo $"Usage: $0 {start|stop|restart|status}"
exit 1
esac
#!/bin/bash
# chkconfig: - 25 75
# description: carbon-cache
# processname: carbon-cache
export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
# Source function library.
if [ -e /etc/rc.d/init.d/functions ]; then
. /etc/rc.d/init.d/functions;
fi;
CARBON_DAEMON="cache"
GRAPHITE_DIR="/opt/graphite"
INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
function die {
echo $1
exit 1
}
start(){
cd $GRAPHITE_DIR;
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
stop(){
cd $GRAPHITE_DIR
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
sleep 20;
/usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
fi;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
status(){
cd $GRAPHITE_DIR;
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status
;;
restart|reload)
stop
start
;;
*)
echo $"Usage: $0 {start|stop|restart|status}"
exit 1
esac
#!/bin/bash
# chkconfig: - 25 75
# description: carbon-relay
# processname: carbon-relay
export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
# Source function library.
if [ -e /etc/rc.d/init.d/functions ]; then
. /etc/rc.d/init.d/functions;
fi;
CARBON_DAEMON="relay"
GRAPHITE_DIR="/opt/graphite"
INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
function die {
echo $1
exit 1
}
start(){
cd $GRAPHITE_DIR;
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
echo "Starting carbon-${CARBON_DAEMON}:${INSTANCE}..."
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} start;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
stop(){
cd $GRAPHITE_DIR
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
echo "Stopping carbon-${CARBON_DAEMON}:${INSTANCE}..."
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} stop
if [ `sleep 3; /usr/bin/pgrep -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}" | /usr/bin/wc -l` -gt 0 ]; then
echo "Carbon did not stop yet. Sleeping longer, then force killing it...";
sleep 20;
/usr/bin/pkill -9 -f "carbon-${CARBON_DAEMON}.py --instance=${INSTANCE}";
fi;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
status(){
cd $GRAPHITE_DIR;
for INSTANCE in ${INSTANCES}; do
if [ "${INSTANCE}" == "${CARBON_DAEMON}" ]; then
INSTANCE="a";
fi;
bin/carbon-${CARBON_DAEMON}.py --instance=${INSTANCE} status;
if [ $? -eq 0 ]; then
echo_success
else
echo_failure
fi;
echo ""
done;
}
case "$1" in
start)
start
;;
stop)
stop
;;
status)
status
;;
restart|reload)
stop
start
;;
*)
echo $"Usage: $0 {start|stop|restart|status}"
exit 1
esac
......@@ -63,8 +63,8 @@ class MetricBuffer:
if buffer.active:
value = self.aggregation_func(buffer.values)
datapoint = (buffer.interval, value)
state.events.metricGenerated(self.metric_path, datapoint)
state.instrumentation.increment('aggregateDatapointsSent')
events.metricGenerated(self.metric_path, datapoint)
instrumentation.increment('aggregateDatapointsSent')
buffer.mark_inactive()
if buffer.interval < age_threshold:
......@@ -103,4 +103,4 @@ class IntervalBuffer:
BufferManager = BufferManager()
# Avoid import circularity
from carbon import state
from carbon import events, state, instrumentation
......@@ -93,7 +93,8 @@ class AggregationRule:
except TypeError:
log.err("Failed to interpolate template %s with fields %s" % (self.output_template, extracted_fields))
self.cache[metric_path] = result
if result:
self.cache[metric_path] = result
return result
def build_regex(self):
......
......@@ -36,7 +36,7 @@ class _MetricCache(defaultdict):
t = time.time()
queue = sorted(self.counts, key=lambda x: x[1])
if settings.LOG_CACHE_QUEUE_SORTS:
log.debug("Sorted %d cache queues in %.6f seconds" % (len(queue), time.time() - t))
log.msg("Sorted %d cache queues in %.6f seconds" % (len(queue), time.time() - t))
while queue:
yield queue.pop()[0]
......@@ -45,7 +45,7 @@ class _MetricCache(defaultdict):
self[metric].append(datapoint)
if self.isFull():
log.msg("MetricCache is full: self.size=%d" % self.size)
state.events.cacheFull()
events.cacheFull()
def isFull(self):
# Short circuit this test if there is no max cache size, then we don't need
......@@ -55,7 +55,10 @@ class _MetricCache(defaultdict):
def pop(self, metric=None):
if not self:
raise KeyError(metric)
elif metric:
datapoints = (metric, super(_MetricCache, self).pop(metric))
elif not metric and self.method == "max":
# TODO: [jssjr 2015-04-24] This is O(n^2) and suuuuuper slow.
metric = max(self.items(), key=lambda x: len(x[1]))[0]
datapoints = (metric, super(_MetricCache, self).pop(metric))
elif not metric and self.method == "naive":
......@@ -66,6 +69,10 @@ class _MetricCache(defaultdict):
popped = super(_MetricCache, self).pop(metric)
ordered = sorted(dict(popped).items(), key=lambda x: x[0])
datapoints = (metric, deque(ordered))
# Adjust size counter if we've dropped multiple identical timestamps
dropped = len(popped) - len(datapoints[1])
if dropped > 0:
self.size -= dropped
self.size -= len(datapoints[1])
return datapoints
......@@ -80,4 +87,4 @@ MetricCache = _MetricCache(method=settings.CACHE_WRITE_STRATEGY)
# Avoid import circularities
from carbon import log, state
from carbon import log, state, events
......@@ -5,7 +5,7 @@ from twisted.internet.protocol import ReconnectingClientFactory
from twisted.protocols.basic import Int32StringReceiver
from carbon.conf import settings
from carbon.util import pickle
from carbon import log, state, instrumentation
from carbon import log, events, state, instrumentation
try:
import signal
......@@ -76,7 +76,8 @@ class CarbonClientProtocol(Int32StringReceiver):
queueSize = self.factory.queueSize
if (self.factory.queueFull.called and
queueSize < SEND_QUEUE_LOW_WATERMARK):
self.factory.queueHasSpace.callback(queueSize)
if not self.factory.queueHasSpace.called:
self.factory.queueHasSpace.callback(queueSize)
def __str__(self):
return 'CarbonClientProtocol(%s:%d:%s)' % (self.factory.destination)
......@@ -110,7 +111,7 @@ class CarbonClientFactory(ReconnectingClientFactory):
self.relayMaxQueueLength = 'destinations.%s.relayMaxQueueLength' % self.destinationName
def queueFullCallback(self, result):
state.events.cacheFull()
events.cacheFull()
log.clients('%s send queue is full (%d datapoints)' % (self, result))
def queueSpaceCallback(self, result):
......@@ -118,7 +119,7 @@ class CarbonClientFactory(ReconnectingClientFactory):
log.clients('%s send queue has space available' % self.connectedProtocol)
self.queueFull = Deferred()
self.queueFull.addCallback(self.queueFullCallback)
state.events.cacheSpaceAvailable()
events.cacheSpaceAvailable()
self.queueHasSpace = Deferred()
self.queueHasSpace.addCallback(self.queueSpaceCallback)
......
......@@ -32,16 +32,20 @@ defaults = dict(
USER="",
MAX_CACHE_SIZE=float('inf'),
MAX_UPDATES_PER_SECOND=500,
MAX_UPDATES_PER_SECOND_ON_SHUTDOWN=1000,
MAX_CREATES_PER_MINUTE=float('inf'),
LINE_RECEIVER_INTERFACE='0.0.0.0',
LINE_RECEIVER_PORT=2003,
LINE_RECEIVER_BACKLOG=1024,
ENABLE_UDP_LISTENER=False,
UDP_RECEIVER_INTERFACE='0.0.0.0',
UDP_RECEIVER_PORT=2003,
PICKLE_RECEIVER_INTERFACE='0.0.0.0',
PICKLE_RECEIVER_PORT=2004,
PICKLE_RECEIVER_BACKLOG=1024,
CACHE_QUERY_INTERFACE='0.0.0.0',
CACHE_QUERY_PORT=7002,
CACHE_QUERY_BACKLOG=1024,
LOG_UPDATES=True,
LOG_CACHE_HITS=True,
LOG_CACHE_QUEUE_SORTS=True,
......@@ -64,6 +68,7 @@ defaults = dict(
MANHOLE_PUBLIC_KEY="",
RELAY_METHOD='rules',
REPLICATION_FACTOR=1,
DIVERSE_REPLICAS=False,
DESTINATIONS=[],
USE_FLOW_CONTROL=True,
USE_INSECURE_UNPICKLER=False,
......@@ -413,6 +418,9 @@ def get_default_parser(usage="%prog [options] <start|stop|status>"):
parser.add_option(
"--profile",
help="Record performance profile data to the given file")
parser.add_option(
"--profiler",
help="Specify the profiler to use")
parser.add_option(
"--pidfile", default=None,
help="Write pid to the given file")
......
......@@ -29,10 +29,7 @@ cacheSpaceAvailable = Event('cacheSpaceAvailable')
pauseReceivingMetrics = Event('pauseReceivingMetrics')
resumeReceivingMetrics = Event('resumeReceivingMetrics')
# Default handlers
metricReceived.addHandler(lambda metric, datapoint: state.instrumentation.increment('metricsReceived'))
cacheFull.addHandler(lambda: state.instrumentation.increment('cache.overflow'))
cacheFull.addHandler(lambda: carbon.instrumentation.increment('cache.overflow'))
cacheFull.addHandler(lambda: setattr(state, 'cacheTooFull', True))
cacheSpaceAvailable.addHandler(lambda: setattr(state, 'cacheTooFull', False))
......@@ -41,4 +38,4 @@ resumeReceivingMetrics.addHandler(lambda: setattr(state, 'metricReceiversPaused'
# Avoid import circularities
from carbon import log, state
from carbon import log
......@@ -116,9 +116,9 @@ def recordMetrics():
# aggregator metrics
elif settings.program == 'carbon-aggregator':
record = aggregator_record
record('allocatedBuffers', len(BufferManager))
record('allocatedBuffers', len(carbon.aggregator.buffers.BufferManager))
record('bufferedDatapoints',
sum([b.size for b in BufferManager.buffers.values()]))
sum([b.size for b in carbon.aggregator.buffers.BufferManager.buffers.values()]))
record('aggregateDatapointsSent', myStats.get('aggregateDatapointsSent', 0))
# relay metrics
......@@ -171,6 +171,8 @@ def aggregator_record(metric, value):
class InstrumentationService(Service):
def __init__(self):
self.record_task = LoopingCall(recordMetrics)
# Default handlers
events.metricReceived.addHandler(lambda metric, datapoint: increment('metricsReceived'))
def startService(self):
if settings.CARBON_METRIC_INTERVAL > 0:
......@@ -185,4 +187,3 @@ class InstrumentationService(Service):
# Avoid import circularities
from carbon import state, events, cache
from carbon.aggregator.buffers import BufferManager
import os
import time
from os.path import exists
from sys import stdout, stderr
from zope.interface import implements
from twisted.python.log import startLoggingWithObserver, textFromEventDict, msg, err, ILogObserver
......@@ -15,6 +15,26 @@ class CarbonLogFile(DailyLogFile):
from carbon.conf import settings
self.enableRotation = settings.ENABLE_LOGROTATION
def _openFile(self):
"""
Fix Umask Issue https://twistedmatrix.com/trac/ticket/7026
"""
openMode = self.defaultMode or 0777
self._file = os.fdopen(os.open(
self.path, os.O_CREAT|os.O_RDWR, openMode), 'r+', 1)
self.closed = False
# Try our best to update permissions for files which already exist.
if self.defaultMode:
try:
os.chmod(self.path, self.defaultMode)
except OSError:
pass
# Seek is needed for uniformity of stream positioning
# for read and write between Linux and BSD systems due
# to differences in fopen() between operating systems.
self._file.seek(0, os.SEEK_END)
self.lastDate = self.toDate(os.stat(self.path)[8])
def shouldRotate(self):
if self.enableRotation:
return DailyLogFile.shouldRotate(self)
......@@ -23,7 +43,7 @@ class CarbonLogFile(DailyLogFile):
def write(self, data):
if not self.enableRotation:
if not exists(self.path):
if not os.path.exists(self.path):
self.reopen()
DailyLogFile.write(self, data)
......
......@@ -72,7 +72,7 @@ class MetricLineReceiver(MetricReceiver, LineOnlyReceiver):
metric, value, timestamp = line.strip().split()
datapoint = (float(timestamp), float(value))
except ValueError:
log.listener('invalid line received from client %s, ignoring' % self.peerName)
log.listener('invalid line (%s) received from client %s, ignoring' % (line, self.peerName))
return
self.metricReceived(metric, datapoint)
......@@ -87,7 +87,7 @@ class MetricDatagramReceiver(MetricReceiver, DatagramProtocol):
self.metricReceived(metric, datapoint)
except ValueError:
log.listener('invalid line received from %s, ignoring' % host)
log.listener('invalid line (%s) received from %s, ignoring' % (line, host))
class MetricPickleReceiver(MetricReceiver, Int32StringReceiver):
......
......@@ -41,8 +41,9 @@ class RelayRulesRouter(DatapointRouter):
class ConsistentHashingRouter(DatapointRouter):
def __init__(self, replication_factor=1):
def __init__(self, replication_factor=1, diverse_replicas=False):
self.replication_factor = int(replication_factor)
self.diverse_replicas = diverse_replicas
self.instance_ports = {} # { (server, instance) : port }
self.ring = ConsistentHashRing([])
......@@ -62,13 +63,24 @@ class ConsistentHashingRouter(DatapointRouter):
def getDestinations(self, metric):
key = self.getKey(metric)
for count,node in enumerate(self.ring.get_nodes(key)):
if count == self.replication_factor:
return
(server, instance) = node
port = self.instance_ports[ (server, instance) ]
yield (server, port, instance)
if self.diverse_replicas:
used_servers = set()
for (server, instance) in self.ring.get_nodes(key):
if server in used_servers:
continue
else:
used_servers.add(server)
port = self.instance_ports[(server, instance)]
yield (server, port, instance)
if len(used_servers) >= self.replication_factor:
return
else:
for (count, node) in enumerate(self.ring.get_nodes(key)):
if count == self.replication_factor:
return
(server, instance) = node
port = self.instance_ports[(server, instance)]
yield (server, port, instance)
def getKey(self, metric):
return metric
......@@ -85,8 +97,8 @@ class ConsistentHashingRouter(DatapointRouter):
self.setKeyFunction(keyfunc)
class AggregatedConsistentHashingRouter(DatapointRouter):
def __init__(self, agg_rules_manager, replication_factor=1):
self.hash_router = ConsistentHashingRouter(replication_factor)
def __init__(self, agg_rules_manager, replication_factor=1, diverse_replicas=False):
self.hash_router = ConsistentHashingRouter(replication_factor, diverse_replicas=diverse_replicas)
self.agg_rules_manager = agg_rules_manager
def addDestination(self, destination):
......
......@@ -24,9 +24,6 @@ from carbon import state, util, events
from carbon.log import carbonLogObserver
from carbon.exceptions import CarbonConfigException