Commit 24443997 authored by Julien Danjou's avatar Julien Danjou Committed by Pradeep Kilambi

collector: never allow to lose data

The current default allows to lose data very easily: if the dispatcher
fails to send data to the backend (e.g. Gnocchi is down), then the
dispatcher raises and the data are lost for ever. This is completely
unacceptable, and nobody should be able to configure Ceilometer in that
way.

This patch entirely remove that option, and switch the behavior to
something sane.

Change-Id: I45cb3da84eb2a785f46b3ec676c1a052ce999206
(cherry picked from commit 40684daf)
parent acd1fafe
......@@ -37,15 +37,6 @@ OPTS = [
cfg.PortOpt('udp_port',
default=4952,
help='Port to which the UDP socket is bound.'),
cfg.BoolOpt('requeue_sample_on_dispatcher_error',
default=False,
help='Requeue the sample on the collector sample queue '
'when the collector fails to dispatch it. This is only valid '
'if the sample come from the notifier publisher.'),
cfg.BoolOpt('requeue_event_on_dispatcher_error',
default=False,
help='Requeue the event on the collector event queue '
'when the collector fails to dispatch it.'),
cfg.IntOpt('batch_size',
default=1,
help='Number of notification messages to wait before '
......@@ -91,8 +82,7 @@ class CollectorService(os_service.Service):
messaging.get_batch_notification_listener(
transport, [sample_target],
[SampleEndpoint(self.meter_manager)],
allow_requeue=(cfg.CONF.collector.
requeue_sample_on_dispatcher_error),
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
self.sample_listener.start()
......@@ -104,8 +94,7 @@ class CollectorService(os_service.Service):
messaging.get_batch_notification_listener(
transport, [event_target],
[EventEndpoint(self.event_manager)],
allow_requeue=(cfg.CONF.collector.
requeue_event_on_dispatcher_error),
allow_requeue=True,
batch_size=cfg.CONF.collector.batch_size,
batch_timeout=cfg.CONF.collector.batch_timeout))
self.event_listener.start()
......@@ -158,9 +147,8 @@ class CollectorService(os_service.Service):
class CollectorEndpoint(object):
def __init__(self, dispatcher_manager, requeue_on_error):
def __init__(self, dispatcher_manager):
self.dispatcher_manager = dispatcher_manager
self.requeue_on_error = requeue_on_error
def sample(self, messages):
"""RPC endpoint for notification messages
......@@ -172,28 +160,16 @@ class CollectorEndpoint(object):
try:
self.dispatcher_manager.map_method(self.method, samples)
except Exception:
if self.requeue_on_error:
LOG.exception(_LE("Dispatcher failed to handle the %s, "
"requeue it."), self.ep_type)
return oslo_messaging.NotificationResult.REQUEUE
raise
LOG.exception(_LE("Dispatcher failed to handle the %s, "
"requeue it."), self.ep_type)
return oslo_messaging.NotificationResult.REQUEUE
class SampleEndpoint(CollectorEndpoint):
method = 'record_metering_data'
ep_type = 'sample'
def __init__(self, dispatcher_manager):
super(SampleEndpoint, self).__init__(
dispatcher_manager,
cfg.CONF.collector.requeue_sample_on_dispatcher_error)
class EventEndpoint(CollectorEndpoint):
method = 'record_events'
ep_type = 'event'
def __init__(self, dispatcher_manager):
super(EventEndpoint, self).__init__(
dispatcher_manager,
cfg.CONF.collector.requeue_event_on_dispatcher_error)
......@@ -236,46 +236,11 @@ class TestCollector(tests_base.BaseTestCase):
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_sample_requeue(self):
self.CONF.set_override('requeue_sample_on_dispatcher_error', True,
group='collector')
self._test_collector_requeue('sample_listener')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_event_requeue(self):
self.CONF.set_override('requeue_event_on_dispatcher_error', True,
group='collector')
self.CONF.set_override('store_events', True, group='notification')
self._test_collector_requeue('event_listener')
def _test_collector_no_requeue(self, listener):
mock_dispatcher = self._setup_fake_dispatcher()
self.srv.dispatcher_manager = dispatcher.load_dispatcher_manager()
mock_dispatcher.record_metering_data.side_effect = (FakeException
('boom'))
mock_dispatcher.record_events.side_effect = (FakeException
('boom'))
self.srv.start()
endp = getattr(self.srv, listener).dispatcher.endpoints[0]
self.assertRaises(FakeException, endp.sample, [
{'ctxt': {}, 'publisher_id': 'pub_id', 'event_type': 'event',
'payload': {}, 'metadata': {}}])
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_sample_no_requeue(self):
self.CONF.set_override('requeue_sample_on_dispatcher_error', False,
group='collector')
self._test_collector_no_requeue('sample_listener')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start',
mock.Mock())
@mock.patch.object(collector.CollectorService, 'start_udp', mock.Mock())
def test_collector_event_no_requeue(self):
self.CONF.set_override('requeue_event_on_dispatcher_error', False,
group='collector')
self.CONF.set_override('store_events', True, group='notification')
self._test_collector_no_requeue('event_listener')
---
critical:
- >
The previous configuration options default for
`requeue_sample_on_dispatcher_error' and
`requeue_event_on_dispatcher_error' allowed to lose data very easily: if
the dispatcher failed to send data to the backend (e.g. Gnocchi is down),
then the dispatcher raised and the data were lost forever. This was
completely unacceptable, and nobody should be able to configure Ceilometer
in that way."
upgrade:
- >
The options `requeue_event_on_dispatcher_error' and
`requeue_sample_on_dispatcher_error' have been enabled and removed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment