Commit 40a9877e authored by Clément Schreiner's avatar Clément Schreiner
Browse files

Merge branch 'gpg-rewrite'

Conflicts:
	debexpo/controllers/my.py
	debexpo/lib/gnupg.py
parents bd18af74 5fd73540
......@@ -42,7 +42,7 @@ import tempfile
from debexpo.lib.base import *
from debexpo.lib import constants, form
from debexpo.lib.schemas import DetailsForm, GpgForm, PasswordForm, OtherDetailsForm, MetricsForm, DmupForm
from debexpo.lib.gnupg import GnuPG
from debexpo.lib.utils import get_gnupg
from debexpo.model import meta
from debexpo.model.users import User
......@@ -68,7 +68,7 @@ class MyController(BaseController):
"""
c.config = config
self.user = None
self.gnupg = GnuPG()
self.gnupg = get_gnupg()
def _details(self):
"""
......@@ -96,14 +96,13 @@ class MyController(BaseController):
Handles a user submitting the GPG form.
"""
log.debug('GPG form validated successfully')
self.gpg = GnuPG()
# Should the key be deleted?
if self.form_result['delete_gpg'] and self.user.gpg is not None:
keyid = self.gnupg.extract_key_id(self.user.gpg_id)
keyid = self.gnupg.string2key(self.user.gpg_id).id
log.debug('Deleting current GPG key %s' % (keyid))
(out, err) = self.gnupg.remove_signature(keyid)
if err != 0:
result = self.gnupg.remove_signature(keyid)
if result.code != 0:
log.error("gpg failed to delete keyring: %s" % (out))
abort(500)
self.user.gpg = None
......@@ -113,17 +112,16 @@ class MyController(BaseController):
if 'gpg' in self.form_result and self.form_result['gpg'] is not None:
log.debug('Setting a new GPG key')
self.user.gpg = self.form_result['gpg'].value
(self.user.gpg_id, _) = self.gnupg.parse_key_id(self.user.gpg)
temp = tempfile.NamedTemporaryFile(delete=True)
temp.write(self.user.gpg)
temp.flush()
(out, err) = self.gpg.add_signature(temp.name)
temp.close()
if err != 0:
log.error("gpg failed to import keyring: %s" % (out))
(key, uids) = self.gnupg.parse_key_block(self.user.gpg)
self.user.gpg_id = key.id
result = self.gnupg.add_signature(data=self.user.gpg)
log.debug(result.out)
if result.code != 0:
log.error("gpg failed to import keyring: %s" % (result.err))
abort(500)
log.debug(out)
......@@ -223,13 +221,13 @@ class MyController(BaseController):
# set the new value to the 'dmup' boolean in the User object
self.user.dmup = True
meta.session.commit()
log.debug('Changed DMUP acceptance status and redirecting')
redirect(url('my'))
def index(self, get=False):
"""
......@@ -341,7 +339,6 @@ class MyController(BaseController):
user= meta.session.query(User).get(session['user_id'])
data = """I, %s, agree to the the Debian Machine Usage Policies as stated on http://www.debian.org/devel/dmup
""" % user.name # this should be somewhere else
log.debug('Serving DMUP agreement file')
return data
# -*- coding: utf-8 -*-
#
# utils.py — Debexpo utility functions
# gnupg.py — GnuPG wrapper
#
# This file is part of debexpo - https://alioth.debian.org/projects/debexpo/
#
# Copyright © 2008 Serafeim Zanikolas <serzan@hellug.gr>
# 2011 Arno Töll <debian@toell.net>
# © 2011 Arno Töll <debian@toell.net>
# © 2012 Clément Schreiner <clement@mux.me>
#
# Permission is hereby granted, free of charge, to any person
# obtaining a copy of this software and associated documentation
......@@ -32,216 +33,258 @@
Wrapper for a subset of GnuPG functionality.
"""
__author__ = 'Serafeim Zanikolas, Arno Töll'
__copyright__ = 'Copyright © 2008 Serafeim Zanikolas, 2011 Arno Töll'
__author__ = 'Serafeim Zanikolas, Arno Töll, Clément Schreiner'
__copyright__ = ','.join(['Copyright © 2008 Serafeim Zanikolas',
'2011 Arno Töll',
'2012 Clément Schreiner',
])
__license__ = 'MIT'
import logging
import os
import subprocess
import re
import pylons
from collections import namedtuple
log = logging.getLogger(__name__)
class GnuPG(object):
GPG_PATH_NOT_INITIALISED = -1
INVALID_GNUPG_RUN_INVOCATION = -2
#
# Regular expressions for parsing gnupg's output
#
GPG_SIGNATURE_PATTERN = r"^.*Signature made.*using (?P<key_type>\S+) key ID (?P<key_id>\w+)$"
GPG_ADDR_PATTERN = r"^(pub\s+(?P<key_id>\S+)\s+(?P<key_date>\S+)\s|uid\s+)(?P<uid_name>.+)\s+<(?P<uid_email>.+?)>$"
def __init__(self):
"""
Wrapper for certain GPG operations.
Meant to be instantiated only once.
"""
self.gpg_path = pylons.config['debexpo.gpg_path']
#
# Result objects
#
GpgFileSignature = namedtuple('GpgFileSignature',
['is_valid', # boolean: signature status
'key_id',
'key_type',
'data', # plaintext
])
GpgKey = namedtuple('GpgKey', ['id', 'type', 'strength'])
GpgKeyBlock = namedtuple('GpgKeyBlock', ['key', 'user_ids'])
GpgUserId = namedtuple('GpgUserId', ['user', 'email'])
# generic object for other results
GpgResult = namedtuple('GpgResult', ['code', 'out', 'err'])
#
# Exceptions
#
class GpgPathNotInitialised(Exception):
""" GnuPG has not been initialised properly """
class MissingPublicKeyring(Exception):
""" No public keyring has been provided """
class InvalidGnupgRunInvocation(Exception):
""" GnuPG has not been run properly """
class GpgVerifyNoData(Exception):
""" No data has been given to gnupg --decrypt """
class GpgVerifyInvalidData(Exception):
""" Invalid data given to gnupg --decrypt """
class GpgFailure(Exception):
""" Generic exception for errors while running gnupg """
class GpgMissingData(Exception):
""" Some data is missing for the gpg command. """
#
# Main class
#
class GnuPG(object):
""" Wrapper for some GnuPG operations """
def __init__(self, gpg_path=None, default_keyring=None):
self.gpg_path = gpg_path
self.default_keyring = default_keyring
if self.gpg_path is None:
log.error('debexpo.gpg_path is not set in configuration file' +
' (or is set to a blank value)')
print "No gpg"
elif not os.path.isfile(self.gpg_path):
log.error('debexpo.gpg_path refers to a non-existent file')
self.gpg_path = None
elif not os.access(self.gpg_path, os.X_OK):
log.error('debexpo.gpg_path refers to a non-executable file')
self.gpg_path = None
self.default_keyring = pylons.config['debexpo.gpg_keyring']
if self.default_keyring is None:
log.warning('debexpo.gpg_keyring is not set in configuration file' +
' (or is set to a blank value)')
def is_unusable(self):
"""Returns true if the gpg binary is not installed or not executable."""
return self.gpg_path is None
if self.gpg_path is None or self.default_keyring is None:
self.unusable = True
def extract_key_data(self,key,attribute):
@staticmethod
def string2key(s):
"""
Returns the attribute of a given GPG public key.
Attribute can be one of "keyid" or "keystrength"
for example '4096R/8123F27C'
4096 -> key strength
R -> key type
8123F27C -> key id
Returns a GpgKey object.
"""
try:
if attribute == "keyid":
r = key.split("/")[1]
elif attribute == "keystrength":
r = int(key.split("/")[0][:-1])
else:
raise AttributeError
if not r:
raise AttributeError
return r
except (AttributeError, IndexError):
log.error("Failed to extract key data from gpg output: '%s'"
% key)
(tmp, key_id) = s.split('/', 1)
key_strength = int(tmp [:-1])
key_type = tmp[-1]
key = GpgKey(key_id, key_type, key_strength)
return key
def extract_key_id(self, key):
@staticmethod
def key2string(k):
"""
Returns the key id only of a given GPG public key, e.g.:
Reverse function for string2key"
"""
s = "{}{}/{}".format(k.strength,
k.type,
k.id)
return s
1024D/355304E4 -> 355304E4
@property
def is_unusable(self):
"""Returns true if the gpg binary is not installed or not executable."""
return self.gpg_path is None
``key``
A public key output as given by gpg(1)
def verify_file(self, path=None, file_object=None):
"""
return self.extract_key_data(key,"keyid")
def extract_key_strength(self, key):
Check the status of the given's file signature.
If ``path`` is not None, pass it as an argument to gnupg.
Else, if ``file_object`` is not None, pass its content to
gnupg's stdin.
"""
Returns the key strength only of a given GPG public key, e.g.:
1024D/355304E4 -> 1024
# cmd: --decrypt
args = ['--decrypt']
keywords_args = {'pubring': None}
``key``
A public key output as given by gpg(1)
if path is not None and os.path.isfile(path):
args.append(path)
elif file_object is not None:
if file_object.is_closed:
raise GpgVerifyInvalidData
else:
data = file_object.read()
keywords_args['stdin'] = data
else:
raise GpgVerifyNoData
(out, err, code) = self._run(args=args,
**keywords_args)
return self._parse_verify_result(out, err, code)
def _parse_verify_result(self, out, err, code):
if code != 0:
GpgFileSignature(False, None, None, None)
line_err = err.split('\n')[0]
m = re.search(GPG_SIGNATURE_PATTERN, line_err)
if m is not None:
is_valid = True
key_id = m.group('key_id')
key_type = m.group('key_type')
data = out
return GpgFileSignature(is_valid,
key_id,
key_type,
data)
else:
return GpgFileSignature(False, None, None, None)
def parse_key_block(self, data=None, path=None):
"""
return self.extract_key_data(key,"keystrength")
def parse_key_id(self, key, email = None):
Parse a PGP public key block
"""
Returns the key id of the given GPG public key along with a list of user ids.
``key``
ASCII armored GPG public key.
Sample output to be parsed:
stdin = None
args = []
pub 1024D/355304E4 2005-09-13 Serafeim Zanikolas <serzan@hellug.gr>
sub 1024g/C082E9B7 2005-09-13 [expires: 2008-09-12]
if data is not None:
stdin = data
elif path is not None:
args.append(path)
"""
try:
(output, _) = self._run(stdin=key)
output = unicode(output, errors='replace')
lines = (output.split('\n'))
key_id = None
user_ids = []
gpg_addr_pattern = re.compile('(pub\s+\S+\s+\S+\s+|uid\s+)'
'(?P<name>.+)'
'\s+'
'<(?P<email>.+?)>'
'$')
for line in lines:
if not key_id and line.startswith('pub'):
# get only the 2nd column of the 1st matching line
key_id = line.split()[1]
addr_matcher = gpg_addr_pattern.search(line)
if addr_matcher is not None:
user_ids.append( (addr_matcher.group('name'), addr_matcher.group('email')) )
if line.startswith('sub'):
break
return (key_id, user_ids)
except (AttributeError, IndexError):
log.error("Failed to extract key id from gpg output: '%s'"
% output)
def verify_sig(self, signed_file, pubring=None):
"""
Does the same as verify_sig_full() but is meant as compatibility
function which returns a boolean only
else:
raise GpgMissingData
"""
(_, _, status) = self.verify_sig_full(signed_file, pubring)
return status == 0
(out, err, code) = self._run(stdin, args)
return self._parse_key_block_result(out, err, code)
def verify_sig_full(self, signed_file, pubring=None):
"""
Returns a tuple (file output, return code) if the given GPG-signed file
can be verified.
def _parse_key_block_result(self, out, err, code):
if code != 0:
return GpgKeyBlock(None, None)
``signed_file``
path to signed file
``pubring``
path to public key ring (when not specified, the default GPG
setting will be used (~/.gnupg/pubring.gpg))
"""
args = ('--verify', signed_file)
(raw_out, return_code) = self._run(args=args, pubring=pubring)
gpg_addr_pattern = re.compile('"'
'(?P<name>.+)'
'\s+'
'<(?P<email>.+?)>'
'"')
gpg_key_pattern = re.compile(".*Signature made.*using (?P<key_type>\S+) key ID (?P<key_id>\w+)$")
out = {}
out['raw'] = raw_out
# FIXME: use the system's encoding instead of utf-8
out = unicode(out, encoding='utf-8', errors='replace')
lines = (out.split('\n'))
key = None
user_ids = []
for line in raw_out.split("\n"):
# key information
gpg_key_matcher = gpg_key_pattern.search(line)
if gpg_key_matcher is not None:
out['key_type'] = gpg_key_matcher.group('key_type')
out['key_id'] = gpg_key_matcher.group('key_id')
# user names and email address
addr_matcher = gpg_addr_pattern.search(line)
if addr_matcher is not None:
user_ids.append( (addr_matcher.group('name'), addr_matcher.group('email')) )
return (out, user_ids, return_code)
def add_signature(self, signature_file, pubring=None):
for line in lines:
m = re.match(GPG_ADDR_PATTERN, line)
if m is not None:
if (key is None
and m.group('key_id') is not None):
key = self.string2key(m.group('key_id'))
if (m.group('uid_name') is not None
and m.group('uid_email') is not None):
uid_name = m.group('uid_name')
uid_email = m.group('uid_email')
user_id = GpgUserId(uid_name, uid_email)
user_ids.append(user_id)
if key is not None:
return GpgKeyBlock(key, user_ids)
else:
return GpgKeyBlock(None, None)
def add_signature(self, data=None, path=None, pubring=None):
"""
Add the signature(s) within the provided file to the supplied keyring
```signature_file```
A file name containing valid PGP public key data suitable for keyrings
```pubring```
A file name pointing to a keyring. May be empty.
Returns a tuple (file output, return code)
Adds a key's signature to the public keyring.
Returns the triple GpgResult(code, stdout, stderr).
"""
args = ('--import-options', 'import-minimal', '--import', signature_file)
return self._run(args=args, pubring=pubring)
args = ('--import-options', 'import-minimal', '--import')
stdin = None
if data is not None:
stdin = data
elif path is not None:
args.append(path)
else:
raise GpgMissingData
(out, err, code) = self._run(stdin=stdin, args=args, pubring=pubring)
return GpgResult(code, out, err)
def remove_signature(self, keyid, pubring=None):
"""
Remove the signature matching the provided keyid from the supplied keyring
```keyid```
The GnuPG keyid to be removed
```pubring```
A file name pointing to a keyring. May be empty.
Returns a tuple (file output, return code)
Removes a signature from the public keyring
Returns the triple GpgResult(code, stdout, stderr).
"""
args = ('--yes', '--delete-key', keyid)
return self._run(args=args, pubring=pubring)
(out, err, code) = self._run(args=args, pubring=pubring)
return GpgResult(code, out, err)
def _run(self, stdin=None, args=None, pubring=None):
"""
Run gpg with the given stdin and arguments and return the output and
exit status.
Run gpg with the given stdin and arguments and return the output
(stdout and stderr) and exit status.
``stdin``
Feed gpg with this input to stdin
......@@ -253,7 +296,7 @@ class GnuPG(object):
"""
if self.gpg_path is None:
return (None, GnuPG.GPG_PATH_NOT_INITIALISED)
raise GpgPathNotInitialisedException
if pubring is None:
pubring = self.default_keyring
......@@ -266,29 +309,13 @@ class GnuPG(object):
'--secret-keyring', pubring + ".secret",
'--keyring', pubring,
]
if not args is None:
cmd.extend(args)
if args is not None:
cmd.extend(args)
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
output = "\n".join(process.communicate(input=stdin))
process = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
(output, outerr) = process.communicate(input=stdin)
status = process.returncode
return (output, status)
def is_signed(self, signed_file):
"""
Returns true if the given file appears to be GPG signed
``signed_file``
path to a file
"""
try:
f = open(signed_file, 'r')
contents = f.read()
f.close()
except:
log.critical('Could not open %s; continuing' % signed_file)
return False
if contents.startswith('-----BEGIN PGP SIGNED MESSAGE-----'):
return True
return False
return (output, outerr, status)
......@@ -41,6 +41,8 @@ import os
from pylons import config
from debexpo.lib import gnupg
log = logging.getLogger(__name__)
def parse_section(section):
......@@ -104,3 +106,7 @@ def hash_it(s):
if type(s) == unicode:
s = s.encode('utf-8')
return hashlib.md5(s).hexdigest()
def get_gnupg():
return gnupg.GnuPG(config['debexpo.gpg_path'],
config['debexpo.gpg_keyring'])
......@@ -40,7 +40,7 @@ import logging
import tempfile
from debexpo.lib.base import *
from debexpo.lib.gnupg import GnuPG
from debexpo.lib.utils import get_gnupg
from debexpo.model import meta
from debexpo.model.users import User
......@@ -58,7 +58,7 @@ class GpgKey(formencode.validators.FieldStorageUploadConverter):
def __init__(self):
self.gpg_id = None
self.gnupg = GnuPG()
self.gnupg = get_gnupg()
def _to_python(self, value, c):
"""
......@@ -69,18 +69,23 @@ class GpgKey(formencode.validators.FieldStorageUploadConverter):
``c``
"""
if not value.value.startswith('-----BEGIN PGP PUBLIC KEY BLOCK-----'):
log.error('GPG key does not start with BEGIN PGP PUBLIC KEY BLOCK')
raise formencode.Invalid(_('Invalid GPG key'), value, c)
if self.gnupg.is_unusable():
key_block_raw = value.value
key_block_parsed = self.gnupg.parse_key_block(key_block_raw)
if self.gnupg.is_unusable:
log.error('Unable to validate GPG key because gpg is unusable.')
raise formencode.Invalid(_('Internal error: debexpo is not ' +
'properly configured to handle' +
'GPG keys'), value, c)
if key_block_parsed is None:
log.error('Given data is not a valid GPG key')
raise formencode.Invalid(_('Invalid GPG key'), value, c)
(self.gpg_id, user_ids) = self.gnupg.parse_key_id(value.value)
(self.gpg_id, user_ids) = key_block_parsed
if self.gpg_id is None:
log.error("Failed to parse GPG key")
raise formencode.Invalid(_('Invalid GPG key'), value, c)
......@@ -91,19 +96,20 @@ class GpgKey(formencode.validators.FieldStorageUploadConverter):
"""
user = meta.session.query(User).get(session['user_id'])
for (uid, email) in user_ids:
if user.email == email:
for uid in user_ids:
if user.email == uid.email:
break
else:
log.debug("No user id in key %s does match the email address the user configured")
raise formencode.Invalid(_('None of your user IDs in key %s does match your profile mail address' % (self.gpg_id)), value, c)
log.debug("No user id in key %s does match the email address the user configured" % self.gpg_id.id)
raise formencode.Invalid(_('None of your user IDs in key %s does match your profile mail address' % (self.gpg_id.id)), value, c)
"""
Minimum Key Strength Check.
"""
requiredkeystrength = int(config['debexpo.gpg_minkeystrength'])
keystrength = self.gnupg.extract_key_strength(self.key_id())
keystrength = key_block_parsed.key.strength
if keystrength < requiredkeystrength:
log.debug("Key strength unacceptable in Debian Keyring")
raise formencode.Invalid(_('Key strength unacceptable in Debian Keyring. The minimum required key strength is %s bits.' % str(requiredkeystrength)), value, c)
......
......@@ -31,16 +31,17 @@
Test cases for debexpo.lib.gnupg.
"""
__author__ = 'Serafeim Zanikolas'
__author__ = 'Serafeim Zanikolas, Clément Schreiner'
__copyright__ = 'Copyright © 2008 Serafeim Zanikolas'
__license__ = 'MIT'
from unittest import TestCase
import os
from pylons import config
import pylons.test
from debexpo.lib.gnupg import GnuPG
from debexpo.lib.gnupg import GnuPG, GpgUserId
test_gpg_key = \
"""-----BEGIN PGP PUBLIC KEY BLOCK-----
......@@ -85,56 +86,82 @@ test_gpg_key_id = '1024D/355304E4'
class TestGnuPGController(TestCase):
def _get_gnupg(self, gpg_path='/usr/bin/gpg'):
config['debexpo.gpg_path'] = gpg_path
gnupg = GnuPG() # instantiate with new debexpo.gpg_path setting
default_keyring = pylons.test.pylonsapp.config.get('debexpo.gpg_keyring', None)
gnupg = GnuPG(gpg_path, default_keyring)
return gnupg
def _get_data_file(self, name):
gpg_data_dir = os.path.join(os.path.dirname(__file__), 'gpg')
return os.path.join(gpg_data_dir, name)
def testGnuPGfailure1(self):
"""
Test for debexpo.gpg_path being uninitialised.
"""
gnupg = self._get_gnupg(None)
self.assertTrue(gnupg.is_unusable())
self.assertTrue(gnupg.is_unusable)
def testGnuPGfailure2(self):
"""
Test for debexpo.gpg_path pointing to a non-existent file.
"""
gnupg = self._get_gnupg('/non/existent')
self.assertTrue(gnupg.is_unusable())
self.assertTrue(gnupg.is_unusable)
def testGnuPGfailure3(self):
"""
Test for debexpo.gpg_path pointing to a non-executable file.
"""
gnupg = self._get_gnupg('/etc/passwd')
self.assertTrue(gnupg.is_unusable())
self.assertTrue(gnupg.is_unusable)
def testParseKeyID(self):
"""
Test the extraction of key id from a given GPG key.
"""
gnupg = self._get_gnupg()
self.assertFalse(gnupg.is_unusable())
self.assertEqual(gnupg.parse_key_id(test_gpg_key), test_gpg_key_id)
self.assertFalse(gnupg.is_unusable)
parsed_key_block = gnupg.parse_key_block(test_gpg_key)
key_string = gnupg.key2string(parsed_key_block.key)
self.assertEqual(key_string, test_gpg_key_id)
def testParseUserID(self):
"""
Test the extraction of user ids from a given GPG key.
"""
gnupg = self._get_gnupg()
self.assertFalse(gnupg.is_unusable)
parsed_key_block = gnupg.parse_key_block(test_gpg_key)
(k, u) = parsed_key_block
self.assertEqual(u, [GpgUserId('Serafeim Zanikolas',
'serzan@hellug.gr')])
def testParseInvalidKeyBlock(self):
gnupg = self._get_gnupg()
self.assertFalse(gnupg.is_unusable)
invalid_block = self._get_data_file('invalid_key_block')
kb = gnupg.parse_key_block(path=invalid_block)
assert kb.key == None
def testSignatureVerification(self):
"""
Verify the signature in the file debexpo/tests/gpg/signed_by_355304E4.
Verify the signature in the file
debexpo/tests/gpg/signed_by_355304E4.gpg
"""
gnupg = self._get_gnupg()
self.assertFalse(gnupg.is_unusable())
gpg_data_dir = os.path.join(os.path.dirname(__file__), 'gpg')
signed_file = os.path.join(gpg_data_dir, 'signed_by_355304E4.gpg')
pubring = os.path.join(gpg_data_dir, 'pubring_with_355304E4.gpg')
self.assertFalse(gnupg.is_unusable)
signed_file = self._get_data_file('signed_by_355304E4.gpg')
pubring = self._get_data_file('pubring_with_355304E4.gpg')
assert os.path.exists(signed_file)
assert os.path.exists(pubring)
self.assertTrue(gnupg.verify_sig(signed_file, pubring))
verif = gnupg.verify_file(path=signed_file)
self.assertTrue(verif.is_valid)
def testInvalidSignature(self):
"""
Test that verify_sig() fails for an unsigned file.
"""
gnupg = self._get_gnupg()
self.assertFalse(gnupg.is_unusable())
self.assertFalse(gnupg.verify_sig('/etc/passwd'))
self.assertFalse(gnupg.is_unusable)
verif = gnupg.verify_file(path='/etc/passwd')
self.assertFalse(verif.is_valid)
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment