gnupg.py 10.8 KB
Newer Older
1
#   gnupg.py — Debexpo gnupg functions
2
#
Baptiste Beauplat's avatar
Baptiste Beauplat committed
3
4
#   This file is part of debexpo -
#   https://salsa.debian.org/mentors.debian.net-team/debexpo
5
6
#
#   Copyright © 2008 Serafeim Zanikolas <serzan@hellug.gr>
7
#               2011 Arno Töll <debian@toell.net>
8
#               2019 Baptiste Beauplat <lyknode@cilg.org>
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#
#   Permission is hereby granted, free of charge, to any person
#   obtaining a copy of this software and associated documentation
#   files (the "Software"), to deal in the Software without
#   restriction, including without limitation the rights to use,
#   copy, modify, merge, publish, distribute, sublicense, and/or sell
#   copies of the Software, and to permit persons to whom the
#   Software is furnished to do so, subject to the following
#   conditions:
#
#   The above copyright notice and this permission notice shall be
#   included in all copies or substantial portions of the Software.
#
#   THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#   EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
#   OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
#   NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
#   HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
#   WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
#   FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
#   OTHER DEALINGS IN THE SOFTWARE.

"""
Wrapper for a subset of GnuPG functionality.
"""

import logging
import os
37
import subprocess
38
import re
39
import tempfile
40
from os.path import basename
41
42
43

from django.conf import settings
from django.utils.translation import gettext_lazy as _
44
from debexpo.tools.proc import debexpo_exec
45
46
47

log = logging.getLogger(__name__)

Baptiste Beauplat's avatar
Baptiste Beauplat committed
48

49
class ExceptionGnuPG(Exception):
50
51
52
53
54
    pass


class ExceptionGnuPGMultipleKeys(ExceptionGnuPG):
    pass
55
56


57
58
59
60
61
62
63
64
65
class ExceptionGnuPGPathNotInitialized(ExceptionGnuPG):
    pass


class ExceptionGnuPGNotSignedFile(ExceptionGnuPG):
    pass


class ExceptionGnuPGNoPubKey(ExceptionGnuPG):
66
    def __init__(self, filename, fingerprint, long_id):
67
68
        self.filename = filename
        self.fingerprint = fingerprint
69
        self.long_id = long_id
70
71

    def __str__(self):
72
73
74
        if self.fingerprint:
            return 'Unable to verify file {}. No public key found for key {}' \
                   .format(basename(self.filename), self.fingerprint)
75
        return 'Unable to verify file {}. No public key found for key {}' \
76
               .format(basename(self.filename), self.long_id)
77
78


79
class GnuPG():
80
81
82
83
84
85
    def __init__(self):
        """
        Wrapper for certain GPG operations.

        Meant to be instantiated only once.
        """
86
87
88
        self.gpg_path = settings.GPG_PATH
        self.gpg_home = tempfile.TemporaryDirectory()

89
90
91
92
93
94
95
96
97
98
99
100
101
102
        if self.gpg_path is None:
            log.error('debexpo.gpg_path is not set in configuration file' +
                      ' (or is set to a blank value)')
        elif not os.path.isfile(self.gpg_path):
            log.error('debexpo.gpg_path refers to a non-existent file')
            self.gpg_path = None
        elif not os.access(self.gpg_path, os.X_OK):
            log.error('debexpo.gpg_path refers to a non-executable file')
            self.gpg_path = None

    def is_unusable(self):
        """Returns true if the gpg binary is not installed or not executable."""
        return self.gpg_path is None

103
    def get_keys_data(self):
104
        """
105
        Returns the key object of the given GPG public key.
106

107
108
        ``fingerprints``
            fingerprints of keys to get data for.
109
110

        """
111

112
        try:
113
            (output, status) = self._run(args=['--list-keys'])
114
            keys = KeyData.read_from_gpg(output.splitlines())
115

116
117
            return list(keys.values())
        except (AttributeError, IndexError):  # pragma: no cover
