Commit c44759fc authored by Zuul's avatar Zuul Committed by Gerrit Code Review

Merge "utils: move kill_listeners to ceilometer.notification"

parents 575049e6 8197bb21
......@@ -235,8 +235,7 @@ class NotificationService(cotyledon.Service):
targets.append(oslo_messaging.Target(topic=topic))
if self.pipeline_listener:
self.pipeline_listener.stop()
self.pipeline_listener.wait()
self.kill_listeners([self.pipeline_listener])
self.pipeline_listener = messaging.get_batch_notification_listener(
self.transport, targets, endpoints,
......@@ -248,6 +247,15 @@ class NotificationService(cotyledon.Service):
else self.conf.max_parallel_requests)
self.pipeline_listener.start(override_pool_size=batch)
@staticmethod
def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining
# messages and closes connection
for listener in listeners:
listener.stop()
listener.wait()
def terminate(self):
self.shutdown = True
if self.periodic:
......@@ -257,7 +265,7 @@ class NotificationService(cotyledon.Service):
self.partition_coordinator.stop()
with self.coord_lock:
if self.pipeline_listener:
utils.kill_listeners([self.pipeline_listener])
utils.kill_listeners(self.listeners)
self.kill_listeners([self.pipeline_listener])
self.kill_listeners(self.listeners)
super(NotificationService, self).terminate()
......@@ -258,7 +258,6 @@ class TestRealNotificationHA(BaseRealNotification):
fake_publisher_cls.return_value = self.publisher
self._check_notification_service()
@mock.patch("ceilometer.utils.kill_listeners", mock.MagicMock())
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'stop')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'wait')
@mock.patch.object(oslo_messaging.MessageHandlingServer, 'start')
......
......@@ -54,15 +54,6 @@ def hash_of_set(s):
return str(hash(frozenset(s)))
def kill_listeners(listeners):
# NOTE(gordc): correct usage of oslo.messaging listener is to stop(),
# which stops new messages, and wait(), which processes remaining
# messages and closes connection
for listener in listeners:
listener.stop()
listener.wait()
def spawn_thread(target, *args, **kwargs):
t = threading.Thread(target=target, args=args, kwargs=kwargs)
t.daemon = True
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment