Commit 429bd25b authored by Simo Sorce's avatar Simo Sorce

Add AESGCMKW Key Wrapping Algorithms

Signed-off-by: default avatarSimo Sorce <simo@redhat.com>
parent 75d03695
......@@ -240,6 +240,55 @@ class _AesKw(_RawKeyMgmt):
return cek
class _AesGcmKw(_RawKeyMgmt):
def __init__(self, keysize):
self.backend = default_backend()
self.keysize = keysize // 8
def _get_key(self, key, op):
if key.key_type != 'oct':
raise InvalidJWEKeyType('oct', key.key_type)
rk = base64url_decode(key.get_op_key(op))
if len(rk) != self.keysize:
raise InvalidJWEKeyLength(self.keysize * 8, len(rk) * 8)
return rk
def wrap(self, key, keylen, cek, headers):
rk = self._get_key(key, 'encrypt')
if not cek:
cek = os.urandom(keylen)
iv = os.urandom(96 // 8)
cipher = Cipher(algorithms.AES(rk), modes.GCM(iv),
backend=self.backend)
encryptor = cipher.encryptor()
ek = encryptor.update(cek) + encryptor.finalize()
tag = encryptor.tag
return {'cek': cek, 'ek': ek,
'header': {'iv': base64url_encode(iv),
'tag': base64url_encode(tag)}}
def unwrap(self, key, keylen, ek, headers):
rk = self._get_key(key, 'decrypt')
if 'iv' not in headers:
raise InvalidJWEData('Invalid Header, missing "iv" parameter')
iv = base64url_decode(headers['iv'])
if 'tag' not in headers:
raise InvalidJWEData('Invalid Header, missing "tag" parameter')
tag = base64url_decode(headers['tag'])
cipher = Cipher(algorithms.AES(rk), modes.GCM(iv, tag),
backend=self.backend)
decryptor = cipher.decryptor()
cek = decryptor.update(ek) + decryptor.finalize()
if len(cek) != keylen:
raise InvalidJWEKeyLength(keylen, len(cek))
return cek
class _Direct(_RawKeyMgmt):
def _check_key(self, key):
......@@ -459,6 +508,15 @@ class JWE(object):
def _jwa_A256KW(self):
return _AesKw(256)
def _jwa_A128GCMKW(self):
return _AesGcmKw(128)
def _jwa_A192GCMKW(self):
return _AesGcmKw(192)
def _jwa_A256GCMKW(self):
return _AesGcmKw(256)
def _jwa_dir(self):
return _Direct()
......@@ -542,6 +600,25 @@ class JWE(object):
enc = self._jwa(encname)
return alg, enc
def _encrypt(self, alg, enc, jh):
aad = base64url_encode(self.objects.get('protected', ''))
if 'aad' in self.objects:
aad += '.' + base64url_encode(self.objects['aad'])
aad = aad.encode('utf-8')
compress = jh.get('zip', None)
if compress == 'DEF':
data = zlib.compress(self.plaintext)[2:-4]
elif compress is None:
data = self.plaintext
else:
raise ValueError('Unknown compression')
iv, ciphertext, tag = enc.encrypt(self.cek, aad, data)
self.objects['iv'] = iv
self.objects['ciphertext'] = ciphertext
self.objects['tag'] = tag
def add_recipient(self, key, header=None):
"""Encrypt the plaintext with the given key.
......@@ -575,24 +652,13 @@ class JWE(object):
if 'ek' in wrapped:
rec['encrypted_key'] = wrapped['ek']
if 'ciphertext' not in self.objects:
aad = base64url_encode(self.objects.get('protected', ''))
if 'aad' in self.objects:
aad += '.' + base64url_encode(self.objects['aad'])
aad = aad.encode('utf-8')
compress = jh.get('zip', None)
if compress == 'DEF':
data = zlib.compress(self.plaintext)[2:-4]
elif compress is None:
data = self.plaintext
else:
raise ValueError('Unknown compression')
if 'header' in wrapped:
h = json_decode(rec.get('header', '{}'))
nh = self._merge_headers(h, wrapped['header'])
rec['header'] = json_encode(nh)
iv, ciphertext, tag = enc.encrypt(self.cek, aad, data)
self.objects['iv'] = iv
self.objects['ciphertext'] = ciphertext
self.objects['tag'] = tag
if 'ciphertext' not in self.objects:
self._encrypt(alg, enc, jh)
if 'recipients' in self.objects:
self.objects['recipients'].append(rec)
......@@ -635,6 +701,20 @@ class JWE(object):
rec = self.objects['recipients'][0]
else:
rec = self.objects
if 'header' in rec:
# The AESGCMKW algorithm generates data (iv, tag) we put in the
# per-recipient unpotected header by default. Move it to the
# protected header and re-encrypt the payload, as the protected
# header is used as additional authenticated data.
h = json_decode(rec['header'])
ph = json_decode(self.objects['protected'])
nph = self._merge_headers(h, ph)
self.objects['protected'] = json_encode(nph)
jh = self._get_jose_header()
alg, enc = self._get_alg_enc_from_headers(jh)
self._encrypt(alg, enc, jh)
del rec['header']
return '.'.join([base64url_encode(self.objects['protected']),
base64url_encode(rec.get('encrypted_key', '')),
base64url_encode(self.objects['iv']),
......
......@@ -737,6 +737,60 @@ JWE_general_5_6_4 = {
"tag": JWE_Authentication_Tag_5_6_3}
# 5.7 - A256GCMKW not implemented yet
AES_key_5_7_1 = {
"kty": "oct",
"kid": "18ec08e1-bfa9-4d95-b205-2b4dd1d4321d",
"use": "enc",
"alg": "A256GCMKW",
"k": "qC57l_uxcm7Nm3K-ct4GFjx8tM1U8CZ0NLBvdQstiS8"}
JWE_IV_5_7_2 = "gz6NjyEFNm_vm8Gj6FwoFQ"
JWE_Encrypted_Key_5_7_3 = "lJf3HbOApxMEBkCMOoTnnABxs_CvTWUmZQ2ElLvYNok"
JWE_Protected_Header_no_ivtag = {
"alg": "A256GCMKW",
"kid": "18ec08e1-bfa9-4d95-b205-2b4dd1d4321d",
"enc": "A128CBC-HS256"}
JWE_Protected_Header_5_7_4 = \
"eyJhbGciOiJBMjU2R0NNS1ciLCJraWQiOiIxOGVjMDhlMS1iZmE5LTRkOTUtYj" + \
"IwNS0yYjRkZDFkNDMyMWQiLCJ0YWciOiJrZlBkdVZRM1QzSDZ2bmV3dC0ta3N3" + \
"IiwiaXYiOiJLa1lUMEdYXzJqSGxmcU5fIiwiZW5jIjoiQTEyOENCQy1IUzI1Ni" + \
"J9"
JWE_Ciphertext_5_7_4 = \
"Jf5p9-ZhJlJy_IQ_byKFmI0Ro7w7G1QiaZpI8OaiVgD8EqoDZHyFKFBupS8iaE" + \
"eVIgMqWmsuJKuoVgzR3YfzoMd3GxEm3VxNhzWyWtZKX0gxKdy6HgLvqoGNbZCz" + \
"LjqcpDiF8q2_62EVAbr2uSc2oaxFmFuIQHLcqAHxy51449xkjZ7ewzZaGV3eFq" + \
"hpco8o4DijXaG5_7kp3h2cajRfDgymuxUbWgLqaeNQaJtvJmSMFuEOSAzw9Hde" + \
"b6yhdTynCRmu-kqtO5Dec4lT2OMZKpnxc_F1_4yDJFcqb5CiDSmA-psB2k0Jtj" + \
"xAj4UPI61oONK7zzFIu4gBfjJCndsZfdvG7h8wGjV98QhrKEnR7xKZ3KCr0_qR" + \
"1B-gxpNk3xWU"
JWE_Authentication_Tag_5_7_4 = "DKW7jrb4WaRSNfbXVPlT5g"
JWE_compact_5_7_5 = \
"%s.%s.%s.%s.%s" % (JWE_Protected_Header_5_7_4,
JWE_Encrypted_Key_5_7_3,
JWE_IV_5_7_2,
JWE_Ciphertext_5_7_4,
JWE_Authentication_Tag_5_7_4)
JWE_general_5_7_5 = {
"recipients": [{
"encrypted_key": JWE_Encrypted_Key_5_7_3}],
"protected": JWE_Protected_Header_5_7_4,
"iv": JWE_IV_5_7_2,
"ciphertext": JWE_Ciphertext_5_7_4,
"tag": JWE_Authentication_Tag_5_7_4}
JWE_flattened_5_7_5 = {
"protected": JWE_Protected_Header_5_7_4,
"encrypted_key": JWE_Encrypted_Key_5_7_3,
"iv": JWE_IV_5_7_2,
"ciphertext": JWE_Ciphertext_5_7_4,
"tag": JWE_Authentication_Tag_5_7_4}
# 5.8
AES_key_5_8_1 = {
......@@ -1008,8 +1062,20 @@ class Cookbook08JWETests(unittest.TestCase):
e.deserialize(json_encode(JWE_general_5_6_4), aes_key)
self.assertEqual(e.payload, plaintext)
# 5.7 - AES-GCM key wrapping not implemented yet
# def test_5_7_encryption(self):
def test_5_7_encryption(self):
plaintext = Payload_plaintext_5
aes_key = jwk.JWK(**AES_key_5_7_1)
e = jwe.JWE(plaintext, json_encode(JWE_Protected_Header_no_ivtag))
e.add_recipient(aes_key)
enc = e.serialize(compact=True)
e.deserialize(enc, aes_key)
self.assertEqual(e.payload, plaintext)
e.deserialize(JWE_compact_5_7_5, aes_key)
self.assertEqual(e.payload, plaintext)
e.deserialize(json_encode(JWE_general_5_7_5), aes_key)
self.assertEqual(e.payload, plaintext)
e.deserialize(json_encode(JWE_flattened_5_7_5), aes_key)
self.assertEqual(e.payload, plaintext)
def test_5_8_encryption(self):
plaintext = Payload_plaintext_5
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment