Commit 1816fc82 authored by Jonas Genannt's avatar Jonas Genannt

Imported Upstream version 0.9.10

parents
This diff is collapsed.
recursive-include conf/ *
recursive-include distro/ *
exclude conf/*.conf
include LICENSE
include lib/carbon/amqp0-8.xml
include MANIFEST.in
Metadata-Version: 1.0
Name: carbon
Version: 0.9.10
Summary: Backend data caching and persistence daemon for Graphite
Home-page: https://launchpad.net/graphite
Author: Chris Davis
Author-email: chrismd@gmail.com
License: Apache Software License 2.0
Description: UNKNOWN
Platform: UNKNOWN
#!/usr/bin/env python
"""Copyright 2009 Chris Davis
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import sys
from os.path import dirname, join, abspath
# Figure out where we're installed
BIN_DIR = dirname(abspath(__file__))
ROOT_DIR = dirname(BIN_DIR)
# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
# source.
LIB_DIR = join(ROOT_DIR, 'lib')
sys.path.insert(0, LIB_DIR)
from carbon.util import run_twistd_plugin
run_twistd_plugin(__file__)
#!/usr/bin/env python
"""Copyright 2009 Chris Davis
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import sys
from os.path import dirname, join, abspath
# Figure out where we're installed
BIN_DIR = dirname(abspath(__file__))
ROOT_DIR = dirname(BIN_DIR)
# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
# source.
LIB_DIR = join(ROOT_DIR, 'lib')
sys.path.insert(0, LIB_DIR)
from carbon.util import run_twistd_plugin
run_twistd_plugin(__file__)
#!/usr/bin/env python
"""Copyright 2009 Chris Davis
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import sys
import imp
from os.path import dirname, join, abspath, exists
from optparse import OptionParser
# Figure out where we're installed
BIN_DIR = dirname(abspath(__file__))
ROOT_DIR = dirname(BIN_DIR)
CONF_DIR = join(ROOT_DIR, 'conf')
default_relayrules = join(CONF_DIR, 'relay-rules.conf')
# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
# source.
LIB_DIR = join(ROOT_DIR, 'lib')
sys.path.insert(0, LIB_DIR)
try:
from twisted.internet import epollreactor
epollreactor.install()
except ImportError:
pass
from twisted.internet import stdio, reactor, defer
from twisted.protocols.basic import LineReceiver
from carbon.routers import ConsistentHashingRouter, RelayRulesRouter
from carbon.client import CarbonClientManager
from carbon import log, events
option_parser = OptionParser(usage="%prog [options] <host:port:instance> <host:port:instance> ...")
option_parser.add_option('--debug', action='store_true', help="Log debug info to stdout")
option_parser.add_option('--keyfunc', help="Use a custom key function (path/to/module.py:myFunc)")
option_parser.add_option('--replication', type='int', default=1, help='Replication factor')
option_parser.add_option('--routing', default='consistent-hashing',
help='Routing method: "consistent-hashing" (default) or "relay"')
option_parser.add_option('--relayrules', default=default_relayrules,
help='relay-rules.conf file to use for relay routing')
options, args = option_parser.parse_args()
if not args:
print 'At least one host:port destination required\n'
option_parser.print_usage()
raise SystemExit(1)
if options.routing not in ('consistent-hashing', 'relay'):
print "Invalid --routing value, must be one of:"
print " consistent-hashing"
print " relay"
raise SystemExit(1)
destinations = []
for arg in args:
parts = arg.split(':', 2)
host = parts[0]
port = int(parts[1])
if len(parts) > 2:
instance = parts[2]
else:
instance = None
destinations.append( (host, port, instance) )
if options.debug:
log.logToStdout()
log.setDebugEnabled(True)
defer.setDebugging(True)
if options.routing == 'consistent-hashing':
router = ConsistentHashingRouter(options.replication)
elif options.routing == 'relay':
if exists(options.relayrules):
router = RelayRulesRouter(options.relayrules)
else:
print "relay rules file %s does not exist" % options.relayrules
raise SystemExit(1)
client_manager = CarbonClientManager(router)
reactor.callWhenRunning(client_manager.startService)
if options.keyfunc:
router.setKeyFunctionFromModule(options.keyfunc)
firstConnectAttempts = [client_manager.startClient(dest) for dest in destinations]
firstConnectsAttempted = defer.DeferredList(firstConnectAttempts)
class StdinMetricsReader(LineReceiver):
delimiter = '\n'
def lineReceived(self, line):
#log.msg("[DEBUG] lineReceived(): %s" % line)
try:
(metric, value, timestamp) = line.split()
datapoint = (float(timestamp), float(value))
assert datapoint[1] == datapoint[1] # filter out NaNs
client_manager.sendDatapoint(metric, datapoint)
except:
log.err(None, 'Dropping invalid line: %s' % line)
def connectionLost(self, reason):
log.msg('stdin disconnected')
def startShutdown(results):
log.msg("startShutdown(%s)" % str(results))
allStopped = client_manager.stopAllClients()
allStopped.addCallback(shutdown)
firstConnectsAttempted.addCallback(startShutdown)
stdio.StandardIO( StdinMetricsReader() )
exitCode = 0
def shutdown(results):
global exitCode
for success, result in results:
if not success:
exitCode = 1
break
if reactor.running:
reactor.stop()
reactor.run()
raise SystemExit(exitCode)
#!/usr/bin/env python
"""Copyright 2009 Chris Davis
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import sys
from os.path import dirname, join, abspath
# Figure out where we're installed
BIN_DIR = dirname(abspath(__file__))
ROOT_DIR = dirname(BIN_DIR)
# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
# source.
LIB_DIR = join(ROOT_DIR, 'lib')
sys.path.insert(0, LIB_DIR)
from carbon.util import run_twistd_plugin
run_twistd_plugin(__file__)
#!/usr/bin/env python
"""Copyright 2009 Chris Davis
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import sys
import whisper
from os.path import dirname, exists, join, realpath
from ConfigParser import ConfigParser
if len(sys.argv) == 2:
SCHEMAS_FILE = sys.argv[1]
print "Loading storage-schemas configuration from: '%s'" % SCHEMAS_FILE
else:
SCHEMAS_FILE = realpath(join(dirname(__file__), '..', 'conf', 'storage-schemas.conf'))
print "Loading storage-schemas configuration from default location at: '%s'" % SCHEMAS_FILE
config_parser = ConfigParser()
if not config_parser.read(SCHEMAS_FILE):
print "Error: Couldn't read config file: %s" % SCHEMAS_FILE
sys.exit(1)
errors_found = 0
for section in config_parser.sections():
print "Section '%s':" % section
options = dict(config_parser.items(section))
retentions = options['retentions'].split(',')
archives = []
section_failed = False
for retention in retentions:
try:
archives.append(whisper.parseRetentionDef(retention))
except ValueError, e:
print " - Error: Section '%s' contains an invalid item in its retention definition ('%s')" % \
(section, retention)
print " %s" % e.message
section_failed = True
if not section_failed:
try:
whisper.validateArchiveList(archives)
except whisper.InvalidConfiguration, e:
print " - Error: Section '%s' contains an invalid retention definition ('%s')" % \
(section, ','.join(retentions))
print " %s" % e.message
if section_failed:
errors_found += 1
else:
print " OK"
if errors_found:
print
print "Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE
sys.exit(1)
print
print "Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE
# The form of each line in this file should be as follows:
#
# output_template (frequency) = method input_pattern
#
# This will capture any received metrics that match 'input_pattern'
# for calculating an aggregate metric. The calculation will occur
# every 'frequency' seconds and the 'method' can specify 'sum' or
# 'avg'. The name of the aggregate metric will be derived from
# 'output_template' filling in any captured fields from 'input_pattern'.
#
# For example, if you're metric naming scheme is:
#
# <env>.applications.<app>.<server>.<metric>
#
# You could configure some aggregations like so:
#
# <env>.applications.<app>.all.requests (60) = sum <env>.applications.<app>.*.requests
# <env>.applications.<app>.all.latency (60) = avg <env>.applications.<app>.*.latency
#
# As an example, if the following metrics are received:
#
# prod.applications.apache.www01.requests
# prod.applications.apache.www01.requests
#
# They would all go into the same aggregation buffer and after 60 seconds the
# aggregate metric 'prod.applications.apache.all.requests' would be calculated
# by summing their values.
#
# Note that any time this file is modified, it will be re-read automatically.
# This file takes a single regular expression per line
# If USE_WHITELIST is set to True in carbon.conf, any metrics received which
# match one of these expressions will be dropped
# This file is reloaded automatically when changes are made
^some\.noisy\.metric\.prefix\..*
# This is a configuration file with AMQP enabled
[cache]
LOCAL_DATA_DIR =
# Specify the user to drop privileges to
# If this is blank carbon runs as the user that invokes it
# This user must have write access to the local data directory
USER =
# Limit the size of the cache to avoid swapping or becoming CPU bound.
# Sorts and serving cache queries gets more expensive as the cache grows.
# Use the value "inf" (infinity) for an unlimited cache size.
MAX_CACHE_SIZE = inf
# Limits the number of whisper update_many() calls per second, which effectively
# means the number of write requests sent to the disk. This is intended to
# prevent over-utilizing the disk and thus starving the rest of the system.
# When the rate of required updates exceeds this, then carbon's caching will
# take effect and increase the overall throughput accordingly.
MAX_UPDATES_PER_SECOND = 1000
# Softly limits the number of whisper files that get created each minute.
# Setting this value low (like at 50) is a good way to ensure your graphite
# system will not be adversely impacted when a bunch of new metrics are
# sent to it. The trade off is that it will take much longer for those metrics'
# database files to all get created and thus longer until the data becomes usable.
# Setting this value high (like "inf" for infinity) will cause graphite to create
# the files quickly but at the risk of slowing I/O down considerably for a while.
MAX_CREATES_PER_MINUTE = inf
LINE_RECEIVER_INTERFACE = 0.0.0.0
LINE_RECEIVER_PORT = 2003
UDP_RECEIVER_INTERFACE = 0.0.0.0
UDP_RECEIVER_PORT = 2003
PICKLE_RECEIVER_INTERFACE = 0.0.0.0
PICKLE_RECEIVER_PORT = 2004
CACHE_QUERY_INTERFACE = 0.0.0.0
CACHE_QUERY_PORT = 7002
# Enable AMQP if you want to receve metrics using you amqp broker
ENABLE_AMQP = True
# Verbose means a line will be logged for every metric received
# useful for testing
AMQP_VERBOSE = True
# your credentials for the amqp server
# AMQP_USER = guest
# AMQP_PASSWORD = guest
# the network settings for the amqp server
# AMQP_HOST = localhost
# AMQP_PORT = 5672
# if you want to include the metric name as part of the message body
# instead of as the routing key, set this to True
# AMQP_METRIC_NAME_IN_BODY = False
# NOTE: you cannot run both a cache and a relay on the same server
# with the default configuration, you have to specify a distinict
# interfaces and ports for the listeners.
[relay]
LINE_RECEIVER_INTERFACE = 0.0.0.0
LINE_RECEIVER_PORT = 2003
PICKLE_RECEIVER_INTERFACE = 0.0.0.0
PICKLE_RECEIVER_PORT = 2004
CACHE_SERVERS = server1, server2, server3
MAX_QUEUE_SIZE = 10000
This diff is collapsed.
# Relay destination rules for carbon-relay. Entries are scanned in order,
# and the first pattern a metric matches will cause processing to cease after sending
# unless `continue` is set to true
#
# [name]
# pattern = <regex>
# destinations = <list of destination addresses>
# continue = <boolean> # default: False
#
# name: Arbitrary unique name to identify the rule
# pattern: Regex pattern to match against the metric name
# destinations: Comma-separated list of destinations.
# ex: 127.0.0.1, 10.1.2.3:2004, 10.1.2.4:2004:a, myserver.mydomain.com
# continue: Continue processing rules if this rule matches (default: False)
# You must have exactly one section with 'default = true'
# Note that all destinations listed must also exist in carbon.conf
# in the DESTINATIONS setting in the [relay] section
[default]
default = true
destinations = 127.0.0.1:2004:a, 127.0.0.1:2104:b
# This file defines regular expression patterns that can be used to
# rewrite metric names in a search & replace fashion. It consists of two
# sections, [pre] and [post]. The rules in the pre section are applied to
# metric names as soon as they are received. The post rules are applied
# after aggregation has taken place.
#
# The general form of each rule is as follows:
#
# regex-pattern = replacement-text
#
# For example:
#
# [post]
# _sum$ =
# _avg$ =
#
# These rules would strip off a suffix of _sum or _avg from any metric names
# after aggregation.
# Aggregation methods for whisper files. Entries are scanned in order,
# and first match wins. This file is scanned for changes every 60 seconds
#
# [name]
# pattern = <regex>
# xFilesFactor = <float between 0 and 1>
# aggregationMethod = <average|sum|last|max|min>
#
# name: Arbitrary unique name for the rule
# pattern: Regex pattern to match against the metric name
# xFilesFactor: Ratio of valid data points required for aggregation to the next retention to occur
# aggregationMethod: function to apply to data points for aggregation
#
[min]
pattern = \.min$
xFilesFactor = 0.1
aggregationMethod = min
[max]
pattern = \.max$
xFilesFactor = 0.1
aggregationMethod = max
[sum]
pattern = \.count$
xFilesFactor = 0
aggregationMethod = sum
[default_average]
pattern = .*
xFilesFactor = 0.5
aggregationMethod = average
# Schema definitions for Whisper files. Entries are scanned in order,
# and first match wins. This file is scanned for changes every 60 seconds.
#
# [name]
# pattern = regex
# retentions = timePerPoint:timeToStore, timePerPoint:timeToStore, ...
# Carbon's internal metrics. This entry should match what is specified in
# CARBON_METRIC_PREFIX and CARBON_METRIC_INTERVAL settings
[carbon]
pattern = ^carbon\.
retentions = 60:90d
[default_1min_for_1day]
pattern = .*
retentions = 60s:1d
# This file takes a single regular expression per line
# If USE_WHITELIST is set to True in carbon.conf, only metrics received which
# match one of these expressions will be persisted. If this file is empty or
# missing, all metrics will pass through.
# This file is reloaded automatically when changes are made
.*
import time
from twisted.internet.task import LoopingCall
from carbon.conf import settings
from carbon import log
class BufferManager:
def __init__(self):
self.buffers = {}
def __len__(self):
return len(self.buffers)
def get_buffer(self, metric_path):
if metric_path not in self.buffers:
log.aggregator("Allocating new metric buffer for %s" % metric_path)
self.buffers[metric_path] = MetricBuffer(metric_path)
return self.buffers[metric_path]
def clear(self):
for buffer in self.buffers.values():
buffer.close()
self.buffers.clear()
class MetricBuffer:
__slots__ = ('metric_path', 'interval_buffers', 'compute_task', 'configured',
'aggregation_frequency', 'aggregation_func')
def __init__(self, metric_path):
self.metric_path = metric_path
self.interval_buffers = {}
self.compute_task = None
self.configured = False
self.aggregation_frequency = None
self.aggregation_func = None
def input(self, datapoint):
(timestamp, value) = datapoint
interval = timestamp - (timestamp % self.aggregation_frequency)
if interval in self.interval_buffers:
buffer = self.interval_buffers[interval]
else:
buffer = self.interval_buffers[interval] = IntervalBuffer(interval)
buffer.input(datapoint)
def configure_aggregation(self, frequency, func):
self.aggregation_frequency = int(frequency)
self.aggregation_func = func
self.compute_task = LoopingCall(self.compute_value)
self.compute_task.start(frequency, now=False)
self.configured = True
def compute_value(self):
now = int( time.time() )
current_interval = now - (now % self.aggregation_frequency)
age_threshold = current_interval - (settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
for buffer in self.interval_buffers.values():
if buffer.active:
value = self.aggregation_func(buffer.values)
datapoint = (buffer.interval, value)
state.events.metricGenerated(self.metric_path, datapoint)
state.instrumentation.increment('aggregateDatapointsSent')
buffer.mark_inactive()
if buffer.interval < age_threshold:
del self.interval_buffers[buffer.interval]
def close(self):
if self.compute_task and self.compute_task.running:
self.compute_task.stop()
@property
def size(self):
return sum([len(buf.values) for buf in self.interval_buffers.values()])
class IntervalBuffer:
__slots__ = ('interval', 'values', 'active')
def __init__(self, interval):
self.interval = interval
self.values = []
self.active = True
def input(self, datapoint):
self.values.append( datapoint[1] )
self.active = True
def mark_inactive(self):
self.active = False
# Shared importable singleton
BufferManager = BufferManager()
# Avoid import circularity
from carbon import state
from carbon.instrumentation import increment
from carbon.aggregator.rules import RuleManager
from carbon.aggregator.buffers import BufferManager
from carbon.rewrite import RewriteRuleManager
from carbon import events
def process(metric, datapoint):
increment('datapointsReceived')
for rule in RewriteRuleManager.preRules:
metric = rule.apply(metric)
aggregate_metrics = []
for rule in RuleManager.rules:
aggregate_metric = rule.get_aggregate_metric(metric)
if aggregate_metric is None:
continue
else:
aggregate_metrics.append(aggregate_metric)
buffer = BufferManager.get_buffer(aggregate_metric)
if not buffer.configured:
buffer.configure_aggregation(rule.frequency, rule.aggregation_func)
buffer.input(datapoint)
for rule in RewriteRuleManager.postRules:
metric = rule.apply(metric)
if metric not in aggregate_metrics:
events.metricGenerated(metric, datapoint)
import time
import re
from os.path import exists, getmtime
from twisted.internet.task import LoopingCall
from carbon import log
from carbon.aggregator.buffers import BufferManager
class RuleManager:
def __init__(self):
self.rules = []
self.rules_file = None
self.read_task = LoopingCall(self.read_rules)
self.rules_last_read = 0.0
def clear(self):
self.rules = []
def read_from(self, rules_file):
self.rules_file = rules_file
self.read_rules()
self.read_task.start(10, now=False)
def read_rules(self):
if not exists(self.rules_file):
self.clear()
return
# Only read if the rules file has been modified
try:
mtime = getmtime(self.rules_file)
except:
log.err("Failed to get mtime of %s" % self.rules_file)
return
if mtime <= self.rules_last_read:
return
# Read new rules
log.aggregator("reading new aggregation rules from %s" % self.rules_file)
new_rules = []
for line in open(self.rules_file):
line = line.strip()
if line.startswith('#') or not line:
continue
rule = self.parse_definition(line)
new_rules.append(rule)
log.aggregator("clearing aggregation buffers")
BufferManager.clear()