Commit ad9119a2 authored by Kenneth Giusti's avatar Kenneth Giusti

add message performace tool

parent fe4d2389
#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
"""Tool to gauge message passing throughput and latencies"""
import logging
import optparse
import sys
import time
import uuid
import pyngus
from proton import Message
from utils import connect_socket
from utils import get_host_port
from utils import process_connection
LOG = logging.getLogger()
LOG.addHandler(logging.StreamHandler())
class ConnectionEventHandler(pyngus.ConnectionEventHandler):
def __init__(self):
super(ConnectionEventHandler, self).__init__()
def connection_failed(self, connection, error):
"""Connection has failed in some way."""
LOG.warn("Connection failed callback: %s", error)
def connection_remote_closed(self, connection, pn_condition):
"""Peer has closed its end of the connection."""
LOG.debug("connection_remote_closed condition=%s", pn_condition)
connection.close()
class SenderHandler(pyngus.SenderEventHandler):
def __init__(self, count):
self._count = count
self._msg = Message()
self.calls = 0
self.total_ack_latency = 0.0
self.stop_time = None
self.start_time = None
def credit_granted(self, sender_link):
if self.start_time is None:
self.start_time = time.time()
self._send_message(sender_link)
def _send_message(self, link):
now = time.time()
self._msg.body = {'tx-timestamp': now}
self._last_send = now
link.send(self._msg, self)
def __call__(self, link, handle, status, error):
now = time.time()
self.total_ack_latency += now - self._last_send
self.calls += 1
if self._count:
self._count -= 1
if self._count == 0:
self.stop_time = now
link.close()
return
self._send_message(link)
def sender_remote_closed(self, sender_link, pn_condition):
LOG.debug("Sender peer_closed condition=%s", pn_condition)
sender_link.close()
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.debug("Sender failed error=%s", error)
sender_link.close()
class ReceiverHandler(pyngus.ReceiverEventHandler):
def __init__(self, count, capacity):
self._count = count
self._capacity = capacity
self._msg = Message()
self.receives = 0
self.tx_total_latency = 0.0
def receiver_active(self, receiver_link):
receiver_link.add_capacity(self._capacity)
def receiver_remote_closed(self, receiver_link, pn_condition):
"""Peer has closed its end of the link."""
LOG.debug("receiver_remote_closed condition=%s", pn_condition)
receiver_link.close()
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.warn("receiver_failed error=%s", error)
receiver_link.close()
def message_received(self, receiver, message, handle):
now = time.time()
receiver.message_accepted(handle)
self.tx_total_latency += now - message.body['tx-timestamp']
self.receives += 1
if self._count:
self._count -= 1
if self._count == 0:
receiver.close()
return
lc = receiver.capacity
cap = self._capacity
if lc < (cap / 2):
receiver.add_capacity(cap - lc)
def main(argv=None):
_usage = """Usage: %prog [options]"""
parser = optparse.OptionParser(usage=_usage)
parser.add_option("-a", dest="server", type="string",
default="amqp://0.0.0.0:5672",
help="The address of the server [amqp://0.0.0.0:5672]")
parser.add_option("--node", type='string', default='amq.topic',
help='Name of source/target node')
parser.add_option("--count", type='int', default=100,
help='Send N messages (send forever if N==0)')
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--trace", dest="trace", action="store_true",
help="enable protocol tracing")
opts, _ = parser.parse_args(args=argv)
if opts.debug:
LOG.setLevel(logging.DEBUG)
host, port = get_host_port(opts.server)
my_socket = connect_socket(host, port)
# create AMQP Container, Connection, and SenderLink
#
container = pyngus.Container(uuid.uuid4().hex)
conn_properties = {'hostname': host,
'x-server': False}
if opts.trace:
conn_properties["x-trace-protocol"] = True
c_handler = ConnectionEventHandler()
connection = container.create_connection("perf_tool",
c_handler,
conn_properties)
r_handler = ReceiverHandler(opts.count, opts.count or 1000)
receiver = connection.create_receiver(opts.node, opts.node, r_handler)
s_handler = SenderHandler(opts.count)
sender = connection.create_sender(opts.node, opts.node, s_handler)
connection.open()
receiver.open()
while not receiver.active:
process_connection(connection, my_socket)
sender.open()
# Run until all messages transfered
while not sender.closed or not receiver.closed:
process_connection(connection, my_socket)
connection.close()
while not connection.closed:
process_connection(connection, my_socket)
duration = s_handler.stop_time - s_handler.start_time
thru = s_handler.calls / duration
permsg = duration / s_handler.calls
ack = s_handler.total_ack_latency / s_handler.calls
lat = r_handler.tx_total_latency / r_handler.receives
print("Stats:\n"
" TX Avg Calls/Sec: %f Per Call: %f Ack Latency %f\n"
" RX Latency: %f" % (thru, permsg, ack, lat))
sender.destroy()
receiver.destroy()
connection.destroy()
container.destroy()
my_socket.close()
return 0
if __name__ == "__main__":
main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment