Commit 50d801c1 authored by Nicolas Dandrimont's avatar Nicolas Dandrimont 🤔
Browse files

Merge branch 'plugin-api' into tentative-merge

Conflicts:
	debexpo/lib/utils.py
	debexpo/plugins/qa/distribution.py
parents 24fab0f6 83327073
......@@ -47,6 +47,7 @@ from debexpo.lib.utils import get_package_dir
from debexpo.lib.email import Email
from debexpo.lib.filesystem import CheckFiles
from debexpo.lib.schemas import PackageSubscribeForm, PackageCommentForm
from debexpo.lib.plugins import Plugins
from debexpo.model import meta
from debexpo.model.packages import Package
......@@ -89,6 +90,7 @@ class PackageController(BaseController):
``packagename``
Package name to look at.
"""
package = self._get_package(packagename)
c.session = session
......@@ -99,11 +101,21 @@ class PackageController(BaseController):
(constants.PACKAGE_COMMENT_OUTCOME_PERFECT, _('Perfect'))
]
c.plugins = Plugins
c.log = log
if 'user_id' in session:
c.user = meta.session.query(User).filter_by(id=session['user_id']).one()
else:
c.user = None
c.plugins = dict()
for package_version in package.package_versions:
pv_plugins = Plugins('qa', package_version)
pv_plugins.load_plugins()
pv_plugins.load_results()
c.plugins[package_version.id] = pv_plugins
log.debug('Rendering page')
return render('/package/index.mako')
......
......@@ -312,7 +312,7 @@ class Importer(object):
if not os.path.isfile(self.changes_file):
self._fail('Cannot find changes file')
def _create_db_entries(self, qa):
def _create_db_entries(self):
"""
Create entries in the Database for the package upload.
"""
......@@ -351,22 +351,17 @@ class Importer(object):
except KeyError:
closes = None
# TODO: fix these magic numbers
if qa.stop():
qa_status = 1
else:
qa_status = 0
maintainer_matches = re.compile(r'(.*) <(.*)>').match(self.changes['Changed-By'])
maintainer = maintainer_matches.group(2)
package_version = PackageVersion(package=package, version=self.changes['Version'],
section=section, distribution=self.changes['Distribution'], qa_status=qa_status,
# FIXME: qa_status (might be removed from the model)
self.package_version = PackageVersion(package=package, version=self.changes['Version'],
section=section, distribution=self.changes['Distribution'], qa_status=1,
component=component, priority=self.changes.get_priority(), closes=closes,
uploaded=datetime.now(), maintainer=maintainer)
meta.session.add(package_version)
meta.session.add(self.package_version)
source_package = SourcePackage(package_version=package_version)
source_package = SourcePackage(package_version=self.package_version)
meta.session.add(source_package)
binary_package = None
......@@ -387,7 +382,7 @@ class Importer(object):
if file.endswith('.deb'):
# Only create a BinaryPackage if there actually binary package files
if binary_package is None:
binary_package = BinaryPackage(package_version=package_version, arch=file[:-4].split('_')[-1])
binary_package = BinaryPackage(package_version=self.package_version, arch=file[:-4].split('_')[-1])
meta.session.add(binary_package)
meta.session.add(PackageFile(filename=filename, binary_package=binary_package, size=size, md5sum=sum))
......@@ -397,11 +392,6 @@ class Importer(object):
meta.session.commit()
log.warning("Finished adding PackageFile objects.")
# Add PackageInfo objects to the database for the package_version
for result in qa.result:
meta.session.add(PackageInfo(package_version=package_version, from_plugin=result.from_plugin,
outcome=result.outcome, rich_data=result.data, severity=result.severity))
# Commit all changes to the database
meta.session.commit()
log.debug('Committed package data to the database')
......@@ -455,6 +445,16 @@ class Importer(object):
return None
def _run_plugins(self, stage):
plugins = Plugins(stage,
self.package_version,
self.changes,
self.changes_file,
user_id=self.user_id)
log.debug('Running plugins of type: %s' % stage)
plugins.load_plugins()
plugins.run_plugins()
def main(self):
"""
Actually start the import of the package.
......@@ -588,12 +588,12 @@ class Importer(object):
# Run post-upload plugins.
post_upload = Plugins('post-upload', self.changes, self.changes_file,
user_id=self.user_id)
if post_upload.stop():
log.critical('post-upload plugins failed')
self._remove_changes()
sys.exit(1)
#post_upload = Plugins('post-upload', self.changes, self.changes_file,
# user_id=self.user_id)
#if post_upload.stop():
# log.critical('post-upload plugins failed')
# self._remove_changes()
# sys.exit(1)
# Check whether a post-upload plugin has got the orig tarball from somewhere.
if not orig_file_found and not filecheck.is_native_package(self.changes):
......@@ -625,10 +625,6 @@ class Importer(object):
if not os.access(pylons.config['debexpo.repository'], os.W_OK):
self._fail('debexpo.repository is not writeable')
qa = Plugins('qa', self.changes, self.changes_file, user_id=self.user_id)
if qa.stop():
self._reject('QA plugins failed the package')
# Loop through parent directories in the target installation directory to make sure they
# all exist. If not, create them.
for dir in self.changes.get_pool_path().split('/'):
......@@ -645,24 +641,33 @@ class Importer(object):
#git job is done, cleaning
shutil.rmtree(os.path.join(pylons.config['debexpo.repository'], 'git', 'last', ''))
self._remove_temporary_files()
# Create the database rows
self._create_db_entries(qa)
self._create_db_entries()
# Refresh the Sources/Packages files.
log.debug('Updating Sources and Packages files')
r = Repository(pylons.config['debexpo.repository'])
r.update()
# all the useful data is now there
os.chdir(pylons.config['debexpo.repository'])
# Run QA plugins
self._run_plugins('qa')
self._remove_temporary_files()
# Execute I'm happy to have post-successful-upload plugins
f = open(self.changes_file)
changes_contents = f.read()
f.close()
Plugins('post-successful-upload', self.changes, self.changes_file,
changes_contents=changes_contents)
#Plugins('post-successful-upload', self.changes, self.changes_file,
# changes_contents=changes_contents)
# Remove the changes file
self._remove_changes()
# Refresh the Sources/Packages files.
log.debug('Updating Sources and Packages files')
r = Repository(pylons.config['debexpo.repository'])
r.update()
log.debug('Done')
return 0
......@@ -6,6 +6,7 @@
#
# Copyright © 2008 Jonny Lamb <jonny@debian.org>
# Copyright © 2010 Jan Dittberner <jandd@debian.org>
# Copyright © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
......@@ -33,48 +34,50 @@ Holds the plugin loader.
"""
__author__ = 'Jonny Lamb'
__copyright__ = 'Copyright © 2008 Jonny Lamb, Copyright © 2010 Jan Dittberner'
__copyright__ = ', '.join([
'Copyright © 2008 Jonny Lamb',
'Copyright © 2010 Jan Dittberner',
'Copyright © 2012 Clément Schreiner',
])
__license__ = 'MIT'
import logging
import os
import shutil
import sys
import tempfile
import traceback
from collections import namedtuple
from debian import deb822
import pylons
from debexpo.model import meta
log = logging.getLogger(__name__)
# Different plugin stages and their options.
plugin_stages = {
'post-upload' : {
'extract' : False,
},
'qa' : {
'extract' : True,
},
'post-upload-to-debian' : {
'extract' : False,
},
'post-successful-upload' : {
'extract' : False,
},
}
PluginModule = namedtuple('PluginModule', ['name', 'stage', 'plugin', 'models'])
class Plugins(object):
"""
Plugin loader.
"""
def __init__(self, type, changes, changes_file, **kw):
# FIXME: remove changes and changes_file arguments
# (once they're accessible from the git storage backend, that is)
def __init__(self, type, package_version, changes=None, changes_file=None, **kw):
"""
Class constructor. Sets class attributes and then runs the plugins.
``type``
Type of plugins to run. (E.g. "post-upload")
Type of plugins to import (for example 'qa'.)
``package_version``
PackageVersion instance of the package we load plugins for
``changes``
Changes class for the package to test.
Changes class for the package to test. Needed for the importer
``changes_file``
Name of the changes file.
......@@ -86,17 +89,20 @@ class Plugins(object):
self.type = type.replace('-', '_')
self.changes = changes
self.changes_file = changes_file
self.result = None
self.result_objects = []
self.tempdir = None
self.kw = kw
# Run the plugins.
if type in plugin_stages:
self.conf = plugin_stages[type]
log.debug('Running plugins of type: %s' % type)
self.result = self._run_plugins()
self.package_version = package_version
self.modules = self.import_plugins(self.type)
def _import_plugin(self, name):
# the plugin instances
self.plugins = {}
@classmethod
def _import_plugin(cls, name):
"""
Imports a module and returns it.
......@@ -125,107 +131,132 @@ class Plugins(object):
log.warn('Import of module "%s" failed with error: %s', name, e)
return None
def _extract(self):
"""
Copy the files to a temporary directory and run dpkg-source -x on the dsc file
to extract them.
"""
log.debug('Copying files to a temp directory to run dpkg-source -x on the dsc file')
self.tempdir = tempfile.mkdtemp()
log.debug('Temp dir is: %s', self.tempdir)
for filename in self.changes.get_files():
log.debug('Copying: %s', filename)
shutil.copy(os.path.join(self.config['debexpo.upload.incoming'], filename), self.tempdir)
# If the original tarball was pulled from Debian or from the repository, that
# also needs to be copied into this directory.
dsc = deb822.Dsc(file(self.changes.get_dsc()))
for item in dsc['Files']:
if item['name'] not in self.changes.get_files():
src_file = os.path.join(self.config['debexpo.upload.incoming'], item['name'])
repository_src_file = os.path.join(self.config['debexpo.repository'], self.changes.get_pool_path(), item['name'])
if os.path.exists(src_file):
shutil.copy(src_file, self.tempdir)
elif os.path.exists(repository_src_file):
shutil.copy(repository_src_file, self.tempdir)
else:
log.critical("Trying to copy non-existing file %s" % (src_file))
shutil.copy(os.path.join(self.config['debexpo.upload.incoming'], self.changes_file), self.tempdir)
@classmethod
def _what_plugins(cls, stage):
"""
List of the plugins that must be run.
"""
self.oldcurdir = os.path.abspath(os.path.curdir)
os.chdir(self.tempdir)
config = pylons.config
os.system('dpkg-source -x %s extracted' % self.changes.get_dsc())
key = 'debexpo.plugins.' + stage
log.debug("Getting plugins with key (repr): %s", repr(key))
plugins = config.get(key)
def _cleanup(self):
"""
Remove the previously-created temporary directory and chdir back to where the importer
was.
"""
if self.tempdir is not None:
shutil.rmtree(self.tempdir)
os.chdir(self.oldcurdir)
return plugins.split(' ')
def _run_plugins(self):
@classmethod
def import_plugins(cls, stage):
"""
Look in the config file and run the plugins.
Import all plugins for given stage and return them as a list.
"""
key = 'debexpo.plugins.' + self.type
log.debug("Getting plugins with key (repr): %s", repr(key))
plugins = self.config.get(key)
log.debug("Using these plugins: %s", plugins)
result = []
if not plugins:
log.debug("Returning result: %s", result)
return result
modules = {}
config = pylons.config
# Look at whether the plugins need extracting.
if 'extract' in self.conf and self.conf['extract']:
log.debug('Extracting package for plugins')
self._extract()
plugins = cls._what_plugins(stage)
log.debug("Importing these plugins: %s", plugins)
# Run each plugin.
for plugin in plugins.split(' '):
log.debug('Running %s plugin' % plugin)
module = None
if self.config.get('debexpo.plugindir') != '':
for plugin_name in plugins:
if config.get('debexpo.plugindir') != '':
# First try in the user-defined plugindir
sys.path.append(self.config['debexpo.plugindir'])
module = self._import_plugin(plugin)
sys.path.append(config['debexpo.plugindir'])
module = cls._import_plugin(plugin_name)
if module is not None:
log.debug('Found plugin in debexpo.plugin_dir')
log.debug('Found module in debexpo.plugin_dir')
if module is None:
else:
# Try in debexpo.plugins
name = 'debexpo.plugins.%s' % plugin
module = self._import_plugin(name)
name = 'debexpo.plugins.%s' % plugin_name
module = cls._import_plugin(name)
if module is None or not hasattr(module, 'plugin'):
log.debug('No plugin found in module %s' % plugin_name)
continue
models = getattr(module, 'models', [])
# The 'plugin' object points to the class containing the actual plugin/test
if hasattr(module, 'plugin'):
p = getattr(module, 'plugin')(name=plugin, changes=self.changes, \
changes_file=self.changes_file, tempdir=self.tempdir)
modules[plugin_name] = PluginModule(name=plugin_name,
plugin=getattr(module, 'plugin'),
models=models,
stage=stage)
for item in self.kw:
setattr(p, item, self.kw[item])
return modules
def load_plugins(self):
"""
Instantiate all plugins and register them in the 'plugins'
dictionary attribute.
e.g.: self.plugins['native'] -> <NativePlugin 42>
"""
for module in self.modules.itervalues():
try:
result.extend(p.run())
self.plugins[module.name] = module.plugin(module.name,
self.package_version,
models=module.models,
changes=self.changes,
kw=self.kw)
except Exception:
log.debug("Something wrong happened while running the plugin '%s': %s" % (plugin, traceback.format_exc()))
log.debug('Something went wrong while loading the plugin {}:'
'{}'.format(module.name, traceback.format_exc()))
if self.conf['extract']:
self._cleanup()
def run_plugins(self):
"""
Run all imported plugins.
"""
return result
if len(self) == 0:
log.debug('No plugin loaded.')
def stop(self):
# Run each plugin.
for name, plugin in self.iteritems():
log.debug('Running plugin: %s' % name)
try:
plugin.run()
except Exception:
log.debug("Something wrong happened while running the plugin '%s': %s"
% (name, traceback.format_exc()))
else:
plugin.save()
def load_results(self):
"""
Returns whether the importer should stop.
Calls ``load`` method on all loaded plugin instances.
"""
for result in self.result:
if result.stop():
return True
return False
for name, plugin in self.iteritems():
plugin.load()
#
# magic methods from accessing the ``plugins`` attribute
#
# FIXME: do this properly with collections.abc? (not sure)
def __getitem__(self, key):
"""
Returns a plugin instance for the given plugin name.
plugins['native'] -> NativePlugin instance
"""
return self.plugins[key]
def get(self, key, default):
return self.plugins.get(key, default)
def iteritems(self):
"""Iter the items in the ``plugins`` dictionary attribute."""
return self.plugins.iteritems()
def __len__(self):
"""Number of plugins that have been loaded."""
return len(self.plugins)
def __iter__(self):
""" Iter over plugin instances' names """
for name in self.plugins:
yield name
def __contains__(self, key):
return key in self.plugins
......@@ -110,3 +110,23 @@ def hash_it(s):
def get_gnupg():
return gnupg.GnuPG(config['debexpo.gpg_path'],
config['debexpo.gpg_keyring'])
def uncamel(s):
"""
Remove uppercase characters in a string and add an underscore just before.
e.g.: 'BuildsystemTest' -> 'buildsystem_test'
'Plugin' -> 'plugin'
'SomeClass_' -> some_class -- yes, this
'_SomeOther_' -> some_other -- sucks.
"""
# FIXME: this is a ugly (was intended as a quick proof-of-concept)
# FIXME: won't work when a string has several capitalized characters in a row
# e.g.: 'DebianQA'-> debian_q_a -- yuck
# a regexp might be better.
# see this link for an example:
# http://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-camel-case
uncameled_string = ''.join(('_' if x.isupper() else '') + x.lower()
for x in s).strip('_')
return uncameled_string
......@@ -62,7 +62,7 @@ def import_all_models():
from debexpo.model import binary_packages, package_files, packages, source_packages, \
user_metrics, package_comments, package_info, package_versions, user_countries, \
users, package_subscriptions, user_upload_key, password_reset, sponsor_metrics, \
data_store
data_store, plugin_results
class OrmObject(object):
"""
......
......@@ -37,7 +37,9 @@ __license__ = 'MIT'
from sqlalchemy import MetaData
__all__ = ['engine', 'metadata', 'session']
from sqlalchemy.ext.declarative import declarative_base
__all__ = ['engine', 'metadata', 'session', 'Base']
# SQLAlchemy database engine. Updated by model.init_model().
engine = None
......@@ -48,3 +50,6 @@ session = None
# Global metadata. If you have multiple databases with overlapping table
# names, you'll need a metadata for each database.
metadata = MetaData()
Base = declarative_base(metadata=metadata)
# -*- coding: utf-8 -*-
#
# plugin_results.py — plugin results table model
#
# This file is part of debexpo - https://alioth.debian.org/projects/debexpo/
#
# Copyright © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
"""
Holds plugin_results table model.
"""
from sqlalchemy import orm
import sqlalchemy as sa
from sqlalchemy.orm.collections import attribute_mapped_collection
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import (declared_attr, declarative_base,
has_inherited_table)
from debexpo.model import meta
from debexpo.model.package_versions import PackageVersion
from debexpo.lib.utils import uncamel
class PluginResult(meta.Base):
__tablename__ = 'plugin_results'
# plugin that created the result: must be defined in subclasses
plugin = None
#
# Table columns
#
id = sa.Column(sa.types.Integer, primary_key=True)
entity = sa.Column(sa.types.String(200))
package_version_id = sa.Column(sa.types.Integer,
sa.ForeignKey('package_versions.id'),
nullable=False)
package_version = orm.relationship(PackageVersion, backref='plugin_results')
_data = association_proxy(
'_result_data', 'value',
creator = lambda k, v: PluginResultData(key = k, value = v))
#
# Polymorphism configuration
#
@declared_attr
def __mapper_args__(cls):
if not has_inherited_table(cls):
return {'polymorphic_on': 'entity'}
else:
d = {
'inherits': PluginResult,
'polymorphic_identity': uncamel(cls.__name__)
}
return d
#
# Quacking like a dict (but not really)
#
# FIXME: inherit MutableMapping or something and provide a complete
# dictionary-like interface
def __getitem__(self, key):
return self._data[key]
def get(self, key, default):
return self._data.get(key, default)
def __setitem__(self, key, value):
self._data[key] = value
def __delitem__(self, key):
del self._data[key]
def as_dict(self):
return self._data
#
# Direct access to some attributes
#
@property
def severity(self):
return int(self.get('severity', 0))
class PluginResultData(meta.Base):
__tablename__ = 'plugin_result_data'
plugin_result_id = sa.Column(sa.ForeignKey('plugin_results.id'),
primary_key=True)
key = sa.Column(sa.types.String(200), primary_key=True)
value = sa.Column(sa.types.Text())
plugin_result = orm.relationship(PluginResult,
backref = orm.backref('_result_data',
collection_class = attribute_mapped_collection('key'),
cascade="all, delete-orphan"))
# -*- coding: utf-8 -*-
#
# __init__.py — Helpful classes for plugins
#
# This file is part of debexpo - https://alioth.debian.org/projects/debexpo/
#
# Copyright © 2008 Jonny Lamb <jonny@debian.org>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
"""
Holds some helpful classes for plugins to use or extend.
"""
__author__ = 'Jonny Lamb'
__copyright__ = 'Copyright © 2008 Jonny Lamb'
__license__ = 'MIT'
from debexpo.lib import constants
class BasePlugin(object):
"""
The class all other plugins should extend.
"""
def __init__(self, **kw):
"""
Class constructor. Sets class attributes depending on the arguments of the
constructor.
``kw``
Values to assign to class attributes.
"""
for key in kw:
setattr(self, key, kw[key])
def run(self):
"""
Runs all the tests in the self.tests list.
"""
self.result = []
for method in dir(self):
if method.startswith('test'):
getattr(self, method)()
return self.result
def passed(self, outcome, data, severity):
"""
Adds a PluginResult for a passed test to the result list.
``outcome``
Outcome tag of the test.
``data``
Resulting data from the plugin, like more details about the process.
``severity``
Severity of the result.
"""
self.result.append(PluginResult(from_plugin=self.name, outcome=outcome,
data=data, severity=severity))
def failed(self, outcome, data, severity):
"""
Adds a PluginResult for a failed test to the result list.
``outcome``
Outcome tag of the test.
``data``
Resulting data from the plugin, like more details about the process.
``severity``
Severity of the result.
"""
self.result.append(PluginResult(from_plugin=self.name, outcome=outcome,
data=data, severity=severity))
def info(self, outcome, data):
"""
Adds a PluginResult for an info test to the result list.
``outcome``
Outcome tag of the test.
``data``
Resulting data from the plugin, like more detail about the process.
"""
self.result.append(PluginResult(from_plugin=self.name, outcome=outcome,
data=data, severity=constants.PLUGIN_SEVERITY_INFO))
class PluginResult(object):
"""
The class tests should return to provide details about a test.
"""
def __init__(self, from_plugin, outcome, data, severity):
"""
Class constructor. Sets important fields.
``from_plugin``
Name of the plugin the test was carried out in.
``outcome``
Outcome of the test.
``data``
More details of the test.
``severity``
Severity of the result.
"""
self.from_plugin = from_plugin
self.outcome = outcome
self.data = data
self.severity = severity
def failed(self):
"""
Returns whether the test failed.
"""
return self.severity > constants.PLUGIN_SEVERITY_INFO
def stop(self):
"""
Returns whether the process should stop after the test.
"""
return self.severity >= constants.PLUGIN_SEVERITY_CRITICAL
# -*- coding: utf-8 -*-
#
# __init__.py — Helpful classes for plugins
#
# This file is part of debexpo - https://alioth.debian.org/projects/debexpo/
#
# Copyright © 2008 Jonny Lamb <jonny@debian.org>
# Copyright © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
# files (the "Software"), to deal in the Software without
# restriction, including without limitation the rights to use,
# copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following
# conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
# OTHER DEALINGS IN THE SOFTWARE.
"""
Holds some helpful classes and functions for plugins to extend or use.
"""
__author__ = 'Jonny Lamb'
__copyright__ = ', '.join(('Copyright © 2008 Jonny Lamb',
'Copyright © 2012 Clément Schreiner'))
__license__ = 'MIT'
__all__ = ['importercmd',
'test_result',
'BasePlugin',
'QAPlugin',
'PluginResult',
'bool_field',
'int_field',
'string_field']
import os.path
import logging
from inspect import getmembers, ismethod
import operator
from pylons import config
# template rendering
from mako.lookup import TemplateLookup
from mako.exceptions import TopLevelLookupException
from debexpo.lib import constants, helpers
from debexpo.model.meta import session
from debexpo.model.plugin_results import PluginResult
log = logging.getLogger(__name__)
# FIXME: there is probably a better way to define this
PLUGINS_TEMPLATE_DIRS = [os.path.join(path, "plugins")
for path in config["pylons.paths"]["templates"]]
#
# Decorators simplifying the writing of plugins
#
def importercmd(func):
"""
Makes a plugin method an importer command.
(i.e. the importer will call the method)
"""
func.importercmd = True
return func
def test_result(cls):
"""
Makes a plugin result model the result of a QA test.
"""
cls.test_result = True
return cls
#
# Property factories for PluginResult subclasses
#
# FIXME: move this somewhere else
# FIXME: some of that could be abstracted
#
def _fdel(name):
"""
Returns a function for deleting the dictionary's item with the
given key. For use in properties below.
"""
def fdel(instance):
del instance[name]
return fdel
def bool_field(name):
"""
Property exposing an item ('true' or 'false') from the plugin
result's dictionary as a bool.
If the dictionary does not have the requested item yet, the getter
will return False.
"""
def fget(instance):
return True if instance.get(name, 'false') == 'true' else False
def fset(instance, value):
if not isinstance(value, bool):
raise ValueError('Expected bool')
instance[name] = u'true' if value else u'false'
return property(fget, fset, _fdel(name),
"Read-write property exposing a string ('true' or 'false')"
" as a bool.")
def int_field(name):
"""
Property exposing a numeric item from the plugin result's dictionary as
an int.
If the dictionary does not have the requested item yet, the getter
will return 0.
"""
def fget(instance):
# FIXME: better error handling
return int(instance.get(name, 0))
def fset(instance, value):
if not isinstance(value, int):
raise ValueError('Expected int')
instance[name] = unicode(value)
return property(fget, fset, _fdel(name),
'Read-write property exposing a string as an int')
def string_field(name):
"""
Property for accessing a string item from the plugin result's dictionary.
"""
def fset(instance, value):
instance[name] = value
return property(operator.itemgetter(name), fset, _fdel(name))
#
# Bases classes for plugins
#
class BasePlugin(object):
"""
Base class for importer plugins.
"""
def __init__(self, name, package_version, models=None, **kw):
"""
Constructor for a plugin.
"""
self.name = name
# PackageVersion object for the package we're looking at
self.package_version = package_version
self.models = []
if models is not None:
self._load_models(models)
# for storing results retrieved from the database
self.results = {}
# other argumnets
for key in kw:
setattr(self, key, kw[key])
# sqlalchemy session
self.session = session
def run(self):
"""
Import data from the package.
"""
for name, method in getmembers(self, ismethod):
if getattr(method, 'importercmd', False):
method()
def load(self):
"""
Load all data previously imported into the database by this
plugin.
"""
for model in self.models:
q = self.session.query(model)
q = q.filter_by(package_version=self.package_version)
for result in q.all():
self._load_result(result)
return self.results
def _load_result(self, result):
self.results.setdefault(result.entity, list()).append(result)
def save(self):
""" Save DB changes """
self.session.commit()
log.debug('Added results to the database for {}'.format(self.name))
def new_result(self, result_cls, **data):
"""
Add a new result to the list of db objects. Returns the
instance of the result.
"""
result = result_cls(package_version=self.package_version)
for k, v in data.iteritems():
result[k] = v
self.session.add(result)
return result
@property
def nb_results(self):
"""
Number of results that have been retrieved from the DB.
"""
return sum(len(l) for l in self.results.itervalues())
def _find_template(self, name, render_format):
# Files to try out for plugin data rendering
try_files = [
"%s/%s.mako" % (name, render_format),
"%s/text.mako" % (name),
"default/%s.mako" % render_format,
"default/text.mako",
]
lookup = TemplateLookup(
directories=PLUGINS_TEMPLATE_DIRS,
input_encoding='utf-8',
output_encoding='utf-8',
)
for basefile in try_files:
try:
template = lookup.get_template(basefile)
except TopLevelLookupException:
continue
else:
break
else:
# No template file found, something weird happened
return "(!! no template found for %s plugin data)" % name
return template
def _load_model(self, model):
"""
Adds the given model to the ``models`` list attribute.
This is separated from the ``_load_models`` method to simplify
overriding in derived plugin classes, which might handle
models classes differently.
For example, QAPlugin instances have a ``test_result``
attribute.
"""
self.models.append(model)
def _load_models(self, models):
"""
Calls ``_load_model`` on all PluginResult subclasses found in
the given ``models`` sequence.
"""
for model in models:
if issubclass(model, PluginResult):
self._load_model(model)
def render(self, render_format, **render_args):
"""Render the plugin data to the given format"""
template = self._find_template(self.name, render_format)
return template.render_unicode(results=self.results,
nb_results=self.nb_results,
h=helpers,
**render_args)
class QAPlugin(BasePlugin):
"""
Class for implementing QA plugins.
QA plugins have the concept of a test that can be passed or
failed.
Also, their templates look alike and could be abstracted
(i.e. having a standard 'simple test template' that work sith any
SimpleTestPlugin's results. 'tests results' can be considered the
main result, with other results giving complementary information.
The ``test_entity`` class attribute must be set the
PluginResult-class which represents this test result.
"""
# the PluginResult-derived class representing the result of the
# test
test_model = None
test_result = None # instance of that class, set by ``load``
@property
def test_severity(self):
"""
Returns the severity of the test result.
"""
if self.test_result is None:
log.debug("'%s' plugin's data has not been loaded, "
"or this plugin does not define a test" % self.name)
return None
# FIXME: ugly
if self.test_result.severity == 0:
return 1
return self.test_result.severity
def new_test_result(self, **data):
"""
Returns an instance of the sqlalchemy object for the result of
the test.
"""
return self.new_result(self.test_model, **data)
def render(self, render_format, **render_args):
"""
Render the QA test's template.
"""
return super(QAPlugin, self).render(render_format,
test_result=self.test_result,
**render_args)
def _load_model(self, model):
"""
Overridding BasePlugin's ``_load_model`` to set the
``test_model`` attribute if found.
"""
if (self.test_result is None and
getattr(model, 'test_result', False)):
self.test_model = model
super(QAPlugin, self)._load_model(model)
def _load_result(self, result):
"""
Overridding BasePlugin's ``_load_result`` to set the
``test_result`` attribute if found.
"""
log.debug('Loading %s' % result)
if isinstance(result, self.test_model):
self.test_result = result
super(QAPlugin, self)._load_result(result)
......@@ -7,6 +7,7 @@
# Copyright © 2008 Jonny Lamb <jonny@debian.org>
# Copyright © 2010 Jan Dittberner <jandd@debian.org>
# Copyright © 2012 Nicolas Dandrimont <nicolas.dandrimont@crans.org>
# Copyright © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
......@@ -38,62 +39,83 @@ __copyright__ = ', '.join([
'Copyright © 2008 Jonny Lamb',
'Copyright © 2010 Jan Dittberner',
'Copyright © 2012 Nicolas Dandrimont',
'Copyright © 2012 Clément Schreiner',
])
__license__ = 'MIT'
from debexpo.lib import constants
from debexpo.plugins.api import *
import logging
import os
from debian import deb822
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
log = logging.getLogger(__name__)
class BuildSystemPlugin(BasePlugin):
@test_result
class BuildsystemTest(PluginResult):
"""
Result of the 'buildsystem' plugin.
"""
@property
def known(self):
return self['buildsystem'] != 'unknown'
def __str__(self):
if self.known:
msg = 'Package uses %s' % self['buildsystem']
else:
msg = 'Unknown'
return 'Build system: %s' % msg
class BuildSystemPlugin(QAPlugin):
@importercmd
def test_build_system(self):
"""
Finds the build system of the package.
"""
log.debug('Finding the package\'s build system')
dsc = deb822.Dsc(file(self.changes.get_dsc()))
log.debug("Finding the package's build system")
data = {}
severity = constants.PLUGIN_SEVERITY_INFO
log.debug('Opening dsc file in %s' % os.getcwd())
dsc_path = os.path.join(self.changes.get_pool_path(),
self.changes.get_dsc())
with open(dsc_path, 'r') as dsc_file:
dsc = deb822.Dsc(dsc_file)
build_depends = dsc.get('Build-Depends', '')
result = self.new_test_result()
if 'cdbs' in build_depends:
outcome = "Package uses CDBS"
data["build-system"] = "cdbs"
result['buildsystem'] = 'cdbs'
elif 'debhelper' in build_depends:
data["build-system"] = "debhelper"
result['buildsystem'] = 'debhelper'
# Retrieve the debhelper compat level
if hasattr(self, 'tempdir'):
compatpath = os.path.join(self.tempdir, "extracted/debian/compat")
try:
with open(compatpath, "rb") as f:
compat_level = int(f.read().strip())
except IOError:
compat_level = None
compat_level = 'unknown'
else:
compat_level = 'unknown'
data["compat-level"] = compat_level
result['compat_level'] = compat_level
# Warn on old compatibility levels
if compat_level is None or compat_level <= 4:
outcome = "Package uses debhelper with an old compatibility level"
severity = constants.PLUGIN_SEVERITY_WARNING
else:
outcome = "Package uses debhelper"
result['severity'] = constants.PLUGIN_SEVERITY_WARNING
else:
outcome = "Package uses an unknown build system"
data["build-system"] = "unknown"
severity = constants.PLUGIN_SEVERITY_WARNING
self.failed(outcome, data, severity)
result['buildsystem'] = 'unknown'
result['severity'] = constants.PLUGIN_SEVERITY_WARNING
plugin = BuildSystemPlugin
models = [
BuildsystemTest,
]
......@@ -39,16 +39,85 @@ __license__ = 'MIT'
from collections import defaultdict
import logging
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
import SOAPpy
from debexpo.lib import constants
from debexpo.plugins.api import *
from debexpo.model import meta
log = logging.getLogger(__name__)
class ClosedBugsPlugin(BasePlugin):
@test_result
class ClosedbugsTest(PluginResult):
"""
Represents the result of the 'closed bugs' test.
"""
nb_closed = int_field('nb_closed')
nb_errors = int_field('nb_errors')
closes_wnpp = bool_field('wnpp')
def get_bugs(self):
q = meta.session.query(ClosedBug)
q = q.filter_by(package_version_id = self.package_version.id)
# exists = True)
d = {}
for bug in q.all():
d.setdefault(bug.package, list()).append(bug)
return d
def __str__(self):
if (self.closes_wnpp and self.nb_closed == 1 and
self.nb_errors == 0):
return 'Package closes a WNPP bug'
strings = []
strings.append('closes {} bug{plural}'.format(
self.nb_closed if self.nb_closed else 'no',
plural='s' if self.nb_closed > 1 else ''))
if self.nb_errors:
strings.append('wrongfully claims to be closing {} bug{plural}'.format(
self.nb_errors,
plural='s' if self.nb_closed > 1 else ''))
return 'Package ' + ' and '.join(strings)
class ClosedBug(PluginResult):
"""
Represents a bug closed by the package.
"""
exists = bool_field('exists')
belongs = bool_field('exists')
number = int_field('number')
is_error = bool_field('is_error')
@property
def package(self):
return self['package']
@property
def is_wnpp(self):
return self.package == 'wnpp'
def __str__(self):
if self.exists:
string = '{package} ({severity}: {subject}'.format(
package=self['package'],
severity=self['severity'],
subject=self['subject'])
else:
string = 'Bug #{} does not exist for'.format(self.number)
return string
class ClosedBugsPlugin(QAPlugin):
URL = "http://bugs.debian.org/cgi-bin/soap.cgi"
NS = "Debbugs/SOAP"
@importercmd
def test_closed_bugs(self):
"""
Check to make sure the bugs closed belong to the package.
......@@ -58,84 +127,92 @@ class ClosedBugsPlugin(BasePlugin):
log.debug('Package does not close any bugs')
return
log.debug('Checking whether the bugs closed in the package belong to the package')
log.debug('Checking whether the bugs closed in the package belongs'
' to the package')
bugs = [int(x) for x in self.changes['Closes'].split()]
test_result = self.new_test_result(nb_closed=0,
nb_errors=0,
wnpp='false')
if not bugs:
log.debug('Package does not close any bugs')
return
if bugs:
log.debug('Creating SOAP proxy to bugs.debian.org')
try:
server = SOAPpy.SOAPProxy(self.URL, self.NS, simplify_objects = 1)
bugs_retrieved = server.get_status( *bugs )
bugs_retrieved = server.get_status(*bugs)
except Exception as e:
log.critical('An error occurred when creating the SOAP proxy at "%s"'
' (ns: "%s"): %s' % (self.URL, self.NS, e))
return
if 'item' in bugs_retrieved:
bugs_retrieved = bugs_retrieved['item']
else:
bugs_retrieved = []
# Force argument to be a list, SOAPpy returns a dictionary instead of a dictionary list
# if only one bug was found
# Force argument to be a list, SOAPpy returns a
# dictionary instead of a dictionary list if only one
# bug was found
if not isinstance(bugs_retrieved, list):
bugs_retrieved = [bugs_retrieved]
except Exception as e:
log.critical('An error occurred when creating the SOAP proxy at "%s" (ns: "%s"): %s'
% (self.URL, self.NS, e))
return
data = {
'buglist': bugs,
'raw': {},
'errors': [],
'bugs': defaultdict(list),
}
raw_bugs = {}
# Index bugs retrieved
for bug in bugs_retrieved:
if 'key' in bug and 'value' in bug:
data["raw"][int(bug['key'])] = bug['value']
raw_bugs[int(bug['key'])] = bug['value']
else:
continue
severity = constants.PLUGIN_SEVERITY_INFO
for bug in bugs:
if not bug in data['raw']:
data["errors"].append('Bug #%s does not exist' % bug)
severity = max(severity, constants.PLUGIN_SEVERITY_ERROR)
name = data["raw"][bug]['package']
data["bugs"][name].append((bug, data["raw"][bug]["subject"], data["raw"][bug]["severity"]))
if not (self.changes["Source"] in data["raw"][bug]['source'].split(', ') or name == "wnpp"):
data["errors"].append('Bug #%s does not belong to this package' % bug)
severity = max(severity, constants.PLUGIN_SEVERITY_ERROR)
bug_result = self.new_result(ClosedBug, number=bug)
if not bug in raw_bugs:
log.debug('{} does not exist'.format(bug))
bug_result.exists = False
bug_result.belongs = False
bug_result.is_error = True
test_result.nb_errors += 1
test_result['severity'] = max(severity, constants.PLUGIN_SEVERITY_ERROR)
continue
if severity != constants.PLUGIN_SEVERITY_INFO:
outcome = "Package closes bugs in a wrong way"
elif "wnpp" in data["bugs"] and len(data["bugs"]) == 1:
outcome = "Package closes a WNPP bug"
else:
outcome = "Package closes bug%s" % ("s" if len(bugs) > 1 else "")
bug_result.exists = True
log.debug('Found bug {}'.format(bug))
self.failed(outcome, data, severity)
package = raw_bugs[bug]['package']
bug_result['package'] = package
bug_result['subject'] = raw_bugs[bug]['subject']
bug_result['severity'] = raw_bugs[bug]['severity']
else:
log.debug('Package does not close any bugs')
source = raw_bugs[bug]['source'].split(', ')
def _package_in_descriptions(self, name, list):
"""
Finds out whether a binary package is in a source package by looking at the Description
field of the changes file for the binary package name.
if package == 'wnpp':
log.debug('Package closes a wnpp bug')
test_result.nb_closed += 1
test_result.closes_wnpp = True
bug_result.belongs = True
``name``
Name of the binary package.
elif self.changes['Source'] in source:
test_result.nb_closed += 1
bug_result.belongs = True
``list``
List of Description fields split by '\n'.
"""
for item in list:
if item.startswith(name + ' '):
return True
else:
bug_result.belongs = False
test_result.nb_errors += 1
test_result['severity'] = max(severity,
constants.PLUGIN_SEVERITY_ERROR)
return False
plugin = ClosedBugsPlugin
models = [
ClosedbugsTest,
ClosedBug,
]
......@@ -45,19 +45,49 @@ from debian import deb822
import logging
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
from debexpo.model import meta
log = logging.getLogger(__name__)
fields = ['Homepage', 'Vcs-Browser', 'Vcs-Git', 'Vcs-Svn', 'Vcs-Bzr', 'Vcs-Hg']
class ControlFieldsPlugin(BasePlugin):
@test_result
class ControlfieldTest(PluginResult):
""" Result of the control fields plugin """
homepage = bool_field('homepage')
vcs = bool_field('vcs')
def get_fields(self):
q = meta.session.query(ControlField)
q = q.filter_by(package_version=self.package_version)
return dict((f.field, f.value) for f in q.all())
def __str__(self):
if self.homepage:
return 'Homepage {vcs} control field{s} present'.format(
vcs='and VCS' if self.vcs else '',
s='s' if self.vcs else '')
class ControlField(PluginResult):
""" Additional debian/control field """
field = string_field('field')
value = string_field('data')
class ControlFieldsPlugin(QAPlugin):
@importercmd
def test_control_fields(self):
"""
Checks whether additional debian/control fields are present.
"""
log.debug('Checking whether additional debian/control fields are present')
log.debug('Checking whether additional debian/control'
'fields are present')
try:
dsc = deb822.Dsc(file(self.changes.get_dsc()))
......@@ -65,21 +95,26 @@ class ControlFieldsPlugin(BasePlugin):
log.critical('Could not open dsc file; skipping plugin')
return
data = {}
severity = constants.PLUGIN_SEVERITY_WARNING
outcome = "No Homepage field present"
found_homepage = False
for item in fields:
if item in dsc:
data[item] = dsc[item]
if item == 'Homepage':
found_homepage = True
self.new_result(ControlField, field=item,
value=dsc[item])
if "Homepage" in data:
severity = constants.PLUGIN_SEVERITY_INFO
if len(data) > 1:
outcome = "Homepage and VCS control fields present"
else:
outcome = "Homepage control field present"
if found_homepage:
test_result = self.new_test_result(
severity=constants.PLUGIN_SEVERITY_INFO)
test_result.homepage = True
self.failed(outcome, data, severity)
if len(self.results) > 1:
test_result.vcs = True
plugin = ControlFieldsPlugin
models = [
ControlfieldTest,
ControlField,
]
......@@ -46,20 +46,36 @@ import urllib2
from debexpo.model import meta
from debexpo.model.users import User
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
log = logging.getLogger(__name__)
class DebianPlugin(BasePlugin):
@test_result
class DebianqaTest(PluginResult):
in_debian = bool_field('in_debian')
is_nmu = bool_field('is_nmu')
is_debian_maintainer = bool_field('is_debian_maintainer')
def __str__(self):
# FIXME: this variable name sucks.
s = 'already' if self.in_debian else 'not'
return 'Package is %s in debian' % s
class DebianQAPlugin(QAPlugin):
""" Plugin for Debian QA tests """
@property
def _in_debian(self):
try:
self.qa_page = urllib2.urlopen('http://packages.qa.debian.org/%s' % self.changes['Source'])
self.qa_page = urllib2.urlopen('http://packages.qa.debian.org/%s'
% self.changes['Source'])
except urllib2.HTTPError:
self.in_debian = False
return False
else:
self.in_debian = True
self.parsed_qa = lxml.etree.fromstring(self.qa_page.read())
return True
def _qa_xpath(self, query, item = None):
"""Perform the xpath query on the given item"""
......@@ -75,13 +91,7 @@ class DebianPlugin(BasePlugin):
Finds whether the package is in Debian.
"""
log.debug('Testing whether the package is in Debian already')
if self.in_debian:
self.outcome = "Package is already in Debian"
else:
self.outcome = "Package is not in Debian"
self.data["in-debian"] = self.in_debian
self.test_result.in_debian=self._in_debian
def _test_last_upload(self):
"""
......@@ -94,7 +104,7 @@ class DebianPlugin(BasePlugin):
if 'Accepted' in self._qa_xpath('xhtml:a/child::text()', item):
last_change = item.text[1:11]
log.debug('Last upload on %s' % last_change)
self.data["latest-upload"] = last_change
self.test_result["latest_upload"] = last_change
return
log.warning('Couldn\'t find last upload date')
......@@ -113,7 +123,7 @@ class DebianPlugin(BasePlugin):
changes = str(self.changes["Changes"]).lower().translate(None, delete_chars).splitlines()
self.data["nmu"] = (
self.test_result.is_nmu = (
any(change.startswith('nonmaintainerupload') for change in changes) or
any(change.startswith('nmu') for change in changes) or
'nmu' in self.changes["Version"]
......@@ -129,8 +139,9 @@ class DebianPlugin(BasePlugin):
self.user_name = ""
self.user_email = ""
if self.user_id is not None:
user = meta.session.query(User).get(self.user_id)
user_id = self.kw.get('user_id', None)
if user_id is not None:
user = meta.session.query(User).get(user_id)
if user is not None:
self.user_name = user.name
......@@ -143,7 +154,7 @@ class DebianPlugin(BasePlugin):
log.debug('Finding out whether the package Maintainer is the Debian Maintainer')
self.data["is-debian-maintainer"] = self.user_name in self.debian_maintainers
self.test_result.is_debian_maintainer = self.user_name in self.debian_maintainers
def _test_has_new_maintainer(self):
......@@ -162,16 +173,14 @@ class DebianPlugin(BasePlugin):
# TODO
@importercmd
def test_qa(self):
"""Run the Debian QA tests"""
self._in_debian()
self.outcome = ""
self.data = {}
self.test_result = self.new_test_result()
self._test_package_in_debian()
if self.in_debian:
if self._in_debian:
self._test_last_upload()
self._test_is_nmu()
self._get_debian_maintainer_data()
......@@ -179,7 +188,10 @@ class DebianPlugin(BasePlugin):
self._test_has_new_maintainer()
self._test_previous_sponsors()
self.info(self.outcome, self.data)
plugin = DebianPlugin
plugin = DebianQAPlugin
models = [
DebianqaTest,
]
template = 'debianqa'
......@@ -43,12 +43,31 @@ import subprocess
import logging
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
log = logging.getLogger(__name__)
@test_result
class DiffCleanTest(PluginResult):
""" Result of the diffclean QA test """
dirty = bool_field('dirty')
diff_file = bool_field('diff_file')
severity = int_field('severity')
def __str__(self):
outcome = 'Diff file is {} clean'.format('not' if self.dirty else '')
return outcome
class DiffFile(PluginResult):
filename = string_field('filename')
stats = string_field('stats')
class DiffCleanPlugin(BasePlugin):
@importercmd
def test_diff_clean(self):
"""
Check to make sure the diff.gz is clean.
......@@ -58,28 +77,30 @@ class DiffCleanPlugin(BasePlugin):
difffile = self.changes.get_diff()
if difffile is None or not difffile.endswith('.diff.gz'):
log.warning('Package has no diff.gz file; native or format 3.0 package?')
log.warning('Package has no diff.gz file;'
'native or format 3.0 package?')
return
diffstat = subprocess.Popen(["diffstat", "-p1", difffile], stdout=subprocess.PIPE).communicate()[0]
diffstat = subprocess.Popen(["diffstat", "-p1", difffile],
stdout=subprocess.PIPE).communicate()[0]
data = {
"dirty": False,
"modified-files": [],
}
test_result = self.new_test_result()
# Last line is the summary line
for item in diffstat.splitlines()[:-1]:
filename, stats = [i.strip() for i in item.split("|")]
if not filename.startswith('debian/'):
data["dirty"] = True
data["modified-files"].append((filename, stats))
test_resut.dirty = True
self.new_result(DiffFile, filename=filename, stats=stats)
if not data["dirty"]:
if not test_result.dirty:
log.debug('Diff file %s is clean' % difffile)
self.passed("The package's .diff.gz does not modify files outside of debian/", data, constants.PLUGIN_SEVERITY_INFO)
else:
log.error('Diff file %s is not clean' % difffile)
self.failed("The package's .diff.gz modifies files outside of debian/", data, constants.PLUGIN_SEVERITY_WARNING)
test_result.severity = constants.PLUGIN_SEVERITY_WARNING
plugin = DiffCleanPlugin
models = [
DiffCleanTest,
DiffFile,
]
......@@ -41,7 +41,17 @@ import logging
import os
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
@test_result
class DistributionTest(PluginResult):
is_unreleased = bool_field('unreleased')
def __str__(self):
if is_unreleased:
return 'Package uploaded for the unreleased distribution'
else:
return None
class DistributionPlugin(BasePlugin):
......@@ -50,14 +60,13 @@ class DistributionPlugin(BasePlugin):
Checks whether the package is for the UNRELEASED distribution
"""
data = {
"is-unreleased": False,
}
distribution = self.changes["Distribution"]
if distribution.lower() == "unreleased":
data["is-unreleased"] = True
self.failed("Package uploaded for the UNRELEASED distribution", data, constants.PLUGIN_SEVERITY_ERROR)
self.new_test_result(is_unreleased=True,
severity=constants.PLUGIN_SEVERITY_ERROR)
plugin = DistributionPlugin
models = [
DistributionTest
]
......@@ -6,6 +6,7 @@
#
# Copyright © 2008 Jonny Lamb <jonny@debian.org>
# Copyright © 2012 Nicolas Dandrimont <Nicolas.Dandrimont@crans.org>
# Copyright © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
......@@ -36,20 +37,92 @@ __author__ = 'Jonny Lamb'
__copyright__ = ', '.join([
'Copyright © 2008 Jonny Lamb',
'Copyright © 2012 Nicolas Dandrimont',
'Copyright © 2012 Clément Schreiner',
])
__license__ = 'MIT'
from collections import defaultdict
import subprocess
import logging
from collections import defaultdict, namedtuple
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
from debexpo.model import meta
log = logging.getLogger(__name__)
class LintianPlugin(BasePlugin):
LintianSeverity = namedtuple('LintianSeverity',
['str', 'int', 'plugin_severity'])
severities = {
'E': LintianSeverity('Package has lintian errors', 5,
constants.PLUGIN_SEVERITY_ERROR),
'W': LintianSeverity('Package has lintian warnings', 4,
constants.PLUGIN_SEVERITY_WARNING),
'I': LintianSeverity('Package has lintian informational warnings', 3,
constants.PLUGIN_SEVERITY_INFO),
'O': LintianSeverity('Package has overridden lintian tags', 2,
constants.PLUGIN_SEVERITY_INFO),
'P': LintianSeverity('Package has lintian pedantic tags', 1,
constants.PLUGIN_SEVERITY_INFO),
'X': LintianSeverity('Package has lintian experimental tags', 0,
constants.PLUGIN_SEVERITY_INFO),
'': LintianSeverity('Package is lintian clean', -1,
constants.PLUGIN_SEVERITY_INFO),
}
@test_result
class LintianTest(PluginResult):
""" Summary of the lintian results """
# -1 -> package is clean
max_lintian_severity = string_field('max_lintian_severity')
severity = int_field('severity')
def get_tags(self):
"""
Returns the LintianTag objects for this package, as a
dictionary.
"""
# Yes, three levels of defaultdict and one of list...
# FIXME: how exactly is this new API better? :/
def defaultdict_defaultdict_list():
def defaultdict_list():
return defaultdict(list)
return defaultdict(defaultdict_list)
lintian_warnings = defaultdict(defaultdict_defaultdict_list)
q = meta.session.query(LintianWarning)
q = q.filter_by(package_version_id=self.package_version.id)
for w in q.all():
lintian_warnings[w.package][w.severity][w.tag].append(w.data)
return lintian_warnings
def __str__(self):
return severities[self.max_lintian_severity].str
class LintianWarning(PluginResult):
""" A lintian warning found for the package """
package = string_field('package')
severity = string_field('severity')
tag = string_field('tag')
data = string_field('data')
def __str__(self):
return 'Lintian warning ({severity}): [{tag}] {data}'.format(
severity=self.severity,
tag=self.tag,
data=self.data)
class LintianPlugin(QAPlugin):
""" Runs lintian tests on the package's binaries """
@importercmd
def test_lintian(self):
"""
Method to run lintian on the package.
......@@ -61,18 +134,11 @@ class LintianPlugin(BasePlugin):
"-I",
"--pedantic",
"--show-overrides",
self.changes_file], stdout=subprocess.PIPE).communicate()[0]
self.changes_file],
stdout=subprocess.PIPE).communicate()[0]
items = output.split('\n')
# Yes, three levels of defaultdict and one of list...
def defaultdict_defaultdict_list():
def defaultdict_list():
return defaultdict(list)
return defaultdict(defaultdict_list)
lintian_warnings = defaultdict(defaultdict_defaultdict_list)
lintian_severities = set()
override_comments = []
......@@ -81,7 +147,9 @@ class LintianPlugin(BasePlugin):
if not item:
continue
# lintian output is of the form """SEVERITY: package: lintian_tag [lintian tag arguments]""" or """N: Override comment"""
# lintian output is of the form:
# " SEVERITY: package: lintian_tag [lintian tag arguments] "
# or " N: Override comment "
if item.startswith("N: "):
override_comments.append(item[3:].strip())
continue
......@@ -91,26 +159,27 @@ class LintianPlugin(BasePlugin):
lintian_tag = lintian_tag_data[0]
lintian_data = lintian_tag_data[1:]
if override_comments:
lintian_data.append("(override comment: " + " ".join(override_comments) + ")")
lintian_data.append("(override comment: "
+ " ".join(override_comments) + ")")
override_comments = []
lintian_warnings[package][severity][lintian_tag].append(lintian_data)
severity = constants.PLUGIN_SEVERITY_INFO
if 'E' in lintian_severities:
severity = constants.PLUGIN_SEVERITY_ERROR
outcome = 'Package has lintian errors'
elif 'W' in lintian_severities:
severity = constants.PLUGIN_SEVERITY_WARNING
outcome = 'Package has lintian warnings'
elif 'I' in lintian_severities:
outcome = 'Package has lintian informational warnings'
elif 'O' in lintian_severities:
outcome = 'Package has overridden lintian tags'
elif 'P' in lintian_severities or 'X' in lintian_severities:
outcome = 'Package has lintian pedantic/experimental warnings'
for line in lintian_data:
self.new_result(LintianWarning, package=package,
severity=severity, tag=lintian_tag,
data=line)
if not lintian_severities:
max_lintian_severity = ''
else:
outcome = 'Package is lintian clean'
max_lintian_severity = max(lintian_severities,
key=lambda s: severities[s].int)
severity = severities[max_lintian_severity].plugin_severity
self.failed(outcome, lintian_warnings, severity)
self.new_test_result(severity=severity,
max_lintian_severity=max_lintian_severity)
plugin = LintianPlugin
models = [
LintianTest,
LintianWarning,
]
......@@ -6,6 +6,7 @@
#
# Copyright © 2008 Jonny Lamb <jonny@debian.org>
# Copyright © 2012 Nicolas Dandrimont <Nicolas.Dandrimont@crans.org>
# Copyright © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
......@@ -36,6 +37,7 @@ __author__ = 'Jonny Lamb'
__copyright__ = ', '.join([
'Copyright © 2008 Jonny Lamb',
'Copyright © 2012 Nicolas Dandrimont',
'Copyright © 2012 Clément Schreiner',
])
__license__ = 'MIT'
......@@ -46,55 +48,90 @@ import re
from debian import deb822
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
from debexpo.model import meta
from debexpo.plugins.api import *
from debexpo.model.users import User
log = logging.getLogger(__name__)
class MaintainerEmailPlugin(BasePlugin):
@test_result
class MaintainerEmailTest(PluginResult):
""" Result of the maintaineremail QA test. """
user_is_maintainer = bool_field('user_is_maintainer')
user_in_uploaders = bool_field('user_in_uploaders')
def __str__(self):
if self.user_is_maintainer:
return '"Maintainer" email is the same as the uploader'
elif self.user_in_uploaders:
return 'The uploader is in the package\'s "Uploaders" field'
else:
return 'The uploader is not in the package\'s "Maintainer"' \
' or "Uploaders" fields'
class UploaderEmail(PluginResult):
""" Email address of one of the package's co-maintainers """
def __str__(self):
return self['email']
class MaintainerEmailPlugin(QAPlugin):
@importercmd
def test_maintainer_email(self):
"""
Tests whether the maintainer email is the same as the uploader email.
"""
if self.user_id is not None:
user_id = self.kw.get('user_id', None)
if user_id is None:
log.warning('Could not get the uploader\'s user details from the database')
return
log.debug('Checking whether the maintainer email is the same as the uploader email')
user = meta.session.query(User).get(self.user_id)
user = self.session.query(User).get(user_id)
log.debug('Checking whether the maintainer email is the same as'
' the uploader email')
user = self.package_version.package.user
if user is not None:
maintainer_name, maintainer_email = email.utils.parseaddr(self.changes['Maintainer'])
maintainer_name, maintainer_email = email.utils.parseaddr(
self.changes['Maintainer'])
uploader_emails = []
dsc = deb822.Dsc(file(self.changes.get_dsc()))
if 'Uploaders' in dsc:
for uploader_name, uploader_email in email.utils.getaddresses([dsc['Uploaders']]):
for uploader_name, uploader_email in email.utils.getaddresses(
[dsc['Uploaders']]):
uploader_emails.append(uploader_email)
severity = constants.PLUGIN_SEVERITY_INFO
user_is_maintainer = True
if user.email == maintainer_email:
log.debug('"Maintainer" email is the same as the uploader')
outcome = '"Maintainer" email is the same as the uploader'
elif user.email in uploader_emails:
user_in_uploaders = True
log.debug('The uploader is in the package\'s "Uploaders" field')
outcome = 'The uploader is in the package\'s "Uploaders" field'
else:
log.warning('%s != %s' % (user.email, maintainer_email))
outcome = 'The uploader is not in the package\'s "Maintainer" or "Uploaders" fields'
severity = constants.PLUGIN_SEVERITY_WARNING
user_is_maintainer = False
data = {
'user-is-maintainer': (severity == constants.PLUGIN_SEVERITY_INFO),
'user-email': user.email,
'maintainer-email': maintainer_email,
'uploader-emails': uploader_emails,
}
result = self.new_test_result(severity=severity,
user_is_maintainer=user_is_maintainer,
user_email=user.email,
maintainer_email=maintainer_email)
for uploader in uploaders_emails:
self.new_result(UploaderEmail, email='uploader')
self.failed(outcome, data, severity)
else:
log.warning('Could not get the uploader\'s user details from the database')
plugin = MaintainerEmailPlugin
models = [
MaintainerEmailTest,
UploaderEmail,
]
......@@ -6,6 +6,7 @@
#
# Copyright © 2008 Jonny Lamb <jonny@debian.org>
# Copyright © 2012 Nicolas Dandrimont <Nicolas.Dandrimont@crans.org>
# Copyright © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
......@@ -36,18 +37,36 @@ __author__ = 'Jonny Lamb'
__copyright__ = ", ".join([
'Copyright © 2008 Jonny Lamb',
'Copyright © 2012 Nicolas Dandrimont',
'Copyright © 2012 Clément Schreiner',
])
__license__ = 'MIT'
import logging
from debexpo.lib import constants, filesystem
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
log = logging.getLogger(__name__)
class NativePlugin(BasePlugin):
@test_result
class NativeTest(PluginResult):
"""
Result of the 'native' plugin for a package.
"""
is_native = bool_field('native')
def __str__(self):
return 'Package is %s native' % ('' if self.is_native else 'not')
class NativePlugin(QAPlugin):
"""
Plugin checking whether a package is a native package.
"""
@importercmd
def test_native(self):
"""
Test to see whether the package is a native package.
......@@ -55,15 +74,22 @@ class NativePlugin(BasePlugin):
log.debug('Checking whether the package is native or not')
filecheck = filesystem.CheckFiles()
native = filecheck.is_native_package(self.changes)
is_native = filecheck.is_native_package(self.changes)
if native:
result = self.new_test_result()
if is_native:
# Most uploads will not be native, and especially on mentors, a native
# package is almost probably in error.
log.warning('Package is native')
self.failed('Package is native', {"native": True}, constants.PLUGIN_SEVERITY_WARNING)
result['severity'] = constants.PLUGIN_SEVERITY_WARNING
else:
log.debug('Package is not native')
self.passed('Package is not native', {"native": False}, constants.PLUGIN_SEVERITY_INFO)
result['severity'] = constants.PLUGIN_SEVERITY_INFO
result.is_native = is_native
plugin = NativePlugin
models = [
NativeTest,
]
......@@ -44,11 +44,34 @@ import logging
import os
from debexpo.lib import constants
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
log = logging.getLogger(__name__)
class WatchFilePlugin(BasePlugin):
@test_result
class WatchfileTest(PluginResult):
""" Result of the watchfile QA test """
watch_file_present = bool_field('watch_file_present')
watch_file_works = bool_field('watch_file_workds')
uscan_output = string_field('uscan_output')
latest_upstream = string_field('latest_upstream')
def __str__(self):
if not self.watch_file_present:
outcome = 'Watch file is not present'
elif not self.watch_file_works:
outcome = "A watch file is present but doesn't work"
else:
outcome = 'Package is {} the latest upstream version'.format(
'' if self.latest_upstream else 'not')
return outcome
class WatchFilePlugin(QAPlugin):
def _watch_file_present(self):
return os.path.isfile(os.path.join('extracted', 'debian', 'watch'))
......@@ -56,7 +79,8 @@ class WatchFilePlugin(BasePlugin):
def _run_uscan(self):
if not hasattr(self, 'status') and not hasattr(self, 'output'):
os.chdir('extracted')
call = subprocess.Popen(["uscan", "--verbose", '--report'], stdout=subprocess.PIPE)
call = subprocess.Popen(["uscan", "--verbose", '--report'],
stdout=subprocess.PIPE)
(self.output, _) = call.communicate()
self.status = call.returncode
os.chdir('..')
......@@ -65,46 +89,41 @@ class WatchFilePlugin(BasePlugin):
self._run_uscan()
return (self.output.find('Newest version on remote site is') != -1)
@importercmd
def test_uscan(self):
"""
Run the watch file-related checks in the package
"""
data = {
"watch-file-present": False,
}
log.debug('Checking to see whether there is a watch file in the package')
log.debug('Checking to see whether there is'
'a watch file in the package')
test_result = self.new_test_result()
if self._watch_file_present():
log.debug('Watch file present')
data["watch-file-present"] = True
test_result.watch_file_present = True
else:
log.warning('Watch file not present')
self.failed('Watch file is not present', data, constants.PLUGIN_SEVERITY_WARNING)
test_result.watch_file_present = False
return
if self._watch_file_works():
log.debug('Watch file works')
data["watch-file-works"] = True
data["uscan-output"] = self.output
else:
log.warning('Watch file does not work')
data["watch-file-works"] = False
data["uscan-output"] = self.output
self.failed("A watch file is present but doesn't work", data, constants.PLUGIN_SEVERITY_WARNING)
test_result.watch_file_works = self._watch_file_works()
test_result.uscan_output = self.output
if not test_result.watch_file_works:
log.debug('Watch file does not work')
test_result.severity = constants.PLUGIN_SEVERITY_WARNING
return
log.debug('Looking whether there is a new upstream version')
if self.status == 1:
test_result.latest_upstream = self.status == 1
if test_result.latest_upstream:
log.debug('Package is the latest upstream version')
data["latest-upstream"] = True
self.passed('Package is the latest upstream version', data, constants.PLUGIN_SEVERITY_INFO)
else:
log.warning('Package is not the latest upstream version')
data["latest-upstream"] = False
self.failed('Package is not the latest upstream version', data, constants.PLUGIN_SEVERITY_WARNING)
test_result.severity = constants.PLUGIN_SEVERITY_WARNING
return
plugin = WatchFilePlugin
plugin = WatchFilePlugin
models = [WatchfileTest]
......@@ -37,13 +37,24 @@ __license__ = 'MIT'
import logging
from debexpo.plugins import BasePlugin
from debexpo.plugins.api import *
from debexpo.lib import constants
log = logging.getLogger(__name__)
class UbuntuVersionPlugin(BasePlugin):
@test_result
class UbuntuVersionTest(PluginResult):
in_ubuntu = bool_field('in_ubuntu')
def __str__(self):
return 'The uploaded package {verb} "ubuntu" in the version'.format(
'has' if self.in_ubuntu else 'does not have')
class UbuntuVersionPlugin(QAPlugin):
@importercmd
def test_ubuntu_version(self):
"""
Checks whether the word "ubuntu" exists in the package name.
......@@ -51,10 +62,13 @@ class UbuntuVersionPlugin(BasePlugin):
log.debug('Checking whether the package version contains the word "ubuntu"')
if 'ubuntu' in self.changes['Version']:
log.error('Package has ubuntu in the version')
self._new_result(Test, in_ubuntu=True,
severity=constants.PLUGIN_SEVERITY_CRITICAL)
else:
log.debug('Package does not have ubuntu in the version')
# This isn't even worth setting an outcome.
else:
log.error('Package has ubuntu in the version')
self.failed('The uploaded package has "ubuntu" in the version', None, constants.PLUGIN_SEVERITY_CRITICAL)
plugin = UbuntuVersionPlugin
models = [UbuntuVersionTest]
template = 'default'
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment