Commit 03b2c54e authored by Simo Sorce's avatar Simo Sorce

Python3 compatibility: string vs bytes fixes

Signed-off-by: default avatarSimo Sorce <simo@redhat.com>
parent ec0aa182
......@@ -9,7 +9,10 @@ import json
def base64url_encode(payload):
return urlsafe_b64encode(payload).rstrip('=')
if not isinstance(payload, bytes):
payload = payload.encode('utf-8')
encode = urlsafe_b64encode(payload)
return encode.decode('utf-8').rstrip('=')
def base64url_decode(payload):
......@@ -20,16 +23,20 @@ def base64url_decode(payload):
payload += '='
elif l != 0:
raise ValueError('Invalid base64 string')
return urlsafe_b64decode(payload)
return urlsafe_b64decode(payload.encode('utf-8'))
# JSON encoding/decoding helpers with good defaults
def json_encode(string):
if isinstance(string, bytes):
string = string.decode('utf-8')
return json.dumps(string, separators=(',', ':'))
def json_decode(string):
if isinstance(string, bytes):
string = string.decode('utf-8')
return json.loads(string)
......
......@@ -184,7 +184,7 @@ class _aes_kw(_raw_key_mgmt):
if A != Aiv:
raise InvalidJWEData('Decryption Failed')
cek = ''.join(R)
cek = b''.join(R)
return cek
......@@ -205,7 +205,7 @@ class _direct(_raw_key_mgmt):
def unwrap(self, key, ek):
self.check_key(key)
if ek != '':
if ek != b'':
raise InvalidJWEData('Invalid Encryption Key.')
return base64url_decode(key.get_op_key('decrypt'))
......@@ -357,7 +357,12 @@ class JWE(object):
:param aad(bytes): Arbitrary additional authenticated data
"""
self.objects = dict()
self.plaintext = plaintext
self.plaintext = None
if plaintext is not None:
if isinstance(plaintext, bytes):
self.plaintext = plaintext
else:
self.plaintext = plaintext.encode('utf-8')
self.cek = None
self.decryptlog = None
if aad:
......@@ -463,6 +468,8 @@ class JWE(object):
"""
if self.plaintext is None:
raise ValueError('Missing plaintext')
if not isinstance(self.plaintext, bytes):
raise ValueError("Plaintext must be 'bytes'")
if not isinstance(key, JWK):
raise ValueError('key is not a JWK object')
......@@ -481,6 +488,7 @@ class JWE(object):
aad = base64url_encode(self.objects.get('protected', ''))
if 'aad' in self.objects:
aad += '.' + base64url_encode(self.objects['aad'])
aad = aad.encode('utf-8')
compress = jh.get('zip', None)
if compress == 'DEF':
......@@ -581,8 +589,9 @@ class JWE(object):
if 'aad' in self.objects:
aad += '.' + base64url_encode(self.objects['aad'])
cek = alg.unwrap(key, ppe.get('encrypted_key', ''))
data = enc.decrypt(cek, aad, self.objects['iv'],
cek = alg.unwrap(key, ppe.get('encrypted_key', b''))
data = enc.decrypt(cek, aad.encode('utf-8'),
self.objects['iv'],
self.objects['ciphertext'],
self.objects['tag'])
......@@ -613,7 +622,8 @@ class JWE(object):
o['ciphertext'] = base64url_decode(str(djwe['ciphertext']))
o['tag'] = base64url_decode(str(djwe['tag']))
if 'protected' in djwe:
o['protected'] = base64url_decode(str(djwe['protected']))
p = base64url_decode(str(djwe['protected']))
o['protected'] = p.decode('utf-8')
if 'unprotected' in djwe:
o['unprotected'] = json_encode(djwe['unprotected'])
if 'aad' in djwe:
......@@ -639,7 +649,8 @@ class JWE(object):
c = raw_jwe.split('.')
if len(c) != 5:
raise InvalidJWEData()
o['protected'] = base64url_decode(str(c[0]))
p = base64url_decode(str(c[0]))
o['protected'] = p.decode('utf-8')
ekey = base64url_decode(str(c[1]))
if ekey != '':
o['encrypted_key'] = base64url_decode(str(c[1]))
......
......@@ -152,7 +152,7 @@ class _raw_none(_raw_jws):
return ''
def verify(self, key, payload, signature):
if signature != '':
if signature != b'':
raise InvalidJWSSignature('The "none" signature must be the '
'empty string')
......@@ -187,7 +187,7 @@ class JWSCore(object):
self.key = key
if header is not None:
self.protected = base64url_encode(unicode(header, 'utf-8'))
self.protected = base64url_encode(header.encode('utf-8'))
else:
self.protected = ''
self.payload = base64url_encode(payload)
......@@ -245,16 +245,16 @@ class JWSCore(object):
raise InvalidJWAAlgorithm()
def sign(self):
signing_input = str.encode('.'.join([self.protected, self.payload]))
signature = self.engine.sign(self.key, signing_input)
sigin = ('.'.join([self.protected, self.payload])).encode('utf-8')
signature = self.engine.sign(self.key, sigin)
return {'protected': self.protected,
'payload': self.payload,
'signature': base64url_encode(signature)}
def verify(self, signature):
try:
signing_input = '.'.join([self.protected, self.payload])
self.engine.verify(self.key, signing_input, signature)
sigin = ('.'.join([self.protected, self.payload])).encode('utf-8')
self.engine.verify(self.key, sigin, signature)
except Exception as e: # pylint: disable=broad-except
raise InvalidJWSSignature('Verification failed', repr(e))
return True
......@@ -338,8 +338,8 @@ class JWS(object):
os = dict()
os['signature'] = base64url_decode(str(s['signature']))
if 'protected' in s:
os['protected'] = \
base64url_decode(str(s['protected']))
p = base64url_decode(str(s['protected']))
os['protected'] = p.decode('utf-8')
if 'header' in s:
os['header'] = json_encode(s['header'])
try:
......@@ -360,8 +360,8 @@ class JWS(object):
else:
o['signature'] = base64url_decode(str(djws['signature']))
if 'protected' in djws:
o['protected'] = \
base64url_decode(str(djws['protected']))
p = base64url_decode(str(djws['protected']))
o['protected'] = p.decode('utf-8')
if 'header' in djws:
o['header'] = json_encode(djws['header'])
try:
......@@ -385,8 +385,8 @@ class JWS(object):
if len(c) != 3:
raise InvalidJWSObject('Unrecognized representation')
p = base64url_decode(str(c[0]))
if p != '':
o['protected'] = p
if len(p) > 0:
o['protected'] = p.decode('utf-8')
o['payload'] = base64url_decode(str(c[1]))
o['signature'] = base64url_decode(str(c[2]))
try:
......
......@@ -293,7 +293,8 @@ class Cookbook08JWSTests(unittest.TestCase):
def test_4_1_signing(self):
plaintext = base64url_decode(Payload_plaintext_b64_4)
protected = base64url_decode(JWS_Protected_Header_4_1_2)
protected = \
base64url_decode(JWS_Protected_Header_4_1_2).decode('utf-8')
pub_key = jwk.JWK(**RSA_Public_Key_3_3) # pylint: disable=star-args
pri_key = jwk.JWK(**RSA_Private_Key_3_4) # pylint: disable=star-args
S = jws.JWS(payload=plaintext)
......@@ -304,7 +305,8 @@ class Cookbook08JWSTests(unittest.TestCase):
def test_4_2_signing(self):
plaintext = base64url_decode(Payload_plaintext_b64_4)
protected = base64url_decode(JWS_Protected_Header_4_2_2)
protected = \
base64url_decode(JWS_Protected_Header_4_2_2).decode('utf-8')
pub_key = jwk.JWK(**RSA_Public_Key_3_3) # pylint: disable=star-args
pri_key = jwk.JWK(**RSA_Private_Key_3_4) # pylint: disable=star-args
S = jws.JWS(payload=plaintext)
......@@ -320,7 +322,8 @@ class Cookbook08JWSTests(unittest.TestCase):
def test_4_3_signing(self):
plaintext = base64url_decode(Payload_plaintext_b64_4)
protected = base64url_decode(JWS_Protected_Header_4_3_2)
protected = \
base64url_decode(JWS_Protected_Header_4_3_2).decode('utf-8')
pub_key = jwk.JWK(**EC_Public_Key_3_1) # pylint: disable=star-args
pri_key = jwk.JWK(**EC_Private_Key_3_2) # pylint: disable=star-args
S = jws.JWS(payload=plaintext)
......@@ -336,7 +339,8 @@ class Cookbook08JWSTests(unittest.TestCase):
def test_4_4_signing(self):
plaintext = base64url_decode(Payload_plaintext_b64_4)
protected = base64url_decode(JWS_Protected_Header_4_4_2)
protected = \
base64url_decode(JWS_Protected_Header_4_4_2).decode('utf-8')
key = jwk.JWK(**Symmetric_Key_MAC_3_5) # pylint: disable=star-args
S = jws.JWS(payload=plaintext)
S.add_signature(key, None, protected)
......@@ -350,7 +354,8 @@ class Cookbook08JWSTests(unittest.TestCase):
def test_4_6_signing(self):
plaintext = base64url_decode(Payload_plaintext_b64_4)
protected = base64url_decode(JWS_Protected_Header_4_6_2)
protected = \
base64url_decode(JWS_Protected_Header_4_6_2).decode('utf-8')
header = json_encode(JWS_Unprotected_Header_4_6_2)
key = jwk.JWK(**Symmetric_Key_MAC_3_5) # pylint: disable=star-args
S = jws.JWS(payload=plaintext)
......@@ -379,7 +384,8 @@ class Cookbook08JWSTests(unittest.TestCase):
plaintext = base64url_decode(Payload_plaintext_b64_4)
S = jws.JWS(payload=plaintext)
# 4_8_2
protected = base64url_decode(JWS_Protected_Header_4_8_2)
protected = \
base64url_decode(JWS_Protected_Header_4_8_2).decode('utf-8')
header = json_encode(JWS_Unprotected_Header_4_8_2)
pri_key = jwk.JWK(**RSA_Private_Key_3_4) # pylint: disable=star-args
S.add_signature(pri_key, None, protected, header)
......@@ -388,7 +394,8 @@ class Cookbook08JWSTests(unittest.TestCase):
pri_key = jwk.JWK(**EC_Private_Key_3_2) # pylint: disable=star-args
S.add_signature(pri_key, None, None, header)
# 4_8_4
protected = base64url_decode(JWS_Protected_Header_4_8_4)
protected = \
base64url_decode(JWS_Protected_Header_4_8_4).decode('utf-8')
sym_key = jwk.JWK(**Symmetric_Key_MAC_3_5) # pylint: disable=star-args
S.add_signature(sym_key, None, protected)
sig = S.serialize()
......@@ -407,11 +414,11 @@ class Cookbook08JWSTests(unittest.TestCase):
# 5.0
Payload_plaintext_5 = \
"You can trust us to stick with you through thick and " + \
"thin\xe2\x80\x93to the bitter end. And you can trust us to " + \
"keep any secret of yours\xe2\x80\x93closer than you keep it " + \
"yourself. But you cannot trust us to let you face trouble " + \
"alone, and go off without a word. We are your friends, Frodo."
b"You can trust us to stick with you through thick and " + \
b"thin\xe2\x80\x93to the bitter end. And you can trust us to " + \
b"keep any secret of yours\xe2\x80\x93closer than you keep it " + \
b"yourself. But you cannot trust us to let you face trouble " + \
b"alone, and go off without a word. We are your friends, Frodo."
# 5.1
RSA_key_5_1_1 = {
......
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
from __future__ import unicode_literals
from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode
from jwcrypto import jwk
......@@ -199,7 +200,7 @@ A1_signature = \
132, 141, 121]
A1_example = {'key': SymmetricKeys['keys'][1],
'alg': 'HS256',
'protected': bytes(bytearray(A1_protected)),
'protected': bytes(bytearray(A1_protected)).decode('utf-8'),
'payload': bytes(bytearray(A1_payload)),
'signature': bytes(bytearray(A1_signature))}
......@@ -258,7 +259,7 @@ A2_signature = \
251, 71]
A2_example = {'key': A2_key,
'alg': 'RS256',
'protected': bytes(bytearray(A2_protected)),
'protected': bytes(bytearray(A2_protected)).decode('utf-8'),
'payload': bytes(bytearray(A2_payload)),
'signature': bytes(bytearray(A2_signature))}
......@@ -281,7 +282,7 @@ A3_signature = \
143, 63, 127, 138, 131, 163, 84, 213]
A3_example = {'key': A3_key,
'alg': 'ES256',
'protected': bytes(bytearray(A3_protected)),
'protected': bytes(bytearray(A3_protected)).decode('utf-8'),
'payload': bytes(bytearray(A3_payload)),
'signature': bytes(bytearray(A3_signature))}
......@@ -312,7 +313,7 @@ A4_signature = \
232, 148, 188, 222, 59, 242, 103]
A4_example = {'key': A4_key,
'alg': 'ES512',
'protected': bytes(bytearray(A4_protected)),
'protected': bytes(bytearray(A4_protected)).decode('utf-8'),
'payload': bytes(bytearray(A4_payload)),
'signature': bytes(bytearray(A4_signature))}
......@@ -322,10 +323,10 @@ A5_protected = 'eyJhbGciOiJub25lIn0'
A5_payload = A2_payload
A5_key = \
{"kty": "oct", "k": ""}
A5_signature = ""
A5_signature = b''
A5_example = {'key': A5_key,
'alg': 'none',
'protected': base64url_decode(A5_protected),
'protected': base64url_decode(A5_protected).decode('utf-8'),
'payload': bytes(bytearray(A5_payload)),
'signature': A5_signature}
......@@ -355,10 +356,10 @@ A6_serialized = \
A6_example = {
'payload': bytes(bytearray(A2_payload)),
'key1': jwk.JWK(**A2_key), # pylint: disable=star-args
'protected1': bytes(bytearray(A2_protected)),
'protected1': bytes(bytearray(A2_protected)).decode('utf-8'),
'header1': json_encode({"kid": "2010-12-29"}),
'key2': jwk.JWK(**A3_key), # pylint: disable=star-args
'protected2': bytes(bytearray(A3_protected)),
'protected2': bytes(bytearray(A3_protected)).decode('utf-8'),
'header2': json_encode({"kid": "e9bc097a-ce51-4036-9562-d2ade882db0d"}),
'serialized': A6_serialized}
......@@ -489,7 +490,7 @@ E_A1_vector = \
E_A1_ex = {'key': jwk.JWK(**E_A1_key), # pylint: disable=star-args
'protected': base64url_decode(E_A1_protected),
'plaintext': E_A1_plaintext,
'plaintext': bytes(bytearray(E_A1_plaintext)),
'vector': E_A1_vector}
E_A2_plaintext = "Live long and prosper."
......@@ -552,7 +553,7 @@ E_A3_vector = \
"U0m_YmjN04DJvceFICbCVQ"
E_A3_ex = {'key': jwk.JWK(**E_A3_key), # pylint: disable=star-args
'protected': base64url_decode(E_A3_protected),
'protected': base64url_decode(E_A3_protected).decode('utf-8'),
'plaintext': E_A3_plaintext,
'vector': E_A3_vector}
......@@ -651,45 +652,45 @@ class ConformanceTests(unittest.TestCase):
def test_jwe_no_protected_header(self):
enc = jwe.JWE(plaintext='plain')
enc.add_recipient(jwk.JWK(kty='oct', k=base64url_encode('A'*16)),
enc.add_recipient(jwk.JWK(kty='oct', k=base64url_encode(b'A'*16)),
'{"alg":"A128KW","enc":"A128GCM"}')
def test_jwe_no_alg_in_jose_headers(self):
enc = jwe.JWE(plaintext='plain')
self.assertRaises(jwe.InvalidJWEData, enc.add_recipient,
jwk.JWK(kty='oct', k=base64url_encode('A'*16)),
jwk.JWK(kty='oct', k=base64url_encode(b'A'*16)),
'{"enc":"A128GCM"}')
def test_jwe_no_enc_in_jose_headers(self):
enc = jwe.JWE(plaintext='plain')
self.assertRaises(jwe.InvalidJWEData, enc.add_recipient,
jwk.JWK(kty='oct', k=base64url_encode('A'*16)),
jwk.JWK(kty='oct', k=base64url_encode(b'A'*16)),
'{"alg":"A128KW"}')
def test_aes_128(self):
enc = jwe.JWE(plaintext='plain')
key128 = jwk.JWK(kty='oct', k=base64url_encode('A' * (128 // 8)))
key128 = jwk.JWK(kty='oct', k=base64url_encode(b'A' * (128 // 8)))
enc.add_recipient(key128, '{"alg":"A128KW","enc":"A128CBC-HS256"}')
enc.add_recipient(key128, '{"alg":"A128KW","enc":"A128GCM"}')
def test_aes_192(self):
enc = jwe.JWE(plaintext='plain')
key192 = jwk.JWK(kty='oct', k=base64url_encode('B' * (192 // 8)))
key192 = jwk.JWK(kty='oct', k=base64url_encode(b'B' * (192 // 8)))
enc.add_recipient(key192, '{"alg":"A192KW","enc":"A192CBC-HS384"}')
enc.add_recipient(key192, '{"alg":"A192KW","enc":"A192GCM"}')
def test_aes_256(self):
enc = jwe.JWE(plaintext='plain')
key256 = jwk.JWK(kty='oct', k=base64url_encode('C' * (256 // 8)))
key256 = jwk.JWK(kty='oct', k=base64url_encode(b'C' * (256 // 8)))
enc.add_recipient(key256, '{"alg":"A256KW","enc":"A256CBC-HS512"}')
enc.add_recipient(key256, '{"alg":"A256KW","enc":"A256GCM"}')
def test_jws_loopback(self):
sign = jws.JWS(payload='message')
sign.add_signature(jwk.JWK(kty='oct', k=base64url_encode('A'*16)),
sign.add_signature(jwk.JWK(kty='oct', k=base64url_encode(b'A'*16)),
alg="HS512")
o = sign.serialize()
check = jws.JWS()
check.deserialize(o, jwk.JWK(kty='oct', k=base64url_encode('A'*16)),
check.deserialize(o, jwk.JWK(kty='oct', k=base64url_encode(b'A'*16)),
alg="HS512")
self.assertTrue(check.objects['valid'])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment