Commit 77fbd97a authored by Kenneth Giusti's avatar Kenneth Giusti

fix and enhance the perf-test.py tool

parent 74b5ca5b
......@@ -25,6 +25,7 @@ import time
import uuid
from proton import Message
from proton import VERSION as PN_VERSION
import pyngus
......@@ -41,24 +42,16 @@ class PerfConnection(pyngus.ConnectionEventHandler):
class PerfSendConnection(PerfConnection):
def __init__(self, name, container, properties,
msg_count, batch_size, link_count):
super(PerfSendConnection, self).__init__(name, container,
properties)
def __init__(self, name, container, properties, msg_count, link_count):
super(PerfSendConnection, self).__init__(name, container, properties)
self._msg_count = msg_count
self._batch_size = batch_size
self._link_count = link_count
self.connection.pn_sasl.mechanisms("ANONYMOUS")
self.connection.pn_sasl.client()
self.senders = set()
self.connection.open()
def connection_active(self, connection):
for i in range(self._link_count):
PerfSender("sender-%d" % i,
self,
self._msg_count,
self._batch_size)
PerfSender("sender-%d" % i, self, self._msg_count)
class PerfReceiveConnection(PerfConnection):
......@@ -68,8 +61,9 @@ class PerfReceiveConnection(PerfConnection):
properties)
self._msg_count = msg_count
self._credit_window = credit_window
self.connection.pn_sasl.mechanisms("ANONYMOUS")
self.connection.pn_sasl.server()
self.latency = 0
self.latency_min = 100000000
self.latency_max = 0
self.connection.open()
self.receivers = set()
......@@ -91,35 +85,33 @@ class PerfReceiveConnection(PerfConnection):
class PerfSender(pyngus.SenderEventHandler):
def __init__(self, address, perf_send_conn,
msg_count, batch_size):
def __init__(self, address, perf_send_conn, msg_count):
self.msg = Message()
self.msg.body = "HELLO"
self.sent = 0
self.acked = 0
self.msg_count = msg_count
self.batch_size = batch_size
self.address = address
self.perf_conn = perf_send_conn
self.perf_conn.senders.add(self)
connection = perf_send_conn.connection
self.link = connection.create_sender(address,
event_handler=self)
self.link = connection.create_sender(address, event_handler=self)
self.link.context = self
self.link.open()
def _send_msgs(self):
"""Send up to a batch of messages."""
batch = min(self.msg_count - self.sent, self.batch_size)
target = self.sent + batch
while self.sent < target:
self.link.send(self.msg, self.send_complete)
self.sent += 1
def sender_active(self, sender_link):
self._send_msgs()
def send_complete(self, link, handle, result, info):
def credit_granted(self, sender_link):
self._send_msgs()
def _send_msgs(self):
# Send as many messages as credit allows
while self.link.credit > 0 and self.sent < self.msg_count:
self.msg.body = {"timestamp": time.time()}
self.link.send(self.msg, self._send_complete)
self.sent += 1
def _send_complete(self, link, handle, result, info):
self.acked += 1
if self.acked == self.msg_count:
# test done, shutdown
......@@ -127,9 +119,6 @@ class PerfSender(pyngus.SenderEventHandler):
self.perf_conn.senders.discard(self)
if len(self.perf_conn.senders) == 0:
self.link.connection.close()
elif self.acked == self.sent:
# send next batch
self._send_msgs()
class PerfReceiver(pyngus.ReceiverEventHandler):
......@@ -138,6 +127,7 @@ class PerfReceiver(pyngus.ReceiverEventHandler):
self.msg_count = msg_count
self.received = 0
self.credit_window = credit_window if credit_window else msg_count
self.credit_low = (credit_window + 1) / 2
self.address = address
self.perf_conn = perf_receive_conn
self.perf_conn.receivers.add(self)
......@@ -150,20 +140,25 @@ class PerfReceiver(pyngus.ReceiverEventHandler):
self.link.open()
def message_received(self, receiver_link, message, handle):
"""Acknowledge receipt, grant more credit if needed."""
# Acknowledge receipt, grant more credit if needed
self.link.message_accepted(handle)
self.received += 1
if self.received < self.msg_count:
if self.link.capacity == 0:
self.link.add_capacity(self.credit_window)
else:
timestamp = message.body["timestamp"]
latency = time.time() - timestamp
self.perf_conn.latency += latency
self.perf_conn.latency_min = min(latency, self.perf_conn.latency_min)
self.perf_conn.latency_max = max(latency, self.perf_conn.latency_max)
if (self.link.capacity < self.credit_low
and self.received < self.msg_count):
self.link.add_capacity(self.credit_window - self.link.capacity)
elif self.received == self.msg_count:
# link done
self.link.close()
self.perf_conn.receivers.discard(self)
def process_connections(c1, c2):
"""Transfer I/O, then process each connection."""
# Transfer I/O, then process each connection
def _do_io(src, dst):
count = min(src.has_output, dst.needs_input)
if count > 0:
......@@ -183,15 +178,15 @@ def main(argv=None):
parser = optparse.OptionParser(usage=_usage)
parser.add_option("--count", dest="count", type="int",
default=10000,
help="# of messages to transfer.")
help="# of messages to transfer per link.")
parser.add_option("--links", dest="link_count", type="int",
default=100,
default=20,
help="# of link pairs.")
parser.add_option("--send-batch", dest="send_batch", type="int",
default=10,
help="# of msgs sender queues at once.")
parser.add_option("--credit-batch", dest="credit_batch", type="int",
default=5,
help="DEPRECATED")
parser.add_option("--credit", dest="credit_window", type="int",
default=10,
help="Credit window issued by receiver.")
parser.add_option("--ca",
help="Certificate Authority PEM file")
......@@ -208,36 +203,46 @@ def main(argv=None):
# sender acts like SSL client
conn_properties = {'hostname': "test.server.com",
'x-trace-protocol': False}
'x-trace-protocol': False,
'x-sasl-mechs': "ANONYMOUS"}
if opts.ca:
conn_properties["x-ssl-ca-file"] = opts.ca
sender_conn = PerfSendConnection("send-conn",
container,
conn_properties,
opts.count, opts.send_batch,
opts.count,
opts.link_count)
# receiver acts as SSL server
conn_properties = {'hostname': "my.client.com"}
conn_properties = {'hostname': "my.client.com",
'x-server': True,
'x-sasl-mechs': "ANONYMOUS"}
if opts.cert:
conn_properties["x-ssl-server"] = True
identity = (opts.cert, opts.key, opts.keypass)
conn_properties["x-ssl-identity"] = identity
receiver_conn = PerfReceiveConnection("recv-conn",
container,
conn_properties,
opts.count,
opts.credit_batch)
opts.credit_window)
# process connections until finished:
start = time.time()
while ((not sender_conn.connection.closed) or
(not receiver_conn.connection.closed)):
process_connections(sender_conn.connection,
receiver_conn.connection)
process_connections(sender_conn.connection, receiver_conn.connection)
sender_conn.connection.destroy()
receiver_conn.connection.destroy()
container.destroy()
delta = time.time() - start
total = opts.count * opts.link_count
print("Total: %s messages; credit window: %s; proton %s"
% (total, opts.credit_window, PN_VERSION))
print("%d Messages/second; Latency avg: %.3fms min: %.3fms max: %.3fms"
% (total / delta, (receiver_conn.latency / total) * 1000.0,
receiver_conn.latency_min * 1000.0,
receiver_conn.latency_max * 1000.0))
return 0
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment