gnupg.py 10.7 KB
Newer Older
1
#   gnupg.py — Debexpo gnupg functions
2
#
Baptiste Beauplat's avatar
Baptiste Beauplat committed
3
4
#   This file is part of debexpo -
#   https://salsa.debian.org/mentors.debian.net-team/debexpo
5
6
#
#   Copyright © 2008 Serafeim Zanikolas <serzan@hellug.gr>
7
#               2011 Arno Töll <debian@toell.net>
8
#               2019 Baptiste Beauplat <lyknode@cilg.org>
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#
#   Permission is hereby granted, free of charge, to any person
#   obtaining a copy of this software and associated documentation
#   files (the "Software"), to deal in the Software without
#   restriction, including without limitation the rights to use,
#   copy, modify, merge, publish, distribute, sublicense, and/or sell
#   copies of the Software, and to permit persons to whom the
#   Software is furnished to do so, subject to the following
#   conditions:
#
#   The above copyright notice and this permission notice shall be
#   included in all copies or substantial portions of the Software.
#
#   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#   EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
#   OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
#   NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
#   HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
#   WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#   FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
#   OTHER DEALINGS IN THE SOFTWARE.

"""
Wrapper for a subset of GnuPG functionality.
"""

import logging
import os
37
import subprocess
38
import re
39
import tempfile
40
from os.path import basename
41
42
43

from django.conf import settings
from django.utils.translation import gettext_lazy as _
44
from debexpo.tools.proc import debexpo_exec
45
46
47

log = logging.getLogger(__name__)

Baptiste Beauplat's avatar
Baptiste Beauplat committed
48

49
class ExceptionGnuPG(Exception):
50
51
52
53
54
    pass


class ExceptionGnuPGMultipleKeys(ExceptionGnuPG):
    pass
55
56


57
58
59
60
61
62
63
64
65
class ExceptionGnuPGPathNotInitialized(ExceptionGnuPG):
    pass


class ExceptionGnuPGNotSignedFile(ExceptionGnuPG):
    pass


class ExceptionGnuPGNoPubKey(ExceptionGnuPG):
66
    def __init__(self, filename, fingerprint, long_id):
67
68
        self.filename = filename
        self.fingerprint = fingerprint
69
        self.long_id = long_id
70
71

    def __str__(self):
72
73
74
        if self.fingerprint:
            return 'Unable to verify file {}. No public key found for key {}' \
                   .format(basename(self.filename), self.fingerprint)
75
        return 'Unable to verify file {}. No public key found for key {}' \
76
               .format(basename(self.filename), self.long_id)
77
78


79
class GnuPG():
80
81
82
83
84
85
    def __init__(self):
        """
        Wrapper for certain GPG operations.

        Meant to be instantiated only once.
        """
86
87
88
        self.gpg_path = settings.GPG_PATH
        self.gpg_home = tempfile.TemporaryDirectory()

89
90
91
92
93
94
95
96
97
98
99
100
101
102
        if self.gpg_path is None:
            log.error('debexpo.gpg_path is not set in configuration file' +
                      ' (or is set to a blank value)')
        elif not os.path.isfile(self.gpg_path):
            log.error('debexpo.gpg_path refers to a non-existent file')
            self.gpg_path = None
        elif not os.access(self.gpg_path, os.X_OK):
            log.error('debexpo.gpg_path refers to a non-executable file')
            self.gpg_path = None

    def is_unusable(self):
        """Returns true if the gpg binary is not installed or not executable."""
        return self.gpg_path is None

103
    def get_keys_data(self):
104
        """
105
        Returns the key object of the given GPG public key.
106

107
108
        ``fingerprints``
            fingerprints of keys to get data for.
109
110

        """
111

112
        try:
113
            (output, status) = self._run(['--list-keys'])
114
            keys = KeyData.read_from_gpg(output.splitlines())
115

116
117
            return list(keys.values())
        except (AttributeError, IndexError):  # pragma: no cover
118
            log.error("Failed to extract key id from gpg output: '%s'"
Baptiste Beauplat's avatar
Baptiste Beauplat committed
119
                      % output)
120

121
    def verify_sig(self, signed_file):
122
        """
123
        Returns the fingerprint that signed the file if the signature is valid.
124
        Otherwise, throws a ExceptionGnuPG exception.
125
126
127
128

        ``signed_file``
             path to signed file
        """
129
130
        args = ['--verify', signed_file]
        (output, status) = self._run(args)
131
        output = output.splitlines()
132

133
134
        err_sig_re = re.compile(r'\[GNUPG:\] ERRSIG (?P<long_id>\w+)'
                                r' .* (?P<fingerprint>[\w-]+)$')
135
        err_sig = list(filter(None, map(err_sig_re.match, output)))
136

137
        if err_sig:
138
139
140
141
142
143
144
            fingerprint = err_sig[0].group('fingerprint')
            long_id = err_sig[0].group('long_id')

            if fingerprint == '-':
                fingerprint = None

            raise ExceptionGnuPGNoPubKey(signed_file, fingerprint, long_id)
145
146
147
148
149

        no_data_re = re.compile(r'\[GNUPG:\] NODATA')
        no_data = list(filter(None, map(no_data_re.match,
                                        output)))
        if no_data:
150
151
            raise ExceptionGnuPGNotSignedFile(
                f'{os.path.basename(signed_file)}: not a GPG signed file')
152
153
154
155
156
157

        valid_sig_re = re.compile(r'\[GNUPG:\] VALIDSIG .*'
                                  r' (?P<fingerprint>\w+)$')
        valid_sig = list(filter(None, map(valid_sig_re.match, output)))

        if not valid_sig:
158
159
160
161
162
            raise ExceptionGnuPG('Unknown GPG error. Output was:'
                                 ' {}'.format(output))

        return valid_sig[0].group('fingerprint')

163
    def import_key(self, data):
164
165
166
        """
        Add the signature(s) within the provided file to the supplied keyring

167
168
        ```data```
            valid PGP public key data suitable for
Baptiste Beauplat's avatar
Baptiste Beauplat committed
169
            keyrings
170

171
        Returns gpg output
172
        """
173
        args = ['--import-options', 'import-minimal', '--import']
174

175
        (output, status) = self._run(args, stdin=data)
176

177
178
179
        if status and output and len(output.splitlines()) > 0:
            raise ExceptionGnuPG(_('Cannot add key:'
                                 ' {}').format(output.splitlines()[0]))
180

181
        return (output, status)
182

183
    def _run(self, args, stdin=None):
184
185
186
187
        """
        Run gpg with the given stdin and arguments and return the output and
        exit status.

188
189
        ``stdin``
            Feed gpg with this input to stdin
190
        ``args``
191
            a list of strings to be passed as argument(s) to gpg
192
193
194
        ``pubring``
            the path to the public gpg keyring. Note that
            ``pubring + ".secret"`` will be used as the private keyring
195
196
197

        """
        if self.gpg_path is None:
198
            raise ExceptionGnuPGPathNotInitialized()
199

200
201
202
203
        output = None

        env = os.environ.copy()
        env['GNUPGHOME'] = self.gpg_home.name
204
205
206

        cmd = [
            '--batch',
207
208
209
            '--no-auto-check-trustdb',
            '--no-options',
            '--no-permission-warning',
210
211
            '--status-fd',
            '1',
212
213
214
215
216
            '--no-tty',
            '--quiet',
            '--trust-model', 'always',
            '--with-colons',
            '--with-fingerprint'
217
            ] + args
218

219
        try:
220
221
            output = debexpo_exec(self.gpg_path, cmd, env=env,
                                  stderr=subprocess.STDOUT,
222
                                  input=str(stdin))
223
224
        except subprocess.CalledProcessError as e:
            return (e.output, e.returncode)
225
226
227
        except subprocess.TimeoutExpired:
            log.warning('gpg: timeout')
            return ('gpg: timeout', -1)
228

229
        return (output, 0)
230
231


232
class KeyData():
233
234
235
236
237
238
239
240
241
242
243
    """
    Collects data about a key, parsed from gpg --with-colons --fixed-list-mode
    """
    def __init__(self, fpr, pub):
        self.pub = pub
        self.fpr = fpr
        self.uids = {}
        self.subkeys = {}

    def get_uid(self, uid):
        uidfpr = uid[7]
244
        self.uids[uidfpr] = res = Uid(self, uid)
245
246
        return res

247
248
249
250
251
252
    def get_algo(self):
        return self.pub[3]

    def get_size(self):
        return self.pub[2]

253
    def add_sub(self, sub, subfpr):
254
255
        self.subkeys[subfpr] = sub

256
257
258
259
260
261
262
    def get_all_uid(self):
        user_ids = []
        for uid_object in self.uids.values():
            uid = uid_object.split()
            user_ids.append((uid['name'], uid['email']))
        return user_ids

263
264
265
266
267
268
269
270
271
272
    @classmethod
    def read_from_gpg(cls, lines):
        """
        Run the given gpg command and read key and signature data from its
        output
        """
        keys = {}
        pub = None
        sub = None
        cur_key = None
273
274
        pub_fpr = None

275
276
277
278
279
280
281
        for lineno, line in enumerate(lines, start=1):
            if line.startswith("pub:"):
                # Keep track of this pub record, to correlate with the following
                # fpr record
                pub = line.split(":")
                sub = None
                cur_key = None
282
283
                pub_fpr = None

284
            elif line.startswith("fpr:"):
285
286
287
                fpr = line.split(":")[9]
                cur_key = keys.get(pub_fpr, None)

288
289
                # Correlate fpr with the previous pub record, and start
                # gathering information for a new key
290
                if pub:
291
                    keys[fpr] = cur_key = cls(fpr, pub)
292
293
294
295
296
297
298
299
300
301
                    pub_fpr = fpr
                    pub = None

                elif sub and cur_key:
                    cur_key.add_sub(sub, fpr)
                    sub = None

                # Internal parsing of GPG. No tests case covering
                # failure for that.
                else:  # pragma: no cover
302
303
                    raise Exception(f"gpg:{lineno}: found fpr line with no"
                                    " previous pub line")
304
            elif line.startswith("uid:"):
305
                if cur_key is None:  # pragma: no cover
306
307
                    raise Exception(f"gpg:{lineno}: found uid line with no "
                                    "previous pub+fpr lines")
308
                cur_key.get_uid(line.split(":"))
309

310
            elif line.startswith("sub:"):
311
                if cur_key is None:  # pragma: no cover
312
313
                    raise Exception(f"gpg:{lineno}: found sub line with no "
                                    "previous pub+fpr lines")
314
315
316
317
318
                sub = line.split(":")

        return keys


319
class Uid():
320
    """
Baptiste Beauplat's avatar
Baptiste Beauplat committed
321
322
    Collects data about a key uid, parsed from gpg --with-colons
    --fixed-list-mode
323
    """
Baptiste Beauplat's avatar
Baptiste Beauplat committed
324
325
326
    re_uid = re.compile(r"^(?P<name>.+?)"
                        r"\s*(?:\((?P<comment>.+)\))?"
                        r"\s*(?:<(?P<email>.+)>)?$")
327
328
329
330
331
332
333
334

    def __init__(self, key, uid):
        self.key = key
        self.uid = uid
        self.name = uid[9]

    def split(self):
        mo = self.re_uid.match(self.name)
Baptiste Beauplat's avatar
Baptiste Beauplat committed
335
        if not mo:
336
            return None  # pragma: no cover
337
338
339
340
341
        return {
                "name": mo.group("name"),
                "email": mo.group("email"),
                "comment": mo.group("comment"),
        }