Commit 27c8d54f authored by Thomas Goirand's avatar Thomas Goirand

Merge tag '2.2.2' into debian/queens

parents 77ca1d54 6e917580
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
addons:
apt:
packages:
- libssl-dev
- libsasl2-dev
- sasl2-bin
- swig
- python-dev
- python3-dev
- uuid-dev
install:
- sudo apt-get install -qq uuid-dev swig
- pip install -r test-requirements.txt
- pip install .
# command to run tests
......
include README.md
include LICENSE
include docs/User-Guide.md
exclude .gitignore
global-exclude *.pyc
......@@ -7,6 +7,68 @@ callback-based API for message passing.
See the User Guide in the docs directory for more detail.
## Release 2.2.2 ##
* verified Python 3.6 compatibility
* bumped max proton version to 0.19
## Release 2.2.1 ##
* disable the socket I/O logging - fills the debug logs with lots of
useless crap.
## Release 2.2.0 ##
* Can now use the system's default CA by specifying the 'x-ssl' option
in the 'properties' field of the create_connection call and _NOT_
specifying the 'x-ssl-ca-file' property. (contributed by
Juan Antonio Osorio Robles)
* use the most secure default setting for x-ssl-verify-mode based on
the configuration
* bump max proton version to 0.17
## Release 2.1.4 ##
* avoid using deprecated next_tick in the container
* enable Python 3.5 testing in tox
* add client authentication via SSL tests
* bump max proton version to 0.16
## Release 2.1.3 ##
* Remove chatty debug log messages
* fix pep8 violation
* add static performace test tool
* Bump max proton version to 0.15
## Release 2.1.2 ##
* Bump max proton version to 0.14
## Release 2.1.1 ##
* bugfix: under some (rare) flow/credit interactions a sender may
stall. Changed code to invoke credit_granted() callback more
frequently.
## Release 2.1.0 ##
* feature: add 'x-force-sasl' to connection property map
* bugfix: update old SASL unit test
## Release 2.0.4 ##
* Bump max proton version to 0.13
* performance tweak to link event handling
* fix perf-test.py tool
* bugfix: fix leak of timer callbacks
* enable Python 3.4 testing
* bugfix: fix receiver example (recv.py)
* several fixes to the SASL unit tests
* bugfix: fix leak of underlying proton objects
* Add SASL/SSL configuration options to examples
* bugfix: allow PLAIN or ANONYMOUS authentication in server mode
## Release 2.0.3 ##
* bugfix: fixed a memory leak
......
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""Tool to gauge message passing throughput and latencies"""
import logging
import optparse
import time
import uuid
import pyngus
from proton import Message
from utils import connect_socket
from utils import get_host_port
from utils import process_connection
LOG = logging.getLogger()
LOG.addHandler(logging.StreamHandler())
class ConnectionEventHandler(pyngus.ConnectionEventHandler):
def __init__(self):
super(ConnectionEventHandler, self).__init__()
def connection_failed(self, connection, error):
"""Connection has failed in some way."""
LOG.warn("Connection failed callback: %s", error)
def connection_remote_closed(self, connection, pn_condition):
"""Peer has closed its end of the connection."""
LOG.debug("connection_remote_closed condition=%s", pn_condition)
connection.close()
class SenderHandler(pyngus.SenderEventHandler):
def __init__(self, count):
self._count = count
self._msg = Message()
self.calls = 0
self.total_ack_latency = 0.0
self.stop_time = None
self.start_time = None
def credit_granted(self, sender_link):
if self.start_time is None:
self.start_time = time.time()
self._send_message(sender_link)
def _send_message(self, link):
now = time.time()
self._msg.body = {'tx-timestamp': now}
self._last_send = now
link.send(self._msg, self)
def __call__(self, link, handle, status, error):
now = time.time()
self.total_ack_latency += now - self._last_send
self.calls += 1
if self._count:
self._count -= 1
if self._count == 0:
self.stop_time = now
link.close()
return
self._send_message(link)
def sender_remote_closed(self, sender_link, pn_condition):
LOG.debug("Sender peer_closed condition=%s", pn_condition)
sender_link.close()
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.debug("Sender failed error=%s", error)
sender_link.close()
class ReceiverHandler(pyngus.ReceiverEventHandler):
def __init__(self, count, capacity):
self._count = count
self._capacity = capacity
self._msg = Message()
self.receives = 0
self.tx_total_latency = 0.0
def receiver_active(self, receiver_link):
receiver_link.add_capacity(self._capacity)
def receiver_remote_closed(self, receiver_link, pn_condition):
"""Peer has closed its end of the link."""
LOG.debug("receiver_remote_closed condition=%s", pn_condition)
receiver_link.close()
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.warn("receiver_failed error=%s", error)
receiver_link.close()
def message_received(self, receiver, message, handle):
now = time.time()
receiver.message_accepted(handle)
self.tx_total_latency += now - message.body['tx-timestamp']
self.receives += 1
if self._count:
self._count -= 1
if self._count == 0:
receiver.close()
return
lc = receiver.capacity
cap = self._capacity
if lc < (cap / 2):
receiver.add_capacity(cap - lc)
def main(argv=None):
_usage = """Usage: %prog [options]"""
parser = optparse.OptionParser(usage=_usage)
parser.add_option("-a", dest="server", type="string",
default="amqp://0.0.0.0:5672",
help="The address of the server [amqp://0.0.0.0:5672]")
parser.add_option("--node", type='string', default='amq.topic',
help='Name of source/target node')
parser.add_option("--count", type='int', default=100,
help='Send N messages (send forever if N==0)')
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--trace", dest="trace", action="store_true",
help="enable protocol tracing")
opts, _ = parser.parse_args(args=argv)
if opts.debug:
LOG.setLevel(logging.DEBUG)
host, port = get_host_port(opts.server)
my_socket = connect_socket(host, port)
# create AMQP Container, Connection, and SenderLink
#
container = pyngus.Container(uuid.uuid4().hex)
conn_properties = {'hostname': host,
'x-server': False}
if opts.trace:
conn_properties["x-trace-protocol"] = True
c_handler = ConnectionEventHandler()
connection = container.create_connection("perf_tool",
c_handler,
conn_properties)
r_handler = ReceiverHandler(opts.count, opts.count or 1000)
receiver = connection.create_receiver(opts.node, opts.node, r_handler)
s_handler = SenderHandler(opts.count)
sender = connection.create_sender(opts.node, opts.node, s_handler)
connection.open()
receiver.open()
while not receiver.active:
process_connection(connection, my_socket)
sender.open()
# Run until all messages transfered
while not sender.closed or not receiver.closed:
process_connection(connection, my_socket)
connection.close()
while not connection.closed:
process_connection(connection, my_socket)
duration = s_handler.stop_time - s_handler.start_time
thru = s_handler.calls / duration
permsg = duration / s_handler.calls
ack = s_handler.total_ack_latency / s_handler.calls
lat = r_handler.tx_total_latency / r_handler.receives
print("Stats:\n"
" TX Avg Calls/Sec: %f Per Call: %f Ack Latency %f\n"
" RX Latency: %f" % (thru, permsg, ack, lat))
sender.destroy()
receiver.destroy()
connection.destroy()
container.destroy()
my_socket.close()
return 0
if __name__ == "__main__":
main()
......@@ -89,12 +89,22 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string",
help="User Id for authentication")
parser.add_option("--password", type="string",
help="User password for authentication")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, extra = parser.parse_args(args=argv)
if opts.debug:
......@@ -111,6 +121,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username:
......@@ -119,8 +133,12 @@ def main(argv=None):
conn_properties['x-password'] = opts.password
if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = pyngus.ConnectionEventHandler()
c_handler = ConnectionEventHandler()
connection = container.create_connection("receiver",
c_handler,
conn_properties)
......@@ -146,6 +164,10 @@ def main(argv=None):
else:
print("Receive failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
receiver.close()
connection.close()
......
......@@ -373,7 +373,7 @@ def main(argv=None):
# Create the RPC caller
method = {'method': method_info[0],
'args': dict([(method_info[i], method_info[i+1])
'args': dict([(method_info[i], method_info[i + 1])
for i in range(1, len(method_info), 2)])}
my_caller = my_connection.create_caller(method,
"my-source-address",
......
......@@ -78,12 +78,22 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--ssl-cert-file",
help="Self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-file",
help="Key for self-identifying certificate (PEM file)")
parser.add_option("--ssl-key-password",
help="Password to unlock SSL key file")
parser.add_option("--username", type="string",
help="User Id for authentication")
parser.add_option("--password", type="string",
help="User password for authentication")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-config-dir", type="string",
help="Path to directory containing sasl config")
parser.add_option("--sasl-config-name", type="string",
help="Name of the sasl config file (without '.config')")
opts, payload = parser.parse_args(args=argv)
if not payload:
......@@ -103,6 +113,10 @@ def main(argv=None):
conn_properties["x-trace-protocol"] = True
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
if opts.ssl_cert_file:
conn_properties["x-ssl-identity"] = (opts.ssl_cert_file,
opts.ssl_key_file,
opts.ssl_key_password)
if opts.idle_timeout:
conn_properties["idle-time-out"] = opts.idle_timeout
if opts.username:
......@@ -111,6 +125,10 @@ def main(argv=None):
conn_properties['x-password'] = opts.password
if opts.sasl_mechs:
conn_properties['x-sasl-mechs'] = opts.sasl_mechs
if opts.sasl_config_dir:
conn_properties["x-sasl-config-dir"] = opts.sasl_config_dir
if opts.sasl_config_name:
conn_properties["x-sasl-config-name"] = opts.sasl_config_name
c_handler = ConnectionEventHandler()
connection = container.create_connection("sender",
......@@ -151,6 +169,10 @@ def main(argv=None):
else:
print("Send failed due to connection failure!")
# flush any remaining output before closing (optional)
while connection.has_output > 0:
process_connection(connection, my_socket)
sender.close()
connection.close()
......
......@@ -245,19 +245,21 @@ def main(argv=None):
help="enable protocol tracing")
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--cert",
parser.add_option("--ca",
help="Certificate Authority PEM file")
parser.add_option("--cert", "--ssl-cert-file",
help="PEM File containing the server's certificate")
parser.add_option("--key",
parser.add_option("--key", "--ssl-key-file",
help="PEM File containing the server's private key")
parser.add_option("--keypass",
parser.add_option("--keypass", "--ssl-key-password",
help="Password used to decrypt key file")
parser.add_option("--require-auth", action="store_true",
help="Require clients to authenticate")
parser.add_option("--sasl-mechs", type="string",
help="The list of acceptable SASL mechs")
parser.add_option("--sasl-cfg-name", type="string",
parser.add_option("--sasl-cfg-name", "--sasl-config-name", type="string",
help="name of SASL config file (no suffix)")
parser.add_option("--sasl-cfg-dir", type="string",
parser.add_option("--sasl-cfg-dir", "--sasl-config-dir", type="string",
help="Path to the SASL config file")
opts, arguments = parser.parse_args(args=argv)
......
......@@ -126,6 +126,7 @@ def process_connection(connection, my_socket):
connection.close()
return True
# Map the send callback status to a string
SEND_STATUS = {
pyngus.SenderLink.ABORTED: "Aborted",
......
......@@ -23,4 +23,4 @@ from pyngus.link import SenderLink, SenderEventHandler
from pyngus.sockets import read_socket_input
from pyngus.sockets import write_socket_output
VERSION = (2, 0, 3) # major, minor, fix
VERSION = (2, 2, 2) # major, minor, fix
This diff is collapsed.
......@@ -31,8 +31,6 @@ class Container(object):
def __init__(self, name, properties=None):
self._name = name
self._connections = {}
self._timer_heap = [] # (next_tick, connection)
self._need_processing = set()
self._properties = properties
def destroy(self):
......@@ -67,7 +65,7 @@ class Container(object):
readers.append(c)
if c.has_output > 0:
writers.append(c)
if c.next_tick:
if c.deadline:
heapq.heappush(timer_heap, (c.next_tick, c))
timers = []
while timer_heap:
......
......@@ -188,15 +188,15 @@ class _Link(Endpoint):
def active(self):
state = self._pn_link.state
return (not self._failed and
state == (proton.Endpoint.LOCAL_ACTIVE
| proton.Endpoint.REMOTE_ACTIVE))
state == (proton.Endpoint.LOCAL_ACTIVE |
proton.Endpoint.REMOTE_ACTIVE))
@property
def closed(self):
state = self._pn_link.state
return (self._failed or
state == (proton.Endpoint.LOCAL_CLOSED
| proton.Endpoint.REMOTE_CLOSED))
state == (proton.Endpoint.LOCAL_CLOSED |
proton.Endpoint.REMOTE_CLOSED))
def reject(self, pn_condition):
self._rejected = True # prevent 'active' callback!
......@@ -248,22 +248,27 @@ class _Link(Endpoint):
@staticmethod
def _handle_proton_event(pn_event, connection):
ep_event = _Link._endpoint_event_map.get(pn_event.type)
if pn_event.type == proton.Event.DELIVERY:
pn_delivery = pn_event.context
pn_link = pn_delivery.link
if pn_link.context:
pn_link.context._process_delivery(pn_delivery)
elif pn_event.type == proton.Event.LINK_FLOW:
pn_link = pn_event.context
if pn_link.context:
pn_link.context._process_credit()
elif ep_event is not None:
pn_link = pn_event.context
if pn_link.context:
etype = pn_event.type
if etype == proton.Event.DELIVERY:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_delivery(pn_event.delivery)
return True
if etype == proton.Event.LINK_FLOW:
pn_link = pn_event.link
pn_link.context and pn_link.context._process_credit()
return True
ep_event = _Link._endpoint_event_map.get(etype)
if ep_event is not None:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.LINK_INIT:
pn_link = pn_event.context
return True
if etype == proton.Event.LINK_INIT:
pn_link = pn_event.link
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if not c:
......@@ -278,11 +283,14 @@ class _Link(Endpoint):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link
elif pn_event.type == proton.Event.LINK_FINAL:
return True
if etype == proton.Event.LINK_FINAL:
LOG.debug("link finalized: %s", pn_event.context)
else:
return False # unknown
return True # handled
return True
return False # event not handled
elif hasattr(proton.Event, "LINK_REMOTE_STATE"):
# 0.7 proton event model
@staticmethod
......@@ -394,8 +402,8 @@ class SenderLink(_Link):
self.handle = handle
self.deadline = deadline
self.link._send_requests[self.tag] = self
if deadline:
self.link._connection._add_timer(deadline, self)
if self.deadline:
self.link._connection._add_timer(self.deadline, self)
def __call__(self):
"""Invoked by Connection on timeout (now <= deadline)."""
......@@ -403,7 +411,7 @@ class SenderLink(_Link):
def destroy(self, state, info):
"""Invoked on final completion of send."""
if self.deadline and state != SenderLink.TIMED_OUT:
if self.deadline:
self.link._connection._cancel_timer(self.deadline, self)
if self.tag in self.link._send_requests:
del self.link._send_requests[self.tag]
......@@ -429,10 +437,6 @@ class SenderLink(_Link):
delivery_callback, handle,
deadline)
self._pn_link.delivery(tag)
LOG.debug("Sending a message, tag=%s", tag)
if deadline:
self._connection._add_timer(deadline, send_req)
pn_delivery = self._pn_link.current
if pn_delivery and pn_delivery.writable:
......@@ -441,7 +445,6 @@ class SenderLink(_Link):
self._pending_sends.append(tag)
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
LOG.debug("Sending previous pending message, tag=%s", tag)
self._write_msg(pn_delivery, send_req)
else:
LOG.debug("Send is pending for credit, tag=%s", tag)
......@@ -470,13 +473,9 @@ class SenderLink(_Link):
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
LOG.debug("Processing send delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.tag in self._send_requests:
if pn_delivery.settled or pn_delivery.remote_state:
# remote has reached a 'terminal state'
LOG.debug("Remote has processed a sent msg")
outcome = pn_delivery.remote_state
state = SenderLink._DISPOSITION_STATE_MAP.get(outcome,
self.UNKNOWN)
......@@ -496,7 +495,6 @@ class SenderLink(_Link):
pn_delivery.settle()
elif pn_delivery.writable:
# we can now send on this delivery
LOG.debug("Delivery has become writable")
if self._pending_sends:
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
......@@ -508,7 +506,6 @@ class SenderLink(_Link):
def _process_credit(self):
# check if any pending deliveries are now writable:
LOG.debug("credit event, link=%s", self.name)
pn_delivery = self._pn_link.current
while (self._pending_sends and
pn_delivery and pn_delivery.writable):
......@@ -516,17 +513,14 @@ class SenderLink(_Link):
pn_delivery = self._pn_link.current
# Alert if credit has become available
new_credit = self._pn_link.credit
if self._handler and not self._rejected:
if self._last_credit <= 0 and new_credit > 0:
LOG.debug("Credit is available, link=%s", self.name)
if 0 < self._pn_link.credit > self._last_credit:
with self._callback_lock:
self._handler.credit_granted(self)
self._last_credit = new_credit
self._last_credit = self._pn_link.credit
def _write_msg(self, pn_delivery, send_req):
# given a writable delivery, send a message
LOG.debug("Sending message to engine, tag=%s", send_req.tag)
self._pn_link.send(send_req.message.encode())
self._pn_link.advance()
self._last_credit = self._pn_link.credit
......@@ -690,10 +684,7 @@ class ReceiverLink(_Link):
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
LOG.debug("Processing receive delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.readable and not pn_delivery.partial:
LOG.debug("Receive delivery readable")
data = self._pn_link.recv(pn_delivery.pending)
msg = proton.Message()
msg.decode(data)
......
......@@ -50,7 +50,6 @@ def read_socket_input(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle
except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno
if err in [errno.EAGAIN,
errno.EWOULDBLOCK,
......@@ -58,6 +57,7 @@ def read_socket_input(connection, socket_obj):
# try again later
return 0
# otherwise, unrecoverable, caller must handle
LOG.debug("Socket error exception %s", str(e))
raise
except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e))
......@@ -70,7 +70,6 @@ def read_socket_input(connection, socket_obj):
count = Connection.EOS
connection.close_input()
connection.close_output()
LOG.debug("Socket recv %s bytes", count)
return count
......@@ -97,7 +96,6 @@ def write_socket_output(connection, socket_obj):
LOG.debug("Socket timeout exception %s", str(e))
raise # caller must handle
except socket.error as e:
LOG.debug("Socket error exception %s", str(e))
err = e.errno
if err in [errno.EAGAIN,
errno.EWOULDBLOCK,
......@@ -105,13 +103,13 @@ def write_socket_output(connection, socket_obj):
# try again later
return 0
# else assume fatal let caller handle it:
LOG.debug("Socket error exception %s", str(e))
raise
except Exception as e: # beats me... assume fatal
LOG.debug("unknown socket exception %s", str(e))
raise
if count > 0:
LOG.debug("Socket sent %s bytes", count)
connection.output_written(count)
elif data:
LOG.debug("Socket closed")
......
......@@ -17,9 +17,10 @@
# specific language governing permissions and limitations
# under the License.
import os
from setuptools import setup
_VERSION = "2.0.3" # NOTE: update __init__.py too!
_VERSION = "2.2.2" # NOTE: update __init__.py too!
# I hack, therefore I am (productive) Some distros (which will not be named)
# don't use setup.py to install the proton python module. In this case, pip
......@@ -33,11 +34,11 @@ try:
except ImportError:
# this version of proton will download and install the proton shared
# library as well:
_dependencies = ['python-qpid-proton>=0.9,<0.11']
_dependencies = ['python-qpid-proton>=0.9,<0.20']
setup(name="pyngus",