118
            log.error("Failed to extract key id from gpg output: '%s'"
Baptiste Beauplat's avatar
Baptiste Beauplat committed
119
                      % output)
120

121
    def verify_sig(self, signed_file):
122
        """
123
        Returns the fingerprint that signed the file if the signature is valid.
124
        Otherwise, throws a ExceptionGnuPG exception.
125
126
127
128

        ``signed_file``
             path to signed file
        """
129
        args = ('--verify', signed_file)
130
131
        (output, status) = self._run(args=args)
        output = output.splitlines()
132

133
134
        err_sig_re = re.compile(r'\[GNUPG:\] ERRSIG (?P<long_id>\w+)'
                                r' .* (?P<fingerprint>[\w-]+)$')
135
        err_sig = list(filter(None, map(err_sig_re.match, output)))
136

137
        if err_sig:
138
139
140
141
142
143
144
            fingerprint = err_sig[0].group('fingerprint')
            long_id = err_sig[0].group('long_id')

            if fingerprint == '-':
                fingerprint = None

            raise ExceptionGnuPGNoPubKey(signed_file, fingerprint, long_id)
145
146
147
148
149

        no_data_re = re.compile(r'\[GNUPG:\] NODATA')
        no_data = list(filter(None, map(no_data_re.match,
                                        output)))
        if no_data:
150
151
            raise ExceptionGnuPGNotSignedFile(
                f'{os.path.basename(signed_file)}: not a GPG signed file')
152
153
154
155
156
157

        valid_sig_re = re.compile(r'\[GNUPG:\] VALIDSIG .*'
                                  r' (?P<fingerprint>\w+)$')
        valid_sig = list(filter(None, map(valid_sig_re.match, output)))

        if not valid_sig:
158
159
160
161
162
            raise ExceptionGnuPG('Unknown GPG error. Output was:'
                                 ' {}'.format(output))

        return valid_sig[0].group('fingerprint')

163
    def import_key(self, data):
164
165
166
        """
        Add the signature(s) within the provided file to the supplied keyring

167
168
        ```data```
            valid PGP public key data suitable for
Baptiste Beauplat's avatar
Baptiste Beauplat committed
169
            keyrings
170

171
        Returns gpg output
172
        """
173
        args = ['--import-options', 'import-minimal', '--import']
174

175
        (output, status) = self._run(args=args, stdin=data)
176

177
178
179
        if status and output and len(output.splitlines()) > 0:
            raise ExceptionGnuPG(_('Cannot add key:'
                                 ' {}').format(output.splitlines()[0]))
180

181
        return (output, status)
182

183
    def _run(self, stdin=None, args=None):
184
185
186
187
        """
        Run gpg with the given stdin and arguments and return the output and
        exit status.

188
189
        ``stdin``
            Feed gpg with this input to stdin
190
        ``args``
191
            a list of strings to be passed as argument(s) to gpg
192
193
194
        ``pubring``
            the path to the public gpg keyring. Note that
            ``pubring + ".secret"`` will be used as the private keyring
195
196
197

        """
        if self.gpg_path is None:
198
            raise ExceptionGnuPGPathNotInitialized()
199

200
201
202
203
        output = None

        env = os.environ.copy()
        env['GNUPGHOME'] = self.gpg_home.name
204
205
206

        cmd = [
            '--batch',
207
208
209
            '--no-auto-check-trustdb',
            '--no-options',
            '--no-permission-warning',
210
211
            '--status-fd',
            '1',
212
213
214
215
216
            '--no-tty',
            '--quiet',
            '--trust-model', 'always',
            '--with-colons',
            '--with-fingerprint'
217
            ]
218

Baptiste Beauplat's avatar
Baptiste Beauplat committed
219
220
        if args is not None:
            cmd.extend(args)
221

222
        try:
223
224
            output = debexpo_exec(self.gpg_path, cmd, env=env,
                                  stderr=subprocess.STDOUT,
225
                                  input=str(stdin))
226
227
        except subprocess.CalledProcessError as e:
            return (e.output, e.returncode)
228
229
230
        except subprocess.TimeoutExpired:
            log.warning('gpg: timeout')
            return ('gpg: timeout', -1)
231

232
        return (output, 0)
233
234


235
class KeyData():
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
    """
    Collects data about a key, parsed from gpg --with-colons --fixed-list-mode
    """
    def __init__(self, fpr, pub):
        self.pub = pub
        self.fpr = fpr
        self.uids = {}
        self.subkeys = {}

    def get_uid(self, uid):
        uidfpr = uid[7]
        res = self.uids.get(uidfpr, None)
        if res is None:
            self.uids[uidfpr] = res = Uid(self, uid)
        return res

252
253
254
255
256
257
    def get_algo(self):
        return self.pub[3]

    def get_size(self):
        return self.pub[2]

258
    def add_sub(self, sub, subfpr):
259
260
        self.subkeys[subfpr] = sub

261
262
263
264
265
266
267
    def get_all_uid(self):
        user_ids = []
        for uid_object in self.uids.values():
            uid = uid_object.split()
            user_ids.append((uid['name'], uid['email']))
        return user_ids

268
269
270
271
272
273
274
275
276
277
    @classmethod
    def read_from_gpg(cls, lines):
        """
        Run the given gpg command and read key and signature data from its
        output
        """
        keys = {}
        pub = None
        sub = None
        cur_key = None
278
279
        pub_fpr = None

280
281
282
283
284
285
286
        for lineno, line in enumerate(lines, start=1):
            if line.startswith("pub:"):
                # Keep track of this pub record, to correlate with the following
                # fpr record
                pub = line.split(":")
                sub = None
                cur_key = None
287
288
                pub_fpr = None

289
            elif line.startswith("fpr:"):
290
291
292
                fpr = line.split(":")[9]
                cur_key = keys.get(pub_fpr, None)

293
294
                # Correlate fpr with the previous pub record, and start
                # gathering information for a new key
295
296
297
298
299
300
301
302
303
304
305
306
307
                if pub:
                    if cur_key is None:
                        keys[fpr] = cur_key = cls(fpr, pub)
                    pub_fpr = fpr
                    pub = None

                elif sub and cur_key:
                    cur_key.add_sub(sub, fpr)
                    sub = None

                # Internal parsing of GPG. No tests case covering
                # failure for that.
                else:  # pragma: no cover
308
309
                    raise Exception(f"gpg:{lineno}: found fpr line with no"
                                    " previous pub line")
310
            elif line.startswith("uid:"):
311
                if cur_key is None:  # pragma: no cover
312
313
                    raise Exception(f"gpg:{lineno}: found uid line with no "
                                    "previous pub+fpr lines")
314
                cur_key.get_uid(line.split(":"))
315

316
            elif line.startswith("sub:"):
317
                if cur_key is None:  # pragma: no cover
318
319
                    raise Exception(f"gpg:{lineno}: found sub line with no "
                                    "previous pub+fpr lines")
320
321
322
323
324
                sub = line.split(":")

        return keys


325
class Uid():
326
    """
Baptiste Beauplat's avatar
Baptiste Beauplat committed
327
328
    Collects data about a key uid, parsed from gpg --with-colons
    --fixed-list-mode
329
    """
Baptiste Beauplat's avatar
Baptiste Beauplat committed
330
331
332
    re_uid = re.compile(r"^(?P<name>.+?)"
                        r"\s*(?:\((?P<comment>.+)\))?"
                        r"\s*(?:<(?P<email>.+)>)?$")
333
334
335
336
337
338
339
340

    def __init__(self, key, uid):
        self.key = key
        self.uid = uid
        self.name = uid[9]

    def split(self):
        mo = self.re_uid.match(self.name)
Baptiste Beauplat's avatar
Baptiste Beauplat committed
341
        if not mo:
342
            return None  # pragma: no cover
343
344
345
346
347
        return {
                "name": mo.group("name"),
                "email": mo.group("email"),
                "comment": mo.group("comment"),
        }