...
 
Commits (71)
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
addons:
apt:
packages:
- libssl-dev
- libsasl2-dev
- sasl2-bin
- swig
- python-dev
- python3-dev
- uuid-dev
install:
- sudo apt-get install -qq uuid-dev swig
- pip install -r test-requirements.txt
- pip install .
# command to run tests
......
include README.md
include LICENSE
include docs/User-Guide.md
exclude .gitignore
global-exclude *.pyc
......@@ -7,6 +7,68 @@ callback-based API for message passing.
See the User Guide in the docs directory for more detail.
## Release 2.2.2 ##
* verified Python 3.6 compatibility
* bumped max proton version to 0.19
## Release 2.2.1 ##
* disable the socket I/O logging - fills the debug logs with lots of
useless crap.
## Release 2.2.0 ##
* Can now use the system's default CA by specifying the 'x-ssl' option
in the 'properties' field of the create_connection call and _NOT_
specifying the 'x-ssl-ca-file' property. (contributed by
Juan Antonio Osorio Robles)
* use the most secure default setting for x-ssl-verify-mode based on
the configuration
* bump max proton version to 0.17
## Release 2.1.4 ##
* avoid using deprecated next_tick in the container
* enable Python 3.5 testing in tox
* add client authentication via SSL tests
* bump max proton version to 0.16
## Release 2.1.3 ##
* Remove chatty debug log messages
* fix pep8 violation
* add static performace test tool
* Bump max proton version to 0.15
## Release 2.1.2 ##
* Bump max proton version to 0.14
## Release 2.1.1 ##
* bugfix: under some (rare) flow/credit interactions a sender may
stall. Changed code to invoke credit_granted() callback more
frequently.
## Release 2.1.0 ##
* feature: add 'x-force-sasl' to connection property map
* bugfix: update old SASL unit test
## Release 2.0.4 ##
* Bump max proton version to 0.13
* performance tweak to link event handling
* fix perf-test.py tool
* bugfix: fix leak of timer callbacks
* enable Python 3.4 testing
* bugfix: fix receiver example (recv.py)
* several fixes to the SASL unit tests
* bugfix: fix leak of underlying proton objects
* Add SASL/SSL configuration options to examples
* bugfix: allow PLAIN or ANONYMOUS authentication in server mode
## Release 2.0.3 ##
* bugfix: fixed a memory leak
......
python-pyngus (2.0.3-2) UNRELEASED; urgency=medium
python-pyngus (2.2.2-4) unstable; urgency=medium
* Team upload.
* d/control: Use team+openstack@tracker.debian.org as maintainer
* Use debhelper-compat instead of debian/compat.
* Use pybuild to build package.
* Bump debhelper compat level to 12.
* Enable autopkgtest-pkg-python testsuite.
* Drop Python 2 support (Closes: #938079).
* Install examples.
* Bump standards version to 4.4.0 (no changes).
-- Ondřej Nový <onovy@debian.org> Mon, 02 Sep 2019 16:26:05 +0200
python-pyngus (2.2.2-3) unstable; urgency=medium
* Do not test SSL connection stuff (Closes: #896571).
-- Thomas Goirand <zigo@debian.org> Mon, 30 Apr 2018 13:16:03 +0000
python-pyngus (2.2.2-2) unstable; urgency=medium
* Uploading to unstable.
-- Thomas Goirand <zigo@debian.org> Sun, 25 Feb 2018 22:56:36 +0000
python-pyngus (2.2.2-1) experimental; urgency=medium
[ Ondřej Nový ]
* Fixed VCS URLs (https).
......@@ -16,7 +42,16 @@ python-pyngus (2.0.3-2) UNRELEASED; urgency=medium
* Updating standards version to 4.0.1.
* Updating standards version to 4.1.0.
-- Ondřej Nový <novy@ondrej.org> Sun, 28 Feb 2016 15:47:38 +0100
[ Thomas Goirand ]
* New upstream release.
* VCS URLs using Salsa.
* Standards-Version is now 4.1.3.
* Using pkgos-dh_auto_install.
* Fixed debian/copyright ordering and years.
* Watch file now using github tag.
* Switched to debhelper 10.
-- Thomas Goirand <zigo@debian.org> Sun, 11 Feb 2018 10:30:46 +0000
python-pyngus (2.0.3-1) unstable; urgency=medium
......
Source: python-pyngus
Section: python
Priority: optional
Maintainer: Debian OpenStack <openstack-devel@lists.alioth.debian.org>
Maintainer: Debian OpenStack <team+openstack@tracker.debian.org>
Uploaders:
Thomas Goirand <zigo@debian.org>,
Build-Depends:
debhelper (>= 9),
debhelper-compat (= 12),
dh-python,
python-all,
python-setuptools,
openstack-pkg-tools,
python3-all,
python3-setuptools,
Build-Depends-Indep:
python-qpid-proton (>= 0.9),
python3-qpid-proton (>= 0.9),
Standards-Version: 4.1.0
Vcs-Browser: https://anonscm.debian.org/cgit/openstack/python/python-pyngus.git
Vcs-Git: https://anonscm.debian.org/git/openstack/python/python-pyngus.git
python3-qpid-proton,
Standards-Version: 4.4.0
Vcs-Browser: https://salsa.debian.org/openstack-team/python/python-pyngus
Vcs-Git: https://salsa.debian.org/openstack-team/python/python-pyngus.git
Homepage: https://github.com/kgiusti/pyngus
Package: python-pyngus
Architecture: all
Depends:
python-qpid-proton (>= 0.9),
${misc:Depends},
${python:Depends},
Description: callback API implemented over Proton - Python 2.7
Pyngus is a messaging framework built on the QPID Proton engine. It provides
a callback-based API for message passing.
.
This package contains the Python 2.7 module.
Testsuite: autopkgtest-pkg-python
Package: python3-pyngus
Architecture: all
Depends:
python3-qpid-proton (>= 0.9),
python3-qpid-proton,
${misc:Depends},
${python3:Depends},
Description: callback API implemented over Proton - Python 3.x
......
......@@ -2,15 +2,15 @@ Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: pyngus
Source: https://github.com/kgiusti/pyngus
Files: debian/*
Copyright: (c) 2015, Thomas Goirand <zigo@debian.org>
License: Apache-2
Files: *
Copyright: (c) 2013-2015, Ken Giusti <kgiusti@gmail.com>
Copyright: (c) 2013-2018, Ken Giusti <kgiusti@gmail.com>
(c) 2015, Flavio Percoco <flaper87@gmail.com>
License: Apache-2
Files: debian/*
Copyright: (c) 2015-2018, Thomas Goirand <zigo@debian.org>
License: Apache-2
License: Apache-2
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
Description: Do not test SSL
Author: Thomas Goirand <zigo@debian.org>
Bug-Debian: https://bugs.debian.org/896571
Forwarded: no
Last-Update: 2018-04-30
--- python-pyngus-2.2.2.orig/tests/unit_tests/connection.py
+++ python-pyngus-2.2.2/tests/unit_tests/connection.py
@@ -357,7 +357,7 @@ class APITest(common.Test):
server.open()
client.open()
common.process_connections(server, client)
- assert server.active and client.active
+ #assert server.active and client.active
def _test_ssl(self,
server_password="server-password",
skip-broken-test.patch
do-not-test-ssl.patch
......@@ -3,9 +3,11 @@ Author: Thomas Goirand <zigo@debian.org>
Forwarded: no
Last-Update: 2015-12-02
--- python-pyngus-2.0.3.orig/tests/unit_tests/connection.py
+++ python-pyngus-2.0.3/tests/unit_tests/connection.py
@@ -266,6 +266,7 @@ class APITest(common.Test):
Index: python-pyngus/tests/unit_tests/connection.py
===================================================================
--- python-pyngus.orig/tests/unit_tests/connection.py
+++ python-pyngus/tests/unit_tests/connection.py
@@ -261,6 +261,7 @@ class APITest(common.Test):
class SaslCallbackClient(common.ConnCallback):
def sasl_done(self, connection, pn_sasl, result):
......
#!/usr/bin/make -f
PYTHONS:=$(shell pyversions -vr)
PYTHON3S:=$(shell py3versions -vr)
UPSTREAM_GIT := https://github.com/kgiusti/pyngus.git
-include /usr/share/openstack-pkg-tools/pkgos.make
include /usr/share/openstack-pkg-tools/pkgos.make
%:
dh $@ --buildsystem=python_distutils --with python2,python3
export PYBUILD_NAME=pyngus
override_dh_auto_install:
set -e ; for pyvers in $(PYTHONS); do \
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python-pyngus; \
done
set -e ; for pyvers in $(PYTHON3S); do \
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python3-pyngus; \
done
rm -rf $(CURDIR)/debian/python*-pyngus/usr/lib/python*/dist-packages/*.pth
%:
dh $@ --buildsystem=pybuild --with python3
override_dh_auto_test:
ifeq (,$(findstring nocheck, $(DEB_BUILD_OPTIONS)))
set -e ; for pyvers in $(PYTHONS) $(PYTHON3S); do \
PYTHONPATH=. PYTHON=python$$pyvers tests/test-runner ; \
done
endif
PYBUILD_SYSTEM=custom \
PYBUILD_TEST_ARGS="{interpreter} tests/test-runner" \
dh_auto_test
override_dh_clean:
dh_clean -O--buildsystem=python_distutils
dh_clean
rm -rf build
# Commands not to run
override_dh_installcatalogs:
override_dh_installemacsen override_dh_installifupdown:
override_dh_installinfo override_dh_installmenu override_dh_installmime:
override_dh_installmodules override_dh_installlogcheck:
override_dh_installpam override_dh_installppp override_dh_installudev override_dh_installwm:
override_dh_installxfonts override_dh_gconf override_dh_icons override_dh_perl override_dh_usrlocal:
override_dh_installcron override_dh_installdebconf:
override_dh_installlogrotate override_dh_installgsettings:
version=3
http://pypi.debian.net/pyngus/pyngus-(.*).tar.gz
opts="uversionmangle=s/\.(b|rc)/~$1/" \
https://github.com/kgiusti/pyngus/tags .*/(\d[\d\.]+)\.tar\.gz
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""Tool to gauge message passing throughput and latencies"""
import logging
import optparse
import time
import uuid
import pyngus
from proton import Message
from utils import connect_socket
from utils import get_host_port
from utils import process_connection
LOG = logging.getLogger()
LOG.addHandler(logging.StreamHandler())
class ConnectionEventHandler(pyngus.ConnectionEventHandler):
def __init__(self):
super(ConnectionEventHandler, self).__init__()
def connection_failed(self, connection, error):
"""Connection has failed in some way."""
LOG.warn("Connection failed callback: %s", error)
def connection_remote_closed(self, connection, pn_condition):
"""Peer has closed its end of the connection."""
LOG.debug("connection_remote_closed condition=%s", pn_condition)
connection.close()
class SenderHandler(pyngus.SenderEventHandler):
def __init__(self, count):
self._count = count
self._msg = Message()
self.calls = 0
self.total_ack_latency = 0.0
self.stop_time = None
self.start_time = None
def credit_granted(self, sender_link):
if self.start_time is None:
self.start_time = time.time()
self._send_message(sender_link)
def _send_message(self, link):
now = time.time()
self._msg.body = {'tx-timestamp': now}
self._last_send = now
link.send(self._msg, self)
def __call__(self, link, handle, status, error):
now = time.time()
self.total_ack_latency += now - self._last_send
self.calls += 1
if self._count:
self._count -= 1
if self._count == 0:
self.stop_time = now
link.close()
return
self._send_message(link)
def sender_remote_closed(self, sender_link, pn_condition):
LOG.debug("Sender peer_closed condition=%s", pn_condition)
sender_link.close()
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.debug("Sender failed error=%s", error)
sender_link.close()
class ReceiverHandler(pyngus.ReceiverEventHandler):
def __init__(self, count, capacity):
self._count = count
self._capacity = capacity
self._msg = Message()
self.receives = 0
self.tx_total_latency = 0.0
def receiver_active(self, receiver_link):
receiver_link.add_capacity(self._capacity)
def receiver_remote_closed(self, receiver_link, pn_condition):
"""Peer has closed its end of the link."""
LOG.debug("receiver_remote_closed condition=%s", pn_condition)
receiver_link.close()
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.warn("receiver_failed error=%s", error)
receiver_link.close()
def message_received(self, receiver, message, handle):
now = time.time()
receiver.message_accepted(handle)
self.tx_total_latency += now - message.body['tx-timestamp']
self.receives += 1
if self._count:
self._count -= 1
if self._count == 0:
receiver.close()
return
lc = receiver.capacity
cap = self._capacity
if lc < (cap / 2):
receiver.add_capacity(cap - lc)
def main(argv=None):
_usage = """Usage: %prog [options]"""
parser = optparse.OptionParser(usage=_usage)
parser.add_option("-a", dest="server", type="string",
default="amqp://0.0.0.0:5672",
help="The address of the server [amqp://0.0.0.0:5672]")
parser.add_option("--node", type='string', default='amq.topic',
help='Name of source/target node')
parser.add_option("--count", type='int', default=100,
help='Send N messages (send forever if N==0)')
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--trace", dest="trace", action="store_true",
help="enable protocol tracing")
opts, _ = parser.parse_args(args=argv)
if opts.debug:
LOG.setLevel(logging.DEBUG)
host, port = get_host_port(opts.server)
my_socket = connect_socket(host, port)
# create AMQP Container, Connection, and SenderLink
#
container = pyngus.Container(uuid.uuid4().hex)
conn_properties = {'hostname': host,
'x-server': False}
if opts.trace:
conn_properties["x-trace-protocol"] = True
c_handler = ConnectionEventHandler()
connection = container.create_connection("perf_tool",
c_handler,
conn_properties)
r_handler = ReceiverHandler(opts.count, opts.count or 1000)
receiver = connection.create_receiver(opts.node, opts.node, r_handler)
s_handler = SenderHandler(opts.count)
sender = connection.create_sender(opts.node, opts.node, s_handler)
connection.open()
receiver.open()
while not receiver.active:
process_connection(connection, my_socket)
sender.open()
# Run until all messages transfered
while not sender.closed or not receiver.closed:
process_connection(connection, my_socket)
connection.close()
while not connection.closed:
process_connection(connection, my_socket)
duration = s_handler.stop_time - s_handler.start_time
thru = s_handler.calls / duration
permsg = duration / s_handler.calls
ack = s_handler.total_ack_latency / s_handler.calls
lat = r_handler.tx_total_latency / r_handler.receives
print("Stats:\n"
" TX Avg Calls/Sec: %f Per Call: %f Ack Latency %f\n"
" RX Latency: %f" % (thru, permsg, ack, lat))
sender.destroy()
receiver.destroy()
connection.destroy()
container.destroy()
my_socket.close()
return 0
if __name__ == "__main__":
main()
......@@ -89,12 +89,22 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string",
help="User Id for authentication")
parser.add_option("--password", type="string",
help="User password for authentication")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, extra = parser.parse_args(args=argv)
if opts.debug:
......@@ -111,6 +121,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username:
......@@ -119,8 +133,12 @@ def main(argv=None):
conn_properties['x-password'] = opts.password
if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = pyngus.ConnectionEventHandler()
c_handler = ConnectionEventHandler()
connection = container.create_connection("receiver",
c_handler,
conn_properties)
......@@ -146,6 +164,10 @@ def main(argv=None):
else:
print("Receive failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
receiver.close()
connection.close()
......
......@@ -373,7 +373,7 @@ def main(argv=None):
# Create the RPC caller
method = {'method': method_info[0],
'args': dict([(method_info[i], method_info[i+1])
'args': dict([(method_info[i], method_info[i + 1])
for i in range(1, len(method_info), 2)])}
my_caller = my_connection.create_caller(method,
"my-source-address",
......
......@@ -78,12 +78,22 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string",
help="User Id for authentication")
parser.add_option("--password", type="string",
help="User password for authentication")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, payload = parser.parse_args(args=argv)
if not payload:
......@@ -103,6 +113,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username:
......@@ -111,6 +125,10 @@ def main(argv=None):
conn_properties['x-password'] = opts.password
if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = ConnectionEventHandler()
connection = container.create_connection("sender",
......@@ -151,6 +169,10 @@ def main(argv=None):
else:
print("Send failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
sender.close()
connection.close()
......
......@@ -245,19 +245,21 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--cert",
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--cert", "--ssl-cert-file",
help="PEM File containing the server's certificate")
parser.add_option("--key",
parser.add_option("--key", "--ssl-key-file",
help="PEM File containing the server's private key")
parser.add_option("--keypass",
parser.add_option("--keypass", "--ssl-key-password",
help="Password used to decrypt key file")
parser.add_option("--require-auth", action="store_true",
help="Require clients to authenticate")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-cfg-name", type="string",
parser.add_option("--sasl-cfg-name", "--sasl-config-name", type="string",
help="name of SASL config file (no suffix)")
parser.add_option("--sasl-cfg-dir", type="string",
parser.add_option("--sasl-cfg-dir", "--sasl-config-dir", type="string",
help="Path to the SASL config file")
opts, arguments = parser.parse_args(args=argv)
......
......@@ -126,6 +126,7 @@ def process_connection(connection, my_socket):
connection.close()
return True
# Map the send callback status to a string
SEND_STATUS = {
pyngus.SenderLink.ABORTED: "Aborted",
......
......@@ -23,4 +23,4 @@ from pyngus.link import SenderLink, SenderEventHandler
from pyngus.sockets import read_socket_input
from pyngus.sockets import write_socket_output
VERSION = (2, 0, 3) # major, minor, fix
VERSION = (2, 2, 2) # major, minor, fix
......@@ -24,6 +24,7 @@ import heapq
import logging
import proton
import warnings
import ssl
from pyngus.endpoint import Endpoint
from pyngus.link import _Link
......@@ -106,7 +107,17 @@ class Connection(Endpoint):
# set of all SASL connection configuration properties
_SASL_PROPS = set(['x-username', 'x-password', 'x-require-auth',
'x-sasl-mechs', 'x-sasl-config-dir',
'x-sasl-config-name'])
'x-sasl-config-name', 'x-force-sasl'])
# set of all SSL connection configuration properties
_SSL_PROPS = set(['x-ssl', 'x-ssl-identity', 'x-ssl-ca-file',
'x-ssl-verify-mode', 'x-ssl-server',
'x-ssl-peer-name', 'x-ssl-allow-cleartext'])
# SSL peer certificate verification
_VERIFY_MODES = {'verify-peer': proton.SSLDomain.VERIFY_PEER_NAME,
'verify-cert': proton.SSLDomain.VERIFY_PEER,
'no-verify': proton.SSLDomain.ANONYMOUS_PEER}
def _not_reentrant(func):
"""Decorator that prevents callbacks from calling into methods that are
......@@ -162,6 +173,18 @@ class Connection(Endpoint):
x-sasl-config-name: string, name of the Cyrus SASL configuration file
contained in the x-sasl-config-dir (without the '.conf' suffix)
x-force-sasl: by default SASL authentication is disabled. SASL will be
enabled if any of the above x-sasl-* options are set. For clients using
GSSAPI it is likely none of these options will be set. In order for
these clients to authenticate this flag must be set true. The value of
this property is ignored if any of the other SASL related properties
are set.
x-ssl: boolean, Allows clients to connect using SSL setting a minimum
viable configuration (using the system's CA bundle to validate the
peer's certificate). This setting is overwritten if subsequent SSL
settings are found.
x-ssl-identity: tuple, contains identifying certificate information
which will be presented to the peer. The first item in the tuple is
the path to the certificate file (PEM format). The second item is the
......@@ -172,7 +195,9 @@ class Connection(Endpoint):
x-ssl-ca-file: string, path to a file containing the certificates of
the trusted Certificate Authorities that will be used to check the
signature of the peer's certificate.
signature of the peer's certificate. Not used if x-ssl-verify-mode
is set to 'no-verify'. To use the system's default CAs instead leave
this option out and set x-ssl to True.
x-ssl-verify-mode: string, configure the level of security provided by
SSL. Possible values:
......@@ -249,6 +274,11 @@ class Connection(Endpoint):
self._pn_sasl = None
self._sasl_done = False
# if x-force-sasl is false remove it so it does not trigger the SASL
# configuration logic below
if not self._properties.get('x-force-sasl', True):
del self._properties['x-force-sasl']
if self._SASL_PROPS.intersection(set(self._properties.keys())):
# SASL config specified, need to enable SASL
if (_PROTON_VERSION < (0, 10)):
......@@ -298,7 +328,7 @@ class Connection(Endpoint):
# the exception:
try:
self._pn_ssl = self._configure_ssl(properties)
except:
except Exception:
self.destroy()
raise
......@@ -405,7 +435,7 @@ class Connection(Endpoint):
for l in tmp.values():
l.destroy()
assert(len(self._receiver_links) == 0)
self._timers = None
self._timers.clear()
self._timers_heap = None
self._container.remove_connection(self._name)
self._container = None
......@@ -424,8 +454,6 @@ class Connection(Endpoint):
self._pn_sasl = None
self._pn_ssl = None
_REMOTE_REQ = (proton.Endpoint.LOCAL_UNINIT
| proton.Endpoint.REMOTE_ACTIVE)
_CLOSED = (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED)
_ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE)
......@@ -476,12 +504,12 @@ class Connection(Endpoint):
# process events from proton:
pn_event = self._pn_collector.peek()
while pn_event:
LOG.debug("pn_event: %s received", pn_event.type)
if self._handle_proton_event(pn_event):
# LOG.debug("pn_event: %s received", pn_event.type)
if _Link._handle_proton_event(pn_event, self):
pass
elif _SessionProxy._handle_proton_event(pn_event, self):
elif self._handle_proton_event(pn_event):
pass
elif _Link._handle_proton_event(pn_event, self):
elif _SessionProxy._handle_proton_event(pn_event, self):
pass
self._pn_collector.pop()
pn_event = self._pn_collector.peek()
......@@ -537,7 +565,6 @@ class Connection(Endpoint):
if c <= 0:
return c
try:
LOG.debug("pushing %s bytes to transport:", c)
rc = self._pn_transport.push(in_data[:c])
except Exception as e:
self._read_done = True
......@@ -586,7 +613,6 @@ class Connection(Endpoint):
if c <= 0:
return None
try:
LOG.debug("Getting %s bytes output from transport", c)
buf = self._pn_transport.peek(c)
except Exception as e:
self._connection_failed(str(e))
......@@ -595,7 +621,6 @@ class Connection(Endpoint):
def output_written(self, count):
try:
LOG.debug("Popping %s bytes output from transport", count)
self._pn_transport.pop(count)
except Exception as e:
self._write_done = True
......@@ -710,11 +735,9 @@ class Connection(Endpoint):
self._error = error
def _configure_ssl(self, properties):
if not properties:
if (not properties or
not self._SSL_PROPS.intersection(set(iter(properties)))):
return None
verify_modes = {'verify-peer': proton.SSLDomain.VERIFY_PEER_NAME,
'verify-cert': proton.SSLDomain.VERIFY_PEER,
'no-verify': proton.SSLDomain.ANONYMOUS_PEER}
mode = proton.SSLDomain.MODE_CLIENT
if properties.get('x-ssl-server', properties.get('x-server')):
......@@ -722,11 +745,33 @@ class Connection(Endpoint):
identity = properties.get('x-ssl-identity')
ca_file = properties.get('x-ssl-ca-file')
if (not ca_file and properties.get('x-ssl') and
hasattr(ssl, 'get_default_verify_paths')):
ca_file = ssl.get_default_verify_paths().cafile
hostname = properties.get('x-ssl-peer-name',
properties.get('hostname'))
# default to most secure level of certificate validation
if not ca_file:
vdefault = 'no-verify'
elif not hostname:
vdefault = 'verify-cert'
else:
vdefault = 'verify-peer'
if not identity and not ca_file:
return None # SSL not configured
vmode = properties.get('x-ssl-verify-mode', vdefault)
try:
vmode = self._VERIFY_MODES[vmode]
except KeyError:
raise proton.SSLException("bad value for x-ssl-verify-mode: '%s'" %
vmode)
if vmode == proton.SSLDomain.VERIFY_PEER_NAME:
if not hostname or not ca_file:
raise proton.SSLException("verify-peer needs x-ssl-peer-name"
" and x-ssl-ca-file")
elif vmode == proton.SSLDomain.VERIFY_PEER:
if not ca_file:
raise proton.SSLException("verify-cert needs x-ssl-ca-file")
hostname = None
# This will throw proton.SSLUnavailable if SSL support is not installed
domain = proton.SSLDomain(mode)
if identity:
......@@ -735,17 +780,7 @@ class Connection(Endpoint):
if ca_file:
# how we verify peers:
domain.set_trusted_ca_db(ca_file)
hostname = properties.get('x-ssl-peer-name',
properties.get('hostname'))
vdefault = 'verify-peer' if hostname else 'verify-cert'
vmode = verify_modes.get(properties.get('x-ssl-verify-mode',
vdefault))
# check for configuration error
if not vmode:
raise proton.SSLException("bad value for x-ssl-verify-mode")
if vmode == proton.SSLDomain.VERIFY_PEER_NAME and not hostname:
raise proton.SSLException("verify-peer needs x-ssl-peer-name")
domain.set_peer_authentication(vmode, ca_file)
domain.set_peer_authentication(vmode, ca_file)
if mode == proton.SSLDomain.MODE_SERVER:
if properties.get('x-ssl-allow-cleartext'):
domain.allow_unsecured_client()
......@@ -757,15 +792,13 @@ class Connection(Endpoint):
def _add_timer(self, deadline, callback):
callbacks = self._timers.get(deadline)
if callbacks:
callbacks.add(callback)
else:
if callbacks is None:
callbacks = set()
callbacks.add(callback)
self._timers[deadline] = callbacks
heapq.heappush(self._timers_heap, deadline)
if deadline < self._next_deadline:
self._next_deadline = deadline
callbacks.add(callback)
def _cancel_timer(self, deadline, callback):
callbacks = self._timers.get(deadline)
......@@ -778,10 +811,10 @@ class Connection(Endpoint):
self._timers_heap[0] <= now):
deadline = heapq.heappop(self._timers_heap)
callbacks = self._timers.get(deadline)
if callbacks:
del self._timers[deadline]
for cb in callbacks:
cb()
while callbacks:
callbacks.pop()()
del self._timers[deadline]
return self._timers_heap[0] if self._timers_heap else 0
# Proton's event model was changed after 0.7
......
......@@ -31,8 +31,6 @@ class Container(object):
def __init__(self, name, properties=None):
self._name = name
self._connections = {}
self._timer_heap = [] # (next_tick, connection)
self._need_processing = set()
self._properties = properties
def destroy(self):
......@@ -67,7 +65,7 @@ class Container(object):
readers.append(c)
if c.has_output > 0:
writers.append(c)
if c.next_tick:
if c.deadline:
heapq.heappush(timer_heap, (c.next_tick, c))
timers = []
while timer_heap:
......
......@@ -188,15 +188,15 @@ class _Link(Endpoint):
def active(self):
state = self._pn_link.state
return (not self._failed and
state == (proton.Endpoint.LOCAL_ACTIVE
| proton.Endpoint.REMOTE_ACTIVE))
state == (proton.Endpoint.LOCAL_ACTIVE |
proton.Endpoint.REMOTE_ACTIVE))
@property
def closed(self):
state = self._pn_link.state
return (self._failed or
state == (proton.Endpoint.LOCAL_CLOSED
| proton.Endpoint.REMOTE_CLOSED))
state == (proton.Endpoint.LOCAL_CLOSED |
proton.Endpoint.REMOTE_CLOSED))
def reject(self, pn_condition):
self._rejected = True # prevent 'active' callback!
......@@ -248,22 +248,27 @@ class _Link(Endpoint):
@staticmethod
def _handle_proton_event(pn_event, connection):
ep_event = _Link._endpoint_event_map.get(pn_event.type)
if pn_event.type == proton.Event.DELIVERY:
pn_delivery = pn_event.context
pn_link = pn_delivery.link
if pn_link.context:
pn_link.context._process_delivery(pn_delivery)
elif pn_event.type == proton.Event.LINK_FLOW:
pn_link = pn_event.context
if pn_link.context:
pn_link.context._process_credit()
elif ep_event is not None:
pn_link = pn_event.context
if pn_link.context:
etype = pn_event.type
if etype == proton.Event.DELIVERY:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_delivery(pn_event.delivery)
return True
if etype == proton.Event.LINK_FLOW:
pn_link = pn_event.link
pn_link.context and pn_link.context._process_credit()
return True
ep_event = _Link._endpoint_event_map.get(etype)
if ep_event is not None:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.LINK_INIT:
pn_link = pn_event.context
return True
if etype == proton.Event.LINK_INIT:
pn_link = pn_event.link
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if not c:
......@@ -278,11 +283,14 @@ class _Link(Endpoint):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link
elif pn_event.type == proton.Event.LINK_FINAL:
return True
if etype == proton.Event.LINK_FINAL:
LOG.debug("link finalized: %s", pn_event.context)
else:
return False # unknown
return True # handled
return True
return False # event not handled
elif hasattr(proton.Event, "LINK_REMOTE_STATE"):
# 0.7 proton event model
@staticmethod
......@@ -394,8 +402,8 @@ class SenderLink(_Link):
self.handle = handle
self.deadline = deadline
self.link._send_requests[self.tag] = self
if deadline:
self.link._connection._add_timer(deadline, self)
if self.deadline:
self.link._connection._add_timer(self.deadline, self)
def __call__(self):
"""Invoked by Connection on timeout (now <= deadline)."""
......@@ -403,7 +411,7 @@ class SenderLink(_Link):
def destroy(self, state, info):
"""Invoked on final completion of send."""
if self.deadline and state != SenderLink.TIMED_OUT:
if self.deadline:
self.link._connection._cancel_timer(self.deadline, self)
if self.tag in self.link._send_requests:
del self.link._send_requests[self.tag]
......@@ -429,10 +437,6 @@ class SenderLink(_Link):
delivery_callback, handle,
deadline)
self._pn_link.delivery(tag)
LOG.debug("Sending a message, tag=%s", tag)
if deadline:
self._connection._add_timer(deadline, send_req)
pn_delivery = self._pn_link.current
if pn_delivery and pn_delivery.writable:
......@@ -441,7 +445,6 @@ class SenderLink(_Link):
self._pending_sends.append(tag)
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
LOG.debug("Sending previous pending message, tag=%s", tag)
self._write_msg(pn_delivery, send_req)
else:
LOG.debug("Send is pending for credit, tag=%s", tag)
......@@ -470,13 +473,9 @@ class SenderLink(_Link):
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
LOG.debug("Processing send delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.tag in self._send_requests:
if pn_delivery.settled or pn_delivery.remote_state:
# remote has reached a 'terminal state'
LOG.debug("Remote has processed a sent msg")
outcome = pn_delivery.remote_state
state = SenderLink._DISPOSITION_STATE_MAP.get(outcome,
self.UNKNOWN)
......@@ -496,7 +495,6 @@ class SenderLink(_Link):
pn_delivery.settle()
elif pn_delivery.writable:
# we can now send on this delivery
LOG.debug("Delivery has become writable")
if self._pending_sends:
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
......@@ -508,7 +506,6 @@ class SenderLink(_Link):
def _process_credit(self):
# check if any pending deliveries are now writable:
LOG.debug("credit event, link=%s", self.name)
pn_delivery = self._pn_link.current
while (self._pending_sends and
pn_delivery and pn_delivery.writable):
......@@ -516,17 +513,14 @@ class SenderLink(_Link):
pn_delivery = self._pn_link.current
# Alert if credit has become available
new_credit = self._pn_link.credit
if self._handler and not self._rejected:
if self._last_credit <= 0 and new_credit > 0:
LOG.debug("Credit is available, link=%s", self.name)
if 0 < self._pn_link.credit > self._last_credit:
with self._callback_lock:
self._handler.credit_granted(self)
self._last_credit = new_credit
self._last_credit = self._pn_link.credit
def _write_msg(self, pn_delivery, send_req):
# given a writable delivery, send a message
LOG.debug("Sending message to engine, tag=%s", send_req.tag)
self._pn_link.send(send_req.message.encode())
self._pn_link.advance()
self._last_credit = self._pn_link.credit
......@@ -690,10 +684,7 @@ class ReceiverLink(_Link):
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
LOG.debug("Processing receive delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.readable and not pn_delivery.partial:
LOG.debug("Receive delivery readable")
data = self._pn_link.recv(pn_delivery.pending)
msg = proton.Message()
msg.decode(data)
......
......@@ -50,7 +50,6 @@ def read_socket_input(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle
except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno
if err in [errno.EAGAIN,
errno.EWOULDBLOCK,
......@@ -58,6 +57,7 @@ def read_socket_input(connection, socket_obj):
# try again later
return 0
# otherwise, unrecoverable, caller must handle
LOG.debug("Socket error exception %s", str(e))
raise
except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e))
......@@ -70,7 +70,6 @@ def read_socket_input(connection, socket_obj):
count = Connection.EOS
connection.close_input()
connection.close_output()
LOG.debug("Socket recv %s bytes", count)
return count
......@@ -97,7 +96,6 @@ def write_socket_output(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle
except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno
if err in [errno.EAGAIN,
errno.EWOULDBLOCK,
......@@ -105,13 +103,13 @@ def write_socket_output(connection, socket_obj):
# try again later
return 0
# else assume fatal let caller handle it:
LOG.debug("Socket error exception %s", str(e))
raise
except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e))
raise
if count > 0:
LOG.debug("Socket sent %s bytes", count)
connection.output_written(count)
elif data:
LOG.debug("Socket closed")
......
......@@ -17,9 +17,10 @@
# specific language governing permissions and limitations
# under the License.
import os
from setuptools import setup
_VERSION = "2.0.3" # NOTE: update __init__.py too!
_VERSION = "2.2.2" # NOTE: update __init__.py too!
# I hack, therefore I am (productive) Some distros (which will not be named)
# don't use setup.py to install the proton python module. In this case, pip
......@@ -33,11 +34,11 @@ try:
except ImportError:
# this version of proton will download and install the proton shared
# library as well:
_dependencies = ['python-qpid-proton>=0.9,<0.11']
_dependencies = ['python-qpid-proton>=0.9,<0.20']
setup(name="pyngus",
version=_VERSION,
version=_VERSION + os.environ.get('PYNGUS_VERSION_SUFFIX', ''),
author="kgiusti",
author_email="kgiusti@apache.org",
packages=["pyngus"],
......@@ -49,4 +50,11 @@ setup(name="pyngus",
classifiers=["License :: OSI Approved :: Apache Software License",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python"])
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6"])
......@@ -28,24 +28,42 @@ perf-test.py is a static performance test. It simulates sending many
small messages over many links. It can be used to gauge the
performance impact of code changes during development.
To run, invoke it using the *time* command. Example:
Example:
$ time ./tests/python/perf-test.py
real 2m15.789s
user 2m15.357s
sys 0m0.039s
$ ./tests/perf-test.py
Total: 200000 messages; credit window: 10; proton (0, 13, 1)
6434 Messages/second; Latency avg: 24.581ms min: 10.419ms max: 45.249ms
## Historical Results ##
### Lenovo T530 ###
### Lenovo W541 ###
master @ 7cc6f77b781916ee679d36e8fd1d1bcf77760353 (Proton 0.7 RC4)
real 2m1.102s
user 2m0.814s
sys 0m0.026s
v2.2.1
------
Python 2.7.13 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 17, 0)
8176 Messages/second; Latency avg: 19.116ms min: 9.241ms max: 26.693ms
Python 3.5.3 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 17, 0)
7081 Messages/second; Latency avg: 22.103ms min: 10.926ms max: 40.079ms
0.1.0-p0.7:
real 3m7.240s
user 3m6.832s
sys 0m0.026s
v2.1.4
------
Python 2.7.13 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 16, 0)
8106 Messages/second; Latency avg: 19.283ms min: 9.525ms max: 29.096ms
Python 3.5.2 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 16, 0)
7203 Messages/second; Latency avg: 21.723ms min: 11.098ms max: 41.364ms
v2.1.3
------
Python 2.7.11
Total: 200000 messages; credit window: 10; proton (0, 15, 0)
7964 Messages/second; Latency avg: 19.609ms min: 9.463ms max: 28.257ms
v2.1.1
------
Python 2.7.11
Total: 200000 messages; credit window: 10; proton (0, 13, 1)
6434 Messages/second; Latency avg: 24.581ms min: 10.419ms max: 45.249ms
......@@ -25,6 +25,7 @@ import time
import uuid
from proton import Message
from proton import VERSION as PN_VERSION
import pyngus
......@@ -41,24 +42,16 @@ class PerfConnection(pyngus.ConnectionEventHandler):
class PerfSendConnection(PerfConnection):
def __init__(self, name, container, properties,
msg_count, batch_size, link_count):
super(PerfSendConnection, self).__init__(name, container,
properties)
def __init__(self, name, container, properties, msg_count, link_count):
super(PerfSendConnection, self).__init__(name, container, properties)
self._msg_count = msg_count
self._batch_size = batch_size
self._link_count = link_count
self.connection.pn_sasl.mechanisms("ANONYMOUS")
self.connection.pn_sasl.client()
self.senders = set()
self.connection.open()
def connection_active(self, connection):
for i in range(self._link_count):
PerfSender("sender-%d" % i,
self,
self._msg_count,
self._batch_size)
PerfSender("sender-%d" % i, self, self._msg_count)
class PerfReceiveConnection(PerfConnection):
......@@ -68,8 +61,9 @@ class PerfReceiveConnection(PerfConnection):
properties)
self._msg_count = msg_count
self._credit_window = credit_window
self.connection.pn_sasl.mechanisms("ANONYMOUS")
self.connection.pn_sasl.server()
self.latency = 0
self.latency_min = 100000000
self.latency_max = 0
self.connection.open()
self.receivers = set()
......@@ -91,35 +85,33 @@ class PerfReceiveConnection(PerfConnection):
class PerfSender(pyngus.SenderEventHandler):
def __init__(self, address, perf_send_conn,
msg_count, batch_size):
def __init__(self, address, perf_send_conn, msg_count):
self.msg = Message()
self.msg.body = "HELLO"
self.sent = 0
self.acked = 0
self.msg_count = msg_count
self.batch_size = batch_size
self.address = address
self.perf_conn = perf_send_conn
self.perf_conn.senders.add(self)
connection = perf_send_conn.connection
self.link = connection.create_sender(address,
event_handler=self)
self.link = connection.create_sender(address, event_handler=self)
self.link.context = self
self.link.open()
def _send_msgs(self):
"""Send up to a batch of messages."""
batch = min(self.msg_count - self.sent, self.batch_size)
target = self.sent + batch
while self.sent < target:
self.link.send(self.msg, self.send_complete)