Commit 50114306 authored by Dan Travis's avatar Dan Travis

Adds timestamp option to Aggregation transformer

Adds an argument to the Aggregation transformer constructor
that allows a user to specify whether to include the timestamp
from either the first or last sample received for a given
aggregated sample.

This addresses an issue with transformer chaining where incorrect
values will sometimes be produced by the Rate of Change
transformer when chaining the Aggregation transformer with the
Rate of Change transformer.

Change-Id: Ib163a80a7e6ddaf58d7cc555fb4f4d87d570b1a1
Closes-Bug: #1539163
parent 9880862e
......@@ -18,10 +18,12 @@
# under the License.
import abc
import copy
import datetime
import traceback
import mock
from oslo_context import context
from oslo_utils import timeutils
from oslotest import base
from oslotest import mockpatch
......@@ -1660,6 +1662,65 @@ class BasePipelineTestCase(base.BaseTestCase):
self.assertEqual("test_resource", getattr(publisher.samples[0],
'resource_id'))
def test_aggregator_to_rate_of_change_transformer_two_resources(self):
resource_id = ['1ca738a1-c49c-4401-8346-5c60ebdb03f4',
'5dd418a6-c6a9-49c9-9cef-b357d72c71dd']
aggregator = conversions.AggregatorTransformer(size="2",
timestamp="last")
rate_of_change_transformer = conversions.RateOfChangeTransformer()
counter_time = timeutils.parse_isotime('2016-01-01T12:00:00+00:00')
for offset in range(2):
counter = copy.copy(self.test_counter)
counter.timestamp = timeutils.isotime(counter_time)
counter.resource_id = resource_id[0]
counter.volume = offset
counter.type = sample.TYPE_CUMULATIVE
counter.unit = 'ns'
aggregator.handle_sample(context.get_admin_context(), counter)
if offset == 1:
test_time = counter_time
counter_time = counter_time + datetime.timedelta(0, 1)
aggregated_counters = aggregator.flush(context.get_admin_context())
self.assertEqual(len(aggregated_counters), 1)
self.assertEqual(aggregated_counters[0].timestamp,
timeutils.isotime(test_time))
rate_of_change_transformer.handle_sample(context.get_admin_context(),
aggregated_counters[0])
for offset in range(2):
counter = copy.copy(self.test_counter)
counter.timestamp = timeutils.isotime(counter_time)
counter.resource_id = resource_id[offset]
counter.volume = 2
counter.type = sample.TYPE_CUMULATIVE
counter.unit = 'ns'
aggregator.handle_sample(context.get_admin_context(), counter)
if offset == 0:
test_time = counter_time
counter_time = counter_time + datetime.timedelta(0, 1)
aggregated_counters = aggregator.flush(context.get_admin_context())
self.assertEqual(len(aggregated_counters), 2)
for counter in aggregated_counters:
if counter.resource_id == resource_id[0]:
rateOfChange = rate_of_change_transformer.handle_sample(
context.get_admin_context(), counter)
self.assertEqual(counter.timestamp,
timeutils.isotime(test_time))
self.assertEqual(rateOfChange.volume, 1)
def _do_test_arithmetic_expr_parse(self, expr, expected):
actual = arithmetic.ArithmeticTransformer.parse_expr(expr)
self.assertEqual(expected, actual)
......
......@@ -63,6 +63,32 @@ class AggregatorTransformerTestCase(base.BaseTestCase):
self.assertRaises(ValueError, conversions.AggregatorTransformer,
"2", "abc", None, None, None)
def test_init_no_timestamp(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None)
self.assertEqual("first", aggregator.timestamp)
def test_init_timestamp_none(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None, None)
self.assertEqual("first", aggregator.timestamp)
def test_init_timestamp_first(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None, "first")
self.assertEqual("first", aggregator.timestamp)
def test_init_timestamp_last(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None, "last")
self.assertEqual("last", aggregator.timestamp)
def test_init_timestamp_invalid(self):
aggregator = conversions.AggregatorTransformer("1", "1", None,
None, None,
"invalid_option")
self.assertEqual("first", aggregator.timestamp)
def test_size_unbounded(self):
aggregator = conversions.AggregatorTransformer(size="0",
retention_time="300")
......
......@@ -236,11 +236,17 @@ class AggregatorTransformer(ScalingTransformer):
AggregatorTransformer(size=15, user_id='first',
resource_metadata='drop')
To keep the timestamp of the last received sample rather
than the first:
AggregatorTransformer(timestamp="last")
"""
def __init__(self, size=1, retention_time=None,
project_id=None, user_id=None, resource_metadata="last",
**kwargs):
timestamp="first", **kwargs):
super(AggregatorTransformer, self).__init__(**kwargs)
self.samples = {}
self.counts = collections.defaultdict(int)
......@@ -249,6 +255,11 @@ class AggregatorTransformer(ScalingTransformer):
if not (self.size or self.retention_time):
self.size = 1
if timestamp in ["first", "last"]:
self.timestamp = timestamp
else:
self.timestamp = "first"
self.initial_timestamp = None
self.aggregated_samples = 0
......@@ -295,6 +306,8 @@ class AggregatorTransformer(ScalingTransformer):
'resource_metadata'] == 'drop':
self.samples[key].resource_metadata = {}
else:
if self.timestamp == "last":
self.samples[key].timestamp = sample_.timestamp
if sample_.type == sample.TYPE_CUMULATIVE:
self.samples[key].volume = self._scale(sample_)
else:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment