Commit 9e7e008e authored by Thomas Goirand's avatar Thomas Goirand

New upstream version 1.1.4

parent 1e79b413
Metadata-Version: 1.1
Name: carbon
Version: 1.0.2
Version: 1.1.4
Summary: Backend data caching and persistence daemon for Graphite
Home-page: http://graphiteapp.org/
Author: Chris Davis
......@@ -14,4 +14,10 @@ Classifier: License :: OSI Approved :: Apache Software License
Classifier: Programming Language :: Python
Classifier: Programming Language :: Python :: 2
Classifier: Programming Language :: Python :: 2.7
Classifier: Programming Language :: Python :: 2 :: Only
Classifier: Programming Language :: Python :: 3
Classifier: Programming Language :: Python :: 3.4
Classifier: Programming Language :: Python :: 3.5
Classifier: Programming Language :: Python :: 3.6
Classifier: Programming Language :: Python :: 3.7
Classifier: Programming Language :: Python :: Implementation :: CPython
Classifier: Programming Language :: Python :: Implementation :: PyPy
#!/usr/bin/env python
"""Copyright 2009 Chris Davis
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License."""
import sys
import os.path
# Figure out where we're installed
BIN_DIR = os.path.dirname(os.path.abspath(__file__))
ROOT_DIR = os.path.dirname(BIN_DIR)
# Make sure that carbon's 'lib' dir is in the $PYTHONPATH if we're running from
# source.
LIB_DIR = os.path.join(ROOT_DIR, "lib")
sys.path.insert(0, LIB_DIR)
from carbon.util import run_twistd_plugin # noqa
from carbon.exceptions import CarbonConfigException # noqa
try:
run_twistd_plugin(__file__)
except CarbonConfigException as exc:
raise SystemExit(str(exc))
......@@ -25,10 +25,10 @@ ROOT_DIR = os.path.dirname(BIN_DIR)
LIB_DIR = os.path.join(ROOT_DIR, "lib")
sys.path.insert(0, LIB_DIR)
from carbon.util import run_twistd_plugin
from carbon.exceptions import CarbonConfigException
from carbon.util import run_twistd_plugin # noqa
from carbon.exceptions import CarbonConfigException # noqa
try:
run_twistd_plugin(__file__)
except CarbonConfigException, exc:
except CarbonConfigException as exc:
raise SystemExit(str(exc))
......@@ -25,10 +25,10 @@ ROOT_DIR = os.path.dirname(BIN_DIR)
LIB_DIR = os.path.join(ROOT_DIR, "lib")
sys.path.insert(0, LIB_DIR)
from carbon.util import run_twistd_plugin
from carbon.exceptions import CarbonConfigException
from carbon.util import run_twistd_plugin # noqa
from carbon.exceptions import CarbonConfigException # noqa
try:
run_twistd_plugin(__file__)
except CarbonConfigException, exc:
except CarbonConfigException as exc:
raise SystemExit(str(exc))
......@@ -14,7 +14,6 @@ See the License for the specific language governing permissions and
limitations under the License."""
import sys
import imp
from os.path import dirname, join, abspath, exists
from optparse import OptionParser
......@@ -35,34 +34,36 @@ try:
except ImportError:
pass
from twisted.internet import stdio, reactor, defer
from twisted.protocols.basic import LineReceiver
from carbon.routers import ConsistentHashingRouter, RelayRulesRouter
from carbon.client import CarbonClientManager
from carbon import log, events
from twisted.internet import stdio, reactor, defer # noqa
from twisted.protocols.basic import LineReceiver # noqa
from carbon.routers import ConsistentHashingRouter, RelayRulesRouter # noqa
from carbon.client import CarbonClientManager # noqa
from carbon import log, events # noqa
option_parser = OptionParser(usage="%prog [options] <host:port:instance> <host:port:instance> ...")
option_parser.add_option('--debug', action='store_true', help="Log debug info to stdout")
option_parser.add_option('--keyfunc', help="Use a custom key function (path/to/module.py:myFunc)")
option_parser.add_option('--replication', type='int', default=1, help='Replication factor')
option_parser.add_option('--routing', default='consistent-hashing',
option_parser.add_option(
'--routing', default='consistent-hashing',
help='Routing method: "consistent-hashing" (default) or "relay"')
option_parser.add_option('--diverse-replicas', action='store_true', help="Spread replicas across diff. servers")
option_parser.add_option('--relayrules', default=default_relayrules,
help='relay-rules.conf file to use for relay routing')
option_parser.add_option(
'--diverse-replicas', action='store_true', help="Spread replicas across diff. servers")
option_parser.add_option(
'--relayrules', default=default_relayrules, help='relay-rules.conf file to use for relay routing')
options, args = option_parser.parse_args()
if not args:
print 'At least one host:port destination required\n'
print('At least one host:port destination required\n')
option_parser.print_usage()
raise SystemExit(1)
if options.routing not in ('consistent-hashing', 'relay'):
print "Invalid --routing value, must be one of:"
print " consistent-hashing"
print " relay"
print("Invalid --routing value, must be one of:")
print(" consistent-hashing")
print(" relay")
raise SystemExit(1)
destinations = []
......@@ -74,7 +75,7 @@ for arg in args:
instance = parts[2]
else:
instance = None
destinations.append( (host, port, instance) )
destinations.append((host, port, instance))
if options.debug:
log.logToStdout()
......@@ -87,7 +88,7 @@ elif options.routing == 'relay':
if exists(options.relayrules):
router = RelayRulesRouter(options.relayrules)
else:
print "relay rules file %s does not exist" % options.relayrules
print("relay rules file %s does not exist" % options.relayrules)
raise SystemExit(1)
client_manager = CarbonClientManager(router)
......@@ -104,26 +105,31 @@ class StdinMetricsReader(LineReceiver):
delimiter = '\n'
def lineReceived(self, line):
#log.msg("[DEBUG] lineReceived(): %s" % line)
# log.msg("[DEBUG] lineReceived(): %s" % line)
try:
(metric, value, timestamp) = line.split()
datapoint = (float(timestamp), float(value))
assert datapoint[1] == datapoint[1] # filter out NaNs
assert datapoint[1] == datapoint[1] # filter out NaNs
client_manager.sendDatapoint(metric, datapoint)
except ValueError:
log.err(None, 'Dropping invalid line: %s' % line)
def connectionLost(self, reason):
log.msg('stdin disconnected')
def startShutdown(results):
log.msg("startShutdown(%s)" % str(results))
allStopped = client_manager.stopAllClients()
allStopped.addCallback(shutdown)
firstConnectsAttempted.addCallback(startShutdown)
stdio.StandardIO( StdinMetricsReader() )
stdio.StandardIO(StdinMetricsReader())
exitCode = 0
def shutdown(results):
global exitCode
for success, result in results:
......@@ -133,5 +139,6 @@ def shutdown(results):
if reactor.running:
reactor.stop()
reactor.run()
raise SystemExit(exitCode)
......@@ -25,10 +25,10 @@ ROOT_DIR = os.path.dirname(BIN_DIR)
LIB_DIR = os.path.join(ROOT_DIR, "lib")
sys.path.insert(0, LIB_DIR)
from carbon.util import run_twistd_plugin
from carbon.exceptions import CarbonConfigException
from carbon.util import run_twistd_plugin # noqa
from carbon.exceptions import CarbonConfigException # noqa
try:
run_twistd_plugin(__file__)
except CarbonConfigException, exc:
except CarbonConfigException as exc:
raise SystemExit(str(exc))
......@@ -15,15 +15,19 @@ limitations under the License."""
import sys
import whisper
from os.path import dirname, exists, join, realpath
from ConfigParser import ConfigParser
from os.path import dirname, join, realpath
try:
from ConfigParser import ConfigParser
except ImportError:
from configparser import ConfigParser
if len(sys.argv) == 2:
SCHEMAS_FILE = sys.argv[1]
print "Loading storage-schemas configuration from: '%s'" % SCHEMAS_FILE
print("Loading storage-schemas configuration from: '%s'" % SCHEMAS_FILE)
else:
SCHEMAS_FILE = realpath(join(dirname(__file__), '..', 'conf', 'storage-schemas.conf'))
print "Loading storage-schemas configuration from default location at: '%s'" % SCHEMAS_FILE
print("Loading storage-schemas configuration from default location at: '%s'" % SCHEMAS_FILE)
config_parser = ConfigParser()
if not config_parser.read(SCHEMAS_FILE):
......@@ -32,7 +36,7 @@ if not config_parser.read(SCHEMAS_FILE):
errors_found = 0
for section in config_parser.sections():
print "Section '%s':" % section
print("Section '%s':" % section)
options = dict(config_parser.items(section))
retentions = options['retentions'].split(',')
......@@ -41,26 +45,30 @@ for section in config_parser.sections():
for retention in retentions:
try:
archives.append(whisper.parseRetentionDef(retention))
except ValueError, e:
print " - Error: Section '%s' contains an invalid item in its retention definition ('%s')" % \
except ValueError as e:
print(
" - Error: Section '%s' contains an invalid item in its retention definition ('%s')" %
(section, retention)
print " %s" % e.message
)
print(" %s" % e)
section_failed = True
if not section_failed:
try:
whisper.validateArchiveList(archives)
except whisper.InvalidConfiguration, e:
print " - Error: Section '%s' contains an invalid retention definition ('%s')" % \
except whisper.InvalidConfiguration as e:
print(
" - Error: Section '%s' contains an invalid retention definition ('%s')" %
(section, ','.join(retentions))
print " %s" % e.message
)
print(" %s" % e)
if section_failed:
errors_found += 1
else:
print " OK"
print(" OK")
if errors_found:
raise SystemExit( "Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE)
raise SystemExit("Storage-schemas configuration '%s' failed validation" % SCHEMAS_FILE)
print "Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE
print("Storage-schemas configuration '%s' is valid" % SCHEMAS_FILE)
......@@ -85,6 +85,12 @@ MAX_CREATES_PER_MINUTE = 50
# second.
MIN_TIMESTAMP_RESOLUTION = 1
# Set the minimum lag in seconds for a point to be written to the database
# in order to optimize batching. This means that each point will wait at least
# the duration of this lag before being written. Setting this to 0 disable the feature.
# This currently only works when using the timesorted write strategy.
# MIN_TIMESTAMP_LAG = 0
# Set the interface and port for the line (plain text) listener. Setting the
# interface to 0.0.0.0 listens on all interfaces. Port can be set to 0 to
# disable this listener if it is not required.
......@@ -131,7 +137,7 @@ CACHE_QUERY_PORT = 7002
USE_FLOW_CONTROL = True
# If enabled this setting is used to timeout metric client connection if no
# metrics have been sent in specified time in seconds
# metrics have been sent in specified time in seconds
#METRIC_CLIENT_IDLE_TIMEOUT = None
# By default, carbon-cache will log every whisper update and cache hit.
......@@ -201,7 +207,7 @@ WHISPER_FALLOCATE_CREATE = True
# On systems which has a large number of metrics, an amount of Whisper write(2)'s
# pageback sometimes cause disk thrashing due to memory shortage, so that abnormal
# disk reads occur. Enabling this option makes it possible to decrease useless
# disk reads occur. Enabling this option makes it possible to decrease useless
# page cache memory by posix_fadvise(2) with POSIX_FADVISE_RANDOM option.
# WHISPER_FADVISE_RANDOM = False
......@@ -296,6 +302,30 @@ WHISPER_FALLOCATE_CREATE = True
# Example: store everything
# BIND_PATTERNS = #
# URL of graphite-web instance, this is used to add incoming series to the tag database
# GRAPHITE_URL = http://127.0.0.1:80
# Tag support, when enabled carbon will make HTTP calls to graphite-web to update the tag index
# ENABLE_TAGS = True
# Tag update interval, this specifies how frequently updates to existing series will trigger
# an update to the tag index, the default setting is once every 100 updates
# TAG_UPDATE_INTERVAL = 100
# Tag hash filenames, this specifies whether tagged metric filenames should use the hash of the metric name
# or a human-readable name, using hashed names avoids issues with path length when using a large number of tags
# TAG_HASH_FILENAMES = True
# Tag batch size, this specifies the maximum number of series to be sent to graphite-web in a single batch
# TAG_BATCH_SIZE = 100
# Tag queue size, this specifies the maximum number of series to be queued for sending to graphite-web
# There are separate queues for new series and for updates to existing series
# TAG_QUEUE_SIZE = 10000
# Set to enable Sentry.io exception monitoring.
# RAVEN_DSN='YOUR_DSN_HERE'.
# To configure special settings for the carbon-cache instance 'b', uncomment this:
#[cache:b]
#LINE_RECEIVER_PORT = 2103
......@@ -341,7 +371,7 @@ REPLICATION_FACTOR = 1
# For REPLICATION_FACTOR >=2, set DIVERSE_REPLICAS to True to guarantee replicas
# across distributed hosts. With this setting disabled, it's possible that replicas
# may be sent to different caches on the same host. This has been the default
# may be sent to different caches on the same host. This has been the default
# behavior since introduction of 'consistent-hashing' relay method.
# Note that enabling this on an existing pre-0.9.14 cluster will require rebalancing
# your metrics across the cluster nodes using a tool like Carbonate.
......@@ -370,6 +400,36 @@ DESTINATIONS = 127.0.0.1:2004
# extended with CarbonClientFactory plugins and defaults to "pickle".
# DESTINATION_PROTOCOL = pickle
# This defines the wire transport, either none or ssl.
# If SSL is used any TCP connection will be upgraded to TLS1. The system's
# trust authority will be used unless DESTINATION_SSL_CA is specified in
# which case an alternative certificate authority chain will be used for
# verifying the remote certificate.
# To use SSL you'll need the cryptography, service_identity, and twisted >= 14
# DESTINATION_TRANSPORT = none
# DESTINATION_SSL_CA=/path/to/private-ca.crt
# This allows to have multiple connections per destinations, this will
# pool all the replicas of a single host in the same queue and distribute
# points accross these replicas instead of replicating them.
# The following example will balance the load between :0 and :1.
## DESTINATIONS = foo:2001:0, foo:2001:1
## RELAY_METHOD = rules
# Note: this is currently incompatible with USE_RATIO_RESET which gets
# disabled if this option is enabled.
# DESTINATIONS_POOL_REPLICAS = False
# When using consistent hashing it sometime makes sense to make
# the ring dynamic when you don't want to loose points when a
# single destination is down. Replication is an answer to that
# but it can be quite expensive.
# DYNAMIC_ROUTER = False
# Controls the number of connection attempts before marking a
# destination as down. We usually do one connection attempt per
# second.
# DYNAMIC_ROUTER_MAX_RETRIES = 5
# This is the maximum number of datapoints that can be queued up
# for a single destination. Once this limit is hit, we will
# stop accepting new data if USE_FLOW_CONTROL is True, otherwise
......@@ -413,7 +473,7 @@ TIME_TO_DEFER_SENDING = 0.0001
USE_FLOW_CONTROL = True
# If enabled this setting is used to timeout metric client connection if no
# metrics have been sent in specified time in seconds
# metrics have been sent in specified time in seconds
#METRIC_CLIENT_IDLE_TIMEOUT = None
# Set this to True to enable whitelisting and blacklisting of metrics in
......@@ -461,6 +521,21 @@ MIN_RESET_RATIO=0.9
# reset connections for no good reason.
MIN_RESET_INTERVAL=121
# Enable TCP Keep Alive (http://tldp.org/HOWTO/TCP-Keepalive-HOWTO/overview.html).
# Default settings will send a probe every 30s. Default is False.
# TCP_KEEPALIVE=True
# The interval between the last data packet sent (simple ACKs are not
# considered data) and the first keepalive probe; after the connection is marked
# to need keepalive, this counter is not used any further.
# TCP_KEEPIDLE=10
# The interval between subsequential keepalive probes, regardless of what
# the connection has exchanged in the meantime.
# TCP_KEEPINTVL=30
# The number of unacknowledged probes to send before considering the connection
# dead and notifying the application layer.
# TCP_KEEPCNT=2
[aggregator]
LINE_RECEIVER_INTERFACE = 0.0.0.0
LINE_RECEIVER_PORT = 2023
......@@ -511,7 +586,7 @@ MAX_QUEUE_SIZE = 10000
USE_FLOW_CONTROL = True
# If enabled this setting is used to timeout metric client connection if no
# metrics have been sent in specified time in seconds
# metrics have been sent in specified time in seconds
#METRIC_CLIENT_IDLE_TIMEOUT = None
# This defines the maximum "message size" between carbon daemons.
......@@ -559,3 +634,12 @@ MAX_AGGREGATION_INTERVALS = 5
# Specify the user to drop privileges to
# If this is blank carbon-aggregator runs as the user that invokes it
# USER =
# Part of the code, and particularly aggregator rules, need
# to cache metric names. To avoid leaking too much memory you
# can tweak the size of this cache. The default allow for 1M
# different metrics per rule (~200MiB).
# CACHE_METRIC_NAMES_MAX=1000000
# You can optionally set a ttl to this cache.
# CACHE_METRIC_NAMES_TTL=600
......@@ -24,7 +24,7 @@ aggregationMethod = max
[sum]
pattern = \.count$
xFilesFactor = 0
aggregationMethod = sum
aggregationMethod = max
[default_average]
pattern = .*
......
......@@ -14,6 +14,9 @@
# Valid: 60s:7d,300s:30d (300/60 = 5)
# Invalid: 180s:7d,300s:30d (300/180 = 3.333)
#
# This retention is set at the time the first metric is sent.
# Changing this file will not affect already-created .wsp files.
# Use whisper-resize.py to change existing data files.
# Carbon's internal metrics. This entry should match what is specified in
# CARBON_METRIC_PREFIX and CARBON_METRIC_INTERVAL settings
......
......@@ -4,7 +4,7 @@ from carbon.conf import settings
from carbon import log
class BufferManager:
class _BufferManager:
def __init__(self):
self.buffers = {}
......@@ -13,7 +13,7 @@ class BufferManager:
def get_buffer(self, metric_path):
if metric_path not in self.buffers:
log.aggregator("Allocating new metric buffer for %s" % metric_path)
log.debug("Allocating new metric buffer for %s" % metric_path)
self.buffers[metric_path] = MetricBuffer(metric_path)
return self.buffers[metric_path]
......@@ -51,16 +51,20 @@ class MetricBuffer:
self.aggregation_frequency = int(frequency)
self.aggregation_func = func
self.compute_task = LoopingCall(self.compute_value)
compute_frequency = min(settings['WRITE_BACK_FREQUENCY'], frequency) or frequency
if settings['WRITE_BACK_FREQUENCY'] is not None:
compute_frequency = min(settings['WRITE_BACK_FREQUENCY'], frequency)
else:
compute_frequency = frequency
self.compute_task.start(compute_frequency, now=False)
self.configured = True
def compute_value(self):
now = int( time.time() )
now = int(time.time())
current_interval = now - (now % self.aggregation_frequency)
age_threshold = current_interval - (settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
age_threshold = current_interval - (
settings['MAX_AGGREGATION_INTERVALS'] * self.aggregation_frequency)
for buffer in self.interval_buffers.values():
for buffer in list(self.interval_buffers.values()):
if buffer.active:
value = self.aggregation_func(buffer.values)
datapoint = (buffer.interval, value)
......@@ -93,7 +97,7 @@ class IntervalBuffer:
self.active = True
def input(self, datapoint):
self.values.append( datapoint[1] )
self.values.append(datapoint[1])
self.active = True
def mark_inactive(self):
......@@ -101,7 +105,7 @@ class IntervalBuffer:
# Shared importable singleton
BufferManager = BufferManager()
BufferManager = _BufferManager()
# Avoid import circularity
from carbon import state
from carbon import state # NOQA
......@@ -2,7 +2,6 @@ from carbon.aggregator.rules import RuleManager
from carbon.aggregator.buffers import BufferManager
from carbon.instrumentation import increment
from carbon.pipeline import Processor
from carbon.rewrite import PRE, POST, RewriteRuleManager
from carbon.conf import settings
from carbon import log
......@@ -13,9 +12,6 @@ class AggregationProcessor(Processor):
def process(self, metric, datapoint):
increment('datapointsReceived')
for rule in RewriteRuleManager.rules(PRE):
metric = rule.apply(metric)
aggregate_metrics = set()
for rule in RuleManager.rules:
......@@ -33,10 +29,8 @@ class AggregationProcessor(Processor):
values_buffer.input(datapoint)
for rule in RewriteRuleManager.rules(POST):
metric = rule.apply(metric)
if settings.FORWARD_ALL and metric not in aggregate_metrics:
if settings.LOG_AGGREGATOR_MISSES and len(aggregate_metrics) == 0:
log.msg("Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric)
log.msg(
"Couldn't match metric %s with any aggregation rule. Passing on un-aggregated." % metric)
yield (metric, datapoint)
import time
import re
from math import floor, ceil
from os.path import exists, getmtime
from twisted.internet.task import LoopingCall
from cachetools import TTLCache, LRUCache
from carbon import log
from carbon.conf import settings
from carbon.aggregator.buffers import BufferManager
class RuleManager:
def get_cache():
ttl = settings.CACHE_METRIC_NAMES_TTL
size = settings.CACHE_METRIC_NAMES_MAX
if ttl > 0 and size > 0:
return TTLCache(size, ttl)
elif size > 0:
return LRUCache(size)
else:
return dict()
class RuleManager(object):
def __init__(self):
self.rules = []
self.rules_file = None
......@@ -56,7 +72,7 @@ class RuleManager:
left_side, right_side = line.split('=', 1)
output_pattern, frequency = left_side.split()
method, input_pattern = right_side.split()
frequency = int( frequency.lstrip('(').rstrip(')') )
frequency = int(frequency.lstrip('(').rstrip(')'))
return AggregationRule(input_pattern, output_pattern, method, frequency)
except ValueError:
......@@ -64,7 +80,7 @@ class RuleManager:
raise
class AggregationRule:
class AggregationRule(object):
def __init__(self, input_pattern, output_pattern, method, frequency):
self.input_pattern = input_pattern
self.output_pattern = output_pattern
......@@ -77,7 +93,7 @@ class AggregationRule:
self.aggregation_func = AGGREGATION_METHODS[method]
self.build_regex()
self.build_template()
self.cache = {}
self.cache = get_cache()
def get_aggregate_metric(self, metric_path):
if metric_path in self.cache:
......@@ -95,10 +111,10 @@ class AggregationRule:
try:
result = self.output_template % extracted_fields
except TypeError:
log.err("Failed to interpolate template %s with fields %s" % (self.output_template, extracted_fields))
log.err("Failed to interpolate template %s with fields %s" % (
self.output_template, extracted_fields))
if result:
self.cache[metric_path] = result
self.cache[metric_path] = result
return result
def build_regex(self):
......@@ -110,8 +126,8 @@ class AggregationRule:
i = input_part.find('<<')
j = input_part.find('>>')
pre = input_part[:i]
post = input_part[j+2:]
field_name = input_part[i+2:j]
post = input_part[j + 2:]
field_name = input_part[i + 2:j]
regex_part = '%s(?P<%s>.+)%s' % (pre, field_name, post)
else:
......@@ -119,8 +135,8 @@ class AggregationRule:
j = input_part.find('>')
if i > -1 and j > i:
pre = input_part[:i]
post = input_part[j+1:]
field_name = input_part[i+1:j]
post = input_part[j + 1:]
field_name = input_part[i + 1:j]
regex_part = '%s(?P<%s>[^.]+)%s' % (pre, field_name, post)
elif input_part == '*':
regex_part = '[^.]+'
......@@ -138,18 +154,43 @@ class AggregationRule:
def avg(values):
if values:
return float( sum(values) ) / len(values)
return float(sum(values)) / len(values)
def count(values):
if values:
return len(values)
def percentile(factor):
def func(values):
if values:
values = sorted(values)
rank = factor * (len(values) - 1)
rank_left = int(floor(rank))
rank_right = int(ceil(rank))
if rank_left == rank_right:
return values[rank_left]
else:
return values[rank_left] * (rank_right - rank) + values[rank_right] * (rank - rank_left)
return func
AGGREGATION_METHODS = {
'sum' : sum,
'avg' : avg,
'min' : min,
'max' : max,
'count' : count
'sum': sum,
'avg': avg,
'min': min,
'max': max,
'p50': percentile(0.50),
'p75': percentile(0.75),
'p80': percentile(0.80),
'p90': percentile(0.90),
'p95': percentile(0.95),
'p99': percentile(0.99),
'p999': percentile(0.999),
'count': count,
}
# Importable singleton
......
......@@ -31,6 +31,7 @@ This program can be started standalone for testing or using carbon-cache.py
(see example config file provided)
"""
import sys
import os
import socket
from optparse import OptionParser
......@@ -39,9 +40,14 @@ from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.internet.protocol import ReconnectingClientFactory
from twisted.application.internet import TCPClient
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
# txamqp is currently not ported to py3
try:
from txamqp.protocol import AMQClient
from txamqp.client import TwistedDelegate
import txamqp.spec
except ImportError:
raise ImportError
try:
import carbon
......@@ -50,10 +56,10 @@ except ImportError:
LIB_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.insert(0, LIB_DIR)
import carbon.protocols #satisfy import order requirements
import carbon.protocols # NOQA satisfy import order requirements
from carbon.protocols import CarbonServerProtocol
from carbon.conf import settings
from carbon import log, events, instrumentation