Commit 575b568f authored by Simo Sorce's avatar Simo Sorce

JWE: make internal functions private

Signed-off-by: default avatarSimo Sorce <simo@redhat.com>
parent d546bcc5
......@@ -33,7 +33,7 @@ JWEHeaderRegistry = {'alg': ('Algorithm', True),
# Note: l is the number of bits, which should be a multiple of 16
def encode_int(n, l):
def _encode_int(n, l):
e = hex(n).rstrip("L").lstrip("0x")
el = len(e)
L = ((l + 7) // 8) * 2 # number of bytes rounded up times 2 chars/bytes
......@@ -44,7 +44,7 @@ def encode_int(n, l):
return unhexlify(e)
def decode_int(n):
def _decode_int(n):
return int(hexlify(n), 16)
......@@ -154,7 +154,7 @@ class _aes_kw(_raw_key_mgmt):
e = Cipher(algorithms.AES(rk), modes.ECB(),
backend=self.backend).encryptor()
B = e.update(A + R[i]) + e.finalize()
A = encode_int(decode_int(B[:8]) ^ ((n*j)+i+1), 64)
A = _encode_int(_decode_int(B[:8]) ^ ((n*j)+i+1), 64)
R[i] = B[-8:]
ek = A
for i in range(0, n):
......@@ -174,7 +174,7 @@ class _aes_kw(_raw_key_mgmt):
n = len(R)
for j in range(5, -1, -1):
for i in range(n - 1, -1, -1):
AtR = encode_int((decode_int(A) ^ ((n*j)+i+1)), 64) + R[i]
AtR = _encode_int((_decode_int(A) ^ ((n*j)+i+1)), 64) + R[i]
d = Cipher(algorithms.AES(rk), modes.ECB(),
backend=self.backend).decryptor()
B = d.update(AtR) + d.finalize()
......@@ -232,7 +232,7 @@ class _aes_cbc_hmac_sha2(_raw_jwe):
return self.keysize * 2
def _mac(self, k, a, iv, e):
al = encode_int(len(a * 8), 64)
al = _encode_int(len(a * 8), 64)
h = hmac.HMAC(k, self.hashfn, backend=self.backend)
h.update(a)
h.update(iv)
......@@ -426,27 +426,27 @@ class JWE(object):
except (KeyError, AttributeError):
raise InvalidJWAAlgorithm()
def merge_headers(self, h1, h2):
def _merge_headers(self, h1, h2):
for k in list(h1.keys()):
if k in h2:
raise InvalidJWEData('Duplicate header: "%s"' % k)
h1.update(h2)
return h1
def get_jose_header(self, header=None):
def _get_jose_header(self, header=None):
jh = dict()
if 'protected' in self.objects:
ph = json_decode(self.objects['protected'])
jh = self.merge_headers(jh, ph)
jh = self._merge_headers(jh, ph)
if 'unprotected' in self.objects:
uh = json_decode(self.objects['unprotected'])
jh = self.merge_headers(jh, uh)
jh = self._merge_headers(jh, uh)
if header:
rh = json_decode(header)
jh = self.merge_headers(jh, rh)
jh = self._merge_headers(jh, rh)
return jh
def get_alg_enc_from_headers(self, jh):
def _get_alg_enc_from_headers(self, jh):
algname = jh.get('alg', None)
if algname is None:
raise InvalidJWEData('Missing "alg" from headers')
......@@ -473,8 +473,8 @@ class JWE(object):
if not isinstance(key, JWK):
raise ValueError('key is not a JWK object')
jh = self.get_jose_header(header)
alg, enc = self.get_alg_enc_from_headers(jh)
jh = self._get_jose_header(header)
alg, enc = self._get_alg_enc_from_headers(jh)
rec = dict()
if header:
......@@ -568,7 +568,7 @@ class JWE(object):
enc['header'] = json_decode(obj['header'])
return json_encode(enc)
def check_crit(self, crit):
def _check_crit(self, crit):
for k in crit:
if k not in JWEHeaderRegistry:
raise InvalidJWEData('Unknown critical header: "%s"' % k)
......@@ -580,10 +580,10 @@ class JWE(object):
# FIXME: allow to specify which algorithms to accept as valid
def _decrypt(self, key, ppe):
jh = self.get_jose_header(ppe.get('header', None))
jh = self._get_jose_header(ppe.get('header', None))
# TODO: allow caller to specify list of headers it understands
self.check_crit(jh.get('crit', dict()))
self._check_crit(jh.get('crit', dict()))
alg = self._jwa(jh.get('alg', None))
enc = self._jwa(jh.get('enc', None))
......@@ -700,7 +700,7 @@ class JWE(object):
@property
def jose_header(self):
jh = self.get_jose_header()
jh = self._get_jose_header()
if len(jh) == 0:
raise InvalidJWEOperation("JOSE Header not available")
return jh
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment