Commit 9323f07f authored by Julien Danjou's avatar Julien Danjou

Remove deprecated storage drivers

Change-Id: I6b262dd440a72f25662b64d938ab9e5328709a97
parent 22138b59
- job:
name: ceilometer-dsvm-functional-mongodb
parent: legacy-dsvm-base
run: playbooks/legacy/ceilometer-dsvm-functional-mongodb/run
post-run: playbooks/legacy/ceilometer-dsvm-functional-mongodb/post
timeout: 7800
required-projects:
- openstack-infra/devstack-gate
- openstack/ceilometer
- job:
name: ceilometer-dsvm-functional-mysql
parent: legacy-dsvm-base
run: playbooks/legacy/ceilometer-dsvm-functional-mysql/run
post-run: playbooks/legacy/ceilometer-dsvm-functional-mysql/post
timeout: 7800
required-projects:
- openstack-infra/devstack-gate
- openstack/ceilometer
- job:
name: ceilometer-tox-py27-mongodb
parent: legacy-base
run: playbooks/legacy/ceilometer-tox-py27-mongodb/run
post-run: playbooks/legacy/ceilometer-tox-py27-mongodb/post
timeout: 2400
required-projects:
- openstack/requirements
- job:
name: ceilometer-tox-py27-mysql
parent: legacy-base
run: playbooks/legacy/ceilometer-tox-py27-mysql/run
post-run: playbooks/legacy/ceilometer-tox-py27-mysql/post
timeout: 2400
required-projects:
- openstack/requirements
- job:
name: ceilometer-tox-py27-postgresql
parent: legacy-base
run: playbooks/legacy/ceilometer-tox-py27-postgresql/run
post-run: playbooks/legacy/ceilometer-tox-py27-postgresql/post
timeout: 2400
required-projects:
- openstack/requirements
- job: - job:
name: grenade-dsvm-ceilometer name: grenade-dsvm-ceilometer
parent: legacy-dsvm-base parent: legacy-dsvm-base
...@@ -76,16 +29,6 @@ ...@@ -76,16 +29,6 @@
name: openstack/ceilometer name: openstack/ceilometer
check: check:
jobs: jobs:
- ceilometer-dsvm-functional-mongodb:
branches: ^stable/newton$
- ceilometer-dsvm-functional-mysql:
branches: ^stable/newton$
- ceilometer-tox-py27-mongodb:
branches: ^(?!stable/newton)
- ceilometer-tox-py27-mysql:
branches: ^(?!stable/newton)
- ceilometer-tox-py27-postgresql:
branches: ^(?!stable/newton)
- grenade-dsvm-ceilometer: - grenade-dsvm-ceilometer:
branches: ^(?!stable/newton).*$ branches: ^(?!stable/newton).*$
irrelevant-files: irrelevant-files:
...@@ -94,16 +37,6 @@ ...@@ -94,16 +37,6 @@
- telemetry-dsvm-integration-ceilometer - telemetry-dsvm-integration-ceilometer
gate: gate:
jobs: jobs:
- ceilometer-dsvm-functional-mongodb:
branches: ^stable/newton$
- ceilometer-dsvm-functional-mysql:
branches: ^stable/newton$
- ceilometer-tox-py27-mongodb:
branches: ^(?!stable/newton)
- ceilometer-tox-py27-mysql:
branches: ^(?!stable/newton)
- ceilometer-tox-py27-postgresql:
branches: ^(?!stable/newton)
- grenade-dsvm-ceilometer: - grenade-dsvm-ceilometer:
branches: ^(?!stable/newton).*$ branches: ^(?!stable/newton).*$
irrelevant-files: irrelevant-files:
......
libpq-dev [platform:dpkg]
libxml2-dev [platform:dpkg test] libxml2-dev [platform:dpkg test]
libxslt-devel [platform:rpm test] libxslt-devel [platform:rpm test]
libxslt1-dev [platform:dpkg test] libxslt1-dev [platform:dpkg test]
postgresql [platform:dpkg]
mysql-client [platform:dpkg]
mysql-server [platform:dpkg]
build-essential [platform:dpkg] build-essential [platform:dpkg]
libffi-dev [platform:dpkg] libffi-dev [platform:dpkg]
mongodb [platform:dpkg]
gettext [platform:dpkg] gettext [platform:dpkg]
...@@ -16,13 +16,9 @@ ...@@ -16,13 +16,9 @@
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log from oslo_log import log
from six import moves
import six.moves.urllib.parse as urlparse
import sqlalchemy as sa
import tenacity import tenacity
from ceilometer import service from ceilometer import service
from ceilometer import storage
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
...@@ -30,9 +26,6 @@ LOG = log.getLogger(__name__) ...@@ -30,9 +26,6 @@ LOG = log.getLogger(__name__)
def upgrade(): def upgrade():
conf = cfg.ConfigOpts() conf = cfg.ConfigOpts()
conf.register_cli_opts([ conf.register_cli_opts([
cfg.BoolOpt('skip-metering-database',
help='Skip metering database upgrade.',
default=False),
cfg.BoolOpt('skip-gnocchi-resource-types', cfg.BoolOpt('skip-gnocchi-resource-types',
help='Skip gnocchi resource-types upgrade.', help='Skip gnocchi resource-types upgrade.',
default=False), default=False),
...@@ -43,19 +36,6 @@ def upgrade(): ...@@ -43,19 +36,6 @@ def upgrade():
]) ])
service.prepare_service(conf=conf) service.prepare_service(conf=conf)
if conf.skip_metering_database:
LOG.info("Skipping metering database upgrade")
else:
url = (getattr(conf.database, 'metering_connection') or
conf.database.connection)
if url:
LOG.debug("Upgrading metering database")
storage.get_connection(conf, url).upgrade()
else:
LOG.info("Skipping metering database upgrade, "
"legacy database backend not configured.")
if conf.skip_gnocchi_resource_types: if conf.skip_gnocchi_resource_types:
LOG.info("Skipping Gnocchi resource types upgrade") LOG.info("Skipping Gnocchi resource types upgrade")
else: else:
...@@ -75,96 +55,3 @@ def upgrade(): ...@@ -75,96 +55,3 @@ def upgrade():
exceptions.SSLError, exceptions.SSLError,
)) ))
)(gnocchi_client.upgrade_resource_types, conf) )(gnocchi_client.upgrade_resource_types, conf)
def expirer():
conf = service.prepare_service()
if conf.database.metering_time_to_live > 0:
LOG.debug("Clearing expired metering data")
storage_conn = storage.get_connection_from_config(conf)
storage_conn.clear_expired_metering_data(
conf.database.metering_time_to_live)
else:
LOG.info("Nothing to clean, database metering time to live "
"is disabled")
def db_clean_legacy():
conf = cfg.ConfigOpts()
conf.register_cli_opts([
cfg.strOpt('confirm-drop-table',
short='n',
help='confirm to drop the legacy tables')])
if not conf.confirm_drop_table:
confirm = moves.input("Do you really want to drop the legacy "
"alarm and event tables? This will destroy "
"data definitively if it exist. Please type "
"'YES' to confirm: ")
if confirm != 'YES':
print("DB legacy cleanup aborted!")
return
service.prepare_service(conf=conf)
url = (getattr(conf.database, "metering_connection") or
conf.database.connection)
parsed = urlparse.urlparse(url)
if parsed.password:
masked_netloc = '****'.join(parsed.netloc.rsplit(parsed.password))
masked_url = parsed._replace(netloc=masked_netloc)
masked_url = urlparse.urlunparse(masked_url)
else:
masked_url = url
LOG.info('Starting to drop event, alarm and alarm history tables in '
'backend: %s', masked_url)
connection_scheme = parsed.scheme
conn = storage.get_connection_from_config(conf)
if connection_scheme in ('mysql', 'mysql+pymysql', 'postgresql',
'sqlite'):
engine = conn._engine_facade.get_engine()
meta = sa.MetaData(bind=engine)
for table_name in ('alarm', 'alarm_history',
'trait_text', 'trait_int',
'trait_float', 'trait_datetime',
'event', 'event_type'):
if engine.has_table(table_name):
table = sa.Table(table_name, meta, autoload=True)
table.drop()
LOG.info("Legacy %s table of SQL backend has been "
"dropped.", table_name)
else:
LOG.info('%s table does not exist.', table_name)
elif connection_scheme == 'hbase':
with conn.conn_pool.connection() as h_conn:
tables = h_conn.tables()
table_name_mapping = {'alarm': 'alarm',
'alarm_h': 'alarm history',
'event': 'event'}
for table_name in ('alarm', 'alarm_h', 'event'):
try:
if table_name in tables:
h_conn.disable_table(table_name)
h_conn.delete_table(table_name)
LOG.info("Legacy %s table of Hbase backend "
"has been dropped.",
table_name_mapping[table_name])
else:
LOG.info('%s table does not exist.',
table_name_mapping[table_name])
except Exception as e:
LOG.error('Error occurred while dropping alarm '
'tables of Hbase, %s', e)
elif connection_scheme == 'mongodb':
for table_name in ('alarm', 'alarm_history', 'event'):
if table_name in conn.db.conn.collection_names():
conn.db.conn.drop_collection(table_name)
LOG.info("Legacy %s table of Mongodb backend has been "
"dropped.", table_name)
else:
LOG.info('%s table does not exist.', table_name)
LOG.info('Legacy alarm and event tables cleanup done.')
#
# Copyright 2013 IBM
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from oslo_config import cfg
from oslo_log import log
import six
from stevedore import named
LOG = log.getLogger(__name__)
OPTS = [
cfg.MultiStrOpt('meter_dispatchers',
deprecated_name='dispatcher',
default=[],
deprecated_for_removal=True,
deprecated_reason='This option only be used in collector '
'service, the collector service has '
'been deprecated and will be removed '
'in the future, this should also be '
'deprecated for removal with collector '
'service.',
help='Dispatchers to process metering data.'),
cfg.MultiStrOpt('event_dispatchers',
default=[],
deprecated_name='dispatcher',
deprecated_for_removal=True,
deprecated_reason='This option only be used in collector '
'service, the collector service has '
'been deprecated and will be removed '
'in the future, this should also be '
'deprecated for removal with collector '
'service.',
help='Dispatchers to process event data.'),
]
def _load_dispatcher_manager(conf, dispatcher_type):
namespace = 'ceilometer.dispatcher.%s' % dispatcher_type
conf_name = '%s_dispatchers' % dispatcher_type
LOG.debug('loading dispatchers from %s', namespace)
# set propagate_map_exceptions to True to enable stevedore
# to propagate exceptions.
dispatcher_manager = named.NamedExtensionManager(
namespace=namespace,
names=getattr(conf, conf_name),
invoke_on_load=True,
invoke_args=[conf],
propagate_map_exceptions=True)
if not list(dispatcher_manager):
LOG.warning('Failed to load any dispatchers for %s',
namespace)
return dispatcher_manager
def load_dispatcher_manager(conf):
return (_load_dispatcher_manager(conf, 'meter'),
_load_dispatcher_manager(conf, 'event'))
class Base(object):
def __init__(self, conf):
self.conf = conf
@six.add_metaclass(abc.ABCMeta)
class MeterDispatcherBase(Base):
@abc.abstractmethod
def record_metering_data(self, data):
"""Recording metering data interface."""
@six.add_metaclass(abc.ABCMeta)
class EventDispatcherBase(Base):
@abc.abstractmethod
def record_events(self, events):
"""Record events."""
#
# Copyright 2013 IBM Corp
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_utils import timeutils
from ceilometer import dispatcher
from ceilometer import storage
LOG = log.getLogger(__name__)
class MeterDatabaseDispatcher(dispatcher.MeterDispatcherBase):
"""Dispatcher class for recording metering data into database.
The dispatcher class which records each meter into a database configured
in ceilometer configuration file.
To enable this dispatcher, the following section needs to be present in
ceilometer.conf file
[DEFAULT]
meter_dispatchers = database
"""
@property
def conn(self):
if not hasattr(self, "_conn"):
self._conn = storage.get_connection_from_config(
self.conf)
return self._conn
def record_metering_data(self, data):
# We may have receive only one counter on the wire
if not data:
return
if not isinstance(data, list):
data = [data]
for meter in data:
LOG.debug(
'metering data %(counter_name)s '
'for %(resource_id)s @ %(timestamp)s: %(counter_volume)s',
{'counter_name': meter['counter_name'],
'resource_id': meter['resource_id'],
'timestamp': meter.get('timestamp', 'NO TIMESTAMP'),
'counter_volume': meter['counter_volume']})
# Convert the timestamp to a datetime instance.
# Storage engines are responsible for converting
# that value to something they can store.
if meter.get('timestamp'):
ts = timeutils.parse_isotime(meter['timestamp'])
meter['timestamp'] = timeutils.normalize_time(ts)
try:
self.conn.record_metering_data_batch(data)
except Exception as err:
LOG.error('Failed to record %(len)s: %(err)s.',
{'len': len(data), 'err': err})
raise
...@@ -15,15 +15,39 @@ ...@@ -15,15 +15,39 @@
from oslo_utils import timeutils from oslo_utils import timeutils
import six import six
from ceilometer.storage import base
def serialize_dt(value): def serialize_dt(value):
"""Serializes parameter if it is datetime.""" """Serializes parameter if it is datetime."""
return value.isoformat() if hasattr(value, 'isoformat') else value return value.isoformat() if hasattr(value, 'isoformat') else value
class Event(base.Model): class Model(object):
"""Base class for storage API models."""
def __init__(self, **kwds):
self.fields = list(kwds)
for k, v in six.iteritems(kwds):
setattr(self, k, v)
def as_dict(self):
d = {}
for f in self.fields:
v = getattr(self, f)
if isinstance(v, Model):
v = v.as_dict()
elif isinstance(v, list) and v and isinstance(v[0], Model):
v = [sub.as_dict() for sub in v]
d[f] = v
return d
def __eq__(self, other):
return self.as_dict() == other.as_dict()
def __ne__(self, other):
return not self.__eq__(other)
class Event(Model):
"""A raw event from the source system. Events have Traits. """A raw event from the source system. Events have Traits.
Metrics will be derived from one or more Events. Metrics will be derived from one or more Events.
...@@ -45,8 +69,8 @@ class Event(base.Model): ...@@ -45,8 +69,8 @@ class Event(base.Model):
:param traits: list of Traits on this Event. :param traits: list of Traits on this Event.
:param raw: Unindexed raw notification details. :param raw: Unindexed raw notification details.
""" """
base.Model.__init__(self, message_id=message_id, event_type=event_type, Model.__init__(self, message_id=message_id, event_type=event_type,
generated=generated, traits=traits, raw=raw) generated=generated, traits=traits, raw=raw)
def append_trait(self, trait_model): def append_trait(self, trait_model):
self.traits.append(trait_model) self.traits.append(trait_model)
...@@ -67,7 +91,7 @@ class Event(base.Model): ...@@ -67,7 +91,7 @@ class Event(base.Model):
'raw': self.raw} 'raw': self.raw}
class Trait(base.Model): class Trait(Model):
"""A Trait is a key/value pair of data on an Event. """A Trait is a key/value pair of data on an Event.
The value is variant record of basic data types (int, date, float, etc). The value is variant record of basic data types (int, date, float, etc).
...@@ -90,7 +114,7 @@ class Trait(base.Model): ...@@ -90,7 +114,7 @@ class Trait(base.Model):
def __init__(self, name, dtype, value): def __init__(self, name, dtype, value):
if not dtype: if not dtype:
dtype = Trait.NONE_TYPE dtype = Trait.NONE_TYPE
base.Model.__init__(self, name=name, dtype=dtype, value=value) Model.__init__(self, name=name, dtype=dtype, value=value)
def __repr__(self): def __repr__(self):
return "<Trait: %s %d %s>" % (self.name, self.dtype, self.value) return "<Trait: %s %d %s>" % (self.name, self.dtype, self.value)
......
...@@ -23,7 +23,6 @@ import ceilometer.compute.virt.inspector ...@@ -23,7 +23,6 @@ import ceilometer.compute.virt.inspector
import ceilometer.compute.virt.libvirt.utils import ceilometer.compute.virt.libvirt.utils
import ceilometer.compute.virt.vmware.inspector import ceilometer.compute.virt.vmware.inspector
import ceilometer.compute.virt.xenapi.inspector import ceilometer.compute.virt.xenapi.inspector
import ceilometer.dispatcher
import ceilometer.event.converter import ceilometer.event.converter
import ceilometer.hardware.discovery import ceilometer.hardware.discovery
import ceilometer.hardware.pollsters.generic import ceilometer.hardware.pollsters.generic
...@@ -42,7 +41,6 @@ import ceilometer.pipeline ...@@ -42,7 +41,6 @@ import ceilometer.pipeline
import ceilometer.publisher.messaging import ceilometer.publisher.messaging
import ceilometer.publisher.utils import ceilometer.publisher.utils
import ceilometer.sample import ceilometer.sample
import ceilometer.storage
import ceilometer.utils import ceilometer.utils
import ceilometer.volume.discovery import ceilometer.volume.discovery
...@@ -75,7 +73,6 @@ def list_opts(): ...@@ -75,7 +73,6 @@ def list_opts():
itertools.chain(ceilometer.agent.manager.OPTS, itertools.chain(ceilometer.agent.manager.OPTS,
ceilometer.compute.virt.inspector.OPTS, ceilometer.compute.virt.inspector.OPTS,
ceilometer.compute.virt.libvirt.utils.OPTS, ceilometer.compute.virt.libvirt.utils.OPTS,
ceilometer.dispatcher.OPTS,
ceilometer.objectstore.swift.OPTS, ceilometer.objectstore.swift.OPTS,
ceilometer.pipeline.OPTS, ceilometer.pipeline.OPTS,
ceilometer.sample.OPTS, ceilometer.sample.OPTS,
...@@ -96,7 +93,6 @@ def list_opts(): ...@@ -96,7 +93,6 @@ def list_opts():
help='Number of seconds between checks to see if group ' help='Number of seconds between checks to see if group '
'membership has changed'), 'membership has changed'),
]), ]),
('database', ceilometer.storage.OPTS),
('dispatcher_gnocchi', ( ('dispatcher_gnocchi', (
cfg.StrOpt( cfg.StrOpt(
'filter_project', 'filter_project',
......
#
# Copyright 2015 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
import six.moves.urllib.parse as urlparse
from stevedore import driver
import stevedore.exception
from ceilometer import publisher
from ceilometer.publisher import utils
LOG = log.getLogger(__name__)
class DirectPublisher(publisher.ConfigPublisherBase):
"""A publisher that allows saving directly from the pipeline.
Samples are saved to a configured dispatcher. This is useful
where it is desirable to limit the number of external services that
are required.
By default, the database dispatcher is used to select another one we
can use direct://?dispatcher=name_of_dispatcher, ...
"""
def __init__(self, conf, parsed_url):
super(DirectPublisher, self).__init__(conf, parsed_url)
default_dispatcher = parsed_url.scheme
if default_dispatcher == 'direct':
LOG.warning('Direct publisher is deprecated for removal. Use '
'an explicit publisher instead, e.g. '
'"database", "file", ...')
default_dispatcher = 'database'
options = urlparse.parse_qs(parsed_url.query)
self.dispatcher_name = options.get('dispatcher',
[default_dispatcher])[-1]
self._sample_dispatcher = None
self._event_dispatcher = None
try:
self.sample_driver = driver.DriverManager(
'ceilometer.dispatcher.meter', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.sample_driver = None
try:
self.event_driver = driver.DriverManager(
'ceilometer.dispatcher.event', self.dispatcher_name).driver
except stevedore.exception.NoMatches:
self.event_driver = None
def get_sample_dispatcher(self):
if not self._sample_dispatcher:
self._sample_dispatcher = self.sample_driver(self.conf)
return self._sample_dispatcher
def get_event_dispatcher(self):
if not self._event_dispatcher:
if self.event_driver != self.sample_driver:
self._event_dispatcher = self.event_driver(self.conf)
else:
self._event_dispatcher = self.get_sample_dispatcher()
return self._event_dispatcher
def publish_samples(self, samples):
if not self.sample_driver:
LOG.error("Can't publish samples to a non-existing dispatcher "
"'%s'", self.dispatcher_name)
return
if not isinstance(samples, list):
samples = [samples]
# not published externally; skip signing
self.get_sample_dispatcher().record_metering_data([
utils.meter_message_from_counter(sample, secret=None)
for sample in samples])
def publish_events(self, events):
if not self.event_driver:
LOG.error("Can't publish events to a non-existing dispatcher "
"'%s'", self.dispatcher_name)
return
if not isinstance(events, list):
events = [events]
# not published externally; skip signing
self.get_event_dispatcher().record_events([
utils.message_from_event(event, secret=None) for event in events])
...@@ -15,7 +15,6 @@ ...@@ -15,7 +15,6 @@
import sys import sys