Commit 562cdbbf authored by Simo Sorce's avatar Simo Sorce

Add ECDH-ES Key Wrapping Algorithms

Signed-off-by: default avatarSimo Sorce <simo@redhat.com>

Closes #3
Closes #38
parent 3eff0fe9
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
import os
import struct
import zlib
from binascii import hexlify, unhexlify
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import constant_time, hashes, hmac
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from cryptography.hazmat.primitives.padding import PKCS7
......@@ -487,6 +490,126 @@ class _Direct(_RawKeyMgmt):
return cek
class _EcdhEs(_RawKeyMgmt):
@property
def name(self):
return 'ECDH-ES'
def __init__(self, keydatalen=None):
self.backend = default_backend()
self.keydatalen = keydatalen
def _check_key(self, key):
if key.key_type != 'EC':
raise InvalidJWEKeyType('EC', key.key_type)
def _derive(self, privkey, pubkey, alg, keydatalen, headers):
# OtherInfo is defined in NIST SP 56A 5.8.1.2.1
# AlgorithmID
otherinfo = struct.pack('>I', len(alg))
otherinfo += bytes(alg.encode('utf8'))
# PartyUInfo
apu = base64url_decode(headers['apu']) if 'apu' in headers else b''
otherinfo += struct.pack('>I', len(apu))
otherinfo += apu
# PartyVInfo
apv = base64url_decode(headers['apv']) if 'apv' in headers else b''
otherinfo += struct.pack('>I', len(apv))
otherinfo += apv
# SuppPubInfo
otherinfo += struct.pack('>I', keydatalen)
# no SuppPrivInfo
shared_key = privkey.exchange(ec.ECDH(), pubkey)
ckdf = ConcatKDFHash(algorithm=hashes.SHA256(),
length=keydatalen // 8,
otherinfo=otherinfo,
backend=self.backend)
return ckdf.derive(shared_key)
def wrap(self, key, keylen, cek, headers):
self._check_key(key)
if self.keydatalen is None:
if cek is not None:
raise InvalidJWEOperation('ECDH-ES cannot use an existing CEK')
keydatalen = keylen * 8
alg = headers['enc']
else:
keydatalen = self.keydatalen
alg = headers['alg']
epk = JWK.generate(kty=key.key_type, crv=key.key_curve)
dk = self._derive(epk.get_op_key('unwrapKey'),
key.get_op_key('wrapKey'),
alg, keydatalen, headers)
if self.keydatalen is None:
ret = {'cek': dk}
else:
aeskw = _AesKw(keydatalen)
kek = JWK(kty="oct", use="enc", k=base64url_encode(dk))
ret = aeskw.wrap(kek, keydatalen // 8, cek, headers)
ret['header'] = {'epk': json_decode(epk.export_public())}
return ret
def unwrap(self, key, keylen, ek, headers):
if 'epk' not in headers:
raise InvalidJWEData('Invalid Header, missing "epk" parameter')
self._check_key(key)
if self.keydatalen is None:
keydatalen = keylen * 8
alg = headers['enc']
else:
keydatalen = self.keydatalen
alg = headers['alg']
epk = JWK(**headers['epk'])
dk = self._derive(key.get_op_key('unwrapKey'),
epk.get_op_key('wrapKey'),
alg, keydatalen, headers)
if self.keydatalen is None:
return dk
else:
aeskw = _AesKw(keydatalen)
kek = JWK(kty="oct", use="enc", k=base64url_encode(dk))
cek = aeskw.unwrap(kek, keydatalen // 8, ek, headers)
return cek
class _EcdhEsAes128Kw(_EcdhEs):
def __init__(self):
super(_EcdhEsAes128Kw, self).__init__(128)
@property
def name(self):
return 'ECDH-ES+A128KW'
class _EcdhEsAes192Kw(_EcdhEs):
def __init__(self):
super(_EcdhEsAes192Kw, self).__init__(192)
@property
def name(self):
return 'ECDH-ES+A192KW'
class _EcdhEsAes256Kw(_EcdhEs):
def __init__(self):
super(_EcdhEsAes256Kw, self).__init__(256)
@property
def name(self):
return 'ECDH-ES+A256KW'
class _RawJWE(object):
def encrypt(self, k, a, m):
......@@ -689,6 +812,10 @@ class JWE(object):
'A192KW': _A192KW,
'A256KW': _A256KW,
'dir': _Direct,
'ECDH-ES': _EcdhEs,
'ECDH-ES+A128KW': _EcdhEsAes128Kw,
'ECDH-ES+A192KW': _EcdhEsAes192Kw,
'ECDH-ES+A256KW': _EcdhEsAes256Kw,
'A128GCMKW': _A128GcmKw,
'A192GCMKW': _A192GcmKw,
'A256GCMKW': _A256GcmKw,
......
......@@ -410,6 +410,13 @@ class JWK(object):
"""
return self._params.get('kid', None)
@property
def key_curve(self):
"""The Curve Name."""
if self._params['kty'] != 'EC':
raise InvalidJWKType('Not an EC key')
return self._key['crv']
def get_curve(self, arg):
"""Gets the Elliptic Curve associated with the key.
......
......@@ -692,9 +692,113 @@ JWE_flattened_5_3_5 = {
"ciphertext": JWE_Ciphertext_5_3_4,
"tag": JWE_Authentication_Tag_5_3_4}
# 5.4 - ECDH-ES key agreement not implemented yet
# 5.4
EC_key_5_4_1 = {
"kty": "EC",
"kid": "peregrin.took@tuckborough.example",
"use": "enc",
"crv": "P-384",
"x": "YU4rRUzdmVqmRtWOs2OpDE_T5fsNIodcG8G5FWPrTPMyxpzsSOGaQLpe2FpxBmu2",
"y": "A8-yxCHxkfBz3hKZfI1jUYMjUhsEveZ9THuwFjH2sCNdtksRJU7D5-SkgaFL1ETP",
"d": "iTx2pk7wW-GqJkHcEkFQb2EFyYcO7RugmaW3mRrQVAOUiPommT0IdnYK2xDlZh-j"}
JWE_IV_5_4_2 = "mH-G2zVqgztUtnW_"
# 5.5 - ECDH-ES key agreement not implemented yet
JWE_Encrypted_Key_5_4_3 = \
"0DJjBXri_kBcC46IkU5_Jk9BqaQeHdv2"
JWE_Protected_Header_no_epk_5_4_4 = {
"alg": "ECDH-ES+A128KW",
"kid": "peregrin.took@tuckborough.example",
"enc": "A128GCM"}
JWE_Protected_Header_5_4_4 = \
"eyJhbGciOiJFQ0RILUVTK0ExMjhLVyIsImtpZCI6InBlcmVncmluLnRvb2tAdH" + \
"Vja2Jvcm91Z2guZXhhbXBsZSIsImVwayI6eyJrdHkiOiJFQyIsImNydiI6IlAt" + \
"Mzg0IiwieCI6InVCbzRrSFB3Nmtiang1bDB4b3dyZF9vWXpCbWF6LUdLRlp1NH" + \
"hBRkZrYllpV2d1dEVLNml1RURzUTZ3TmROZzMiLCJ5Ijoic3AzcDVTR2haVkMy" + \
"ZmFYdW1JLWU5SlUyTW84S3BvWXJGRHI1eVBOVnRXNFBnRXdaT3lRVEEtSmRhWT" + \
"h0YjdFMCJ9LCJlbmMiOiJBMTI4R0NNIn0"
JWE_Ciphertext_5_4_4 = \
"tkZuOO9h95OgHJmkkrfLBisku8rGf6nzVxhRM3sVOhXgz5NJ76oID7lpnAi_cP" + \
"WJRCjSpAaUZ5dOR3Spy7QuEkmKx8-3RCMhSYMzsXaEwDdXta9Mn5B7cCBoJKB0" + \
"IgEnj_qfo1hIi-uEkUpOZ8aLTZGHfpl05jMwbKkTe2yK3mjF6SBAsgicQDVCkc" + \
"Y9BLluzx1RmC3ORXaM0JaHPB93YcdSDGgpgBWMVrNU1ErkjcMqMoT_wtCex3w0" + \
"3XdLkjXIuEr2hWgeP-nkUZTPU9EoGSPj6fAS-bSz87RCPrxZdj_iVyC6QWcqAu" + \
"07WNhjzJEPc4jVntRJ6K53NgPQ5p99l3Z408OUqj4ioYezbS6vTPlQ"
JWE_Authentication_Tag_5_4_4 = "WuGzxmcreYjpHGJoa17EBg"
JWE_compact_5_4_5 = \
"%s.%s.%s.%s.%s" % (JWE_Protected_Header_5_4_4,
JWE_Encrypted_Key_5_4_3,
JWE_IV_5_4_2,
JWE_Ciphertext_5_4_4,
JWE_Authentication_Tag_5_4_4)
JWE_general_5_4_5 = {
"recipients": [{
"encrypted_key": JWE_Encrypted_Key_5_4_3}],
"protected": JWE_Protected_Header_5_4_4,
"iv": JWE_IV_5_4_2,
"ciphertext": JWE_Ciphertext_5_4_4,
"tag": JWE_Authentication_Tag_5_4_4}
JWE_flattened_5_4_5 = {
"protected": JWE_Protected_Header_5_4_4,
"encrypted_key": JWE_Encrypted_Key_5_4_3,
"iv": JWE_IV_5_4_2,
"ciphertext": JWE_Ciphertext_5_4_4,
"tag": JWE_Authentication_Tag_5_4_4}
# 5.5
EC_key_5_5_1 = {
"kty": "EC",
"kid": "meriadoc.brandybuck@buckland.example",
"use": "enc",
"crv": "P-256",
"x": "Ze2loSV3wrroKUN_4zhwGhCqo3Xhu1td4QjeQ5wIVR0",
"y": "HlLtdXARY_f55A3fnzQbPcm6hgr34Mp8p-nuzQCE0Zw",
"d": "r_kHyZ-a06rmxM3yESK84r1otSg-aQcVStkRhA-iCM8"}
JWE_IV_5_5_2 = "yc9N8v5sYyv3iGQT926IUg"
JWE_Protected_Header_no_epk_5_5_4 = {
"alg": "ECDH-ES",
"kid": "meriadoc.brandybuck@buckland.example",
"enc": "A128CBC-HS256"
}
JWE_Protected_Header_5_5_4 = \
"eyJhbGciOiJFQ0RILUVTIiwia2lkIjoibWVyaWFkb2MuYnJhbmR5YnVja0BidW" + \
"NrbGFuZC5leGFtcGxlIiwiZXBrIjp7Imt0eSI6IkVDIiwiY3J2IjoiUC0yNTYi" + \
"LCJ4IjoibVBVS1RfYkFXR0hJaGcwVHBqanFWc1AxclhXUXVfdndWT0hIdE5rZF" + \
"lvQSIsInkiOiI4QlFBc0ltR2VBUzQ2ZnlXdzVNaFlmR1RUMElqQnBGdzJTUzM0" + \
"RHY0SXJzIn0sImVuYyI6IkExMjhDQkMtSFMyNTYifQ"
JWE_Ciphertext_5_5_4 = \
"BoDlwPnTypYq-ivjmQvAYJLb5Q6l-F3LIgQomlz87yW4OPKbWE1zSTEFjDfhU9" + \
"IPIOSA9Bml4m7iDFwA-1ZXvHteLDtw4R1XRGMEsDIqAYtskTTmzmzNa-_q4F_e" + \
"vAPUmwlO-ZG45Mnq4uhM1fm_D9rBtWolqZSF3xGNNkpOMQKF1Cl8i8wjzRli7-" + \
"IXgyirlKQsbhhqRzkv8IcY6aHl24j03C-AR2le1r7URUhArM79BY8soZU0lzwI" + \
"-sD5PZ3l4NDCCei9XkoIAfsXJWmySPoeRb2Ni5UZL4mYpvKDiwmyzGd65KqVw7" + \
"MsFfI_K767G9C9Azp73gKZD0DyUn1mn0WW5LmyX_yJ-3AROq8p1WZBfG-ZyJ61" + \
"95_JGG2m9Csg"
JWE_Authentication_Tag_5_5_4 = "WCCkNa-x4BeB9hIDIfFuhg"
JWE_compact_5_5_5 = \
"%s..%s.%s.%s" % (JWE_Protected_Header_5_5_4,
JWE_IV_5_5_2,
JWE_Ciphertext_5_5_4,
JWE_Authentication_Tag_5_5_4)
JWE_general_5_5_5 = {
"protected": JWE_Protected_Header_5_5_4,
"iv": JWE_IV_5_5_2,
"ciphertext": JWE_Ciphertext_5_5_4,
"tag": JWE_Authentication_Tag_5_5_4}
# 5.6
AES_key_5_6_1 = {
......@@ -1051,11 +1155,33 @@ class Cookbook08JWETests(unittest.TestCase):
e.deserialize(json_encode(JWE_flattened_5_3_5), password)
self.assertEqual(e.payload, plaintext)
# 5.4 - ECDH-ES key agreement not implemented yet
# def test_5_4_encryption(self):
def test_5_4_encryption(self):
plaintext = Payload_plaintext_5
protected = json_encode(JWE_Protected_Header_no_epk_5_4_4)
ec_key = jwk.JWK(**EC_key_5_4_1)
e = jwe.JWE(plaintext, protected)
e.add_recipient(ec_key)
enc = e.serialize(compact=True)
e.deserialize(enc, ec_key)
self.assertEqual(e.payload, plaintext)
e.deserialize(JWE_compact_5_4_5, ec_key)
self.assertEqual(e.payload, plaintext)
e.deserialize(json_encode(JWE_general_5_4_5), ec_key)
self.assertEqual(e.payload, plaintext)
# 5.5 - ECDH-ES key agreement not implemented yet
# def test_5_5_encryption(self):
def test_5_5_encryption(self):
plaintext = Payload_plaintext_5
protected = json_encode(JWE_Protected_Header_no_epk_5_5_4)
ec_key = jwk.JWK(**EC_key_5_5_1)
e = jwe.JWE(plaintext, protected)
e.add_recipient(ec_key)
enc = e.serialize(compact=True)
e.deserialize(enc, ec_key)
self.assertEqual(e.payload, plaintext)
e.deserialize(JWE_compact_5_5_5, ec_key)
self.assertEqual(e.payload, plaintext)
e.deserialize(json_encode(JWE_general_5_5_5), ec_key)
self.assertEqual(e.payload, plaintext)
def test_5_6_encryption(self):
plaintext = Payload_plaintext_5
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment