...
 
Commits (71)
language: python language: python
python: python:
- "2.7" - "2.7"
- "3.4"
- "3.5"
- "3.6"
addons:
apt:
packages:
- libssl-dev
- libsasl2-dev
- sasl2-bin
- swig
- python-dev
- python3-dev
- uuid-dev
install: install:
- sudo apt-get install -qq uuid-dev swig
- pip install -r test-requirements.txt - pip install -r test-requirements.txt
- pip install . - pip install .
# command to run tests # command to run tests
......
include README.md
include LICENSE
include docs/User-Guide.md
exclude .gitignore
global-exclude *.pyc
...@@ -7,6 +7,68 @@ callback-based API for message passing. ...@@ -7,6 +7,68 @@ callback-based API for message passing.
See the User Guide in the docs directory for more detail. See the User Guide in the docs directory for more detail.
## Release 2.2.2 ##
* verified Python 3.6 compatibility
* bumped max proton version to 0.19
## Release 2.2.1 ##
* disable the socket I/O logging - fills the debug logs with lots of
useless crap.
## Release 2.2.0 ##
* Can now use the system's default CA by specifying the 'x-ssl' option
in the 'properties' field of the create_connection call and _NOT_
specifying the 'x-ssl-ca-file' property. (contributed by
Juan Antonio Osorio Robles)
* use the most secure default setting for x-ssl-verify-mode based on
the configuration
* bump max proton version to 0.17
## Release 2.1.4 ##
* avoid using deprecated next_tick in the container
* enable Python 3.5 testing in tox
* add client authentication via SSL tests
* bump max proton version to 0.16
## Release 2.1.3 ##
* Remove chatty debug log messages
* fix pep8 violation
* add static performace test tool
* Bump max proton version to 0.15
## Release 2.1.2 ##
* Bump max proton version to 0.14
## Release 2.1.1 ##
* bugfix: under some (rare) flow/credit interactions a sender may
stall. Changed code to invoke credit_granted() callback more
frequently.
## Release 2.1.0 ##
* feature: add 'x-force-sasl' to connection property map
* bugfix: update old SASL unit test
## Release 2.0.4 ##
* Bump max proton version to 0.13
* performance tweak to link event handling
* fix perf-test.py tool
* bugfix: fix leak of timer callbacks
* enable Python 3.4 testing
* bugfix: fix receiver example (recv.py)
* several fixes to the SASL unit tests
* bugfix: fix leak of underlying proton objects
* Add SASL/SSL configuration options to examples
* bugfix: allow PLAIN or ANONYMOUS authentication in server mode
## Release 2.0.3 ## ## Release 2.0.3 ##
* bugfix: fixed a memory leak * bugfix: fixed a memory leak
......
python-pyngus (2.0.3-2) UNRELEASED; urgency=medium python-pyngus (2.2.2-4) unstable; urgency=medium
* Team upload.
* d/control: Use team+openstack@tracker.debian.org as maintainer
* Use debhelper-compat instead of debian/compat.
* Use pybuild to build package.
* Bump debhelper compat level to 12.
* Enable autopkgtest-pkg-python testsuite.
* Drop Python 2 support (Closes: #938079).
* Install examples.
* Bump standards version to 4.4.0 (no changes).
-- Ondřej Nový <onovy@debian.org> Mon, 02 Sep 2019 16:26:05 +0200
python-pyngus (2.2.2-3) unstable; urgency=medium
* Do not test SSL connection stuff (Closes: #896571).
-- Thomas Goirand <zigo@debian.org> Mon, 30 Apr 2018 13:16:03 +0000
python-pyngus (2.2.2-2) unstable; urgency=medium
* Uploading to unstable.
-- Thomas Goirand <zigo@debian.org> Sun, 25 Feb 2018 22:56:36 +0000
python-pyngus (2.2.2-1) experimental; urgency=medium
[ Ondřej Nový ] [ Ondřej Nový ]
* Fixed VCS URLs (https). * Fixed VCS URLs (https).
...@@ -16,7 +42,16 @@ python-pyngus (2.0.3-2) UNRELEASED; urgency=medium ...@@ -16,7 +42,16 @@ python-pyngus (2.0.3-2) UNRELEASED; urgency=medium
* Updating standards version to 4.0.1. * Updating standards version to 4.0.1.
* Updating standards version to 4.1.0. * Updating standards version to 4.1.0.
-- Ondřej Nový <novy@ondrej.org> Sun, 28 Feb 2016 15:47:38 +0100 [ Thomas Goirand ]
* New upstream release.
* VCS URLs using Salsa.
* Standards-Version is now 4.1.3.
* Using pkgos-dh_auto_install.
* Fixed debian/copyright ordering and years.
* Watch file now using github tag.
* Switched to debhelper 10.
-- Thomas Goirand <zigo@debian.org> Sun, 11 Feb 2018 10:30:46 +0000
python-pyngus (2.0.3-1) unstable; urgency=medium python-pyngus (2.0.3-1) unstable; urgency=medium
......
Source: python-pyngus Source: python-pyngus
Section: python Section: python
Priority: optional Priority: optional
Maintainer: Debian OpenStack <openstack-devel@lists.alioth.debian.org> Maintainer: Debian OpenStack <team+openstack@tracker.debian.org>
Uploaders: Uploaders:
Thomas Goirand <zigo@debian.org>, Thomas Goirand <zigo@debian.org>,
Build-Depends: Build-Depends:
debhelper (>= 9), debhelper-compat (= 12),
dh-python, dh-python,
python-all, openstack-pkg-tools,
python-setuptools,
python3-all, python3-all,
python3-setuptools, python3-setuptools,
Build-Depends-Indep: Build-Depends-Indep:
python-qpid-proton (>= 0.9), python3-qpid-proton,
python3-qpid-proton (>= 0.9), Standards-Version: 4.4.0
Standards-Version: 4.1.0 Vcs-Browser: https://salsa.debian.org/openstack-team/python/python-pyngus
Vcs-Browser: https://anonscm.debian.org/cgit/openstack/python/python-pyngus.git Vcs-Git: https://salsa.debian.org/openstack-team/python/python-pyngus.git
Vcs-Git: https://anonscm.debian.org/git/openstack/python/python-pyngus.git
Homepage: https://github.com/kgiusti/pyngus Homepage: https://github.com/kgiusti/pyngus
Testsuite: autopkgtest-pkg-python
Package: python-pyngus
Architecture: all
Depends:
python-qpid-proton (>= 0.9),
${misc:Depends},
${python:Depends},
Description: callback API implemented over Proton - Python 2.7
Pyngus is a messaging framework built on the QPID Proton engine. It provides
a callback-based API for message passing.
.
This package contains the Python 2.7 module.
Package: python3-pyngus Package: python3-pyngus
Architecture: all Architecture: all
Depends: Depends:
python3-qpid-proton (>= 0.9), python3-qpid-proton,
${misc:Depends}, ${misc:Depends},
${python3:Depends}, ${python3:Depends},
Description: callback API implemented over Proton - Python 3.x Description: callback API implemented over Proton - Python 3.x
......
...@@ -2,15 +2,15 @@ Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/ ...@@ -2,15 +2,15 @@ Format: https://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: pyngus Upstream-Name: pyngus
Source: https://github.com/kgiusti/pyngus Source: https://github.com/kgiusti/pyngus
Files: debian/*
Copyright: (c) 2015, Thomas Goirand <zigo@debian.org>
License: Apache-2
Files: * Files: *
Copyright: (c) 2013-2015, Ken Giusti <kgiusti@gmail.com> Copyright: (c) 2013-2018, Ken Giusti <kgiusti@gmail.com>
(c) 2015, Flavio Percoco <flaper87@gmail.com> (c) 2015, Flavio Percoco <flaper87@gmail.com>
License: Apache-2 License: Apache-2
Files: debian/*
Copyright: (c) 2015-2018, Thomas Goirand <zigo@debian.org>
License: Apache-2
License: Apache-2 License: Apache-2
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
......
Description: Do not test SSL
Author: Thomas Goirand <zigo@debian.org>
Bug-Debian: https://bugs.debian.org/896571
Forwarded: no
Last-Update: 2018-04-30
--- python-pyngus-2.2.2.orig/tests/unit_tests/connection.py
+++ python-pyngus-2.2.2/tests/unit_tests/connection.py
@@ -357,7 +357,7 @@ class APITest(common.Test):
server.open()
client.open()
common.process_connections(server, client)
- assert server.active and client.active
+ #assert server.active and client.active
def _test_ssl(self,
server_password="server-password",
skip-broken-test.patch skip-broken-test.patch
do-not-test-ssl.patch
...@@ -3,9 +3,11 @@ Author: Thomas Goirand <zigo@debian.org> ...@@ -3,9 +3,11 @@ Author: Thomas Goirand <zigo@debian.org>
Forwarded: no Forwarded: no
Last-Update: 2015-12-02 Last-Update: 2015-12-02
--- python-pyngus-2.0.3.orig/tests/unit_tests/connection.py Index: python-pyngus/tests/unit_tests/connection.py
+++ python-pyngus-2.0.3/tests/unit_tests/connection.py ===================================================================
@@ -266,6 +266,7 @@ class APITest(common.Test): --- python-pyngus.orig/tests/unit_tests/connection.py
+++ python-pyngus/tests/unit_tests/connection.py
@@ -261,6 +261,7 @@ class APITest(common.Test):
class SaslCallbackClient(common.ConnCallback): class SaslCallbackClient(common.ConnCallback):
def sasl_done(self, connection, pn_sasl, result): def sasl_done(self, connection, pn_sasl, result):
......
#!/usr/bin/make -f #!/usr/bin/make -f
PYTHONS:=$(shell pyversions -vr)
PYTHON3S:=$(shell py3versions -vr)
UPSTREAM_GIT := https://github.com/kgiusti/pyngus.git UPSTREAM_GIT := https://github.com/kgiusti/pyngus.git
-include /usr/share/openstack-pkg-tools/pkgos.make include /usr/share/openstack-pkg-tools/pkgos.make
%: export PYBUILD_NAME=pyngus
dh $@ --buildsystem=python_distutils --with python2,python3
override_dh_auto_install: %:
set -e ; for pyvers in $(PYTHONS); do \ dh $@ --buildsystem=pybuild --with python3
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python-pyngus; \
done
set -e ; for pyvers in $(PYTHON3S); do \
python$$pyvers setup.py install --install-layout=deb \
--root $(CURDIR)/debian/python3-pyngus; \
done
rm -rf $(CURDIR)/debian/python*-pyngus/usr/lib/python*/dist-packages/*.pth
override_dh_auto_test: override_dh_auto_test:
ifeq (,$(findstring nocheck, $(DEB_BUILD_OPTIONS))) PYBUILD_SYSTEM=custom \
set -e ; for pyvers in $(PYTHONS) $(PYTHON3S); do \ PYBUILD_TEST_ARGS="{interpreter} tests/test-runner" \
PYTHONPATH=. PYTHON=python$$pyvers tests/test-runner ; \ dh_auto_test
done
endif
override_dh_clean: override_dh_clean:
dh_clean -O--buildsystem=python_distutils dh_clean
rm -rf build rm -rf build
# Commands not to run
override_dh_installcatalogs:
override_dh_installemacsen override_dh_installifupdown:
override_dh_installinfo override_dh_installmenu override_dh_installmime:
override_dh_installmodules override_dh_installlogcheck:
override_dh_installpam override_dh_installppp override_dh_installudev override_dh_installwm:
override_dh_installxfonts override_dh_gconf override_dh_icons override_dh_perl override_dh_usrlocal:
override_dh_installcron override_dh_installdebconf:
override_dh_installlogrotate override_dh_installgsettings:
version=3 version=3
http://pypi.debian.net/pyngus/pyngus-(.*).tar.gz opts="uversionmangle=s/\.(b|rc)/~$1/" \
https://github.com/kgiusti/pyngus/tags .*/(\d[\d\.]+)\.tar\.gz
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""Tool to gauge message passing throughput and latencies"""
import logging
import optparse
import time
import uuid
import pyngus
from proton import Message
from utils import connect_socket
from utils import get_host_port
from utils import process_connection
LOG = logging.getLogger()
LOG.addHandler(logging.StreamHandler())
class ConnectionEventHandler(pyngus.ConnectionEventHandler):
def __init__(self):
super(ConnectionEventHandler, self).__init__()
def connection_failed(self, connection, error):
"""Connection has failed in some way."""
LOG.warn("Connection failed callback: %s", error)
def connection_remote_closed(self, connection, pn_condition):
"""Peer has closed its end of the connection."""
LOG.debug("connection_remote_closed condition=%s", pn_condition)
connection.close()
class SenderHandler(pyngus.SenderEventHandler):
def __init__(self, count):
self._count = count
self._msg = Message()
self.calls = 0
self.total_ack_latency = 0.0
self.stop_time = None
self.start_time = None
def credit_granted(self, sender_link):
if self.start_time is None:
self.start_time = time.time()
self._send_message(sender_link)
def _send_message(self, link):
now = time.time()
self._msg.body = {'tx-timestamp': now}
self._last_send = now
link.send(self._msg, self)
def __call__(self, link, handle, status, error):
now = time.time()
self.total_ack_latency += now - self._last_send
self.calls += 1
if self._count:
self._count -= 1
if self._count == 0:
self.stop_time = now
link.close()
return
self._send_message(link)
def sender_remote_closed(self, sender_link, pn_condition):
LOG.debug("Sender peer_closed condition=%s", pn_condition)
sender_link.close()
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.debug("Sender failed error=%s", error)
sender_link.close()
class ReceiverHandler(pyngus.ReceiverEventHandler):
def __init__(self, count, capacity):
self._count = count
self._capacity = capacity
self._msg = Message()
self.receives = 0
self.tx_total_latency = 0.0
def receiver_active(self, receiver_link):
receiver_link.add_capacity(self._capacity)
def receiver_remote_closed(self, receiver_link, pn_condition):
"""Peer has closed its end of the link."""
LOG.debug("receiver_remote_closed condition=%s", pn_condition)
receiver_link.close()
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.warn("receiver_failed error=%s", error)
receiver_link.close()
def message_received(self, receiver, message, handle):
now = time.time()
receiver.message_accepted(handle)
self.tx_total_latency += now - message.body['tx-timestamp']
self.receives += 1
if self._count:
self._count -= 1
if self._count == 0:
receiver.close()
return
lc = receiver.capacity
cap = self._capacity
if lc < (cap / 2):
receiver.add_capacity(cap - lc)
def main(argv=None):
_usage = """Usage: %prog [options]"""
parser = optparse.OptionParser(usage=_usage)
parser.add_option("-a", dest="server", type="string",
default="amqp://0.0.0.0:5672",
help="The address of the server [amqp://0.0.0.0:5672]")
parser.add_option("--node", type='string', default='amq.topic',
help='Name of source/target node')
parser.add_option("--count", type='int', default=100,
help='Send N messages (send forever if N==0)')
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--trace", dest="trace", action="store_true",
help="enable protocol tracing")
opts, _ = parser.parse_args(args=argv)
if opts.debug:
LOG.setLevel(logging.DEBUG)
host, port = get_host_port(opts.server)
my_socket = connect_socket(host, port)
# create AMQP Container, Connection, and SenderLink
#
container = pyngus.Container(uuid.uuid4().hex)
conn_properties = {'hostname': host,
'x-server': False}
if opts.trace:
conn_properties["x-trace-protocol"] = True
c_handler = ConnectionEventHandler()
connection = container.create_connection("perf_tool",
c_handler,
conn_properties)
r_handler = ReceiverHandler(opts.count, opts.count or 1000)
receiver = connection.create_receiver(opts.node, opts.node, r_handler)
s_handler = SenderHandler(opts.count)
sender = connection.create_sender(opts.node, opts.node, s_handler)
connection.open()
receiver.open()
while not receiver.active:
process_connection(connection, my_socket)
sender.open()
# Run until all messages transfered
while not sender.closed or not receiver.closed:
process_connection(connection, my_socket)
connection.close()
while not connection.closed:
process_connection(connection, my_socket)
duration = s_handler.stop_time - s_handler.start_time
thru = s_handler.calls / duration
permsg = duration / s_handler.calls
ack = s_handler.total_ack_latency / s_handler.calls
lat = r_handler.tx_total_latency / r_handler.receives
print("Stats:\n"
" TX Avg Calls/Sec: %f Per Call: %f Ack Latency %f\n"
" RX Latency: %f" % (thru, permsg, ack, lat))
sender.destroy()
receiver.destroy()
connection.destroy()
container.destroy()
my_socket.close()
return 0
if __name__ == "__main__":
main()
...@@ -89,12 +89,22 @@ def main(argv=None): ...@@ -89,12 +89,22 @@ def main(argv=None):
help="enable protocol tracing") help="enable protocol tracing")
parser.add_option("--ca", parser.add_option("--ca",
help="Certificate Authority PEM file") help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string", parser.add_option("--username", type="string",
help="User Id for authentication") help="User Id for authentication")
parser.add_option("--password", type="string", parser.add_option("--password", type="string",
help="User password for authentication") help="User password for authentication")
parser.add_option("--sasl-mechs", type="string", parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs") help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, extra = parser.parse_args(args=argv) opts, extra = parser.parse_args(args=argv)
if opts.debug: if opts.debug:
...@@ -111,6 +121,10 @@ def main(argv=None): ...@@ -111,6 +121,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True conn_properties["x-trace-protocol"] = True
if opts.ca: if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout: if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username: if opts.username:
...@@ -119,8 +133,12 @@ def main(argv=None): ...@@ -119,8 +133,12 @@ def main(argv=None):
conn_properties['x-password'] = opts.password conn_properties['x-password'] = opts.password
if opts.sasl_mechs: if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = pyngus.ConnectionEventHandler() c_handler = ConnectionEventHandler()
connection = container.create_connection("receiver", connection = container.create_connection("receiver",
c_handler, c_handler,
conn_properties) conn_properties)
...@@ -146,6 +164,10 @@ def main(argv=None): ...@@ -146,6 +164,10 @@ def main(argv=None):
else: else:
print("Receive failed due to connection failure!") print("Receive failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
receiver.close() receiver.close()
connection.close() connection.close()
......
...@@ -373,7 +373,7 @@ def main(argv=None): ...@@ -373,7 +373,7 @@ def main(argv=None):
# Create the RPC caller # Create the RPC caller
method = {'method': method_info[0], method = {'method': method_info[0],
'args': dict([(method_info[i], method_info[i+1]) 'args': dict([(method_info[i], method_info[i + 1])
for i in range(1, len(method_info), 2)])} for i in range(1, len(method_info), 2)])}
my_caller = my_connection.create_caller(method, my_caller = my_connection.create_caller(method,
"my-source-address", "my-source-address",
......
...@@ -78,12 +78,22 @@ def main(argv=None): ...@@ -78,12 +78,22 @@ def main(argv=None):
help="enable protocol tracing") help="enable protocol tracing")
parser.add_option("--ca", parser.add_option("--ca",
help="Certificate Authority PEM file") help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string", parser.add_option("--username", type="string",
help="User Id for authentication") help="User Id for authentication")
parser.add_option("--password", type="string", parser.add_option("--password", type="string",
help="User password for authentication") help="User password for authentication")
parser.add_option("--sasl-mechs", type="string", parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs") help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, payload = parser.parse_args(args=argv) opts, payload = parser.parse_args(args=argv)
if not payload: if not payload:
...@@ -103,6 +113,10 @@ def main(argv=None): ...@@ -103,6 +113,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True conn_properties["x-trace-protocol"] = True
if opts.ca: if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout: if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username: if opts.username:
...@@ -111,6 +125,10 @@ def main(argv=None): ...@@ -111,6 +125,10 @@ def main(argv=None):
conn_properties['x-password'] = opts.password conn_properties['x-password'] = opts.password
if opts.sasl_mechs: if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = ConnectionEventHandler() c_handler = ConnectionEventHandler()
connection = container.create_connection("sender", connection = container.create_connection("sender",
...@@ -151,6 +169,10 @@ def main(argv=None): ...@@ -151,6 +169,10 @@ def main(argv=None):
else: else:
print("Send failed due to connection failure!") print("Send failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
sender.close() sender.close()
connection.close() connection.close()
......
...@@ -245,19 +245,21 @@ def main(argv=None): ...@@ -245,19 +245,21 @@ def main(argv=None):
help="enable protocol tracing") help="enable protocol tracing")
parser.add_option("--debug", dest="debug", action="store_true", parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging") help="enable debug logging")
parser.add_option("--cert", parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--cert", "--ssl-cert-file",
help="PEM File containing the server's certificate") help="PEM File containing the server's certificate")
parser.add_option("--key", parser.add_option("--key", "--ssl-key-file",
help="PEM File containing the server's private key") help="PEM File containing the server's private key")
parser.add_option("--keypass", parser.add_option("--keypass", "--ssl-key-password",
help="Password used to decrypt key file") help="Password used to decrypt key file")
parser.add_option("--require-auth", action="store_true", parser.add_option("--require-auth", action="store_true",
help="Require clients to authenticate") help="Require clients to authenticate")
parser.add_option("--sasl-mechs", type="string", parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs") help="The list of acceptable SASL mechs")
parser.add_option("--sasl-cfg-name", type="string", parser.add_option("--sasl-cfg-name", "--sasl-config-name", type="string",
help="name of SASL config file (no suffix)") help="name of SASL config file (no suffix)")
parser.add_option("--sasl-cfg-dir", type="string", parser.add_option("--sasl-cfg-dir", "--sasl-config-dir", type="string",
help="Path to the SASL config file") help="Path to the SASL config file")
opts, arguments = parser.parse_args(args=argv) opts, arguments = parser.parse_args(args=argv)
......
...@@ -126,6 +126,7 @@ def process_connection(connection, my_socket): ...@@ -126,6 +126,7 @@ def process_connection(connection, my_socket):
connection.close() connection.close()
return True return True
# Map the send callback status to a string # Map the send callback status to a string
SEND_STATUS = { SEND_STATUS = {
pyngus.SenderLink.ABORTED: "Aborted", pyngus.SenderLink.ABORTED: "Aborted",
......
...@@ -23,4 +23,4 @@ from pyngus.link import SenderLink, SenderEventHandler ...@@ -23,4 +23,4 @@ from pyngus.link import SenderLink, SenderEventHandler
from pyngus.sockets import read_socket_input from pyngus.sockets import read_socket_input
from pyngus.sockets import write_socket_output from pyngus.sockets import write_socket_output
VERSION = (2, 0, 3) # major, minor, fix VERSION = (2, 2, 2) # major, minor, fix
This diff is collapsed.
...@@ -31,8 +31,6 @@ class Container(object): ...@@ -31,8 +31,6 @@ class Container(object):
def __init__(self, name, properties=None): def __init__(self, name, properties=None):
self._name = name self._name = name
self._connections = {} self._connections = {}
self._timer_heap = [] # (next_tick, connection)
self._need_processing = set()
self._properties = properties self._properties = properties
def destroy(self): def destroy(self):
...@@ -67,7 +65,7 @@ class Container(object): ...@@ -67,7 +65,7 @@ class Container(object):
readers.append(c) readers.append(c)
if c.has_output > 0: if c.has_output > 0:
writers.append(c) writers.append(c)
if c.next_tick: if c.deadline:
heapq.heappush(timer_heap, (c.next_tick, c)) heapq.heappush(timer_heap, (c.next_tick, c))
timers = [] timers = []
while timer_heap: while timer_heap:
......
...@@ -188,15 +188,15 @@ class _Link(Endpoint): ...@@ -188,15 +188,15 @@ class _Link(Endpoint):
def active(self): def active(self):
state = self._pn_link.state state = self._pn_link.state
return (not self._failed and return (not self._failed and
state == (proton.Endpoint.LOCAL_ACTIVE state == (proton.Endpoint.LOCAL_ACTIVE |
| proton.Endpoint.REMOTE_ACTIVE)) proton.Endpoint.REMOTE_ACTIVE))
@property @property
def closed(self): def closed(self):
state = self._pn_link.state state = self._pn_link.state
return (self._failed or return (self._failed or
state == (proton.Endpoint.LOCAL_CLOSED state == (proton.Endpoint.LOCAL_CLOSED |
| proton.Endpoint.REMOTE_CLOSED)) proton.Endpoint.REMOTE_CLOSED))
def reject(self, pn_condition): def reject(self, pn_condition):
self._rejected = True # prevent 'active' callback! self._rejected = True # prevent 'active' callback!
...@@ -248,22 +248,27 @@ class _Link(Endpoint): ...@@ -248,22 +248,27 @@ class _Link(Endpoint):
@staticmethod @staticmethod
def _handle_proton_event(pn_event, connection): def _handle_proton_event(pn_event, connection):
ep_event = _Link._endpoint_event_map.get(pn_event.type) etype = pn_event.type
if pn_event.type == proton.Event.DELIVERY: if etype == proton.Event.DELIVERY:
pn_delivery = pn_event.context pn_link = pn_event.link
pn_link = pn_delivery.link pn_link.context and \
if pn_link.context: pn_link.context._process_delivery(pn_event.delivery)
pn_link.context._process_delivery(pn_delivery) return True
elif pn_event.type == proton.Event.LINK_FLOW:
pn_link = pn_event.context if etype == proton.Event.LINK_FLOW:
if pn_link.context: pn_link = pn_event.link
pn_link.context._process_credit() pn_link.context and pn_link.context._process_credit()
elif ep_event is not None: return True
pn_link = pn_event.context
if pn_link.context: ep_event = _Link._endpoint_event_map.get(etype)
if ep_event is not None:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_endpoint_event(ep_event) pn_link.context._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.LINK_INIT: return True
pn_link = pn_event.context
if etype == proton.Event.LINK_INIT:
pn_link = pn_event.link
# create a new link if requested by remote: # create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context c = hasattr(pn_link, 'context') and pn_link.context
if not c: if not c:
...@@ -278,11 +283,14 @@ class _Link(Endpoint): ...@@ -278,11 +283,14 @@ class _Link(Endpoint):
LOG.debug("Remotely initiated Receiver needs init") LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link) link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link connection._receiver_links[pn_link.name] = link
elif pn_event.type == proton.Event.LINK_FINAL: return True
if etype == proton.Event.LINK_FINAL:
LOG.debug("link finalized: %s", pn_event.context) LOG.debug("link finalized: %s", pn_event.context)
else: return True
return False # unknown
return True # handled return False # event not handled
elif hasattr(proton.Event, "LINK_REMOTE_STATE"): elif hasattr(proton.Event, "LINK_REMOTE_STATE"):
# 0.7 proton event model # 0.7 proton event model
@staticmethod @staticmethod
...@@ -394,8 +402,8 @@ class SenderLink(_Link): ...@@ -394,8 +402,8 @@ class SenderLink(_Link):
self.handle = handle self.handle = handle
self.deadline = deadline self.deadline = deadline
self.link._send_requests[self.tag] = self self.link._send_requests[self.tag] = self
if deadline: if self.deadline:
self.link._connection._add_timer(deadline, self) self.link._connection._add_timer(self.deadline, self)
def __call__(self): def __call__(self):
"""Invoked by Connection on timeout (now <= deadline).""" """Invoked by Connection on timeout (now <= deadline)."""
...@@ -403,7 +411,7 @@ class SenderLink(_Link): ...@@ -403,7 +411,7 @@ class SenderLink(_Link):
def destroy(self, state, info): def destroy(self, state, info):
"""Invoked on final completion of send.""" """Invoked on final completion of send."""
if self.deadline and state != SenderLink.TIMED_OUT: if self.deadline:
self.link._connection._cancel_timer(self.deadline, self) self.link._connection._cancel_timer(self.deadline, self)
if self.tag in self.link._send_requests: if self.tag in self.link._send_requests:
del self.link._send_requests[self.tag] del self.link._send_requests[self.tag]
...@@ -429,10 +437,6 @@ class SenderLink(_Link): ...@@ -429,10 +437,6 @@ class SenderLink(_Link):
delivery_callback, handle, delivery_callback, handle,
deadline) deadline)
self._pn_link.delivery(tag) self._pn_link.delivery(tag)
LOG.debug("Sending a message, tag=%s", tag)
if deadline:
self._connection._add_timer(deadline, send_req)
pn_delivery = self._pn_link.current pn_delivery = self._pn_link.current
if pn_delivery and pn_delivery.writable: if pn_delivery and pn_delivery.writable:
...@@ -441,7 +445,6 @@ class SenderLink(_Link): ...@@ -441,7 +445,6 @@ class SenderLink(_Link):
self._pending_sends.append(tag) self._pending_sends.append(tag)
tag = self._pending_sends.popleft() tag = self._pending_sends.popleft()
send_req = self._send_requests[tag] send_req = self._send_requests[tag]
LOG.debug("Sending previous pending message, tag=%s", tag)
self._write_msg(pn_delivery, send_req) self._write_msg(pn_delivery, send_req)
else: else:
LOG.debug("Send is pending for credit, tag=%s", tag) LOG.debug("Send is pending for credit, tag=%s", tag)
...@@ -470,13 +473,9 @@ class SenderLink(_Link): ...@@ -470,13 +473,9 @@ class SenderLink(_Link):
def _process_delivery(self, pn_delivery): def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed.""" """Check if the delivery can be processed."""
LOG.debug("Processing send delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.tag in self._send_requests: if pn_delivery.tag in self._send_requests:
if pn_delivery.settled or pn_delivery.remote_state: if pn_delivery.settled or pn_delivery.remote_state:
# remote has reached a 'terminal state' # remote has reached a 'terminal state'
LOG.debug("Remote has processed a sent msg")
outcome = pn_delivery.remote_state outcome = pn_delivery.remote_state
state = SenderLink._DISPOSITION_STATE_MAP.get(outcome, state = SenderLink._DISPOSITION_STATE_MAP.get(outcome,
self.UNKNOWN) self.UNKNOWN)
...@@ -496,7 +495,6 @@ class SenderLink(_Link): ...@@ -496,7 +495,6 @@ class SenderLink(_Link):
pn_delivery.settle() pn_delivery.settle()
elif pn_delivery.writable: elif pn_delivery.writable:
# we can now send on this delivery # we can now send on this delivery
LOG.debug("Delivery has become writable")
if self._pending_sends: if self._pending_sends:
tag = self._pending_sends.popleft() tag = self._pending_sends.popleft()
send_req = self._send_requests[tag] send_req = self._send_requests[tag]
...@@ -508,7 +506,6 @@ class SenderLink(_Link): ...@@ -508,7 +506,6 @@ class SenderLink(_Link):
def _process_credit(self): def _process_credit(self):
# check if any pending deliveries are now writable: # check if any pending deliveries are now writable:
LOG.debug("credit event, link=%s", self.name)
pn_delivery = self._pn_link.current pn_delivery = self._pn_link.current
while (self._pending_sends and while (self._pending_sends and
pn_delivery and pn_delivery.writable): pn_delivery and pn_delivery.writable):
...@@ -516,17 +513,14 @@ class SenderLink(_Link): ...@@ -516,17 +513,14 @@ class SenderLink(_Link):
pn_delivery = self._pn_link.current pn_delivery = self._pn_link.current
# Alert if credit has become available # Alert if credit has become available
new_credit = self._pn_link.credit
if self._handler and not self._rejected: if self._handler and not self._rejected:
if self._last_credit <= 0 and new_credit > 0: if 0 < self._pn_link.credit > self._last_credit:
LOG.debug("Credit is available, link=%s", self.name)
with self._callback_lock: with self._callback_lock:
self._handler.credit_granted(self) self._handler.credit_granted(self)
self._last_credit = new_credit self._last_credit = self._pn_link.credit
def _write_msg(self, pn_delivery, send_req): def _write_msg(self, pn_delivery, send_req):
# given a writable delivery, send a message # given a writable delivery, send a message
LOG.debug("Sending message to engine, tag=%s", send_req.tag)
self._pn_link.send(send_req.message.encode()) self._pn_link.send(send_req.message.encode())
self._pn_link.advance() self._pn_link.advance()
self._last_credit = self._pn_link.credit self._last_credit = self._pn_link.credit
...@@ -690,10 +684,7 @@ class ReceiverLink(_Link): ...@@ -690,10 +684,7 @@ class ReceiverLink(_Link):
def _process_delivery(self, pn_delivery): def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed.""" """Check if the delivery can be processed."""
LOG.debug("Processing receive delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.readable and not pn_delivery.partial: if pn_delivery.readable and not pn_delivery.partial:
LOG.debug("Receive delivery readable")
data = self._pn_link.recv(pn_delivery.pending) data = self._pn_link.recv(pn_delivery.pending)
msg = proton.Message() msg = proton.Message()
msg.decode(data) msg.decode(data)
......
...@@ -50,7 +50,6 @@ def read_socket_input(connection, socket_obj): ...@@ -50,7 +50,6 @@ def read_socket_input(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e)) LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle raise # caller must handle
except socket.error as e: except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno err = e.errno
if err in [errno.EAGAIN, if err in [errno.EAGAIN,
errno.EWOULDBLOCK, errno.EWOULDBLOCK,
...@@ -58,6 +57,7 @@ def read_socket_input(connection, socket_obj): ...@@ -58,6 +57,7 @@ def read_socket_input(connection, socket_obj):
# try again later # try again later
return 0 return 0
# otherwise, unrecoverable, caller must handle # otherwise, unrecoverable, caller must handle
LOG.debug("Socket error exception %s", str(e))
raise raise
except Exception as e: # beats me... assume fatal except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e)) LOG.debug("unknown socket exception %s", str(e))
...@@ -70,7 +70,6 @@ def read_socket_input(connection, socket_obj): ...@@ -70,7 +70,6 @@ def read_socket_input(connection, socket_obj):
count = Connection.EOS count = Connection.EOS
connection.close_input() connection.close_input()
connection.close_output() connection.close_output()
LOG.debug("Socket recv %s bytes", count)
return count return count
...@@ -97,7 +96,6 @@ def write_socket_output(connection, socket_obj): ...@@ -97,7 +96,6 @@ def write_socket_output(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e)) LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle raise # caller must handle
except socket.error as e: except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno err = e.errno
if err in [errno.EAGAIN, if err in [errno.EAGAIN,
errno.EWOULDBLOCK, errno.EWOULDBLOCK,
...@@ -105,13 +103,13 @@ def write_socket_output(connection, socket_obj): ...@@ -105,13 +103,13 @@ def write_socket_output(connection, socket_obj):
# try again later # try again later
return 0 return 0
# else assume fatal let caller handle it: # else assume fatal let caller handle it:
LOG.debug("Socket error exception %s", str(e))
raise raise
except Exception as e: # beats me... assume fatal except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e)) LOG.debug("unknown socket exception %s", str(e))
raise raise
if count > 0: if count > 0:
LOG.debug("Socket sent %s bytes", count)
connection.output_written(count) connection.output_written(count)
elif data: elif data:
LOG.debug("Socket closed") LOG.debug("Socket closed")
......
...@@ -17,9 +17,10 @@ ...@@ -17,9 +17,10 @@
# specific language governing permissions and limitations # specific language governing permissions and limitations
# under the License. # under the License.
import os
from setuptools import setup from setuptools import setup
_VERSION = "2.0.3" # NOTE: update __init__.py too! _VERSION = "2.2.2" # NOTE: update __init__.py too!
# I hack, therefore I am (productive) Some distros (which will not be named) # I hack, therefore I am (productive) Some distros (which will not be named)
# don't use setup.py to install the proton python module. In this case, pip # don't use setup.py to install the proton python module. In this case, pip
...@@ -33,11 +34,11 @@ try: ...@@ -33,11 +34,11 @@ try:
except ImportError: except ImportError:
# this version of proton will download and install the proton shared # this version of proton will download and install the proton shared
# library as well: # library as well:
_dependencies = ['python-qpid-proton>=0.9,<0.11'] _dependencies = ['python-qpid-proton>=0.9,<0.20']
setup(name="pyngus", setup(name="pyngus",
version=_VERSION, version=_VERSION + os.environ.get('PYNGUS_VERSION_SUFFIX', ''),
author="kgiusti", author="kgiusti",
author_email="kgiusti@apache.org", author_email="kgiusti@apache.org",
packages=["pyngus"], packages=["pyngus"],
...@@ -49,4 +50,11 @@ setup(name="pyngus", ...@@ -49,4 +50,11 @@ setup(name="pyngus",
classifiers=["License :: OSI Approved :: Apache Software License", classifiers=["License :: OSI Approved :: Apache Software License",
"Intended Audience :: Developers", "Intended Audience :: Developers",
"Operating System :: OS Independent", "Operating System :: OS Independent",
"Programming Language :: Python"]) "Programming Language :: Python",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6"])
...@@ -28,24 +28,42 @@ perf-test.py is a static performance test. It simulates sending many ...@@ -28,24 +28,42 @@ perf-test.py is a static performance test. It simulates sending many
small messages over many links. It can be used to gauge the small messages over many links. It can be used to gauge the
performance impact of code changes during development. performance impact of code changes during development.
To run, invoke it using the *time* command. Example: Example:
$ time ./tests/python/perf-test.py $ ./tests/perf-test.py
Total: 200000 messages; credit window: 10; proton (0, 13, 1)
real 2m15.789s 6434 Messages/second; Latency avg: 24.581ms min: 10.419ms max: 45.249ms
user 2m15.357s
sys 0m0.039s
## Historical Results ## ## Historical Results ##
### Lenovo T530 ### ### Lenovo W541 ###
master @ 7cc6f77b781916ee679d36e8fd1d1bcf77760353 (Proton 0.7 RC4) v2.2.1
real 2m1.102s ------
user 2m0.814s Python 2.7.13 (Fedora 24)
sys 0m0.026s Total: 200000 messages; credit window: 10; proton (0, 17, 0)
8176 Messages/second; Latency avg: 19.116ms min: 9.241ms max: 26.693ms
Python 3.5.3 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 17, 0)
7081 Messages/second; Latency avg: 22.103ms min: 10.926ms max: 40.079ms
0.1.0-p0.7: v2.1.4
real 3m7.240s ------
user 3m6.832s Python 2.7.13 (Fedora 24)
sys 0m0.026s Total: 200000 messages; credit window: 10; proton (0, 16, 0)
8106 Messages/second; Latency avg: 19.283ms min: 9.525ms max: 29.096ms
Python 3.5.2 (Fedora 24)
Total: 200000 messages; credit window: 10; proton (0, 16, 0)
7203 Messages/second; Latency avg: 21.723ms min: 11.098ms max: 41.364ms
v2.1.3
------
Python 2.7.11
Total: 200000 messages; credit window: 10; proton (0, 15, 0)
7964 Messages/second; Latency avg: 19.609ms min: 9.463ms max: 28.257ms
v2.1.1
------
Python 2.7.11
Total: 200000 messages; credit window: 10; proton (0, 13, 1)
6434 Messages/second; Latency avg: 24.581ms min: 10.419ms max: 45.249ms
...@@ -25,6 +25,7 @@ import time ...@@ -25,6 +25,7 @@ import time
import uuid import uuid
from proton import Message from proton import Message
from proton import VERSION as PN_VERSION
import pyngus import pyngus
...@@ -41,24 +42,16 @@ class PerfConnection(pyngus.ConnectionEventHandler): ...@@ -41,24 +42,16 @@ class PerfConnection(pyngus.ConnectionEventHandler):
class PerfSendConnection(PerfConnection): class PerfSendConnection(PerfConnection):
def __init__(self, name, container, properties, def __init__(self, name, container, properties, msg_count, link_count):
msg_count, batch_size, link_count): super(PerfSendConnection, self).__init__(name, container, properties)
super(PerfSendConnection, self).__init__(name, container,
properties)
self._msg_count = msg_count self._msg_count = msg_count
self._batch_size = batch_size
self._link_count = link_count self._link_count = link_count
self.connection.pn_sasl.mechanisms("ANONYMOUS")
self.connection.pn_sasl.client()
self.senders = set() self.senders = set()
self.connection.open() self.connection.open()
def connection_active(self, connection): def connection_active(self, connection):
for i in range(self._link_count): for i in range(self._link_count):
PerfSender("sender-%d" % i, PerfSender("sender-%d" % i, self, self._msg_count)
self,
self._msg_count,
self._batch_size)
class PerfReceiveConnection(PerfConnection): class PerfReceiveConnection(PerfConnection):
...@@ -68,8 +61,9 @@ class PerfReceiveConnection(PerfConnection): ...@@ -68,8 +61,9 @@ class PerfReceiveConnection(PerfConnection):
properties) properties)
self._msg_count = msg_count self._msg_count = msg_count
self._credit_window = credit_window self._credit_window = credit_window
self.connection.pn_sasl.mechanisms("ANONYMOUS") self.latency = 0
self.connection.pn_sasl.server() self.latency_min = 100000000
self.latency_max = 0
self.connection.open() self.connection.open()
self.receivers = set() self.receivers = set()
...@@ -91,35 +85,33 @@ class PerfReceiveConnection(PerfConnection): ...@@ -91,35 +85,33 @@ class PerfReceiveConnection(PerfConnection):
class PerfSender(pyngus.SenderEventHandler): class PerfSender(pyngus.SenderEventHandler):
def __init__(self, address, perf_send_conn, def __init__(self, address, perf_send_conn, msg_count):
msg_count, batch_size):
self.msg = Message() self.msg = Message()
self.msg.body = "HELLO"
self.sent = 0 self.sent = 0
self.acked = 0 self.acked = 0
self.msg_count = msg_count self.msg_count = msg_count
self.batch_size = batch_size
self.address = address self.address = address
self.perf_conn = perf_send_conn self.perf_conn = perf_send_conn
self.perf_conn.senders.add(self) self.perf_conn.senders.add(self)
connection = perf_send_conn.connection connection = perf_send_conn.connection
self.link = connection.create_sender(address, self.link = connection.create_sender(address, event_handler=self)
event_handler=self)
self.link.context = self self.link.context = self
self.link.open() self.link.open()
def _send_msgs(self):
"""Send up to a batch of messages."""
batch = min(self.msg_count - self.sent, self.batch_size)
target = self.sent + batch
while self.sent < target:
self.link.send(self.msg, self.send_complete)
self.sent += 1
def sender_active(self, sender_link): def sender_active(self, sender_link):
self._send_msgs() self._send_msgs()
def send_complete(self, link, handle, result, info): def credit_granted(self, sender_link):
self._send_msgs()
def _send_msgs(self):
# Send as many messages as credit allows
while self.link.credit > 0 and self.sent < self.msg_count:
self.msg.body = {"timestamp": time.time()}
self.link.send(self.msg, self._send_complete)
self.sent += 1
def _send_complete(self, link, handle, result, info):
self.acked += 1 self.acked += 1
if self.acked == self.msg_count: if self.acked == self.msg_count:
# test done, shutdown # test done, shutdown
...@@ -127,9 +119,6 @@ class PerfSender(pyngus.SenderEventHandler): ...@@ -127,9 +119,6 @@ class PerfSender(pyngus.SenderEventHandler):
self.perf_conn.senders.discard(self) self.perf_conn.senders.discard(self)
if len(self.perf_conn.senders) == 0: if len(self.perf_conn.senders) == 0:
self.link.connection.close() self.link.connection.close()
elif self.acked == self.sent:
# send next batch
self._send_msgs()
class PerfReceiver(pyngus.ReceiverEventHandler): class PerfReceiver(pyngus.ReceiverEventHandler):
...@@ -138,6 +127,7 @@ class PerfReceiver(pyngus.ReceiverEventHandler): ...@@ -138,6 +127,7 @@ class PerfReceiver(pyngus.ReceiverEventHandler):
self.msg_count = msg_count self.msg_count = msg_count
self.received = 0 self.received = 0
self.credit_window = credit_window if credit_window else msg_count self.credit_window = credit_window if credit_window else msg_count
self.credit_low = (credit_window + 1) / 2
self.address = address self.address = address
self.perf_conn = perf_receive_conn self.perf_conn = perf_receive_conn
self.perf_conn.receivers.add(self) self.perf_conn.receivers.add(self)
...@@ -150,20 +140,25 @@ class PerfReceiver(pyngus.ReceiverEventHandler): ...@@ -150,20 +140,25 @@ class PerfReceiver(pyngus.ReceiverEventHandler):
self.link.open() self.link.open()
def message_received(self, receiver_link, message, handle): def message_received(self, receiver_link, message, handle):
"""Acknowledge receipt, grant more credit if needed.""" # Acknowledge receipt, grant more credit if needed
self.link.message_accepted(handle) self.link.message_accepted(handle)
self.received += 1 self.received += 1
if self.received < self.msg_count: timestamp = message.body["timestamp"]
if self.link.capacity == 0: latency = time.time() - timestamp
self.link.add_capacity(self.credit_window) self.perf_conn.latency += latency
else: self.perf_conn.latency_min = min(latency, self.perf_conn.latency_min)
self.perf_conn.latency_max = max(latency, self.perf_conn.latency_max)
if self.link.capacity <= self.credit_low and \
self.received < self.msg_count:
self.link.add_capacity(self.credit_window - self.link.capacity)
elif self.received == self.msg_count:
# link done # link done
self.link.close() self.link.close()
self.perf_conn.receivers.discard(self) self.perf_conn.receivers.discard(self)
def process_connections(c1, c2): def process_connections(c1, c2):
"""Transfer I/O, then process each connection.""" # Transfer I/O, then process each connection
def _do_io(src, dst): def _do_io(src, dst):
count = min(src.has_output, dst.needs_input) count = min(src.has_output, dst.needs_input)
if count > 0: if count > 0:
...@@ -183,15 +178,15 @@ def main(argv=None): ...@@ -183,15 +178,15 @@ def main(argv=None):
parser = optparse.OptionParser(usage=_usage) parser = optparse.OptionParser(usage=_usage)
parser.add_option("--count", dest="count", type="int", parser.add_option("--count", dest="count", type="int",
default=10000, default=10000,
help="# of messages to transfer.") help="# of messages to transfer per link.")
parser.add_option("--links", dest="link_count", type="int", parser.