Commit 1e79b413 authored by Jonas Genannt's avatar Jonas Genannt

Imported Upstream version 1.0.2

parent 915dd774
Metadata-Version: 1.0
Metadata-Version: 1.1
Name: carbon
Version: 0.9.15
Version: 1.0.2
Summary: Backend data caching and persistence daemon for Graphite
Home-page: http://graphite-project.github.com
Home-page: http://graphiteapp.org/
Author: Chris Davis
Author-email: chrismd@gmail.com
License: Apache Software License 2.0
Description: UNKNOWN
Description: Backend data caching and persistence daemon for Graphite
Platform: UNKNOWN
Classifier: Intended Audience :: Developers
Classifier: Natural Language :: English
Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 2 :: Only
......@@ -3,3 +3,10 @@
# match one of these expressions will be dropped
# This file is reloaded automatically when changes are made
^some\.noisy\.metric\.prefix\..*
# Reject metrics with multiple or surrounding dots, since they lead to
# counter intuitive behavior when read (they can be read from disk but not
# from carbon-cache, at least with whisper data back-end)
\.\.
^\.
\.$
......@@ -41,7 +41,7 @@ PICKLE_RECEIVER_PORT = 2004
CACHE_QUERY_INTERFACE = 0.0.0.0
CACHE_QUERY_PORT = 7002
# Enable AMQP if you want to receve metrics using you amqp broker
# Enable AMQP if you want to receive metrics using your amqp broker
ENABLE_AMQP = True
# Verbose means a line will be logged for every metric received
......
This diff is collapsed.
# Schema definitions for Whisper files. Entries are scanned in order,
# and first match wins. This file is scanned for changes every 60 seconds.
#
# [name]
# pattern = regex
# retentions = timePerPoint:timeToStore, timePerPoint:timeToStore, ...
# Definition Syntax:
#
# [name]
# pattern = regex
# retentions = timePerPoint:timeToStore, timePerPoint:timeToStore, ...
#
# Remember: To support accurate aggregation from higher to lower resolution
# archives, the precision of a longer retention archive must be
# cleanly divisible by precision of next lower retention archive.
#
# Valid: 60s:7d,300s:30d (300/60 = 5)
# Invalid: 180s:7d,300s:30d (300/180 = 3.333)
#
# Carbon's internal metrics. This entry should match what is specified in
# CARBON_METRIC_PREFIX and CARBON_METRIC_INTERVAL settings
......
......@@ -3,8 +3,6 @@
# description: carbon-cache
# processname: carbon-cache
export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
# Source function library.
if [ -e /etc/rc.d/init.d/functions ]; then
. /etc/rc.d/init.d/functions;
......@@ -14,6 +12,8 @@ CARBON_DAEMON="cache"
GRAPHITE_DIR="/opt/graphite"
INSTANCES=`grep "^\[${CARBON_DAEMON}" ${GRAPHITE_DIR}/conf/carbon.conf | cut -d \[ -f 2 | cut -d \] -f 1 | cut -d : -f 2`
export PYTHONPATH="$GRAPHITE_DIR/lib:$PYTHONPATH"
function die {
echo $1
exit 1
......
......@@ -51,7 +51,8 @@ class MetricBuffer:
self.aggregation_frequency = int(frequency)
self.aggregation_func = func
self.compute_task = LoopingCall(self.compute_value)
self.compute_task.start(settings['WRITE_BACK_FREQUENCY'] or frequency, now=False)
compute_frequency = min(settings['WRITE_BACK_FREQUENCY'], frequency) or frequency
self.compute_task.start(compute_frequency, now=False)
self.configured = True
def compute_value(self):
......@@ -63,8 +64,8 @@ class MetricBuffer:
if buffer.active:
value = self.aggregation_func(buffer.values)
datapoint = (buffer.interval, value)
events.metricGenerated(self.metric_path, datapoint)
instrumentation.increment('aggregateDatapointsSent')
state.events.metricGenerated(self.metric_path, datapoint)
state.instrumentation.increment('aggregateDatapointsSent')
buffer.mark_inactive()
if buffer.interval < age_threshold:
......@@ -103,4 +104,4 @@ class IntervalBuffer:
BufferManager = BufferManager()
# Avoid import circularity
from carbon import events, state, instrumentation
from carbon import state
from carbon.aggregator.rules import RuleManager
from carbon.aggregator.buffers import BufferManager
from carbon.instrumentation import increment
from carbon.pipeline import Processor
from carbon.rewrite import PRE, POST, RewriteRuleManager
from carbon.conf import settings
from carbon import log
class AggregationProcessor(Processor):
plugin_name = 'aggregate'
def process(self, metric, datapoint):
increment('datapointsReceived')
for rule in RewriteRuleManager.rules(PRE):
metric = rule.apply(metric)
aggregate_metrics = set()
for rule in RuleManager.rules:
aggregate_metric = rule.get_aggregate_metric(metric)
if aggregate_metric is None:
continue
else:
aggregate_metrics.add(aggregate_metric)
values_buffer = BufferManager.get_buffer(aggregate_metric)
if not values_buffer.configured:
values_buffer.configure_aggregation(rule.frequency, rule.aggregation_func)
values_buffer.input(datapoint)
for rule in RewriteRuleManager.rules(POST):
metric = rule.apply(metric)
if settings.FORWARD_ALL and metric not in aggregate_metrics:
if settings.LOG_AGGREGATOR_MISSES and len(aggregate_metrics) == 0:
log.msg("Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric)
yield (metric, datapoint)
from carbon.instrumentation import increment
from carbon.aggregator.rules import RuleManager
from carbon.aggregator.buffers import BufferManager
from carbon.conf import settings
from carbon.rewrite import RewriteRuleManager
from carbon import events
def process(metric, datapoint):
increment('datapointsReceived')
for rule in RewriteRuleManager.preRules:
metric = rule.apply(metric)
aggregate_metrics = []
for rule in RuleManager.rules:
aggregate_metric = rule.get_aggregate_metric(metric)
if aggregate_metric is None:
continue
else:
aggregate_metrics.append(aggregate_metric)
buffer = BufferManager.get_buffer(aggregate_metric)
if not buffer.configured:
buffer.configure_aggregation(rule.frequency, rule.aggregation_func)
buffer.input(datapoint)
for rule in RewriteRuleManager.postRules:
metric = rule.apply(metric)
if settings['FORWARD_ALL'] and metric not in aggregate_metrics:
events.metricGenerated(metric, datapoint)
......@@ -60,7 +60,7 @@ class RuleManager:
return AggregationRule(input_pattern, output_pattern, method, frequency)
except ValueError:
log.err("Failed to parse line: %s" % line)
log.err("Failed to parse rule in %s, line: %s" % (self.rules_file, line))
raise
......@@ -81,7 +81,11 @@ class AggregationRule:
def get_aggregate_metric(self, metric_path):
if metric_path in self.cache:
return self.cache[metric_path]
try:
return self.cache[metric_path]
except KeyError:
# The value can expire at any time, so we need to catch this.
pass
match = self.regex.match(metric_path)
result = None
......@@ -125,7 +129,7 @@ class AggregationRule:
regex_pattern_parts.append(regex_part)
regex_pattern = '\\.'.join(regex_pattern_parts)
regex_pattern = '\\.'.join(regex_pattern_parts) + '$'
self.regex = re.compile(regex_pattern)
def build_template(self):
......@@ -136,12 +140,16 @@ def avg(values):
if values:
return float( sum(values) ) / len(values)
def count(values):
if values:
return len(values)
AGGREGATION_METHODS = {
'sum' : sum,
'avg' : avg,
'min' : min,
'max' : max,
'sum' : sum,
'avg' : avg,
'min' : min,
'max' : max,
'count' : count
}
# Importable singleton
......
......@@ -35,10 +35,10 @@ import os
import socket
from optparse import OptionParser
from twisted.python.failure import Failure
from twisted.internet.defer import deferredGenerator, waitForDeferred
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.application.internet import TCPClient
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
......@@ -51,6 +51,7 @@ except ImportError:
sys.path.insert(0, LIB_DIR)
import carbon.protocols #satisfy import order requirements
from carbon.protocols import CarbonServerProtocol
from carbon.conf import settings
from carbon import log, events, instrumentation
......@@ -58,70 +59,77 @@ from carbon import log, events, instrumentation
HOSTNAME = socket.gethostname().split('.')[0]
class AMQPProtocol(CarbonServerProtocol):
plugin_name = "amqp"
@classmethod
def build(cls, root_service):
if not settings.ENABLE_AMQP:
return
amqp_host = settings.AMQP_HOST
amqp_port = settings.AMQP_PORT
amqp_user = settings.AMQP_USER
amqp_password = settings.AMQP_PASSWORD
amqp_verbose = settings.AMQP_VERBOSE
amqp_vhost = settings.AMQP_VHOST
amqp_spec = settings.AMQP_SPEC
amqp_exchange_name = settings.AMQP_EXCHANGE
factory = createAMQPListener(
amqp_user,
amqp_password,
vhost=amqp_vhost,
spec=amqp_spec,
exchange_name=amqp_exchange_name,
verbose=amqp_verbose)
service = TCPClient(amqp_host, amqp_port, factory)
service.setServiceParent(root_service)
class AMQPGraphiteProtocol(AMQClient):
"""This is the protocol instance that will receive and post metrics."""
consumer_tag = "graphite_consumer"
@deferredGenerator
@inlineCallbacks
def connectionMade(self):
AMQClient.connectionMade(self)
yield AMQClient.connectionMade(self)
log.listener("New AMQP connection made")
self.setup()
wfd = waitForDeferred(self.receive_loop())
yield wfd
yield self.setup()
yield self.receive_loop()
@deferredGenerator
@inlineCallbacks
def setup(self):
exchange = self.factory.exchange_name
d = self.authenticate(self.factory.username, self.factory.password)
wfd = waitForDeferred(d)
yield wfd
wfd = waitForDeferred(self.channel(1))
yield wfd
chan = wfd.getResult()
wfd = waitForDeferred(chan.channel_open())
yield wfd
yield self.authenticate(self.factory.username, self.factory.password)
chan = yield self.channel(1)
yield chan.channel_open()
# declare the exchange and queue
d = chan.exchange_declare(exchange=exchange, type="topic",
durable=True, auto_delete=False)
wfd = waitForDeferred(d)
yield wfd
yield chan.exchange_declare(exchange=exchange, type="topic",
durable=True, auto_delete=False)
# we use a private queue to avoid conflicting with existing bindings
wfd = waitForDeferred(chan.queue_declare(exclusive=True))
yield wfd
reply = wfd.getResult()
reply = yield chan.queue_declare(exclusive=True)
my_queue = reply.queue
# bind each configured metric pattern
for bind_pattern in settings.BIND_PATTERNS:
log.listener("binding exchange '%s' to queue '%s' with pattern %s" \
% (exchange, my_queue, bind_pattern))
d = chan.queue_bind(exchange=exchange, queue=my_queue,
routing_key=bind_pattern)
wfd = waitForDeferred(d)
yield wfd
d = chan.basic_consume(queue=my_queue, no_ack=True,
consumer_tag=self.consumer_tag)
wfd = waitForDeferred(d)
yield wfd
yield chan.queue_bind(exchange=exchange, queue=my_queue,
routing_key=bind_pattern)
@deferredGenerator
yield chan.basic_consume(queue=my_queue, no_ack=True,
consumer_tag=self.consumer_tag)
@inlineCallbacks
def receive_loop(self):
wfd = waitForDeferred(self.queue(self.consumer_tag))
yield wfd
queue = wfd.getResult()
queue = yield self.queue(self.consumer_tag)
while True:
wfd = waitForDeferred(queue.get())
yield wfd
msg = wfd.getResult()
msg = yield queue.get()
self.processMessage(msg)
def processMessage(self, message):
......@@ -175,6 +183,7 @@ class AMQPReconnectingFactory(ReconnectingClientFactory):
self.verbose = verbose
def buildProtocol(self, addr):
self.resetDelay()
p = self.protocol(self.delegate, self.vhost, self.spec)
p.factory = self
return p
......
......@@ -20,8 +20,7 @@ import os
import time
from optparse import OptionParser
from twisted.python.failure import Failure
from twisted.internet.defer import deferredGenerator, waitForDeferred
from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor, task
from twisted.internet.protocol import ClientCreator
from txamqp.protocol import AMQClient
......@@ -29,7 +28,8 @@ from txamqp.client import TwistedDelegate
from txamqp.content import Content
import txamqp.spec
@deferredGenerator
@inlineCallbacks
def writeMetric(metric_path, value, timestamp, host, port, username, password,
vhost, exchange, spec=None, channel_number=1, ssl=False):
......@@ -43,38 +43,23 @@ def writeMetric(metric_path, value, timestamp, host, port, username, password,
vhost=vhost, spec=spec)
if ssl:
from twisted.internet.ssl import ClientContextFactory
wfd = waitForDeferred(connector.connectSSL(host, port,
ClientContextFactory()))
yield wfd
conn = wfd.getResult()
conn = yield connector.connectSSL(host, port, ClientContextFactory())
else:
wfd = waitForDeferred(connector.connectTCP(host, port))
yield wfd
conn = wfd.getResult()
wfd = waitForDeferred(conn.authenticate(username, password))
yield wfd
conn = yield connector.connectTCP(host, port)
wfd = waitForDeferred(conn.channel(channel_number))
yield wfd
channel = wfd.getResult()
yield conn.authenticate(username, password)
channel = yield conn.channel(channel_number)
yield channel.channel_open()
wfd = waitForDeferred(channel.channel_open())
yield wfd
wfd = waitForDeferred(channel.exchange_declare(exchange=exchange,
type="topic",
durable=True,
auto_delete=False))
yield wfd
yield channel.exchange_declare(exchange=exchange, type="topic",
durable=True, auto_delete=False)
message = Content( "%f %d" % (value, timestamp) )
message["delivery mode"] = 2
channel.basic_publish(exchange=exchange, content=message,
routing_key=metric_path)
wfd = waitForDeferred(channel.channel_close())
yield wfd
channel.basic_publish(exchange=exchange, content=message, routing_key=metric_path)
yield channel.channel_close()
def main():
parser = OptionParser(usage="%prog [options] <metric> <value> [timestamp]")
......
......@@ -13,78 +13,223 @@ See the License for the specific language governing permissions and
limitations under the License."""
import time
from collections import deque
import threading
from operator import itemgetter
from random import choice
from collections import defaultdict
from carbon.conf import settings
try:
from collections import defaultdict
except ImportError:
from util import defaultdict
from carbon import events, log
from carbon.pipeline import Processor
def by_timestamp((timestamp, value)): # useful sort key function
return timestamp
class CacheFeedingProcessor(Processor):
plugin_name = 'write'
def __init__(self, *args, **kwargs):
super(Processor, self).__init__(*args, **kwargs)
self.cache = MetricCache()
def process(self, metric, datapoint):
self.cache.store(metric, datapoint)
return Processor.NO_OUTPUT
class DrainStrategy(object):
"""Implements the strategy for writing metrics.
The strategy chooses what order (if any) metrics
will be popped from the backing cache"""
def __init__(self, cache):
self.cache = cache
def choose_item(self):
raise NotImplemented
class NaiveStrategy(DrainStrategy):
"""Pop points in an unordered fashion."""
def __init__(self, cache):
super(NaiveStrategy, self).__init__(cache)
def _generate_queue():
while True:
metric_names = self.cache.keys()
while metric_names:
yield metric_names.pop()
self.queue = _generate_queue()
def choose_item(self):
return self.queue.next()
class MaxStrategy(DrainStrategy):
"""Always pop the metric with the greatest number of points stored.
This method leads to less variance in pointsPerUpdate but may mean
that infrequently or irregularly updated metrics may not be written
until shutdown """
def choose_item(self):
metric_name, size = max(self.cache.items(), key=lambda x: len(itemgetter(1)(x)))
return metric_name
class RandomStrategy(DrainStrategy):
"""Pop points randomly"""
def choose_item(self):
return choice(self.cache.keys())
class SortedStrategy(DrainStrategy):
""" The default strategy which prefers metrics with a greater number
of cached points but guarantees every point gets written exactly once during
a loop of the cache """
def __init__(self, cache):
super(SortedStrategy, self).__init__(cache)
def _generate_queue():
while True:
t = time.time()
metric_counts = sorted(self.cache.counts, key=lambda x: x[1])
size = len(metric_counts)
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t))
while metric_counts:
yield itemgetter(0)(metric_counts.pop())
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Queue consumed in %.6f seconds" % (time.time() - t))
self.queue = _generate_queue()
def choose_item(self):
return self.queue.next()
class TimeSortedStrategy(DrainStrategy):
""" This strategy prefers metrics wich are lagging behind
guarantees every point gets written exactly once during
a loop of the cache """
def __init__(self, cache):
super(TimeSortedStrategy, self).__init__(cache)
def _generate_queue():
while True:
t = time.time()
metric_lw = sorted(self.cache.watermarks, key=lambda x: x[1], reverse=True)
size = len(metric_lw)
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Sorted %d cache queues in %.6f seconds" % (size, time.time() - t))
while metric_lw:
yield itemgetter(0)(metric_lw.pop())
if settings.LOG_CACHE_QUEUE_SORTS and size:
log.msg("Queue consumed in %.6f seconds" % (time.time() - t))
self.queue = _generate_queue()
def choose_item(self):
return self.queue.next()
class _MetricCache(defaultdict):
def __init__(self, defaultfactory=deque, method="sorted"):
"""A Singleton dictionary of metric names and lists of their datapoints"""
def __init__(self, strategy=None):
self.lock = threading.Lock()
self.size = 0
self.method = method
if self.method == "sorted":
self.queue = self.gen_queue()
else:
self.queue = False
super(_MetricCache, self).__init__(defaultfactory)
def gen_queue(self):
while True:
t = time.time()
queue = sorted(self.counts, key=lambda x: x[1])
if settings.LOG_CACHE_QUEUE_SORTS:
log.msg("Sorted %d cache queues in %.6f seconds" % (len(queue), time.time() - t))
while queue:
yield queue.pop()[0]
def store(self, metric, datapoint):
self.size += 1
self[metric].append(datapoint)
if self.isFull():
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
def isFull(self):
# Short circuit this test if there is no max cache size, then we don't need
# to do the someone expensive work of calculating the current size.
return settings.MAX_CACHE_SIZE != float('inf') and self.size >= settings.MAX_CACHE_SIZE
def pop(self, metric=None):
if not self:
raise KeyError(metric)
elif metric:
datapoints = (metric, super(_MetricCache, self).pop(metric))
elif not metric and self.method == "max":
# TODO: [jssjr 2015-04-24] This is O(n^2) and suuuuuper slow.
metric = max(self.items(), key=lambda x: len(x[1]))[0]
datapoints = (metric, super(_MetricCache, self).pop(metric))
elif not metric and self.method == "naive":
datapoints = self.popitem()
elif not metric and self.method == "sorted":
metric = self.queue.next()
# Save only last value for each timestamp
popped = super(_MetricCache, self).pop(metric)
ordered = sorted(dict(popped).items(), key=lambda x: x[0])
datapoints = (metric, deque(ordered))
# Adjust size counter if we've dropped multiple identical timestamps
dropped = len(popped) - len(datapoints[1])
if dropped > 0:
self.size -= dropped
self.size -= len(datapoints[1])
return datapoints
self.strategy = None
if strategy:
self.strategy = strategy(self)
super(_MetricCache, self).__init__(dict)
@property
def counts(self):
return [(metric, len(datapoints)) for (metric, datapoints) in self.items()]
@property
def watermarks(self):
return [(metric, min(datapoints.keys()), max(datapoints.keys()))
for (metric, datapoints) in self.items()
if datapoints]
@property
def is_full(self):
if settings.MAX_CACHE_SIZE == float('inf'):
return False
else:
return self.size >= settings.MAX_CACHE_SIZE
def _check_available_space(self):
if state.cacheTooFull and self.size < settings.CACHE_SIZE_LOW_WATERMARK:
log.msg("MetricCache below watermark: self.size=%d" % self.size)
events.cacheSpaceAvailable()
def drain_metric(self):
"""Returns a metric and it's datapoints in order determined by the
`DrainStrategy`_"""
if not self:
return (None, [])
if self.strategy:
metric = self.strategy.choose_item()
else:
# Avoid .keys() as it dumps the whole list
metric = self.iterkeys().next()
return (metric, self.pop(metric))
def get_datapoints(self, metric):
"""Return a list of currently cached datapoints sorted by timestamp"""
return sorted(self.get(metric, {}).items(), key=by_timestamp)
def pop(self, metric):
with self.lock:
datapoint_index = defaultdict.pop(self, metric)
self.size -= len(datapoint_index)
self._check_available_space()
return sorted(datapoint_index.items(), key=by_timestamp)
def store(self, metric, datapoint):
timestamp, value = datapoint
if timestamp not in self[metric]:
# Not a duplicate, hence process if cache is not full
if self.is_full:
log.msg("MetricCache is full: self.size=%d" % self.size)
events.cacheFull()
else:
with self.lock:
self.size += 1
self[metric][timestamp] = value
else:
# Updating a duplicate does not increase the cache size
self[metric][timestamp] = value
_Cache = None
def MetricCache():
global _Cache
if _Cache is not None:
return _Cache