Commit 9975e063 authored by Scott Moser's avatar Scott Moser

Add initial reporting module and events

parents 328cc7fb 95bfe5d5
......@@ -59,6 +59,7 @@
- _read_dmi_syspath: fix bad log message causing unintended exception
- rsyslog: add additional configuration mode (LP: #1478103)
- status_wrapper in main: fix use of print_exc when handling exception
- reporting: add reporting module for web hook or logging of events.
0.7.6:
- open 0.7.6
- Enable vendordata on CloudSigma datasource (LP: #1303986)
......
......@@ -46,6 +46,7 @@ from cloudinit import sources
from cloudinit import stages
from cloudinit import templater
from cloudinit import util
from cloudinit import reporting
from cloudinit import version
from cloudinit.settings import (PER_INSTANCE, PER_ALWAYS, PER_ONCE,
......@@ -136,6 +137,11 @@ def run_module_section(mods, action_name, section):
return failures
def apply_reporting_cfg(cfg):
if cfg.get('reporting'):
reporting.update_configuration(cfg.get('reporting'))
def main_init(name, args):
deps = [sources.DEP_FILESYSTEM, sources.DEP_NETWORK]
if args.local:
......@@ -171,7 +177,7 @@ def main_init(name, args):
w_msg = welcome_format(name)
else:
w_msg = welcome_format("%s-local" % (name))
init = stages.Init(deps)
init = stages.Init(ds_deps=deps, reporter=args.reporter)
# Stage 1
init.read_cfg(extract_fns(args))
# Stage 2
......@@ -190,6 +196,7 @@ def main_init(name, args):
" longer be active shortly"))
logging.resetLogging()
logging.setupLogging(init.cfg)
apply_reporting_cfg(init.cfg)
# Any log usage prior to setupLogging above did not have local user log
# config applied. We send the welcome message now, as stderr/out have
......@@ -282,8 +289,10 @@ def main_init(name, args):
util.logexc(LOG, "Consuming user data failed!")
return (init.datasource, ["Consuming user data failed!"])
apply_reporting_cfg(init.cfg)
# Stage 8 - re-read and apply relevant cloud-config to include user-data
mods = stages.Modules(init, extract_fns(args))
mods = stages.Modules(init, extract_fns(args), reporter=args.reporter)
# Stage 9
try:
outfmt_orig = outfmt
......@@ -313,7 +322,7 @@ def main_modules(action_name, args):
# 5. Run the modules for the given stage name
# 6. Done!
w_msg = welcome_format("%s:%s" % (action_name, name))
init = stages.Init(ds_deps=[])
init = stages.Init(ds_deps=[], reporter=args.reporter)
# Stage 1
init.read_cfg(extract_fns(args))
# Stage 2
......@@ -328,7 +337,7 @@ def main_modules(action_name, args):
if not args.force:
return [(msg)]
# Stage 3
mods = stages.Modules(init, extract_fns(args))
mods = stages.Modules(init, extract_fns(args), reporter=args.reporter)
# Stage 4
try:
LOG.debug("Closing stdin")
......@@ -342,6 +351,7 @@ def main_modules(action_name, args):
" longer be active shortly"))
logging.resetLogging()
logging.setupLogging(mods.cfg)
apply_reporting_cfg(init.cfg)
# now that logging is setup and stdout redirected, send welcome
welcome(name, msg=w_msg)
......@@ -366,7 +376,7 @@ def main_single(name, args):
# 6. Done!
mod_name = args.name
w_msg = welcome_format(name)
init = stages.Init(ds_deps=[])
init = stages.Init(ds_deps=[], reporter=args.reporter)
# Stage 1
init.read_cfg(extract_fns(args))
# Stage 2
......@@ -383,7 +393,7 @@ def main_single(name, args):
if not args.force:
return 1
# Stage 3
mods = stages.Modules(init, extract_fns(args))
mods = stages.Modules(init, extract_fns(args), reporter=args.reporter)
mod_args = args.module_args
if mod_args:
LOG.debug("Using passed in arguments %s", mod_args)
......@@ -404,6 +414,7 @@ def main_single(name, args):
" longer be active shortly"))
logging.resetLogging()
logging.setupLogging(mods.cfg)
apply_reporting_cfg(init.cfg)
# now that logging is setup and stdout redirected, send welcome
welcome(name, msg=w_msg)
......@@ -549,6 +560,8 @@ def main():
' found (use at your own risk)'),
dest='force',
default=False)
parser.set_defaults(reporter=None)
subparsers = parser.add_subparsers()
# Each action and its sub-options (if any)
......@@ -595,6 +608,9 @@ def main():
help=("frequency of the module"),
required=False,
choices=list(FREQ_SHORT_NAMES.keys()))
parser_single.add_argument("--report", action="store_true",
help="enable reporting",
required=False)
parser_single.add_argument("module_args", nargs="*",
metavar='argument',
help=('any additional arguments to'
......@@ -617,8 +633,27 @@ def main():
if name in ("modules", "init"):
functor = status_wrapper
return util.log_time(logfunc=LOG.debug, msg="cloud-init mode '%s'" % name,
get_uptime=True, func=functor, args=(name, args))
report_on = True
if name == "init":
if args.local:
rname, rdesc = ("init-local", "searching for local datasources")
else:
rname, rdesc = ("init-network",
"searching for network datasources")
elif name == "modules":
rname, rdesc = ("modules-%s" % args.mode,
"running modules for %s" % args.mode)
elif name == "single":
rname, rdesc = ("single/%s" % args.name,
"running single module %s" % args.name)
report_on = args.report
args.reporter = reporting.ReportEventStack(
rname, rdesc, reporting_enabled=report_on)
with args.reporter:
return util.log_time(
logfunc=LOG.debug, msg="cloud-init mode '%s'" % name,
get_uptime=True, func=functor, args=(name, args))
if __name__ == '__main__':
......
......@@ -24,6 +24,7 @@ import copy
import os
from cloudinit import log as logging
from cloudinit import reporting
LOG = logging.getLogger(__name__)
......@@ -40,12 +41,18 @@ LOG = logging.getLogger(__name__)
class Cloud(object):
def __init__(self, datasource, paths, cfg, distro, runners):
def __init__(self, datasource, paths, cfg, distro, runners, reporter=None):
self.datasource = datasource
self.paths = paths
self.distro = distro
self._cfg = cfg
self._runners = runners
if reporter is None:
reporter = reporting.ReportEventStack(
name="unnamed-cloud-reporter",
description="unnamed-cloud-reporter",
reporting_enabled=False)
self.reporter = reporter
# If a 'user' manipulates logging or logging services
# it is typically useful to cause the logging to be
......
# Copyright 2015 Canonical Ltd.
# This file is part of cloud-init. See LICENCE file for license information.
#
# vi: ts=4 expandtab
import copy
class DictRegistry(object):
"""A simple registry for a mapping of objects."""
def __init__(self):
self.reset()
def reset(self):
self._items = {}
def register_item(self, key, item):
"""Add item to the registry."""
if key in self._items:
raise ValueError(
'Item already registered with key {0}'.format(key))
self._items[key] = item
def unregister_item(self, key, force=True):
"""Remove item from the registry."""
if key in self._items:
del self._items[key]
elif not force:
raise KeyError("%s: key not present to unregister" % key)
@property
def registered_items(self):
"""All the items that have been registered.
This cannot be used to modify the contents of the registry.
"""
return copy.copy(self._items)
# Copyright 2015 Canonical Ltd.
# This file is part of cloud-init. See LICENCE file for license information.
#
# vi: ts=4 expandtab
"""
cloud-init reporting framework
The reporting framework is intended to allow all parts of cloud-init to
report events in a structured manner.
"""
from ..registry import DictRegistry
from ..reporting.handlers import available_handlers
FINISH_EVENT_TYPE = 'finish'
START_EVENT_TYPE = 'start'
DEFAULT_CONFIG = {
'logging': {'type': 'log'},
}
class _nameset(set):
def __getattr__(self, name):
if name in self:
return name
raise AttributeError("%s not a valid value" % name)
status = _nameset(("SUCCESS", "WARN", "FAIL"))
class ReportingEvent(object):
"""Encapsulation of event formatting."""
def __init__(self, event_type, name, description):
self.event_type = event_type
self.name = name
self.description = description
def as_string(self):
"""The event represented as a string."""
return '{0}: {1}: {2}'.format(
self.event_type, self.name, self.description)
def as_dict(self):
"""The event represented as a dictionary."""
return {'name': self.name, 'description': self.description,
'event_type': self.event_type}
class FinishReportingEvent(ReportingEvent):
def __init__(self, name, description, result=status.SUCCESS):
super(FinishReportingEvent, self).__init__(
FINISH_EVENT_TYPE, name, description)
self.result = result
if result not in status:
raise ValueError("Invalid result: %s" % result)
def as_string(self):
return '{0}: {1}: {2}: {3}'.format(
self.event_type, self.name, self.result, self.description)
def as_dict(self):
"""The event represented as json friendly."""
data = super(FinishReportingEvent, self).as_dict()
data['result'] = self.result
return data
def update_configuration(config):
"""Update the instanciated_handler_registry.
:param config:
The dictionary containing changes to apply. If a key is given
with a False-ish value, the registered handler matching that name
will be unregistered.
"""
for handler_name, handler_config in config.items():
if not handler_config:
instantiated_handler_registry.unregister_item(
handler_name, force=True)
continue
registered = instantiated_handler_registry.registered_items
handler_config = handler_config.copy()
cls = available_handlers.registered_items[handler_config.pop('type')]
instantiated_handler_registry.unregister_item(handler_name)
instance = cls(**handler_config)
instantiated_handler_registry.register_item(handler_name, instance)
def report_event(event):
"""Report an event to all registered event handlers.
This should generally be called via one of the other functions in
the reporting module.
:param event_type:
The type of the event; this should be a constant from the
reporting module.
"""
for _, handler in instantiated_handler_registry.registered_items.items():
handler.publish_event(event)
def report_finish_event(event_name, event_description,
result=status.SUCCESS):
"""Report a "finish" event.
See :py:func:`.report_event` for parameter details.
"""
event = FinishReportingEvent(event_name, event_description, result)
return report_event(event)
def report_start_event(event_name, event_description):
"""Report a "start" event.
:param event_name:
The name of the event; this should be a topic which events would
share (e.g. it will be the same for start and finish events).
:param event_description:
A human-readable description of the event that has occurred.
"""
event = ReportingEvent(START_EVENT_TYPE, event_name, event_description)
return report_event(event)
class ReportEventStack(object):
"""Context Manager for using :py:func:`report_event`
This enables calling :py:func:`report_start_event` and
:py:func:`report_finish_event` through a context manager.
:param name:
the name of the event
:param description:
the event's description, passed on to :py:func:`report_start_event`
:param message:
the description to use for the finish event. defaults to
:param:description.
:param parent:
:type parent: :py:class:ReportEventStack or None
The parent of this event. The parent is populated with
results of all its children. The name used in reporting
is <parent.name>/<name>
:param reporting_enabled:
Indicates if reporting events should be generated.
If not provided, defaults to the parent's value, or True if no parent
is provided.
:param result_on_exception:
The result value to set if an exception is caught. default
value is FAIL.
"""
def __init__(self, name, description, message=None, parent=None,
reporting_enabled=None, result_on_exception=status.FAIL):
self.parent = parent
self.name = name
self.description = description
self.message = message
self.result_on_exception = result_on_exception
self.result = status.SUCCESS
# use parents reporting value if not provided
if reporting_enabled is None:
if parent:
reporting_enabled = parent.reporting_enabled
else:
reporting_enabled = True
self.reporting_enabled = reporting_enabled
if parent:
self.fullname = '/'.join((parent.fullname, name,))
else:
self.fullname = self.name
self.children = {}
def __repr__(self):
return ("ReportEventStack(%s, %s, reporting_enabled=%s)" %
(self.name, self.description, self.reporting_enabled))
def __enter__(self):
self.result = status.SUCCESS
if self.reporting_enabled:
report_start_event(self.fullname, self.description)
if self.parent:
self.parent.children[self.name] = (None, None)
return self
def _childrens_finish_info(self):
for cand_result in (status.FAIL, status.WARN):
for name, (value, msg) in self.children.items():
if value == cand_result:
return (value, self.message)
return (self.result, self.message)
@property
def result(self):
return self._result
@result.setter
def result(self, value):
if value not in status:
raise ValueError("'%s' not a valid result" % value)
self._result = value
@property
def message(self):
if self._message is not None:
return self._message
return self.description
@message.setter
def message(self, value):
self._message = value
def _finish_info(self, exc):
# return tuple of description, and value
if exc:
return (self.result_on_exception, self.message)
return self._childrens_finish_info()
def __exit__(self, exc_type, exc_value, traceback):
(result, msg) = self._finish_info(exc_value)
if self.parent:
self.parent.children[self.name] = (result, msg)
if self.reporting_enabled:
report_finish_event(self.fullname, msg, result)
instantiated_handler_registry = DictRegistry()
update_configuration(DEFAULT_CONFIG)
# vi: ts=4 expandtab
import abc
import oauthlib.oauth1 as oauth1
import six
from ..registry import DictRegistry
from .. import (url_helper, util)
from .. import log as logging
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class ReportingHandler(object):
"""Base class for report handlers.
Implement :meth:`~publish_event` for controlling what
the handler does with an event.
"""
@abc.abstractmethod
def publish_event(self, event):
"""Publish an event to the ``INFO`` log level."""
class LogHandler(ReportingHandler):
"""Publishes events to the cloud-init log at the ``INFO`` log level."""
def __init__(self, level="DEBUG"):
super(LogHandler, self).__init__()
if isinstance(level, int):
pass
else:
input_level = level
try:
level = gettattr(logging, level.upper())
except:
LOG.warn("invalid level '%s', using WARN", input_level)
level = logging.WARN
self.level = level
def publish_event(self, event):
"""Publish an event to the ``INFO`` log level."""
logger = logging.getLogger(
'.'.join(['cloudinit', 'reporting', event.event_type, event.name]))
logger.log(self.level, event.as_string())
class PrintHandler(ReportingHandler):
def publish_event(self, event):
"""Publish an event to the ``INFO`` log level."""
class WebHookHandler(ReportingHandler):
def __init__(self, endpoint, consumer_key=None, token_key=None,
token_secret=None, consumer_secret=None, timeout=None,
retries=None):
super(WebHookHandler, self).__init__()
if any([consumer_key, token_key, token_secret, consumer_secret]):
self.oauth_helper = url_helper.OauthUrlHelper(
consumer_key=consumer_key, token_key=token_key,
token_secret=token_secret, consumer_secret=consumer_secret)
else:
self.oauth_helper = None
self.endpoint = endpoint
self.timeout = timeout
self.retries = retries
self.ssl_details = util.fetch_ssl_details()
def publish_event(self, event):
if self.oauth_helper:
readurl = self.oauth_helper.readurl
else:
readurl = url_helper.readurl
try:
return readurl(
self.endpoint, data=event.as_dict(),
timeout=self.timeout,
retries=self.retries, ssl_details=self.ssl_details)
except:
LOG.warn("failed posting event: %s" % event.as_string())
available_handlers = DictRegistry()
available_handlers.register_item('log', LogHandler)
available_handlers.register_item('print', PrintHandler)
available_handlers.register_item('webhook', WebHookHandler)
......@@ -52,7 +52,20 @@ class DataSourceMAAS(sources.DataSource):
sources.DataSource.__init__(self, sys_cfg, distro, paths)
self.base_url = None
self.seed_dir = os.path.join(paths.seed_dir, 'maas')
self.oauth_clockskew = None
self.oauth_helper = self._get_helper()
def _get_helper(self):
mcfg = self.ds_cfg
# If we are missing token_key, token_secret or consumer_key
# then just do non-authed requests
for required in ('token_key', 'token_secret', 'consumer_key'):
if required not in mcfg:
return url_helper.OauthUrlHelper()
return url_helper.OauthHelper(
consumer_key=mcfg['consumer_key'], token_key=mcfg['token_key'],
token_secret=mcfg['token_secret'],
consumer_secret=mcfg.get('consumer_secret'))
def __str__(self):
root = sources.DataSource.__str__(self)
......@@ -84,9 +97,9 @@ class DataSourceMAAS(sources.DataSource):
self.base_url = url
(userdata, metadata) = read_maas_seed_url(self.base_url,
self._md_headers,
paths=self.paths)
(userdata, metadata) = read_maas_seed_url(
self.base_url, self.oauth_helper.md_headers,
paths=self.paths)
self.userdata_raw = userdata
self.metadata = metadata
return True
......@@ -94,31 +107,8 @@ class DataSourceMAAS(sources.DataSource):
util.logexc(LOG, "Failed fetching metadata from url %s", url)
return False
def _md_headers(self, url):
mcfg = self.ds_cfg
# If we are missing token_key, token_secret or consumer_key
# then just do non-authed requests
for required in ('token_key', 'token_secret', 'consumer_key'):
if required not in mcfg:
return {}
consumer_secret = mcfg.get('consumer_secret', "")
timestamp = None
if self.oauth_clockskew:
timestamp = int(time.time()) + self.oauth_clockskew
return oauth_headers(url=url,
consumer_key=mcfg['consumer_key'],
token_key=mcfg['token_key'],
token_secret=mcfg['token_secret'],
consumer_secret=consumer_secret,
timestamp=timestamp)
def wait_for_metadata_service(self, url):
mcfg = self.ds_cfg
max_wait = 120
try:
max_wait = int(mcfg.get("max_wait", max_wait))
......@@ -138,10 +128,8 @@ class DataSourceMAAS(sources.DataSource):
starttime = time.time()
check_url = "%s/%s/meta-data/instance-id" % (url, MD_VERSION)
urls = [check_url]
url = url_helper.wait_for_url(urls=urls, max_wait=max_wait,
timeout=timeout,
exception_cb=self._except_cb,
headers_cb=self._md_headers)
url = self.oauth_helper.wait_for_url(
urls=urls, max_wait=max_wait, timeout=timeout)
if url:
LOG.debug("Using metadata source: '%s'", url)
......@@ -151,26 +139,6 @@ class DataSourceMAAS(sources.DataSource):
return bool(url)
def _except_cb(self, msg, exception):
if not (isinstance(exception, url_helper.UrlError) and
(exception.code == 403 or exception.code == 401)):
return
if 'date' not in exception.headers:
LOG.warn("Missing header 'date' in %s response", exception.code)
return
date = exception.headers['date']
try:
ret_time = time.mktime(parsedate(date))
except Exception as e:
LOG.warn("Failed to convert datetime '%s': %s", date, e)
return
self.oauth_clockskew = int(ret_time - time.time())
LOG.warn("Setting oauth clockskew to %d", self.oauth_clockskew)
return
def read_maas_seed_dir(seed_d):
"""
......@@ -196,12 +164,12 @@ def read_maas_seed_dir(seed_d):
return check_seed_contents(md, seed_d)
def read_maas_seed_url(seed_url, header_cb=None, timeout=None,
def read_maas_seed_url(seed_url, read_file_or_url=None, timeout=None,
version=MD_VERSION, paths=None):
"""
Read the maas datasource at seed_url.
- header_cb is a method that should return a headers dictionary for
a given url
read_file_or_url is a method that should provide an interface
like util.read_file_or_url
Expected format of seed_url is are the following files:
* <seed_url>/<version>/meta-data/instance-id
......@@ -222,14 +190,12 @@ def read_maas_seed_url(seed_url, header_cb=None, timeout=None,
'user-data': "%s/%s" % (base_url, 'user-data'),
}
if read_file_or_url is None:
read_file_or_url = util.read_file_or_url
md = {}
for name in file_order:
url = files.get(name)
if not header_cb:
def _cb(url):
return {}
header_cb = _cb
if name == 'user-data':
retries = 0
else:
......@@ -237,10 +203,8 @@ def read_maas_seed_url(seed_url, header_cb=None, timeout=None,
try:
ssl_details = util.fetch_ssl_details(paths)
resp = util.read_file_or_url(url, retries=retries,
headers_cb=header_cb,
timeout=timeout,
ssl_details=ssl_details)
resp = read_file_or_url(url, retries=retries,
timeout=timeout, ssl_details=ssl_details)
if resp.ok():
if name in BINARY_FIELDS:
md[name] = resp.contents
......@@ -280,24 +244,6 @@ def check_seed_contents(content, seed):
return (userdata, md)
def oauth_headers(url, consumer_key, token_key, token_secret, consumer_secret,
timestamp=None):
if timestamp:
timestamp = str(timestamp)
else:
timestamp = None
client = oauth1.Client(
consumer_key,
client_secret=consumer_secret,
resource_owner_key=token_key,
resource_owner_secret=token_secret,
signature_method=oauth1.SIGNATURE_PLAINTEXT,
timestamp=timestamp)
uri, signed_headers, body = client.sign(url)
return signed_headers
class MAASSeedDirNone(Exception):
pass
......@@ -361,47 +307,39 @@ if __name__ == "__main__":
if key in cfg and creds[key] is None:
creds[key] = cfg[key]
def geturl(url, headers_cb):