Commit 92466152 authored by Simo Sorce's avatar Simo Sorce

JWS: Add explicit, separate verify step

Instead of always assuming we can verify with a key at deserialization
time, allow a key to be None and perform verification as a separate step.

The code reads better and we can easily try multiple keys after
deserialization is performed once. this also gets rid of the paramter
to raise on error, as a None key takes the role of not raising on
verification errors (still raises on format errors, but that is fine.
Signed-off-by: default avatarSimo Sorce <simo@redhat.com>
parent 3323b32e
......@@ -33,7 +33,7 @@ class InvalidJWSSignature(Exception):
def __init__(self, message=None, exception=None):
msg = None
if message:
msg = message
msg = str(message)
else:
msg = 'Unknown Signature Verification Failure'
if exception:
......@@ -270,6 +270,7 @@ class JWS(object):
self.objects = dict()
if payload:
self.objects['payload'] = payload
self.verifylog = None
def check_crit(self, crit):
for k in crit:
......@@ -281,8 +282,12 @@ class JWS(object):
raise InvalidJWSSignature('Unsupported critical '
'header: "%s"' % k)
@property
def is_valid(self):
return self.objects.get('valid', False)
# TODO: support selecting key with 'kid' and passing in multiple keys
def verify(self, alg, key, payload, signature, protected, header=None):
def _verify(self, alg, key, payload, signature, protected, header=None):
# verify it is a valid JSON object and keep a decode copy
if protected is not None:
p = json_decode(protected)
......@@ -320,7 +325,41 @@ class JWS(object):
S = JWSCore(a, key, protected, payload)
S.verify(signature)
def deserialize(self, raw_jws, key=None, alg=None, raise_invalid=True):
def verify(self, key, alg=None):
self.verifylog = list()
self.objects['valid'] = False
obj = self.objects
if 'signature' in obj:
try:
self._verify(alg, key,
obj['payload'],
obj['signature'],
obj.get('protected', None),
obj.get('header', None))
obj['valid'] = True
except Exception as e: # pylint: disable=broad-except
self.verifylog.append('Failed: [%s]' % repr(e))
elif 'signatures' in obj:
for o in obj['signatures']:
try:
self._verify(alg, key,
obj['payload'],
o['signature'],
o.get('protected', None),
o.get('header', None))
# Ok if at least one verifies
obj['valid'] = True
except Exception as e: # pylint: disable=broad-except
self.verifylog.append('Failed: [%s]' % repr(e))
else:
raise InvalidJWSSignature('No signatures availble')
if not self.is_valid:
raise InvalidJWSSignature('Verification failed for all '
'signatures' + repr(self.verifylog))
def deserialize(self, raw_jws, key=None, alg=None):
""" Destroys any current status and tries to import the raw
JWS provided.
"""
......@@ -332,8 +371,6 @@ class JWS(object):
o['payload'] = base64url_decode(str(djws['payload']))
if 'signatures' in djws:
o['signatures'] = list()
valid = False
faillog = []
for s in djws['signatures']:
os = dict()
os['signature'] = base64url_decode(str(s['signature']))
......@@ -342,21 +379,7 @@ class JWS(object):
os['protected'] = p.decode('utf-8')
if 'header' in s:
os['header'] = json_encode(s['header'])
try:
self.verify(alg, key, o['payload'],
os['signature'],
os.get('protected'),
os.get('header'))
os['valid'] = True
# Ok if at least one verifies
valid = True
except Exception as e: # pylint: disable=broad-except
faillog.append(str(e))
os['valid'] = False
o['signatures'].append(os)
if raise_invalid and not valid:
raise InvalidJWSSignature('Verification failed',
faillog)
else:
o['signature'] = base64url_decode(str(djws['signature']))
if 'protected' in djws:
......@@ -364,21 +387,6 @@ class JWS(object):
o['protected'] = p.decode('utf-8')
if 'header' in djws:
o['header'] = json_encode(djws['header'])
try:
self.verify(alg, key, o['payload'],
o['signature'],
o.get('protected'),
o.get('header'))
o['valid'] = True
except InvalidJWSSignature:
o['valid'] = False
if raise_invalid:
raise
except Exception as e: # pylint: disable=broad-except
o['valid'] = False
if raise_invalid:
raise InvalidJWSSignature('Verification failed', e)
except ValueError:
c = raw_jws.split('.')
......@@ -389,25 +397,14 @@ class JWS(object):
o['protected'] = p.decode('utf-8')
o['payload'] = base64url_decode(str(c[1]))
o['signature'] = base64url_decode(str(c[2]))
try:
self.verify(alg, key, o['payload'], o['signature'],
o.get('protected'), None)
o['valid'] = True
except InvalidJWSSignature:
o['valid'] = False
if raise_invalid:
raise
except Exception as e: # pylint: disable=broad-except
o['valid'] = False
if raise_invalid:
raise InvalidJWSSignature('Verification failed', e)
except InvalidJWSSignature:
raise
self.objects = o
except Exception as e: # pylint: disable=broad-except
raise InvalidJWSObject('Invalid format', e)
raise InvalidJWSObject('Invalid format', repr(e))
self.objects = o
if key:
self.verify(key, alg)
def add_signature(self, key, alg=None, protected=None, header=None):
if not self.objects.get('payload', None):
......
......@@ -434,8 +434,9 @@ class TestJWS(unittest.TestCase):
def test_E(self):
S = jws.JWS(A6_example['payload'])
self.assertRaises(jws.InvalidJWSSignature,
S.deserialize, E_negative)
with self.assertRaises(jws.InvalidJWSSignature):
jws.InvalidJWSSignature(S.deserialize, E_negative)
S.verify(None)
E_A1_plaintext = \
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment