Commit b69aaeee authored by Simo Sorce's avatar Simo Sorce

Add way to get JOSE header(s) to both JWs and JWE

This property allows to retrieve the (merged) JOSE header(s) from a an
object regardless of whether it is a JWE or JWS token.
Signed-off-by: default avatarSimo Sorce <simo@redhat.com>
parent b4712a01
......@@ -694,3 +694,10 @@ class JWE(object):
if not self.plaintext:
raise InvalidJWEOperation("Plaintext not available")
return self.plaintext
@property
def jose_header(self):
jh = self.get_jose_header()
if len(jh) == 0:
raise InvalidJWEOperation("JOSE Header not available")
return jh
......@@ -286,6 +286,13 @@ class JWS(object):
def is_valid(self):
return self.objects.get('valid', False)
def merge_headers(self, h1, h2):
for k in list(h1.keys()):
if k in h2:
raise InvalidJWSObject('Duplicate header: "%s"' % k)
h1.update(h2)
return h1
# TODO: support selecting key with 'kid' and passing in multiple keys
def _verify(self, alg, key, payload, signature, protected, header=None):
# verify it is a valid JSON object and keep a decode copy
......@@ -300,10 +307,7 @@ class JWS(object):
h = json_decode(header)
if not isinstance(h, dict):
raise InvalidJWSSignature('Invalid Unprotected header')
for k in list(p.keys()):
if k in h:
raise InvalidJWSSignature('Duplicate header: "%s"' % k)
p.update(h)
p = self.merge_headers(p, h)
# verify critical headers
# TODO: allow caller to specify list of headers it understands
if 'crit' in p:
......@@ -413,31 +417,20 @@ class JWS(object):
p = dict()
if protected:
p = json_decode(protected)
if 'alg' in p:
if alg is None:
alg = p['alg']
elif alg != p['alg']:
raise ValueError('"alg" value mismatch, specified "alg" '
'does not match "protected" header '
'alg value')
# TODO: allow caller to specify list of headers it understands
if 'crit' in p:
self.check_crit(p['crit'])
if header:
h = json_decode(header)
for k in list(p.keys()):
if k in h:
raise ValueError('Duplicate header: "%s"' % k)
if 'alg' in h:
if alg is None:
alg = h['alg']
elif alg != h['alg']:
raise ValueError('"alg" value mismatch, specified "alg" '
'does not match "unprotected" header '
'alg value')
p = self.merge_headers(p, h)
if 'alg' in p:
if alg is None:
alg = p['alg']
elif alg != p['alg']:
raise ValueError('"alg" value mismatch, specified "alg" '
'does not match JOSE header value')
if alg is None:
raise ValueError('"alg" not specified')
......@@ -527,3 +520,26 @@ class JWS(object):
if not self.is_valid:
raise InvalidJWSOperation("Payload not verified")
return self.objects['payload']
@property
def jose_header(self):
obj = self.objects
if 'signature' in obj:
jh = dict()
if 'protected' in obj:
p = json_decode(obj['protected'])
jh = self.merge_headers(jh, p)
jh = self.merge_headers(jh, obj.get('header', dict()))
return jh
elif 'signatures' in self.objects:
jhl = list()
for o in obj['signatures']:
jh = dict()
if 'protected' in obj:
p = json_decode(o['protected'])
jh = self.merge_headers(jh, p)
jh = self.merge_headers(jh, o.get('header', dict()))
jhl.append(jh)
return jhl
else:
raise InvalidJWSOperation("JOSE Header(s) not available")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment