diff --git a/debexpo/lib/gnupg.py b/debexpo/lib/gnupg.py index 5088c8ee629343889e2f28b07b6c96c3a4a2d965..0aba4a96097f2e5df30d3bd61862ab0259cdf1e5 100644 --- a/debexpo/lib/gnupg.py +++ b/debexpo/lib/gnupg.py @@ -41,6 +41,7 @@ import os import subprocess import pylons +import re log = logging.getLogger(__name__) @@ -108,11 +109,9 @@ class GnuPG(object): try: (output, _) = self._run(stdin=key) output = unicode(output, errors='replace') - lines = (output.split('\n')) - for line in lines: - if line.startswith('pub'): - # get only the 2nd column of the 1st matching line - return line.split()[1] + keys = KeyData.read_from_gpg(output.splitlines()) + for key in keys.values(): + return key.get_id() except (AttributeError, IndexError): log.error("Failed to extract key id from gpg output: '%s'" % output) @@ -194,16 +193,26 @@ class GnuPG(object): cmd = [ self.gpg_path, - '--no-options', '--batch', + '--keyring', pubring, + '--no-auto-check-trustdb', '--no-default-keyring', + '--no-options', + '--no-permission-warning', + '--no-tty', + '--quiet', '--secret-keyring', pubring + ".secret", - '--keyring', pubring, + '--trust-model', 'always', + '--with-colons', + '--with-fingerprint' ] if not args is None: cmd.extend(args) - process = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE, stdout=subprocess.PIPE) + process = subprocess.Popen(cmd, + stdin=subprocess.PIPE, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE) output = "\n".join(process.communicate(input=stdin)) status = process.returncode @@ -226,3 +235,162 @@ class GnuPG(object): if contents.startswith('-----BEGIN PGP SIGNED MESSAGE-----'): return True return False + + +class GPGAlgo(object): + """ + Static data about GPG PubKey Algo. Extracted from GnuPG source. + + Refs: + Short names: https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=g10/keyid.c;hb=HEAD#l53 + Long names: https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=g10/keyid.c;hb=HEAD#l104 + Ids: https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob;f=common/openpgpdefs.h;hb=HEAD#l157 + """ + gpg_algo = { + '1': {'short': 'R', 'long': 'rsa', 'with_size': True}, + '2': {'short': 'r', 'long': 'rsa', 'with_size': True}, + '3': {'short': 's', 'long': 'rsa', 'with_size': True}, + '16': {'short': 'g', 'long': 'elg', 'with_size': True}, + '17': {'short': 'D', 'long': 'dsa', 'with_size': True}, + '18': {'short': 'e', 'long': 'rsa', 'with_size': True}, + '19': {'short': 'E', 'long': 'cv25519', 'with_size': False}, + '20': {'short': 'G', 'long': 'xxx', 'with_size': True}, + '22': {'short': 'E', 'long': 'ed25519', 'with_size': False}, + } + + @classmethod + def to_short(cls, algo, keysize): + if algo not in cls.gpg_algo: + return None + return '{}{}'.format(keysize, cls.gpg_algo[algo]['short']) + + @classmethod + def to_long(cls, algo, keysize): + if algo not in cls.gpg_algo: + return None + algo_str = '{}'.format(cls.gpg_algo[algo]['long']) + if cls.gpg_algo[algo]['with_size']: + algo_str.append('{}'.format(keysize)) + + +class KeyData(object): + """ + Collects data about a key, parsed from gpg --with-colons --fixed-list-mode + """ + def __init__(self, fpr, pub): + self.pub = pub + self.fpr = fpr + self.uids = {} + self.subkeys = {} + + def get_uid(self, uid): + uidfpr = uid[7] + res = self.uids.get(uidfpr, None) + if res is None: + self.uids[uidfpr] = res = Uid(self, uid) + return res + + def add_sub(self, sub): + subfpr = tuple(sub[3:6]) + self.subkeys[subfpr] = sub + + def format_short_fpr(self, fpr): + return fpr[-8:] + + # Extract an ID of the pub key in its former gpgv1 form: + # Ex. 1024D/355304E4 + def get_id(self): + short_id = GPGAlgo.to_short(self.pub[3], self.pub[2]) + short_fpr = self.format_short_fpr(self.pub[4]) + return "{}/{}".format(short_id, short_fpr) + + def keycheck(self): + return KeycheckKeyResult(self) + + @classmethod + def read_from_gpg(cls, lines): + """ + Run the given gpg command and read key and signature data from its + output + """ + keys = {} + pub = None + sub = None + cur_key = None + cur_uid = None + for lineno, line in enumerate(lines, start=1): + if line.startswith("pub:"): + # Keep track of this pub record, to correlate with the following + # fpr record + pub = line.split(":") + sub = None + cur_key = None + cur_uid = None + elif line.startswith("fpr:"): + # Correlate fpr with the previous pub record, and start + # gathering information for a new key + if pub is None: + if sub is not None: + # Skip fingerprints for subkeys + continue + else: + raise Exception("gpg:{}: found fpr line with no" + + " previous pub line".format(lineno)) + fpr = line.split(":")[9] + cur_key = keys.get(fpr, None) + if cur_key is None: + keys[fpr] = cur_key = cls(fpr, pub) + pub = None + cur_uid = None + elif line.startswith("uid:"): + if cur_key is None: + raise Exception("gpg:{}: found uid line with no previous" + + " pub+fpr lines".format(lineno)) + cur_uid = cur_key.get_uid(line.split(":")) + elif line.startswith("sig:"): + sig = line.split(":") + if sig[1] == "%": + log.debug("gpg:%s: Invalid signature: <%s>", + lineno, + line.strip()) + continue + if cur_uid is None: + raise Exception("gpg:{}: found sig line with no previous" + + " uid line".format(lineno)) + cur_uid.add_sig(sig) + elif line.startswith("sub:"): + if cur_key is None: + raise Exception("gpg:{}: found sub line with no previous" + + " pub+fpr lines".format(lineno)) + sub = line.split(":") + cur_key.add_sub(sub) + + return keys + + +class Uid(object): + """ + Collects data about a key uid, parsed from gpg --with-colons --fixed-list-mode + """ + re_uid = re.compile(r"^(?P.+?)\s*(?:\((?P.+)\))?\s*(?:<(?P.+)>)?$") + + def __init__(self, key, uid): + self.key = key + self.uid = uid + self.name = uid[9] + self.sigs = {} + + def add_sig(self, sig): + # FIXME: missing a full fingerprint, we try to index with as much + # identifying data as possible + k = tuple(sig[3:6]) + self.sigs[k] = sig + + def split(self): + mo = self.re_uid.match(self.name) + if not mo: return None + return { + "name": mo.group("name"), + "email": mo.group("email"), + "comment": mo.group("comment"), + }