Commit fd12e794 authored by Niels Thykier's avatar Niels Thykier Committed by Joerg Jaspert

SignedFile: Assumes bytes as input and produces bytes output

With this, SignedFile work under python3 as well as python2 making
additional tests pass.
Signed-off-by: Niels Thykier's avatarNiels Thykier <>
parent d9a42ea8
......@@ -317,7 +317,7 @@ def read_changes_or_dsc(suite, filename, session=None):
return formatted_text("can't parse .dsc control info")
filecontents = strip_pgp_signature(filename)
filecontents = strip_pgp_signature(filename).decode('utf-8')
keysinorder = []
for l in filecontents.split('\n'):
m = re.match(r'([-a-zA-Z0-9]*):', l)
......@@ -70,7 +70,7 @@ class SignedFile(object):
"""handle files signed with PGP
The following attributes are available:
contents - string with the content (after removing PGP armor)
contents - byte-string with the content (after removing PGP armor)
valid - Boolean indicating a valid signature was found
weak_signature - signature uses a weak algorithm (e.g. SHA-1)
fingerprint - fingerprint of the key used for signing
......@@ -79,7 +79,7 @@ class SignedFile(object):
def __init__(self, data, keyrings, require_signature=True, gpg="/usr/bin/gpg"):
@param data: string containing the message
@param data: byte-string containing the message
@param keyrings: sequence of keyrings
@param require_signature: if True (the default), will raise an exception if no valid signature was found
@param gpg: location of the gpg binary
......@@ -135,8 +135,9 @@ class SignedFile(object):
self.status = read[status.r]
self.stderr = read[stderr.r]
if self.status == "":
raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
if self.status == b"":
stderr = self.stderr.decode('ascii', errors='replace')
raise GpgException("No status output from GPG. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
for line in self.status.splitlines():
......@@ -145,7 +146,8 @@ class SignedFile(object):
self.valid = False
if require_signature and not self.valid:
raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, self.stderr))
stderr = self.stderr.decode('ascii', errors='replace')
raise GpgException("No valid signature found. (GPG exited with status code %s)\n%s" % (exit_code, stderr))
assert len(self.fingerprints) == len(self.primary_fingerprints)
assert len(self.fingerprints) == len(self.signature_ids)
......@@ -177,7 +179,7 @@ class SignedFile(object):
bytes_written = os.write(fd, data)
write_pos[fd] += bytes_written
return dict((fd, "".join(read_lines[fd])) for fd in read_lines)
return dict((fd, b"".join(read_lines[fd])) for fd in read_lines)
def _parse_timestamp(self, timestamp, datestring=None):
"""parse timestamp in GnuPG's format
......@@ -189,25 +191,25 @@ class SignedFile(object):
# used this for replay production, return the legacy value for
# old signatures.
if datestring is not None:
year, month, day = datestring.split('-')
year, month, day = datestring.split(b'-')
date =, int(month), int(day))
time = datetime.time(0, 0)
if date <, 8, 4):
return datetime.datetime.combine(date, time)
if 'T' in timestamp:
if b'T' in timestamp:
raise Exception('No support for ISO 8601 timestamps.')
return datetime.datetime.utcfromtimestamp(long(timestamp))
def _parse_status(self, line):
fields = line.split()
if fields[0] != "[GNUPG:]":
if fields[0] != b"[GNUPG:]":
raise GpgException("Unexpected output on status-fd: %s" % line)
# VALIDSIG <fingerprint in hex> <sig_creation_date> <sig-timestamp>
# <expire-timestamp> <sig-version> <reserved> <pubkey-algo>
# <hash-algo> <sig-class> <primary-key-fpr>
if fields[1] == "VALIDSIG":
if fields[1] == b"VALIDSIG":
# GnuPG accepted MD5 as a hash algorithm until gnupg 1.4.20,
# which Debian 8 does not yet include. We want to make sure
# to not accept uploads covered by a MD5-based signature.
......@@ -215,9 +217,9 @@ class SignedFile(object):
# 1 - MD5
# 2 - SHA-1
# 3 - RIPE-MD/160
if fields[9] == "1":
if fields[9] == b"1":
raise GpgException("Digest algorithm MD5 is not trusted.")
if fields[9] in ("2", "3"):
if fields[9] in (b"2", b"3"):
self.weak_signature = True
self.valid = True
......@@ -225,36 +227,39 @@ class SignedFile(object):
self.signature_timestamp = self._parse_timestamp(fields[4], fields[3])
elif fields[1] == "BADARMOR":
elif fields[1] == b"BADARMOR":
raise GpgException("Bad armor.")
elif fields[1] == "NODATA":
elif fields[1] == b"NODATA":
raise GpgException("No data.")
elif fields[1] == "DECRYPTION_FAILED":
elif fields[1] == b"DECRYPTION_FAILED":
raise GpgException("Decryption failed.")
elif fields[1] == "ERROR":
raise GpgException("Other error: %s %s" % (fields[2], fields[3]))
elif fields[1] == b"ERROR":
f2 = fields[2].decode('ascii', errors='replace')
f3 = fields[3].decode('ascii', errors='replace')
raise GpgException("Other error: %s %s" % (f2, f3))
elif fields[1] == "SIG_ID":
elif fields[1] == b"SIG_ID":
elif fields[1] in ('PLAINTEXT', 'GOODSIG', 'KEY_CONSIDERED',
elif fields[1] in (b'PLAINTEXT', b'GOODSIG', b'KEY_CONSIDERED',
elif fields[1] in ('EXPSIG', 'EXPKEYSIG'):
elif fields[1] in (b'EXPSIG', b'EXPKEYSIG'):
self.expired = True
self.invalid = True
elif fields[1] in ('REVKEYSIG', 'BADSIG', 'ERRSIG', 'KEYREVOKED', 'NO_PUBKEY'):
elif fields[1] in (b'REVKEYSIG', b'BADSIG', b'ERRSIG', b'KEYREVOKED', b'NO_PUBKEY'):
self.invalid = True
raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(fields[1]))
field = fields[1].decode('ascii', errors='replace')
raise GpgException("Keyword '{0}' from GnuPG was not expected.".format(field))
def _exec_gpg(self, stdin, stdout, stderr, statusfd):
......@@ -140,7 +140,7 @@ def parse_deb822(armored_contents, signing_rules=0, keyrings=None, session=None)
require_signature = False
signed_file = SignedFile(armored_contents, keyrings=keyrings, require_signature=require_signature)
contents = signed_file.contents
contents = signed_file.contents.decode('utf-8')
error = ""
changes = {}
