Commit 7c452b96 authored by Simo Sorce's avatar Simo Sorce

Allow to pass in dictionaries for headers

Where it makes sense, allow to pass in a dictionary for the various headers
and auto-encode them as needed.

Resolves #53
Signed-off-by: 's avatarSimo Sorce <simo@redhat.com>
Reviewed-by: 's avatarHanno Schlichting <hanno@hannosch.eu>
Closes #55
parent af69ffda
......@@ -864,10 +864,16 @@ class JWE(object):
if aad:
self.objects['aad'] = aad
if protected:
json_decode(protected) # check header encoding
if isinstance(protected, dict):
protected = json_encode(protected)
else:
json_decode(protected) # check header encoding
self.objects['protected'] = protected
if unprotected:
json_decode(unprotected) # check header encoding
if isinstance(unprotected, dict):
unprotected = json_encode(unprotected)
else:
json_decode(unprotected) # check header encoding
self.objects['unprotected'] = unprotected
if algs:
self.allowed_algs = algs
......@@ -968,6 +974,9 @@ class JWE(object):
if not isinstance(self.plaintext, bytes):
raise ValueError("Plaintext must be 'bytes'")
if isinstance(header, dict):
header = json_encode(header)
jh = self._get_jose_header(header)
alg, enc = self._get_alg_enc_from_headers(jh)
......
......@@ -215,6 +215,8 @@ class JWSCore(object):
self.key = key
if header is not None:
if isinstance(header, dict):
header = json_encode(header)
self.protected = base64url_encode(header.encode('utf-8'))
else:
self.protected = ''
......@@ -517,12 +519,16 @@ class JWS(object):
p = dict()
if protected:
if isinstance(protected, dict):
protected = json_encode(protected)
p = json_decode(protected)
# TODO: allow caller to specify list of headers it understands
if 'crit' in p:
self._check_crit(p['crit'])
if header:
if isinstance(header, dict):
header = json_encode(header)
h = json_decode(header)
p = self._merge_headers(p, h)
......
......@@ -883,3 +883,27 @@ class ConformanceTests(unittest.TestCase):
check.deserialize(o, jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16)),
alg="HS512")
self.assertTrue(check.objects['valid'])
def test_jws_headers_as_dicts(self):
sign = jws.JWS(payload='message')
key = jwk.JWK(kty='oct', k=base64url_encode(b'A' * 16))
sign.add_signature(key, protected={'alg': 'HS512'},
header={'kid': key.thumbprint()})
o = sign.serialize()
check = jws.JWS()
check.deserialize(o, key, alg="HS512")
self.assertTrue(check.objects['valid'])
self.assertEqual(check.jose_header['kid'], key.thumbprint())
def test_jwe_headers_as_dicts(self):
enc = jwe.JWE(plaintext='message',
protected={"alg": "A256KW", "enc": "A256CBC-HS512"})
key = jwk.JWK(kty='oct', k=base64url_encode(b'A' * 32))
enc.add_recipient(key, {'kid': key.thumbprint()})
o = enc.serialize()
check = jwe.JWE()
check.deserialize(o)
check.decrypt(key)
self.assertEqual(check.payload, b'message')
self.assertEqual(
json_decode(check.objects['header'])['kid'], key.thumbprint())
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment