Commit 4c33e551 authored by Clément Schreiner's avatar Clément Schreiner
Browse files

Simplified and improved the new gpg wrapper.

 - only one class for using gnupg, as in the original wrapper
 - use exceptions instead of integer constants for errors
 - return values are now namedtuples
parent fcf07611
......@@ -44,113 +44,63 @@ import logging
import os
import subprocess
import re
from collections import namedtuple
log = logging.getLogger(__name__)
#
# Regular expressions for parsing gnupg's output
#
GPG_SIGNATURE_PATTERN = r"^.*Signature made.*using (?P<key_type>\S+) key ID (?P<key_id>\w+)$"
GPG_ADDR_PATTERN = r"^(pub\s+(?P<key_id>\S+)\s+(?P<key_date>\S+)\s|uid\s+)(?P<uid_name>.+)\s+<(?P<uid_email>.+?)>$"
class GpgFile(object):
GPG_ARGS = ['--decrypt']
def __init__(self, gpg, data=None, filename=None):
""" Loads a file and verifies its PGP signature """
self.gpg = gpg
self.key_id = None
self.key_type = None
self.data = None
self.is_signed = False
self.valid = False
self.loaded = False
if data is not None:
self._load_data(data)
elif filename is not None:
self._load_file(filename)
def _load_data(self, data):
(out, err, code) = self.gpg.run(args=GpgFile.GPG_ARGS,
stdin=data,
pubring=None)
self._parse_gpg_result(out, err, code)
self.loaded = True
def _load_file(self, filename):
args = GpgFile.GPG_ARGS
args.append(filename)
(out, err, code) = self.gpg.run(args=args,
pubring=None)
self._parse_gpg_result(out, err, code)
self.loaded = True
def _parse_gpg_result(self, out, err, code):
if code != 0:
return
line_err = err.split('\n')[0]
m = re.search(GPG_SIGNATURE_PATTERN, line_err)
if m is not None:
self.is_signed = True
self.key_id = m.group('key_id')
self.key_type = m.group('key_type')
self.valid = True
self.data = out
#
# Result objects
#
class GpgKey(object):
def __init__(self, gpg, data=None):
""" Loads a PGP public key block """
GpgFileSignature = namedtuple('GpgFileSignature', ['is_valid', # boolean: signature status
'key_id',
'key_type',
'data', # plaintext
])
self.gpg = gpg
GpgKey = namedtuple('GpgKey', ['id', 'type', 'strength'])
self.key_strength = None
self.key_type = None
self.key_id = None
GpgKeyBlock = namedtuple('GpgKeyBlock', ['key', 'user_ids'])
self.key_date = None
self.user_ids = []
self.loaded = False
self.valid = False
GpgUserId = namedtuple('GpgUserId', ['user', 'email'])
if data is not None:
self._load_data(data)
# generic object for other results
GpgResult = namedtuple('GpgResult', ['code', 'out', 'err'])
def _load_data(self, data):
(out, err, code) = self.gpg.run(stdin=data)
self._parse_gpg_result(out, err, code)
self.loaded = True
def _parse_gpg_result(self, out, err, code):
if code != 0:
return
out = unicode(out, encoding='utf-8', errors='replace')
lines = (out.split('\n'))
for line in lines:
m = re.match(GPG_ADDR_PATTERN, line)
if m is not None:
if (self.key_id is None
and m.group('key_id') is not None):
self.parse_key_string(m.group('key_id'))
self.user_ids.append((m.group('uid_name'),
m.group('uid_email')))
#
# Exceptions
#
self.valid = True
class GpgPathNotInitialised(Exception):
""" GnuPG has not been initialised properly """
def parse_key_string(self, s):
"""
for example '4096R/8123F27C'
4096 -> key strength
R -> key type
8123F27C -> key id
"""
(tmp, self.key_id) = s.split('/', 1)
self.key_strength = int(tmp [:-1])
self.key_type = tmp[-1]
class InvalidGnupgRunInvocation(Exception):
""" GnuPG has not been run properly """
class GpgVerifyNoData(Exception):
""" No data has been given to gnupg --decrypt """
class GpgVerityInvalidData(Exception):
""" Invalid data given to gnupg --decrypt """
#
# Main class
#
class GnuPG(object):
GPG_PATH_NOT_INITIALISED = -1
INVALID_GNUPG_RUN_INVOCATION = -2
""" Wrapper for some GnuPG operations """
def __init__(self, gpg_path, default_keyring):
self.gpg_path = gpg_path
......@@ -168,19 +118,127 @@ class GnuPG(object):
if self.default_keyring is None:
print "No keyring"
def add_signature(self, filename, pubring=None):
args = ('--import-options', 'import-minimal', '--import', signature_file)
return self.run(args=args, pubring=pubring)
@property
def is_unusable(self):
"""Returns true if the gpg binary is not installed or not executable."""
return self.gpg_path is None
def verify_file(self, path=None, file_object=None):
"""
Check the status of the given's file signature.
If ``path`` is not None, pass it as an argument to gnupg.
Else, if ``file_object`` is not None, pass its content to
gnupg's stdin.
"""
# cmd: --decrypt
args = ['--decrypt']
keywords_args = {'pubring': None}
if path is not None and os.path.isfile(path):
args.append(path)
elif file_object is not None:
if file_object.is_closed:
raise GpgVerifyInvalidData
else:
data = file_object.read()
keywords_args['stdin'] = data
else:
raise GpgVerifyNoData
(out, err, code) = self._run(args=args,
**keywords_args)
return self._parse_verify_result(out, err, code)
def _parse_verify_result(self, out, err, code):
if code != 0:
return
line_err = err.split('\n')[0]
m = re.search(GPG_SIGNATURE_PATTERN, line_err)
if m is not None:
is_valid = True
key_id = m.group('key_id')
key_type = m.group('key_type')
data = out
return GpgFileSignature(is_valid,
key_id,
key_type,
data)
else:
return GpgFileSignature(False, None, None, None)
def parse_key(self, data):
"""
Parse a PGP public key block
"""
(out, err, code) = self._run(stdin=data)
return self._parse_key_result(out, err, code)
def _parse_key_result(self, out, err, code):
if code != 0:
return GpgKey
# FIXME: use the system's encoding instead of utf-8
out = unicode(out, encoding='utf-8', errors='replace')
lines = (out.split('\n'))
key = None
for line in lines:
m = re.match(GPG_ADDR_PATTERN, line)
if m is not None:
user_ids = []
if (key is None
and m.group('key_id') is not None):
key = self.parse_key_string(m.group('key_id'))
uid_name = m.group('uid_name')
uid_email = m.group('uid_email')
user_id = GpgUserId(uid_name, uid_email)
user_ids.append(GpgUserId)
if key is not None,
return GpgKey(key, user_ids)
else:
return GpgKey(None, None)
def add_signature(self, filename pubring=None):
"""
Adds a key's signature to the public keyring.
Returns the triple (stdout, stderr, return code).
"""
args = ('--import-options', 'import-minimal', '--import', filename)
(out, err, code) = self._run(args=args, pubring=pubring)
return GpgResult(code, out, err)
def remove_signature(self, keyid, pubring=None):
"""
Removes a signature from the public keyring
Returns the triple (stdout, stderr, return code).
"""
args = ('--yes', '--delete-key', keyid)
return self.run(args=args, pubring=pubring)
(out, err, code) = self._run(args=args, pubring=pubring)
return GpgResult(code, out, err)
def is_unusable(self):
"""Returns true if the gpg binary is not installed or not executable."""
return self.gpg_path is None
@staticmethod
def parse_key_string(self, s):
"""
for example '4096R/8123F27C'
4096 -> key strength
R -> key type
8123F27C -> key id
Returns a GpgKey object.
"""
(tmp, key_id) = s.split('/', 1)
key_strength = int(tmp [:-1])
key_type = tmp[-1]
key = GpgKey(key_id, key_type, key_strength)
return key
def run(self, stdin=None, args=None, pubring=None):
def _run(self, stdin=None, args=None, pubring=None):
"""
Run gpg with the given stdin and arguments and return the output
(stdout and stderr) and exit status.
......@@ -195,7 +253,7 @@ class GnuPG(object):
"""
if self.gpg_path is None:
return (None, None, GnuPG.GPG_PATH_NOT_INITIALISED)
raise GpgPathNotInitialisedException
if pubring is None:
pubring = self.default_keyring
......@@ -208,7 +266,7 @@ class GnuPG(object):
'--secret-keyring', pubring + ".secret",
'--keyring', pubring,
]
if not args is None:
if args is not None:
cmd.extend(args)
process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment