gnupg.py 10.8 KB
Newer Older
1
#   gnupg.py — Debexpo gnupg functions
2
#
Baptiste Beauplat's avatar
Baptiste Beauplat committed
3
4
#   This file is part of debexpo -
#   https://salsa.debian.org/mentors.debian.net-team/debexpo
5
6
#
#   Copyright © 2008 Serafeim Zanikolas <serzan@hellug.gr>
7
#               2011 Arno Töll <debian@toell.net>
8
#               2019 Baptiste Beauplat <lyknode@cilg.org>
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#
#   Permission is hereby granted, free of charge, to any person
#   obtaining a copy of this software and associated documentation
#   files (the "Software"), to deal in the Software without
#   restriction, including without limitation the rights to use,
#   copy, modify, merge, publish, distribute, sublicense, and/or sell
#   copies of the Software, and to permit persons to whom the
#   Software is furnished to do so, subject to the following
#   conditions:
#
#   The above copyright notice and this permission notice shall be
#   included in all copies or substantial portions of the Software.
#
#   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#   EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
#   OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
#   NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
#   HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
#   WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#   FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
#   OTHER DEALINGS IN THE SOFTWARE.

"""
Wrapper for a subset of GnuPG functionality.
"""

import logging
import os
37
import subprocess
38
import re
39
import tempfile
40
from os.path import basename
41
42
43

from django.conf import settings
from django.utils.translation import gettext_lazy as _
44
from debexpo.tools.proc import debexpo_exec
45
46
47

log = logging.getLogger(__name__)

Baptiste Beauplat's avatar
Baptiste Beauplat committed
48

49
class ExceptionGnuPG(Exception):
50
51
52
53
54
    pass


class ExceptionGnuPGMultipleKeys(ExceptionGnuPG):
    pass
55
56


57
58
59
60
61
62
63
64
65
class ExceptionGnuPGPathNotInitialized(ExceptionGnuPG):
    pass


class ExceptionGnuPGNotSignedFile(ExceptionGnuPG):
    pass


class ExceptionGnuPGNoPubKey(ExceptionGnuPG):
66
    def __init__(self, filename, fingerprint, long_id):
67
68
        self.filename = filename
        self.fingerprint = fingerprint
69
        self.long_id = long_id
70
71

    def __str__(self):
72
73
74
        if self.fingerprint:
            return 'Unable to verify file {}. No public key found for key {}' \
                   .format(basename(self.filename), self.fingerprint)
75
        return 'Unable to verify file {}. No public key found for key {}' \
76
               .format(basename(self.filename), self.long_id)
77
78


79
class GnuPG():
80
81
82
83
84
85
    def __init__(self):
        """
        Wrapper for certain GPG operations.

        Meant to be instantiated only once.
        """
86
87
88
        self.gpg_path = settings.GPG_PATH
        self.gpg_home = tempfile.TemporaryDirectory()

89
90
91
92
93
94
95
96
97
98
99
100
101
102
        if self.gpg_path is None:
            log.error('debexpo.gpg_path is not set in configuration file' +
                      ' (or is set to a blank value)')
        elif not os.path.isfile(self.gpg_path):
            log.error('debexpo.gpg_path refers to a non-existent file')
            self.gpg_path = None
        elif not os.access(self.gpg_path, os.X_OK):
            log.error('debexpo.gpg_path refers to a non-executable file')
            self.gpg_path = None

    def is_unusable(self):
        """Returns true if the gpg binary is not installed or not executable."""
        return self.gpg_path is None

103
    def get_keys_data(self):
104
        """
105
        Returns the key object of the given GPG public key.
106

107
108
        ``fingerprints``
            fingerprints of keys to get data for.
109
110

        """
111

112
        try:
113
            (output, status) = self._run(['--list-keys'])
114
            keys = KeyData.read_from_gpg(output.splitlines())
115

116
117
            return list(keys.values())
        except (AttributeError, IndexError):  # pragma: no cover
118
            log.error("Failed to extract key id from gpg output: '%s'"
Baptiste Beauplat's avatar
Baptiste Beauplat committed
119
                      % output)
120

121
    def verify_sig(self, signed_file):
122
        """
123
        Returns the fingerprint that signed the file if the signature is valid.
124
        Otherwise, throws a ExceptionGnuPG exception.
125
126
127
128

        ``signed_file``
             path to signed file
        """
129
130
        args = ['--verify', signed_file]
        (output, status) = self._run(args)
131
        output = output.splitlines()
132

133
134
        err_sig_re = re.compile(r'\[GNUPG:\] ERRSIG (?P<long_id>\w+)'
                                r' .* (?P<fingerprint>[\w-]+)$')
135
        err_sig = list(filter(None, map(err_sig_re.match, output)))
136

137
        if err_sig:
138
139
140
141
142
143
144
            fingerprint = err_sig[0].group('fingerprint')
            long_id = err_sig[0].group('long_id')

            if fingerprint == '-':
                fingerprint = None

            raise ExceptionGnuPGNoPubKey(signed_file, fingerprint, long_id)
145
146
147
148
149

        no_data_re = re.compile(r'\[GNUPG:\] NODATA')
        no_data = list(filter(None, map(no_data_re.match,
                                        output)))
        if no_data:
150
151
            raise ExceptionGnuPGNotSignedFile(
                f'{os.path.basename(signed_file)}: not a GPG signed file')
152
153
154
155
156
157

        valid_sig_re = re.compile(r'\[GNUPG:\] VALIDSIG .*'
                                  r' (?P<fingerprint>\w+)$')
        valid_sig = list(filter(None, map(valid_sig_re.match, output)))

        if not valid_sig:
158
159
160
161
162
            raise ExceptionGnuPG('Unknown GPG error. Output was:'
                                 ' {}'.format(output))

        return valid_sig[0].group('fingerprint')

163
    def import_key(self, data):
164
165
166
        """
        Add the signature(s) within the provided file to the supplied keyring

167
168
        ```data```
            valid PGP public key data suitable for
Baptiste Beauplat's avatar
Baptiste Beauplat committed
169
            keyrings
170

171
        Returns gpg output
172
        """
173
        args = ['--import-options', 'import-minimal', '--import']
174

175
        (output, status) = self._run(args, stdin=data)
176

177
178
179
        if status and output and len(output.splitlines()) > 0:
            raise ExceptionGnuPG(_('Cannot add key:'
                                 ' {}').format(output.splitlines()[0]))
180

181
        return (output, status)
182

183
    def _run(self, args, stdin=None):
184
185
186
187
        """
        Run gpg with the given stdin and arguments and return the output and
        exit status.

188
189
        ``stdin``
            Feed gpg with this input to stdin
190
        ``args``
191
            a list of strings to be passed as argument(s) to gpg
192
193
194
        ``pubring``
            the path to the public gpg keyring. Note that
            ``pubring + ".secret"`` will be used as the private keyring
195
196
197

        """
        if self.gpg_path is None:
198
            raise ExceptionGnuPGPathNotInitialized()
199

200
201
202
203
        output = None

        env = os.environ.copy()
        env['GNUPGHOME'] = self.gpg_home.name
204
205
206

        cmd = [
            '--batch',
207
208
209
            '--no-auto-check-trustdb',
            '--no-options',
            '--no-permission-warning',
210
211
            '--status-fd',
            '1',
212
213
214
215
216
            '--no-tty',
            '--quiet',
            '--trust-model', 'always',
            '--with-colons',
            '--with-fingerprint'
217
            ] + args
218

219
        try:
220
221
            output = debexpo_exec(self.gpg_path, cmd, env=env,
                                  stderr=subprocess.STDOUT,
222
                                  input=str(stdin))
223
224
        except subprocess.CalledProcessError as e:
            return (e.output, e.returncode)
225
226
227
        except subprocess.TimeoutExpired:
            log.warning('gpg: timeout')
            return ('gpg: timeout', -1)
228

229
        return (output, 0)
230
231


232
class KeyData():
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
    """
    Collects data about a key, parsed from gpg --with-colons --fixed-list-mode
    """
    def __init__(self, fpr, pub):
        self.pub = pub
        self.fpr = fpr
        self.uids = {}
        self.subkeys = {}

    def get_uid(self, uid):
        uidfpr = uid[7]
        res = self.uids.get(uidfpr, None)
        if res is None:
            self.uids[uidfpr] = res = Uid(self, uid)
        return res

249
250
251
252
253
254
    def get_algo(self):
        return self.pub[3]

    def get_size(self):
        return self.pub[2]

255
    def add_sub(self, sub, subfpr):
256
257
        self.subkeys[subfpr] = sub

258
259
260
261
262
263
264
    def get_all_uid(self):
        user_ids = []
        for uid_object in self.uids.values():
            uid = uid_object.split()
            user_ids.append((uid['name'], uid['email']))
        return user_ids

265
266
267
268
269
270
271
272
273
274
    @classmethod
    def read_from_gpg(cls, lines):
        """
        Run the given gpg command and read key and signature data from its
        output
        """
        keys = {}
        pub = None
        sub = None
        cur_key = None
275
276
        pub_fpr = None

277
278
279
280
281
282
283
        for lineno, line in enumerate(lines, start=1):
            if line.startswith("pub:"):
                # Keep track of this pub record, to correlate with the following
                # fpr record
                pub = line.split(":")
                sub = None
                cur_key = None
284
285
                pub_fpr = None

286
            elif line.startswith("fpr:"):
287
288
289
                fpr = line.split(":")[9]
                cur_key = keys.get(pub_fpr, None)

290
291
                # Correlate fpr with the previous pub record, and start
                # gathering information for a new key
292
293
294
295
296
297
298
299
300
301
302
303
304
                if pub:
                    if cur_key is None:
                        keys[fpr] = cur_key = cls(fpr, pub)
                    pub_fpr = fpr
                    pub = None

                elif sub and cur_key:
                    cur_key.add_sub(sub, fpr)
                    sub = None

                # Internal parsing of GPG. No tests case covering
                # failure for that.
                else:  # pragma: no cover
305
306
                    raise Exception(f"gpg:{lineno}: found fpr line with no"
                                    " previous pub line")
307
            elif line.startswith("uid:"):
308
                if cur_key is None:  # pragma: no cover
309
310
                    raise Exception(f"gpg:{lineno}: found uid line with no "
                                    "previous pub+fpr lines")
311
                cur_key.get_uid(line.split(":"))
312

313
            elif line.startswith("sub:"):
314
                if cur_key is None:  # pragma: no cover
315
316
                    raise Exception(f"gpg:{lineno}: found sub line with no "
                                    "previous pub+fpr lines")
317
318
319
320
321
                sub = line.split(":")

        return keys


322
class Uid():
323
    """
Baptiste Beauplat's avatar
Baptiste Beauplat committed
324
325
    Collects data about a key uid, parsed from gpg --with-colons
    --fixed-list-mode
326
    """
Baptiste Beauplat's avatar
Baptiste Beauplat committed
327
328
329
    re_uid = re.compile(r"^(?P<name>.+?)"
                        r"\s*(?:\((?P<comment>.+)\))?"
                        r"\s*(?:<(?P<email>.+)>)?$")
330
331
332
333
334
335
336
337

    def __init__(self, key, uid):
        self.key = key
        self.uid = uid
        self.name = uid[9]

    def split(self):
        mo = self.re_uid.match(self.name)
Baptiste Beauplat's avatar
Baptiste Beauplat committed
338
        if not mo:
339
            return None  # pragma: no cover
340
341
342
343
344
        return {
                "name": mo.group("name"),
                "email": mo.group("email"),
                "comment": mo.group("comment"),
        }