Commit 932e6f10 authored by Enrico Zini's avatar Enrico Zini
Browse files

Implemented key download, and storing key material in the database

parent d78ebfbd
......@@ -14,9 +14,14 @@ import os.path
import subprocess
from collections import namedtuple
from backend.utils import StreamStdoutKeepStderr, require_dir
from backend.models import FingerprintField
import time
import re
import datetime
import shutil
import tempfile
from six.moves.urllib.parse import urlencode
import requests
import logging
log = logging.getLogger(__name__)
......@@ -35,6 +40,47 @@ KEYRING_MAINT_GIT_REPO = getattr(settings, "KEYRING_MAINT_GIT_REPO", "data/keyri
Uid = namedtuple("Uid", ("name", "email", "comment"))
class KeyManager(models.Manager):
def download(self, fpr):
"""
Download key material by fingerprint, returning its ASCII-armored version.
It passes the result to GPG to validate at least that there is key
material with the right fingerprint.
"""
url = "http://{server}/pks/lookup?{query}".format(
server=KEYSERVER,
query=urlencode({
"op": "get",
"search": "0x" + fpr,
"exact": "on",
"options": "mr",
}))
res = requests.get(url)
text = res.text.splitlines()
if not text: raise RuntimeError("empty response from key server")
if text[0] != "-----BEGIN PGP PUBLIC KEY BLOCK-----": raise RuntimeError("downloaded key material has invalid begin line")
if text[-1] != "-----END PGP PUBLIC KEY BLOCK-----": raise RuntimeError("downloaded key material has invalid end line")
homedir = tempfile.mkdtemp(dir=KEYRINGS_TMPDIR)
try:
gpg = GPG(homedir=homedir)
gpg.run_checked(["--import"], input=res.text)
return gpg.run_checked(["--export", "-a", fpr])
finally:
shutil.rmtree(homedir)
class Key(models.Model):
fpr = FingerprintField(verbose_name="OpenPGP key fingerprint", max_length=40, unique=True)
key = models.TextField(verbose_name="ASCII armored key material")
key_updated = models.DateTimeField(verbose_name="Datetime when the key material was downloaded")
keycheck = models.TextField(verbose_name="keycheck results (JSON)", blank=True)
keycheck_update = models.DateTimeField(verbose_name="Datetime when the keycheck data was computed", null=True)
objects = KeyManager()
class GPG(object):
def __init__(self, homedir=None):
self.homedir = homedir
......@@ -102,6 +148,17 @@ class GPG(object):
result = proc.wait()
return stdout, stderr, result
def run_checked(self, args, input=None):
"""
Run gpg with the given command, waiting for its completion, returning stdout.
In case gpg returns nonzero, raises a RuntimeError that includes stderr.
"""
stdout, stderr, result = self.run_cmd(self.cmd(*args), input)
if result != 0:
raise RuntimeError("gpg exited with status %d: %s" % (result, stderr.strip()))
return stdout
def pipe_cmd(self, cmd):
"""
Run gpg with the given command, returning a couple
......
......@@ -29,6 +29,10 @@ class LookupTest(TestCase):
self.assertFalse(kmodels.is_dd_nu(fpr))
class TestKeycheck(TestCase):
def test_download(self):
encoded = kmodels.Key.objects.download("1793D6AB75663E6BF104953A634F4BD1E7AD5568")
self.assertTrue(encoded.startswith("-----BEGIN PGP PUBLIC KEY BLOCK-----"))
def test_keycheck(self):
c = Client()
response = c.get(reverse("keyring_keycheck", kwargs={"fpr": "1793D6AB75663E6BF104953A634F4BD1E7AD5568"}))
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment