...
 
Commits (71)
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
addons:
apt:
packages:
- libssl-dev
- libsasl2-dev
- sasl2-bin
- swig
- python-dev
- python3-dev
- uuid-dev
install:
- sudo apt-get install -qq uuid-dev swig
- pip install -r test-requirements.txt
- pip install .
# command to run tests
......
include README.md
include LICENSE
include docs/User-Guide.md
exclude .gitignore
global-exclude *.pyc
......@@ -7,6 +7,68 @@ callback-based API for message passing.
See the User Guide in the docs directory for more detail.
## Release 2.2.2 ##
* verified Python 3.6 compatibility
* bumped max proton version to 0.19
## Release 2.2.1 ##
* disable the socket I/O logging - fills the debug logs with lots of
useless crap.
## Release 2.2.0 ##
* Can now use the system's default CA by specifying the 'x-ssl' option
in the 'properties' field of the create_connection call and _NOT_
specifying the 'x-ssl-ca-file' property. (contributed by
Juan Antonio Osorio Robles)
* use the most secure default setting for x-ssl-verify-mode based on
the configuration
* bump max proton version to 0.17
## Release 2.1.4 ##
* avoid using deprecated next_tick in the container
* enable Python 3.5 testing in tox
* add client authentication via SSL tests
* bump max proton version to 0.16
## Release 2.1.3 ##
* Remove chatty debug log messages
* fix pep8 violation
* add static performace test tool
* Bump max proton version to 0.15
## Release 2.1.2 ##
* Bump max proton version to 0.14
## Release 2.1.1 ##
* bugfix: under some (rare) flow/credit interactions a sender may
stall. Changed code to invoke credit_granted() callback more
frequently.
## Release 2.1.0 ##
* feature: add 'x-force-sasl' to connection property map
* bugfix: update old SASL unit test
## Release 2.0.4 ##
* Bump max proton version to 0.13
* performance tweak to link event handling
* fix perf-test.py tool
* bugfix: fix leak of timer callbacks
* enable Python 3.4 testing
* bugfix: fix receiver example (recv.py)
* several fixes to the SASL unit tests
* bugfix: fix leak of underlying proton objects
* Add SASL/SSL configuration options to examples
* bugfix: allow PLAIN or ANONYMOUS authentication in server mode
## Release 2.0.3 ##
* bugfix: fixed a memory leak
......
python-pyngus (2.0.3-2) UNRELEASED; urgency=medium
python-pyngus (2.2.2-4) unstable; urgency=medium
* Team upload.
* d/control: Use team+openstack@tracker.debian.org as maintainer
* Use debhelper-compat instead of debian/compat.
* Use pybuild to build package.
* Bump debhelper compat level to 12.
* Enable autopkgtest-pkg-python testsuite.
* Drop Python 2 support (Closes: #938079).
* Install examples.
* Bump standards version to 4.4.0 (no changes).
-- Ondřej Nový <onovy@debian.org> Mon, 02 Sep 2019 16:26:05 +0200
python-pyngus (2.2.2-3) unstable; urgency=medium
* Do not test SSL connection stuff (Closes: #896571).
-- Thomas Goirand <zigo@debian.org> Mon, 30 Apr 2018 13:16:03 +0000
python-pyngus (2.2.2-2) unstable; urgency=medium
* Uploading to unstable.
-- Thomas Goirand <zigo@debian.org> Sun, 25 Feb 2018 22:56:36 +0000
python-pyngus (2.2.2-1) experimental; urgency=medium
[ Ondřej Nový ]
* Fixed VCS URLs (https).
......@@ -16,7 +42,16 @@ python-pyngus (2.0.3-2) UNRELEASED; urgency=medium
* Updating standards version to 4.0.1.
* Updating standards version to 4.1.0.
-- Ondřej Nový <novy@ondrej.org> Sun, 28 Feb 2016 15:47:38 +0100
[ Thomas Goirand ]
* New upstream release.
* VCS URLs using Salsa.
* Standards-Version is now 4.1.3.
* Using pkgos-dh_auto_install.
* Fixed debian/copyright ordering and years.
* Watch file now using github tag.
* Switched to debhelper 10.
-- Thomas Goirand <zigo@debian.org> Sun, 11 Feb 2018 10:30:46 +0000
python-pyngus (2.0.3-1) unstable; urgency=medium
......
Source: python-pyngus
Section: python
Priority: optional
Maintainer: Debian OpenStack <openstack-devel@lists.alioth.debian.org>
Maintainer: Debian OpenStack <team+openstack@tracker.debian.org>
Uploaders:
Thomas Goirand <zigo@debian.org>,
Build-Depends:
debhelper (>= 9),
debhelper-compat (= 12),
dh-python,
python-all,
python-setuptools,
openstack-pkg-tools,
python3-all,
python3-setuptools,
Build-Depends-Indep:
python-qpid-proton (>= 0.9),
python3-qpid-proton (>= 0.9),
Standards-Version: 4.1.0
Vcs-Browser: https://anonscm.debian.org/cgit/openstack/python/python-pyngus.git
Vcs-Git: https://anonscm.debian.org/git/openstack/python/python-pyngus.git
python3-qpid-proton,
Standards-Version: 4.4.0
Vcs-Browser: https://salsa.debian.org/openstack-team/python/python-pyngus
Vcs-Git: https://salsa.debian.org/openstack-team/python/python-pyngus.git
Homepage: https://github.com/kgiusti/pyngus
Package: python-pyngus
Architecture: all
Depends:
python-qpid-proton (>= 0.9),
${misc:Depends},
${python:Depends},
Description: callback API implemented over Proton - Python 2.7
Pyngus is a messaging framework built on the QPID Proton engine. It provides
a callback-based API for message passing.
.
This package contains the Python 2.7 module.
Testsuite: autopkgtest-pkg-python
Package: python3-pyngus
Architecture: all
Depends:
python3-qpid-proton (>= 0.9),
python3-qpid-proton,
${misc:Depends},
${python3:Depends},
Description: callback API implemented over Proton - Python 3.x
......
......@@ -2,15 +2,15 @@ Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: pyngus
Source: https://github.com/kgiusti/pyngus
Files: debian/*
Copyright: (c) 2015, Thomas Goirand <zigo@debian.org>
License: Apache-2
Files: *
Copyright: (c) 2013-2015, Ken Giusti <kgiusti@gmail.com>
Copyright: (c) 2013-2018, Ken Giusti <kgiusti@gmail.com>
(c) 2015, Flavio Percoco <flaper87@gmail.com>
License: Apache-2
Files: debian/*
Copyright: (c) 2015-2018, Thomas Goirand <zigo@debian.org>
License: Apache-2
License: Apache-2
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
......
Description: Do not test SSL
Author: Thomas Goirand <zigo@debian.org>
Bug-Debian: https://bugs.debian.org/896571
Forwarded: no
Last-Update: 2018-04-30
--- python-pyngus-2.2.2.orig/tests/unit_tests/connection.py
+++ python-pyngus-2.2.2/tests/unit_tests/connection.py
@@ -357,7 +357,7 @@ class APITest(common.Test):
server.open()
client.open()
common.process_connections(server, client)
- assert server.active and client.active
+ #assert server.active and client.active
def _test_ssl(self,
server_password="server-password",
skip-broken-test.patch
do-not-test-ssl.patch
......@@ -3,9 +3,11 @@ Author: Thomas Goirand <zigo@debian.org>
Forwarded: no
Last-Update: 2015-12-02
--- python-pyngus-2.0.3.orig/tests/unit_tests/connection.py
+++ python-pyngus-2.0.3/tests/unit_tests/connection.py
@@ -266,6 +266,7 @@ class APITest(common.Test):
Index: python-pyngus/tests/unit_tests/connection.py
===================================================================
--- python-pyngus.orig/tests/unit_tests/connection.py
+++ python-pyngus/tests/unit_tests/connection.py
@@ -261,6 +261,7 @@ class APITest(common.Test):
class SaslCallbackClient(common.ConnCallback):
def sasl_done(self, connection, pn_sasl, result):
......
#!/usr/bin/make -f
PYTHONS:=$(shell pyversions -vr)
PYTHON3S:=$(shell py3versions -vr)
UPSTREAM_GIT := https://github.com/kgiusti/pyngus.git
-include /usr/share/openstack-pkg-tools/pkgos.make
include /usr/share/openstack-pkg-tools/pkgos.make
%:
dh $@ --buildsystem=python_distutils --with python2,python3
export PYBUILD_NAME=pyngus
override_dh_auto_install:
set -e ; for pyvers in $(PYTHONS); do \
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python-pyngus; \
done
set -e ; for pyvers in $(PYTHON3S); do \
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python3-pyngus; \
done
rm -rf $(CURDIR)/debian/python*-pyngus/usr/lib/python*/dist-packages/*.pth
%:
dh $@ --buildsystem=pybuild --with python3
override_dh_auto_test:
ifeq (,$(findstring nocheck, $(DEB_BUILD_OPTIONS)))
set -e ; for pyvers in $(PYTHONS) $(PYTHON3S); do \
PYTHONPATH=. PYTHON=python$$pyvers tests/test-runner ; \
done
endif
PYBUILD_SYSTEM=custom \
PYBUILD_TEST_ARGS="{interpreter} tests/test-runner" \
dh_auto_test
override_dh_clean:
dh_clean -O--buildsystem=python_distutils
dh_clean
rm -rf build
# Commands not to run
override_dh_installcatalogs:
override_dh_installemacsen override_dh_installifupdown:
override_dh_installinfo override_dh_installmenu override_dh_installmime:
override_dh_installmodules override_dh_installlogcheck:
override_dh_installpam override_dh_installppp override_dh_installudev override_dh_installwm:
override_dh_installxfonts override_dh_gconf override_dh_icons override_dh_perl override_dh_usrlocal:
override_dh_installcron override_dh_installdebconf:
override_dh_installlogrotate override_dh_installgsettings:
version=3
http://pypi.debian.net/pyngus/pyngus-(.*).tar.gz
opts="uversionmangle=s/\.(b|rc)/~$1/" \
https://github.com/kgiusti/pyngus/tags .*/(\d[\d\.]+)\.tar\.gz
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""Tool to gauge message passing throughput and latencies"""
import logging
import optparse
import time
import uuid
import pyngus
from proton import Message
from utils import connect_socket
from utils import get_host_port
from utils import process_connection
LOG = logging.getLogger()
LOG.addHandler(logging.StreamHandler())
class ConnectionEventHandler(pyngus.ConnectionEventHandler):
def __init__(self):
super(ConnectionEventHandler, self).__init__()
def connection_failed(self, connection, error):
"""Connection has failed in some way."""
LOG.warn("Connection failed callback: %s", error)
def connection_remote_closed(self, connection, pn_condition):
"""Peer has closed its end of the connection."""
LOG.debug("connection_remote_closed condition=%s", pn_condition)
connection.close()
class SenderHandler(pyngus.SenderEventHandler):
def __init__(self, count):
self._count = count
self._msg = Message()
self.calls = 0
self.total_ack_latency = 0.0
self.stop_time = None
self.start_time = None
def credit_granted(self, sender_link):
if self.start_time is None:
self.start_time = time.time()
self._send_message(sender_link)
def _send_message(self, link):
now = time.time()
self._msg.body = {'tx-timestamp': now}
self._last_send = now
link.send(self._msg, self)
def __call__(self, link, handle, status, error):
now = time.time()
self.total_ack_latency += now - self._last_send
self.calls += 1
if self._count:
self._count -= 1
if self._count == 0:
self.stop_time = now
link.close()
return
self._send_message(link)
def sender_remote_closed(self, sender_link, pn_condition):
LOG.debug("Sender peer_closed condition=%s", pn_condition)
sender_link.close()
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.debug("Sender failed error=%s", error)
sender_link.close()
class ReceiverHandler(pyngus.ReceiverEventHandler):
def __init__(self, count, capacity):
self._count = count
self._capacity = capacity
self._msg = Message()
self.receives = 0
self.tx_total_latency = 0.0
def receiver_active(self, receiver_link):
receiver_link.add_capacity(self._capacity)
def receiver_remote_closed(self, receiver_link, pn_condition):
"""Peer has closed its end of the link."""
LOG.debug("receiver_remote_closed condition=%s", pn_condition)
receiver_link.close()
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.warn("receiver_failed error=%s", error)
receiver_link.close()
def message_received(self, receiver, message, handle):
now = time.time()
receiver.message_accepted(handle)
self.tx_total_latency += now - message.body['tx-timestamp']
self.receives += 1
if self._count:
self._count -= 1
if self._count == 0:
receiver.close()
return
lc = receiver.capacity
cap = self._capacity
if lc < (cap / 2):
receiver.add_capacity(cap - lc)
def main(argv=None):
_usage = """Usage: %prog [options]"""
parser = optparse.OptionParser(usage=_usage)
parser.add_option("-a", dest="server", type="string",
default="amqp://0.0.0.0:5672",
help="The address of the server [amqp://0.0.0.0:5672]")
parser.add_option("--node", type='string', default='amq.topic',
help='Name of source/target node')
parser.add_option("--count", type='int', default=100,
help='Send N messages (send forever if N==0)')
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--trace", dest="trace", action="store_true",
help="enable protocol tracing")
opts, _ = parser.parse_args(args=argv)
if opts.debug:
LOG.setLevel(logging.DEBUG)
host, port = get_host_port(opts.server)
my_socket = connect_socket(host, port)
# create AMQP Container, Connection, and SenderLink
#
container = pyngus.Container(uuid.uuid4().hex)
conn_properties = {'hostname': host,
'x-server': False}
if opts.trace:
conn_properties["x-trace-protocol"] = True
c_handler = ConnectionEventHandler()
connection = container.create_connection("perf_tool",
c_handler,
conn_properties)
r_handler = ReceiverHandler(opts.count, opts.count or 1000)
receiver = connection.create_receiver(opts.node, opts.node, r_handler)
s_handler = SenderHandler(opts.count)
sender = connection.create_sender(opts.node, opts.node, s_handler)
connection.open()
receiver.open()
while not receiver.active:
process_connection(connection, my_socket)
sender.open()
# Run until all messages transfered
while not sender.closed or not receiver.closed:
process_connection(connection, my_socket)
connection.close()
while not connection.closed:
process_connection(connection, my_socket)
duration = s_handler.stop_time - s_handler.start_time
thru = s_handler.calls / duration
permsg = duration / s_handler.calls
ack = s_handler.total_ack_latency / s_handler.calls
lat = r_handler.tx_total_latency / r_handler.receives
print("Stats:\n"
" TX Avg Calls/Sec: %f Per Call: %f Ack Latency %f\n"
" RX Latency: %f" % (thru, permsg, ack, lat))
sender.destroy()
receiver.destroy()
connection.destroy()
container.destroy()
my_socket.close()
return 0
if __name__ == "__main__":
main()
......@@ -89,12 +89,22 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string",
help="User Id for authentication")
parser.add_option("--password", type="string",
help="User password for authentication")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, extra = parser.parse_args(args=argv)
if opts.debug:
......@@ -111,6 +121,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username:
......@@ -119,8 +133,12 @@ def main(argv=None):
conn_properties['x-password'] = opts.password
if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = pyngus.ConnectionEventHandler()
c_handler = ConnectionEventHandler()
connection = container.create_connection("receiver",
c_handler,
conn_properties)
......@@ -146,6 +164,10 @@ def main(argv=None):
else:
print("Receive failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
receiver.close()
connection.close()
......
......@@ -373,7 +373,7 @@ def main(argv=None):
# Create the RPC caller
method = {'method': method_info[0],
'args': dict([(method_info[i], method_info[i+1])
'args': dict([(method_info[i], method_info[i + 1])
for i in range(1, len(method_info), 2)])}
my_caller = my_connection.create_caller(method,
"my-source-address",
......
......@@ -78,12 +78,22 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string",
help="User Id for authentication")
parser.add_option("--password", type="string",
help="User password for authentication")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, payload = parser.parse_args(args=argv)
if not payload:
......@@ -103,6 +113,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username:
......@@ -111,6 +125,10 @@ def main(argv=None):
conn_properties['x-password'] = opts.password
if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = ConnectionEventHandler()
connection = container.create_connection("sender",
......@@ -151,6 +169,10 @@ def main(argv=None):
else:
print("Send failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
sender.close()
connection.close()
......
......@@ -245,19 +245,21 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--cert",
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--cert", "--ssl-cert-file",
help="PEM File containing the server's certificate")
parser.add_option("--key",
parser.add_option("--key", "--ssl-key-file",
help="PEM File containing the server's private key")
parser.add_option("--keypass",
parser.add_option("--keypass", "--ssl-key-password",
help="Password used to decrypt key file")
parser.add_option("--require-auth", action="store_true",
help="Require clients to authenticate")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-cfg-name", type="string",
parser.add_option("--sasl-cfg-name", "--sasl-config-name", type="string",
help="name of SASL config file (no suffix)")
parser.add_option("--sasl-cfg-dir", type="string",
parser.add_option("--sasl-cfg-dir", "--sasl-config-dir", type="string",
help="Path to the SASL config file")
opts, arguments = parser.parse_args(args=argv)
......
......@@ -126,6 +126,7 @@ def process_connection(connection, my_socket):
connection.close()
return True
# Map the send callback status to a string
SEND_STATUS = {
pyngus.SenderLink.ABORTED: "Aborted",
......
......@@ -23,4 +23,4 @@ from pyngus.link import SenderLink, SenderEventHandler
from pyngus.sockets import read_socket_input
from pyngus.sockets import write_socket_output
VERSION = (2, 0, 3) # major, minor, fix
VERSION = (2, 2, 2) # major, minor, fix
This diff is collapsed.
......@@ -31,8 +31,6 @@ class Container(object):
def __init__(self, name, properties=None):
self._name = name
self._connections = {}
self._timer_heap = [] # (next_tick, connection)
self._need_processing = set()
self._properties = properties
def destroy(self):
......@@ -67,7 +65,7 @@ class Container(object):
readers.append(c)
if c.has_output > 0:
writers.append(c)
if c.next_tick:
if c.deadline:
heapq.heappush(timer_heap, (c.next_tick, c))
timers = []
while timer_heap:
......
......@@ -188,15 +188,15 @@ class _Link(Endpoint):
def active(self):
state = self._pn_link.state
return (not self._failed and
state == (proton.Endpoint.LOCAL_ACTIVE
| proton.Endpoint.REMOTE_ACTIVE))
state == (proton.Endpoint.LOCAL_ACTIVE |
proton.Endpoint.REMOTE_ACTIVE))
@property
def closed(self):
state = self._pn_link.state
return (self._failed or
state == (proton.Endpoint.LOCAL_CLOSED
| proton.Endpoint.REMOTE_CLOSED))
state == (proton.Endpoint.LOCAL_CLOSED |
proton.Endpoint.REMOTE_CLOSED))
def reject(self, pn_condition):
self._rejected = True # prevent 'active' callback!
......@@ -248,22 +248,27 @@ class _Link(Endpoint):
@staticmethod
def _handle_proton_event(pn_event, connection):
ep_event = _Link._endpoint_event_map.get(pn_event.type)
if pn_event.type == proton.Event.DELIVERY:
pn_delivery = pn_event.context
pn_link = pn_delivery.link
if pn_link.context:
pn_link.context._process_delivery(pn_delivery)
elif pn_event.type == proton.Event.LINK_FLOW:
pn_link = pn_event.context
if pn_link.context:
pn_link.context._process_credit()
elif ep_event is not None:
pn_link = pn_event.context
if pn_link.context:
etype = pn_event.type
if etype == proton.Event.DELIVERY:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_delivery(pn_event.delivery)
return True
if etype == proton.Event.LINK_FLOW:
pn_link = pn_event.link
pn_link.context and pn_link.context._process_credit()
return True
ep_event = _Link._endpoint_event_map.get(etype)
if ep_event is not None:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.LINK_INIT:
pn_link = pn_event.context
return True
if etype == proton.Event.LINK_INIT:
pn_link = pn_event.link
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if not c:
......@@ -278,11 +283,14 @@ class _Link(Endpoint):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link
elif pn_event.type == proton.Event.LINK_FINAL:
return True
if etype == proton.Event.LINK_FINAL:
LOG.debug("link finalized: %s", pn_event.context)
else:
return False # unknown
return True # handled
return True
return False # event not handled
elif hasattr(proton.Event, "LINK_REMOTE_STATE"):
# 0.7 proton event model
@staticmethod
......@@ -394,8 +402,8 @@ class SenderLink(_Link):
self.handle = handle
self.deadline = deadline
self.link._send_requests[self.tag] = self
if deadline:
self.link._connection._add_timer(deadline, self)
if self.deadline:
self.link._connection._add_timer(self.deadline, self)
def __call__(self):
"""Invoked by Connection on timeout (now <= deadline)."""
......@@ -403,7 +411,7 @@ class SenderLink(_Link):
def destroy(self, state, info):
"""Invoked on final completion of send."""
if self.deadline and state != SenderLink.TIMED_OUT:
if self.deadline:
self.link._connection._cancel_timer(self.deadline, self)
if self.tag in self.link._send_requests:
del self.link._send_requests[self.tag]
......@@ -429,10 +437,6 @@ class SenderLink(_Link):
delivery_callback, handle,
deadline)
self._pn_link.delivery(tag)
LOG.debug("Sending a message, tag=%s", tag)
if deadline:
self._connection._add_timer(deadline, send_req)
pn_delivery = self._pn_link.current
if pn_delivery and pn_delivery.writable:
......@@ -441,7 +445,6 @@ class SenderLink(_Link):
self._pending_sends.append(tag)
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
LOG.debug("Sending previous pending message, tag=%s", tag)
self._write_msg(pn_delivery, send_req)
else:
LOG.debug("Send is pending for credit, tag=%s", tag)
......@@ -470,13 +473,9 @@ class SenderLink(_Link):
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
LOG.debug("Processing send delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.tag in self._send_requests:
if pn_delivery.settled or pn_delivery.remote_state:
# remote has reached a 'terminal state'
LOG.debug("Remote has processed a sent msg")
outcome = pn_delivery.remote_state
state = SenderLink._DISPOSITION_STATE_MAP.get(outcome,
self.UNKNOWN)
......@@ -496,7 +495,6 @@ class SenderLink(_Link):
pn_delivery.settle()
elif pn_delivery.writable:
# we can now send on this delivery
LOG.debug("Delivery has become writable")
if self._pending_sends:
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
......@@ -508,7 +506,6 @@ class SenderLink(_Link):
def _process_credit(self):
# check if any pending deliveries are now writable:
LOG.debug("credit event, link=%s", self.name)
pn_delivery = self._pn_link.current
while (self._pending_sends and
pn_delivery and pn_delivery.writable):
......@@ -516,17 +513,14 @@ class SenderLink(_Link):
pn_delivery = self._pn_link.current
# Alert if credit has become available
new_credit = self._pn_link.credit
if self._handler and not self._rejected:
if self._last_credit <= 0 and new_credit > 0:
LOG.debug("Credit is available, link=%s", self.name)
if 0 < self._pn_link.credit > self._last_credit:
with self._callback_lock:
self._handler.credit_granted(self)
self._last_credit = new_credit
self._last_credit = self._pn_link.credit
def _write_msg(self, pn_delivery, send_req):
# given a writable delivery, send a message
LOG.debug("Sending message to engine, tag=%s", send_req.tag)
self._pn_link.send(send_req.message.encode())
self._pn_link.advance()
self._last_credit = self._pn_link.credit
......@@ -690,10 +684,7 @@ class ReceiverLink(_Link):
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
LOG.debug("Processing receive delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.readable and not pn_delivery.partial:
LOG.debug("Receive delivery readable")
data = self._pn_link.recv(pn_delivery.pending)
msg = proton.Message()
msg.decode(data)
......
......@@ -50,7 +50,6 @@ def read_socket_input(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle
except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno
if err in [errno.EAGAIN,
errno.EWOULDBLOCK,
......@@ -58,6 +57,7 @@ def read_socket_input(connection, socket_obj):
# try again later
return 0
# otherwise, unrecoverable, caller must handle
LOG.debug("Socket error exception %s", str(e))
raise
except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e))
......@@ -70,7 +70,6 @@ def read_socket_input(connection, socket_obj):
count = Connection.EOS
connection.close_input()
connection.close_output()
LOG.debug("Socket recv %s bytes", count)
return count
......@@ -97,7 +96,6 @@ def write_socket_output(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle
except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno
if err in [errno.EAGAIN,
errno.EWOULDBLOCK,
......@@ -105,13 +103,13 @@ def write_socket_output(connection, socket_obj):
# try again later
return 0
# else assume fatal let caller handle it:
LOG.debug("Socket error exception %s", str(e))
raise
except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e))
raise
if count > 0:
LOG.debug("Socket sent %s bytes", count)
connection.output_written(count)
elif data:
LOG.debug("Socket closed")
......
......@@ -17,9 +17,10 @@
# specific language governing permissions and limitations
# under the License.
import os
from setuptools import setup
_VERSION = "2.0.3" # NOTE: update __init__.py too!
_VERSION = "2.2.2" # NOTE: update __init__.py too!
# I hack, therefore I am (productive) Some distros (which will not be named)
# don't use setup.py to install the proton python module. In this case, pip
......@@ -33,11 +34,11 @@ try:
except ImportError:
# this version of proton will download and install the proton shared
# library as well:
_dependencies = ['python-qpid-proton>=0.9,<0.11']
_dependencies = ['python-qpid-proton>=0.9,<0.20']
setup(name="pyngus",
version=_VERSION,
version=_VERSION + os.environ.get('PYNGUS_VERSION_SUFFIX', ''),
author="kgiusti",
author_email="kgiusti@apache.org",
packages=["pyngus"],
......@@ -49,4 +50,11 @@ setup(name="pyngus",
classifiers=["License :: OSI Approved :: Apache Software License",
"Intended Audience :: Developers",
"Operating System :: OS Independent",
"Programming Language :: Python"])
"Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6"])
......@@ -28,24 +28,42 @@ perf-test.py is a static performance test. It simulates sending many
small messages over many links. It can be used to gauge the
performance impact of code changes during development.
To run, invoke it using the *time* command. Example:
Example:
$ time ./tests/python/perf-test.py
real 2m15.789s
user 2m15.357s
sys 0m0.039s
$ ./tests/perf-test.py
Total: 200000 messages; credit window: 10; proton (0, 13, 1)
6434 Messages/second; Latency avg: 24.581ms min: 10.419ms max: 45.249ms
## Historical Results ##
### Lenovo T530 ###
### Lenovo W541 ###
master @ 7cc6f77b781916ee679d36e8fd1d1bcf77760353 (Proton 0.7 RC4)
real 2m1.102s
user 2m0.814s
sys 0m0.026s
v2.2.1
------
Python 2.7.13 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 17, 0)
8176 Messages/second; Latency avg: 19.116ms min: 9.241ms max: 26.693ms
Python 3.5.3 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 17, 0)
7081 Messages/second; Latency avg: 22.103ms min: 10.926ms max: 40.079ms
0.1.0-p0.7:
real 3m7.240s
user 3m6.832s
sys 0m0.026s
v2.1.4
------
Python 2.7.13 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 16, 0)
8106 Messages/second; Latency avg: 19.283ms min: 9.525ms max: 29.096ms
Python 3.5.2 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 16, 0)
7203 Messages/second; Latency avg: 21.723ms min: 11.098ms max: 41.364ms
v2.1.3
------
Python 2.7.11
Total: 200000 messages; credit window: 10; proton (0, 15, 0)
7964 Messages/second; Latency avg: 19.609ms min: 9.463ms max: 28.257ms
v2.1.1
------
Python 2.7.11
Total: 200000 messages; credit window: 10; proton (0, 13, 1)
6434 Messages/second; Latency avg: 24.581ms min: 10.419ms max: 45.249ms
......@@ -25,6 +25,7 @@ import time
import uuid
from proton import Message
from proton import VERSION as PN_VERSION
import pyngus
......@@ -41,24 +42,16 @@ class PerfConnection(pyngus.ConnectionEventHandler):
class PerfSendConnection(PerfConnection):
def __init__(self, name, container, properties,
msg_count, batch_size, link_count):
super(PerfSendConnection, self).__init__(name, container,
properties)
def __init__(self, name, container, properties, msg_count, link_count):
super(PerfSendConnection, self).__init__(name, container, properties)
self._msg_count = msg_count
self._batch_size = batch_size
self._link_count = link_count
self.connection.pn_sasl.mechanisms("ANONYMOUS")
self.connection.pn_sasl.client()
self.senders = set()
self.connection.open()
def connection_active(self, connection):
for i in range(self._link_count):
PerfSender("sender-%d" % i,
self,
self._msg_count,
self._batch_size)
PerfSender("sender-%d" % i, self, self._msg_count)
class PerfReceiveConnection(PerfConnection):
......@@ -68,8 +61,9 @@ class PerfReceiveConnection(PerfConnection):
properties)
self._msg_count = msg_count
self._credit_window = credit_window
self.connection.pn_sasl.mechanisms("ANONYMOUS")
self.connection.pn_sasl.server()
self.latency = 0
self.latency_min = 100000000
self.latency_max = 0
self.connection.open()
self.receivers = set()
......@@ -91,35 +85,33 @@ class PerfReceiveConnection(PerfConnection):
class PerfSender(pyngus.SenderEventHandler):
def __init__(self, address, perf_send_conn,
msg_count, batch_size):
def __init__(self, address, perf_send_conn, msg_count):
self.msg = Message()
self.msg.body = "HELLO"
self.sent = 0
self.acked = 0
self.msg_count = msg_count
self.batch_size = batch_size
self.address = address
self.perf_conn = perf_send_conn
self.perf_conn.senders.add(self)
connection = perf_send_conn.connection
self.link = connection.create_sender(address,
event_handler=self)
self.link = connection.create_sender(address, event_handler=self)
self.link.context = self
self.link.open()
def _send_msgs(self):
"""Send up to a batch of messages."""
batch = min(self.msg_count - self.sent, self.batch_size)
target = self.sent + batch
while self.sent < target:
self.link.send(self.msg, self.send_complete)
self.sent += 1
def sender_active(self, sender_link):
self._send_msgs()
def send_complete(self, link, handle, result, info):
def credit_granted(self, sender_link):
self._send_msgs()
def _send_msgs(self):
# Send as many messages as credit allows
while self.link.credit > 0 and self.sent < self.msg_count:
self.msg.body = {"timestamp": time.time()}
self.link.send(self.msg, self._send_complete)
self.sent += 1
def _send_complete(self, link, handle, result, info):
self.acked += 1
if self.acked == self.msg_count:
# test done, shutdown
......@@ -127,9 +119,6 @@ class PerfSender(pyngus.SenderEventHandler):
self.perf_conn.senders.discard(self)
if len(self.perf_conn.senders) == 0:
self.link.connection.close()
elif self.acked == self.sent:
# send next batch
self._send_msgs()
class PerfReceiver(pyngus.ReceiverEventHandler):
......@@ -138,6 +127,7 @@ class PerfReceiver(pyngus.ReceiverEventHandler):
self.msg_count = msg_count
self.received = 0
self.credit_window = credit_window if credit_window else msg_count
self.credit_low = (credit_window + 1) / 2
self.address = address
self.perf_conn = perf_receive_conn
self.perf_conn.receivers.add(self)
......@@ -150,20 +140,25 @@ class PerfReceiver(pyngus.ReceiverEventHandler):
self.link.open()
def message_received(self, receiver_link, message, handle):
"""Acknowledge receipt, grant more credit if needed."""
# Acknowledge receipt, grant more credit if needed
self.link.message_accepted(handle)
self.received += 1
if self.received < self.msg_count:
if self.link.capacity == 0:
self.link.add_capacity(self.credit_window)
else:
timestamp = message.body["timestamp"]
latency = time.time() - timestamp
self.perf_conn.latency += latency
self.perf_conn.latency_min = min(latency, self.perf_conn.latency_min)
self.perf_conn.latency_max = max(latency, self.perf_conn.latency_max)
if self.link.capacity <= self.credit_low and \
self.received < self.msg_count:
self.link.add_capacity(self.credit_window - self.link.capacity)
elif self.received == self.msg_count:
# link done
self.link.close()
self.perf_conn.receivers.discard(self)
def process_connections(c1, c2):
"""Transfer I/O, then process each connection."""
# Transfer I/O, then process each connection
def _do_io(src, dst):
count = min(src.has_output, dst.needs_input)
if count > 0:
......@@ -183,15 +178,15 @@ def main(argv=None):
parser = optparse.OptionParser(usage=_usage)
parser.add_option("--count", dest="count", type="int",
default=10000,
help="# of messages to transfer.")
help="# of messages to transfer per link.")
parser.add_option("--links", dest="link_count", type="int",
default=100,
default=20,
help="# of link pairs.")
parser.add_option("--send-batch", dest="send_batch", type="int",
default=10,
help="# of msgs sender queues at once.")
parser.add_option("--credit-batch", dest="credit_batch", type="int",
default=5,
help="DEPRECATED")
parser.add_option("--credit", dest="credit_window", type="int",
default=10,
help="Credit window issued by receiver.")
parser.add_option("--ca",
help="Certificate Authority PEM file")
......@@ -208,36 +203,46 @@ def main(argv=None):
# sender acts like SSL client
conn_properties = {'hostname': "test.server.com",
'x-trace-protocol': False}
'x-trace-protocol': False,
'x-sasl-mechs': "ANONYMOUS"}
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
sender_conn = PerfSendConnection("send-conn",
container,
conn_properties,
opts.count, opts.send_batch,
opts.count,
opts.link_count)
# receiver acts as SSL server
conn_properties = {'hostname': "my.client.com"}
conn_properties = {'hostname': "my.client.com",
'x-server': True,
'x-sasl-mechs': "ANONYMOUS"}
if opts.cert:
conn_properties["x-ssl-server"] = True
identity = (opts.cert, opts.key, opts.keypass)
conn_properties["x-ssl-identity"] = identity
receiver_conn = PerfReceiveConnection("recv-conn",
container,
conn_properties,
opts.count,
opts.credit_batch)
opts.credit_window)
# process connections until finished:
start = time.time()
while ((not sender_conn.connection.closed) or
(not receiver_conn.connection.closed)):
process_connections(sender_conn.connection,
receiver_conn.connection)
process_connections(sender_conn.connection, receiver_conn.connection)
sender_conn.connection.destroy()
receiver_conn.connection.destroy()
container.destroy()
delta = time.time() - start
total = opts.count * opts.link_count
print("Total: %s messages; credit window: %s; proton %s"
% (total, opts.credit_window, PN_VERSION))
print("%d Messages/second; Latency avg: %.3fms min: %.3fms max: %.3fms"
% (total / delta, (receiver_conn.latency / total) * 1000.0,
receiver_conn.latency_min * 1000.0,
receiver_conn.latency_max * 1000.0))
return 0
......
This diff is collapsed.
......@@ -254,8 +254,8 @@ class APITest(common.Test):
self.process_connections()
assert receiver.capacity == 4
assert sender.credit == 4
# callback only occurs when credit is no longer zero:
assert sl_handler.credit_granted_ct == 1
# callback occurs when credit is increased:
assert sl_handler.credit_granted_ct == 2
assert sender.pending == 0
msg = Message()
msg.body = "Hi"
......@@ -279,26 +279,27 @@ class APITest(common.Test):
sender.send(msg)
self.process_connections()
assert sender.pending == 2
assert sl_handler.credit_granted_ct == 1
assert sl_handler.credit_granted_ct == 2
receiver.add_capacity(1)
self.process_connections()
assert receiver.capacity == 0
assert rl_handler.message_received_ct == 5
assert sender.credit == 0
assert sender.pending == 1
assert sl_handler.credit_granted_ct == 1
assert sl_handler.credit_granted_ct == 2
receiver.add_capacity(1)
self.process_connections()
assert sender.credit == 0
assert sender.pending == 0
assert sl_handler.credit_granted_ct == 1
# credit callback not called since pending ate it
assert sl_handler.credit_granted_ct == 2
# verify new credit becomes available:
receiver.add_capacity(1)
self.process_connections()
assert sender.credit == 1
assert sl_handler.credit_granted_ct == 2
assert sl_handler.credit_granted_ct == 3
def test_send_presettled(self):
sender, receiver = self._setup_sender_sync()
......
......@@ -68,9 +68,9 @@ $OPENSSL pkcs12 -nocerts -passin pass:server-password -in server.pkcs12 -passout
CLIENT_COMMON_NAME="my.client.com"
# Create a certificate request for the client certificate. Use the CA's certificate to sign it:
$KEYTOOL -storetype pkcs12 -keystore client.pkcs12 -storepass client-password -alias client-certificate -keypass client-password -genkey -dname "CN=$CLIENT_COMMON_NAME" -validity 99999
keytool -storetype pkcs12 -keystore client.pkcs12 -storepass client-password -alias client-certificate -keypass client-password -certreq -file client-request.pem
keytool -storetype pkcs12 -keystore ca.pkcs12 -storepass ca-password -alias ca -keypass ca-password -gencert -rfc -validity 99999 -infile client-request.pem -outfile client-certificate.pem
openssl pkcs12 -nocerts -passin pass:client-password -in client.pkcs12 -passout pass:client-password -out client-private-key.pem
$KEYTOOL -storetype pkcs12 -keystore client.pkcs12 -storepass client-password -alias client-certificate -keypass client-password -certreq -file client-request.pem
$KEYTOOL -storetype pkcs12 -keystore ca.pkcs12 -storepass ca-password -alias ca -keypass ca-password -gencert -rfc -validity 99999 -infile client-request.pem -outfile client-certificate.pem
$OPENSSL pkcs12 -nocerts -passin pass:client-password -in client.pkcs12 -passout pass:client-password -out client-private-key.pem
# clean up all the unnecessary stuff
rm *.pkcs12 *-request.pem
[tox]
# Proton does not support Python3 yet: see Jira PROTON-490
#envlist = py27,py33,pep8
envlist = py26,py27,pep8
envlist = py27,pep8,py34,py35,py36
[testenv]
usedevelop=True
......@@ -9,7 +7,7 @@ deps = -r{toxinidir}/test-requirements.txt
commands = {toxinidir}/tests/test-runner
[testenv:pep8]
commands = flake8 pyngus examples tests
commands = flake8 --ignore E402 pyngus examples tests
flake8 --ignore F401 setup.py
[flake8]
......