Commit 3b2f3218 authored by SVN-Git Migration's avatar SVN-Git Migration

Imported Upstream version 1.16

parents
*.pyc
issue*.xml
/.idea
/PySimpleSOAP.egg-info
/.coverage
lib/*
bin/*
include/*
build/
dist/
syntax: glob
*.pyc
lib/*
bin/*
include/*
1b656e9df5e25b15aa72326f581f168696b3ec46 1.10
57bca8475baba500b23bdae40f15e3be644ef698 update
0000000000000000000000000000000000000000 update
57bca8475baba500b23bdae40f15e3be644ef698 f289e16a0972
0000000000000000000000000000000000000000 f289e16a0972
f289e16a0972be7d3a64901d713778381c919c2d 1.11-git
1a76eb59cdf5921344c9ec667af7b25ff8774e46 1.12
2ba439cbd4fc525bc2c62c5fe7b3bd9a83ad6336 1.13
language: python
python:
- "2.6"
- "2.7"
- "3.2"
- "3.3"
- "pypy"
script: nosetests -a !internal,!disabled
Maintainer
========
- Mariano Reingart <reingart@gmail.com>
Contributors
========
- Dean Gardiner <fuzeman91@gmail.com> @fuzeman
- Piotr Staroszczyk <piotr.staroczyk@get24.org> @oczkers
- Rui Carmo @rcarmo
Patches and Suggestions
========
- Marcelo Alaniz
- Margarita Manterola
- Gerardo Allende
- Darell Tan
- Alan Etkin
- Richard Jones
- matee <matee@matee.net>
- Yuan Liu
- Pat Kujawa
- Alfredo Saglimbeni <a.saglimbeni@scsitaly.com> @asaglimbeni
- Jor S
- Akram Parvez
- beppler
- Remco Boerma
- Sergei Lemeshkin
- Tim Alexander
- Ralf Henschkowski
- Allan Crooks
- Paul Jimenez
- Jonathan Balsano
- Kevin Bullmann
- Pedro Hdez
- jpc <jpc@alumni.rice.edu>
- Luka Birsa
- Bill Bennert @billwebreply
Features
========
This fork has the following features:
* Corrected support for multiple SOAP ports/bindings
* Support for both `import` and `include` stanzas in WSDL
* Support for a WSDL base directory to deal with relative pathnames in import/include stanzas
* Somewhat saner traceing/logging (traces now go to log.debug(), which you can handle per module)
* Somewhat more readable logic (by removing a bunch of helpers to a separate file)
Testing
=======
Using Python 2.7+:
python -m unittest discover
Using older Python versions:
python -m unittest tests/suite.py
Code coverage:
sudo pip install coverage
coverage run tests/suite.py
coverage report -m
coverage html
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""PySimpleSOAP"""
__author__ = "Mariano Reingart"
__author_email__ = "reingart@gmail.com"
__copyright__ = "Copyright (C) 2013 Mariano Reingart"
__license__ = "LGPL 3.0"
__version__ = "1.16"
TIMEOUT = 60
from . import client, server, simplexml, transport
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
#!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 3, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
"""Pythonic simple SOAP Client transport"""
import logging
import sys
try:
import urllib2
from cookielib import CookieJar
except ImportError:
from urllib import request as urllib2
from http.cookiejar import CookieJar
from . import __author__, __copyright__, __license__, __version__, TIMEOUT
from .simplexml import SimpleXMLElement, TYPE_MAP, Struct
log = logging.getLogger(__name__)
#
# Socket wrapper to enable socket.TCP_NODELAY - this greatly speeds up transactions in Linux
# WARNING: this will modify the standard library socket module, use with care!
# TODO: implement this as a transport faciliy
# (to pass options directly to httplib2 or pycurl)
# be aware of metaclasses and socks.py (SocksiPy) used by httplib2
if False:
import socket
realsocket = socket.socket
def socketwrap(family=socket.AF_INET, type=socket.SOCK_STREAM, proto=0):
sockobj = realsocket(family, type, proto)
if type == socket.SOCK_STREAM:
sockobj.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
return sockobj
socket.socket = socketwrap
#
# We store metadata about what available transport mechanisms we have available.
#
_http_connectors = {} # libname: classimpl mapping
_http_facilities = {} # functionalitylabel: [sequence of libname] mapping
class TransportBase:
@classmethod
def supports_feature(cls, feature_name):
return cls._wrapper_name in _http_facilities[feature_name]
#
# httplib2 support.
#
try:
import httplib2
if sys.version > '3' and httplib2.__version__ <= "0.7.7":
import http.client
# httplib2 workaround: check_hostname needs a SSL context with either
# CERT_OPTIONAL or CERT_REQUIRED
# see https://code.google.com/p/httplib2/issues/detail?id=173
orig__init__ = http.client.HTTPSConnection.__init__
def fixer(self, host, port, key_file, cert_file, timeout, context,
check_hostname, *args, **kwargs):
chk = kwargs.get('disable_ssl_certificate_validation', True) ^ True
orig__init__(self, host, port=port, key_file=key_file,
cert_file=cert_file, timeout=timeout, context=context,
check_hostname=chk)
http.client.HTTPSConnection.__init__ = fixer
except ImportError:
TIMEOUT = None # timeout not supported by urllib2
pass
else:
class Httplib2Transport(httplib2.Http, TransportBase):
_wrapper_version = "httplib2 %s" % httplib2.__version__
_wrapper_name = 'httplib2'
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
# httplib2.debuglevel=4
kwargs = {}
if proxy:
import socks
kwargs['proxy_info'] = httplib2.ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, **proxy)
log.info("using proxy %s" % proxy)
# set optional parameters according supported httplib2 version
if httplib2.__version__ >= '0.3.0':
kwargs['timeout'] = timeout
if httplib2.__version__ >= '0.7.0':
kwargs['disable_ssl_certificate_validation'] = cacert is None
kwargs['ca_certs'] = cacert
httplib2.Http.__init__(self, **kwargs)
_http_connectors['httplib2'] = Httplib2Transport
_http_facilities.setdefault('proxy', []).append('httplib2')
_http_facilities.setdefault('cacert', []).append('httplib2')
import inspect
if 'timeout' in inspect.getargspec(httplib2.Http.__init__)[0]:
_http_facilities.setdefault('timeout', []).append('httplib2')
#
# urllib2 support.
#
class urllib2Transport(TransportBase):
_wrapper_version = "urllib2 %s" % urllib2.__version__
_wrapper_name = 'urllib2'
def __init__(self, timeout=None, proxy=None, cacert=None, sessions=False):
if (timeout is not None) and not self.supports_feature('timeout'):
raise RuntimeError('timeout is not supported with urllib2 transport')
if proxy:
raise RuntimeError('proxy is not supported with urllib2 transport')
if cacert:
raise RuntimeError('cacert is not support with urllib2 transport')
self.request_opener = urllib2.urlopen
if sessions:
opener = urllib2.build_opener(urllib2.HTTPCookieProcessor(CookieJar()))
self.request_opener = opener.open
self._timeout = timeout
def request(self, url, method="GET", body=None, headers={}):
req = urllib2.Request(url, body, headers)
try:
f = self.request_opener(req, timeout=self._timeout)
return f.info(), f.read()
except urllib2.HTTPError as f:
if f.code != 500:
raise
return f.info(), f.read()
_http_connectors['urllib2'] = urllib2Transport
_http_facilities.setdefault('sessions', []).append('urllib2')
import sys
if sys.version_info >= (2, 6):
_http_facilities.setdefault('timeout', []).append('urllib2')
del sys
#
# pycurl support.
# experimental: pycurl seems faster + better proxy support (NTLM) + ssl features
#
try:
import pycurl
except ImportError:
pass
else:
try:
from cStringIO import StringIO
except ImportError:
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
class pycurlTransport(TransportBase):
_wrapper_version = pycurl.version
_wrapper_name = 'pycurl'
def __init__(self, timeout, proxy=None, cacert=None, sessions=False):
self.timeout = timeout
self.proxy = proxy or {}
self.cacert = cacert
def request(self, url, method, body, headers):
c = pycurl.Curl()
c.setopt(pycurl.URL, url)
if 'proxy_host' in self.proxy:
c.setopt(pycurl.PROXY, self.proxy['proxy_host'])
if 'proxy_port' in self.proxy:
c.setopt(pycurl.PROXYPORT, self.proxy['proxy_port'])
if 'proxy_user' in self.proxy:
c.setopt(pycurl.PROXYUSERPWD, "%(proxy_user)s:%(proxy_pass)s" % self.proxy)
self.buf = StringIO()
c.setopt(pycurl.WRITEFUNCTION, self.buf.write)
#c.setopt(pycurl.READFUNCTION, self.read)
#self.body = StringIO(body)
#c.setopt(pycurl.HEADERFUNCTION, self.header)
if self.cacert:
c.setopt(c.CAINFO, self.cacert)
c.setopt(pycurl.SSL_VERIFYPEER, self.cacert and 1 or 0)
c.setopt(pycurl.SSL_VERIFYHOST, self.cacert and 2 or 0)
c.setopt(pycurl.CONNECTTIMEOUT, self.timeout / 6)
c.setopt(pycurl.TIMEOUT, self.timeout)
if method == 'POST':
c.setopt(pycurl.POST, 1)
c.setopt(pycurl.POSTFIELDS, body)
if headers:
hdrs = ['%s: %s' % (k, v) for k, v in headers.items()]
log.debug(hdrs)
c.setopt(pycurl.HTTPHEADER, hdrs)
c.perform()
c.close()
return {}, self.buf.getvalue()
_http_connectors['pycurl'] = pycurlTransport
_http_facilities.setdefault('proxy', []).append('pycurl')
_http_facilities.setdefault('cacert', []).append('pycurl')
_http_facilities.setdefault('timeout', []).append('pycurl')
class DummyTransport:
"""Testing class to load a xml response"""
def __init__(self, xml_response):
self.xml_response = xml_response
def request(self, location, method, body, headers):
log.debug("%s %s", method, location)
log.debug(headers)
log.debug(body)
return {}, self.xml_response
def get_http_wrapper(library=None, features=[]):
# If we are asked for a specific library, return it.
if library is not None:
try:
return _http_connectors[library]
except KeyError:
raise RuntimeError('%s transport is not available' % (library,))
# If we haven't been asked for a specific feature either, then just return our favourite
# implementation.
if not features:
return _http_connectors.get('httplib2', _http_connectors['urllib2'])
# If we are asked for a connector which supports the given features, then we will
# try that.
current_candidates = _http_connectors.keys()
new_candidates = []
for feature in features:
for candidate in current_candidates:
if candidate in _http_facilities.get(feature, []):
new_candidates.append(candidate)
current_candidates = new_candidates
new_candidates = []
# Return the first candidate in the list.
try:
candidate_name = current_candidates[0]
except IndexError:
raise RuntimeError("no transport available which supports these features: %s" % (features,))
else:
return _http_connectors[candidate_name]
def set_http_wrapper(library=None, features=[]):
"""Set a suitable HTTP connection wrapper."""
global Http
Http = get_http_wrapper(library, features)
return Http
def get_Http():
"""Return current transport class"""
global Http
return Http
# define the default HTTP connection class (it can be changed at runtime!):
set_http_wrapper()
#!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation; either version 3, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
"""Pythonic simple SOAP Client plugins for WebService Security extensions"""
from __future__ import unicode_literals
import sys
if sys.version > '3':
basestring = unicode = str
import datetime
from decimal import Decimal
import os
import logging
import hashlib
import warnings
from . import __author__, __copyright__, __license__, __version__
from .simplexml import SimpleXMLElement
# Namespaces:
WSSE_URI = 'http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd'
WSU_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd"
XMLDSIG_URI = "http://www.w3.org/2000/09/xmldsig#"
X509v3_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"
Base64Binary_URI = "http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary"
class UsernameToken:
"WebService Security extension to add a basic credentials to xml request"
def __init__(self, username="", password=""):
self.token = {
'wsse:UsernameToken': {
'wsse:Username': username,
'wsse:Password': password,
}
}
def preprocess(self, client, request, method, args, kwargs, headers, soap_uri):
"Add basic credentials to outgoing message"
# always extract WS Security header and send it
header = request('Header', ns=soap_uri, )
k = 'wsse:Security'
# for backward compatibility, use header if given:
if k in headers:
self.token = headers[k]
# convert the token to xml
header.marshall(k, self.token, ns=False, add_children_ns=False)
header(k)['xmlns:wsse'] = WSSE_URI
#<wsse:UsernameToken xmlns:wsu='http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd'>
def postprocess(self, client, response, method, args, kwargs, headers, soap_uri):
"Analyze incoming credentials"
# TODO: add some password validation callback?
pass
BIN_TOKEN_TMPL = """<?xml version="1.0" encoding="UTF-8"?>
<wsse:Security soapenv:mustUnderstand="1" xmlns:soapenv="http://schemas.xmlsoap.org/soap/envelope/" xmlns:wsse="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-secext-1.0.xsd">
<wsse:BinarySecurityToken EncodingType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-soap-message-security-1.0#Base64Binary" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3" wsu:Id="CertId-45851B081998E431E8132880700036719" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
%(certificate)s</wsse:BinarySecurityToken>
<ds:Signature Id="Signature-13" xmlns:ds="http://www.w3.org/2000/09/xmldsig#">
%(signed_info)s
<ds:SignatureValue>%(signature_value)s</ds:SignatureValue>
<ds:KeyInfo Id="KeyId-45851B081998E431E8132880700036720">
<wsse:SecurityTokenReference wsu:Id="STRId-45851B081998E431E8132880700036821" xmlns:wsu="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-wssecurity-utility-1.0.xsd">
<wsse:Reference URI="#CertId-45851B081998E431E8132880700036719" ValueType="http://docs.oasis-open.org/wss/2004/01/oasis-200401-wss-x509-token-profile-1.0#X509v3"/>
</wsse:SecurityTokenReference>
</ds:KeyInfo>
</ds:Signature>
</wsse:Security>
"""
class BinaryTokenSignature:
"WebService Security extension to add a basic signature to xml request"
def __init__(self, certificate="", private_key="", password=None, cacert=None):
# read the X509v3 certificate (PEM)
self.certificate = ''.join([line for line in open(certificate)
if not line.startswith("---")])
self.private_key = private_key
self.password = password
self.cacert = cacert
def preprocess(self, client, request, method, args, kwargs, headers, soap_uri):
"Sign the outgoing SOAP request"
# get xml elements:
body = request('Body', ns=soap_uri, )
header = request('Header', ns=soap_uri, )
# prepare body xml attributes to be signed (reference)
body['wsu:Id'] = "id-14"
body['xmlns:wsu'] = WSU_URI
# workaround: copy namespaces so lxml can parse the xml to be signed
for attr, value in request[:]:
if attr.startswith("xmlns"):
body[attr] = value
# use the internal tag xml representation (not the full xml document)
ref_xml = repr(body)
# sign using RSA-SHA1 (XML Security)
from . import xmlsec
vars = xmlsec.rsa_sign(ref_xml, "#id-14",
self.private_key, self.password)
vars['certificate'] = self.certificate
# generate the xml (filling the placeholders)
wsse = SimpleXMLElement(BIN_TOKEN_TMPL % vars)
header.import_node(wsse)
def postprocess(self, client, response, method, args, kwargs, headers, soap_uri):
"Verify the signature of the incoming response"
from . import xmlsec
# get xml elements:
body = response('Body', ns=soap_uri, )
header = response('Header', ns=soap_uri, )
wsse = header("Security", ns=WSSE_URI)
cert = wsse("BinarySecurityToken", ns=WSSE_URI)
# check that the cert (binary token) is coming in the correct format:
self.__check(cert["EncodingType"], Base64Binary_URI)
self.__check(cert["ValueType"], X509v3_URI)
# extract the certificate (in DER to avoid new line & padding issues!)
cert_der = str(cert).decode("base64")
public_key = xmlsec.x509_extract_rsa_public_key(cert_der, binary=True)
# validate the certificate using the certification authority:
if not self.cacert:
warnings.warn("No CA provided, WSSE not validating certificate")
elif not xmlsec.x509_verify(self.cacert, cert_der, binary=True):
raise RuntimeError("WSSE certificate validation failed")
# check body xml attributes was signed correctly (reference)
self.__check(body['xmlns:wsu'], WSU_URI)
ref_uri = body['wsu:Id']
signature = wsse("Signature", ns=XMLDSIG_URI)
signed_info = signature("SignedInfo")
signature_value = signature("SignatureValue")
# TODO: these sanity checks should be moved to xmlsec?
self.__check(signed_info("Reference")['URI'], "#" + ref_uri)
self.__check(signed_info("SignatureMethod")['Algorithm'],
XMLDSIG_URI + "rsa-sha1")
self.__check(signed_info("Reference")("DigestMethod")['Algorithm'],
XMLDSIG_URI + "sha1")
# TODO: check KeyInfo uses the correct SecurityTokenReference
# workaround: copy namespaces so lxml can parse the xml to be signed
for attr, value in response[:]:
if attr.startswith("xmlns"):
body[attr] = value
# use the internal tag xml representation (not the full xml document)
ref_xml = xmlsec.canonicalize(repr(body))
# verify the signed hash
computed_hash = xmlsec.sha1_hash_digest(ref_xml)
digest_value = str(signed_info("Reference")("DigestValue"))
if computed_hash != digest_value:
raise RuntimeError("WSSE SHA1 hash digests mismatch")
# workaround: prepare the signed info (assure the parent ns is present)
signed_info['xmlns'] = XMLDSIG_URI
xml = repr(signed_info)
# verify the signature using RSA-SHA1 (XML Security)
ok = xmlsec.rsa_verify(xml, str(signature_value), public_key)
if not ok:
raise RuntimeError("WSSE RSA-SHA1 signature verification failed")
# TODO: remove any unsigned part from the xml?
def __check(self, value, expected, msg="WSSE sanity check failed"):
if value != expected:
raise RuntimeError(msg)
#!/usr/bin/python
# -*- coding: utf-8 -*-
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by the
# Free Software Foundation; either version 3, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
"""Pythonic XML Security Library implementation"""
import base64
import hashlib
import os
from cStringIO import StringIO
from M2Crypto import BIO, EVP, RSA, X509, m2
# if lxml is not installed, use c14n.py native implementation
try:
import lxml.etree
except ImportError:
lxml = None
# Features:
# * Uses M2Crypto and lxml (libxml2) but it is independent from libxmlsec1
# * Sign, Verify, Encrypt & Decrypt XML documents
# Enveloping templates ("by reference": signature is parent):
SIGN_REF_TMPL = """
<SignedInfo xmlns="http://www.w3.org/2000/09/xmldsig#">
<CanonicalizationMethod Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#" />
<SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1" />
<Reference URI="%(ref_uri)s">
<Transforms>
<Transform Algorithm="http://www.w3.org/2001/10/xml-exc-c14n#" />
</Transforms>
<DigestMethod Algorithm="http://www.w3.org/2000/09/xmldsig#sha1" />
<DigestValue>%(digest_value)s</DigestValue>
</Reference>
</SignedInfo>
"""
SIGNED_TMPL = """
<?xml version="1.0" encoding="UTF-8"?>
<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">
%(signed_info)s
<SignatureValue>%(signature_value)s</SignatureValue>
%(key_info)s
%(ref_xml)s
</Signature>
"""
# Enveloped templates (signature is child, the reference is the root object):
SIGN_ENV_TMPL = """
<SignedInfo xmlns="http://www.w3.org/2000/09/xmldsig#">
<CanonicalizationMethod Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
<SignatureMethod Algorithm="http://www.w3.org/2000/09/xmldsig#rsa-sha1"/>
<Reference URI="">
<Transforms>
<Transform Algorithm="http://www.w3.org/2000/09/xmldsig#enveloped-signature"/>
<Transform Algorithm="http://www.w3.org/TR/2001/REC-xml-c14n-20010315"/>
</Transforms>
<DigestMethod Algorithm="http://www.w3.org/2000/09/xmldsig#sha1"/>
<DigestValue>%(digest_value)s</DigestValue>
</Reference>
</SignedInfo>
"""
SIGNATURE_TMPL = """<Signature xmlns="http://www.w3.org/2000/09/xmldsig#">
%(signed_info)s
<SignatureValue>%(signature_value)s</SignatureValue>
%(key_info)s
</Signature>"""
KEY_INFO_RSA_TMPL = """
<KeyInfo>
<KeyValue>
<RSAKeyValue>
<Modulus>%(modulus)s</Modulus>
<Exponent>%(exponent)s</Exponent>
</RSAKeyValue>
</KeyValue>
</KeyInfo>
"""
KEY_INFO_X509_TMPL = """
<KeyInfo>
<X509Data>
<X509IssuerSerial>
<X509IssuerName>%(issuer_name)s</X509IssuerName>
<X509SerialNumber>%(serial_number)s</X509SerialNumber>
</X509IssuerSerial>
</X509Data>
</KeyInfo>
"""
def canonicalize(xml, c14n_exc=True):
"Return the canonical (c14n) form of the xml document for hashing"
# UTF8, normalization of line feeds/spaces, quoting, attribute ordering...
output = StringIO()
if lxml is not None:
# use faster libxml2 / lxml canonicalization function if available
et = lxml.etree.parse(StringIO(xml))
et.write_c14n(output, exclusive=c14n_exc)
else:
# use pure-python implementation: c14n.py (avoid recursive import)
from .simplexml import SimpleXMLElement
SimpleXMLElement(xml).write_c14n(output, exclusive=c14n_exc)
return output.getvalue()
def sha1_hash_digest(payload):
"Create a SHA1 hash and return the base64 string"
return base64.b64encode(hashlib.sha1(payload).digest())
def rsa_sign(xml, ref_uri, private_key, password=None, cert=None, c14n_exc=True,
sign_template=SIGN_REF_TMPL, key_info_template=KEY_INFO_RSA_TMPL):
"Sign an XML document usign RSA (templates: enveloped -ref- or enveloping)"
# normalize the referenced xml (to compute the SHA1 hash)
ref_xml = canonicalize(xml, c14n_exc)
# create the signed xml normalized (with the referenced uri and hash value)
signed_info = sign_template % {'ref_uri': ref_uri,
'digest_value': sha1_hash_digest(ref_xml)}
signed_info = canonicalize(signed_info, c14n_exc)
# Sign the SHA1 digest of the signed xml using RSA cipher
pkey = RSA.load_key(private_key, lambda *args, **kwargs: password)
signature = pkey.sign(hashlib.sha1(signed_info).digest())
# build the mapping (placeholders) to create the final xml signed message
return {
'ref_xml': ref_xml, 'ref_uri': ref_uri,
'signed_info': signed_info,
'signature_value': base64.b64encode(signature),
'key_info': key_info(pkey, cert, key_info_template),
}