Skip to content
Commits on Source (3)
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......
......@@ -45,7 +45,7 @@ public class AmqpWireFormat implements WireFormat {
public static final int DEFAULT_IDLE_TIMEOUT = 30000;
public static final int DEFAULT_PRODUCER_CREDIT = 1000;
public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = false;
public static final int DEFAULT_ANQP_FRAME_SIZE = NO_AMQP_MAX_FRAME_SIZE;
public static final int DEFAULT_ANQP_FRAME_SIZE = 128 * 1024;
private static final int SASL_PROTOCOL = 3;
......
......@@ -18,11 +18,9 @@ package org.apache.activemq.transport.amqp.message;
import java.nio.ByteBuffer;
import org.apache.qpid.proton.codec.ReadableBuffer;
import org.apache.qpid.proton.codec.WritableBuffer;
/**
*
*/
public class AmqpWritableBuffer implements WritableBuffer {
public final static int DEFAULT_CAPACITY = 4 * 1024;
......@@ -148,6 +146,12 @@ public class AmqpWritableBuffer implements WritableBuffer {
return Integer.MAX_VALUE;
}
@Override
public void put(ReadableBuffer src) {
ensureCapacity(position);
src.get(this);
}
/**
* Ensures the the buffer has at least the minimumCapacity specified.
*
......
......@@ -171,6 +171,11 @@ public class AmqpConnection implements AmqpProtocolConverter {
int maxFrameSize = amqpWireFormat.getMaxAmqpFrameSize();
if (maxFrameSize > AmqpWireFormat.NO_AMQP_MAX_FRAME_SIZE) {
this.protonTransport.setMaxFrameSize(maxFrameSize);
try {
this.protonTransport.setOutboundFrameSizeLimit(maxFrameSize);
} catch (Throwable e) {
// Ignore if older proton-j was injected.
}
}
this.protonTransport.bind(this.protonConnection);
......@@ -328,6 +333,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
}
}
@SuppressWarnings("deprecation")
@Override
public void onAMQPData(Object command) throws Exception {
Buffer frame;
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-blueprint</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-broker</artifactId>
......
......@@ -71,10 +71,11 @@ public class TopicSubscription extends AbstractSubscription {
protected ActiveMQMessageAudit audit;
protected boolean active = false;
protected boolean discarding = false;
private boolean useTopicSubscriptionInflightStats = true;
//Used for inflight message size calculations
protected final Object dispatchLock = new Object();
protected final List<MessageReference> dispatched = new ArrayList<MessageReference>();
protected final List<DispatchedNode> dispatched = new ArrayList<>();
public TopicSubscription(Broker broker,ConnectionContext context, ConsumerInfo info, SystemUsage usageManager) throws Exception {
super(broker, context, info);
......@@ -257,8 +258,10 @@ public class TopicSubscription extends AbstractSubscription {
synchronized(dispatchLock) {
matched.remove();
getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node);
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
node.decrementReferenceCount();
}
break;
......@@ -381,10 +384,12 @@ public class TopicSubscription extends AbstractSubscription {
* @param ack
*/
private void updateStatsOnAck(final MessageAck ack) {
//Allow disabling inflight stats to save memory usage
if (isUseTopicSubscriptionInflightStats()) {
synchronized(dispatchLock) {
boolean inAckRange = false;
List<MessageReference> removeList = new ArrayList<MessageReference>();
for (final MessageReference node : dispatched) {
List<DispatchedNode> removeList = new ArrayList<>();
for (final DispatchedNode node : dispatched) {
MessageId messageId = node.getMessageId();
if (ack.getFirstMessageId() == null
|| ack.getFirstMessageId().equals(messageId)) {
......@@ -398,22 +403,36 @@ public class TopicSubscription extends AbstractSubscription {
}
}
for (final MessageReference node : removeList) {
for (final DispatchedNode node : removeList) {
dispatched.remove(node);
getSubscriptionStatistics().getInflightMessageSize().addSize(-node.getSize());
getSubscriptionStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getDequeues().increment();
((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
if (info.isNetworkSubscription()) {
((Destination)node.getRegionDestination()).getDestinationStatistics().getForwards().add(ack.getMessageCount());
final Destination destination = node.getDestination();
incrementStatsOnAck(destination, ack, 1);
if (!ack.isInTransaction()) {
contractPrefetchExtension(1);
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(ack.getMessageCount());
}
}
} else {
if (singleDestination && destination != null) {
incrementStatsOnAck(destination, ack, ack.getMessageCount());
}
if (!ack.isInTransaction()) {
contractPrefetchExtension(1);
contractPrefetchExtension(ack.getMessageCount());
}
}
}
private void incrementStatsOnAck(final Destination destination, final MessageAck ack, final int count) {
getSubscriptionStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getDequeues().add(count);
destination.getDestinationStatistics().getInflight().subtract(count);
if (info.isNetworkSubscription()) {
destination.getDestinationStatistics().getForwards().add(count);
}
if (ack.isExpiredAck()) {
destination.getDestinationStatistics().getExpired().add(count);
}
}
......@@ -648,9 +667,11 @@ public class TopicSubscription extends AbstractSubscription {
md.setDestination(((Destination)node.getRegionDestination()).getActiveMQDestination());
synchronized(dispatchLock) {
getSubscriptionStatistics().getDispatched().increment();
dispatched.add(node);
if (isUseTopicSubscriptionInflightStats()) {
dispatched.add(new DispatchedNode(node));
getSubscriptionStatistics().getInflightMessageSize().addSize(node.getSize());
}
}
// Keep track if this subscription is receiving messages from a single destination.
if (singleDestination) {
......@@ -759,4 +780,38 @@ public class TopicSubscription extends AbstractSubscription {
}
}
public boolean isUseTopicSubscriptionInflightStats() {
return useTopicSubscriptionInflightStats;
}
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}
private static class DispatchedNode {
private final int size;
private final MessageId messageId;
private final Destination destination;
public DispatchedNode(final MessageReference node) {
super();
this.size = node.getSize();
this.messageId = node.getMessageId();
this.destination = node.getRegionDestination() instanceof Destination ?
((Destination)node.getRegionDestination()) : null;
}
public long getSize() {
return size;
}
public MessageId getMessageId() {
return messageId;
}
public Destination getDestination() {
return destination;
}
}
}
......@@ -188,7 +188,7 @@ public class StoreDurableSubscriberCursor extends AbstractPendingMessageCursor {
Message msg = node.getMessage();
if (isStarted()) {
if (!msg.isPersistent()) {
nonPersistent.addMessageLast(node);
nonPersistent.tryAddMessageLast(node, wait);
}
}
if (msg.isPersistent()) {
......
......@@ -103,6 +103,7 @@ public class PolicyEntry extends DestinationMapEntry {
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
private boolean doOptimzeMessageStorage = true;
private int maxDestinations = -1;
private boolean useTopicSubscriptionInflightStats = true;
/*
* percentage of in-flight messages above which optimize message store is disabled
......@@ -315,6 +316,7 @@ public class PolicyEntry extends DestinationMapEntry {
configurePrefetch(subscription);
subscription.setUsePrefetchExtension(isUsePrefetchExtension());
subscription.setCursorMemoryHighWaterMark(getCursorMemoryHighWaterMark());
subscription.setUseTopicSubscriptionInflightStats(isUseTopicSubscriptionInflightStats());
if (pendingMessageLimitStrategy != null) {
int value = pendingMessageLimitStrategy.getMaximumPendingMessageLimit(subscription);
int consumerLimit = subscription.getInfo().getMaximumPendingMessageLimit();
......@@ -1100,4 +1102,12 @@ public class PolicyEntry extends DestinationMapEntry {
public String toString() {
return "PolicyEntry [" + destination + "]";
}
public boolean isUseTopicSubscriptionInflightStats() {
return useTopicSubscriptionInflightStats;
}
public void setUseTopicSubscriptionInflightStats(boolean useTopicSubscriptionInflightStats) {
this.useTopicSubscriptionInflightStats = useTopicSubscriptionInflightStats;
}
}
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-camel</artifactId>
......
......@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-cf</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-client</artifactId>
......
......@@ -950,7 +950,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
@Override
public void afterRollback() throws Exception {
if (LOG.isTraceEnabled()) {
LOG.trace("rollback {}", ack, new Throwable("here"));
}
// ensure we don't filter this as a duplicate
connection.rollbackDuplicate(ActiveMQSession.this, md.getMessage());
......
......@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.activemq.command.ActiveMQDestination;
......@@ -46,6 +48,7 @@ public class DestinationMap {
private DestinationMapNode topicRootNode = new DestinationMapNode(null);
private DestinationMapNode tempTopicRootNode = new DestinationMapNode(null);
/**
* Looks up the value(s) matching the given Destination key. For simple
* destinations this is typically a List of one single value, for wildcards
......@@ -202,19 +205,38 @@ public class DestinationMap {
* @return the largest matching value or null if no value matches
*/
@SuppressWarnings({"rawtypes", "unchecked"})
public Object chooseValue(final ActiveMQDestination destination) {
Set set = get(destination);
public DestinationMapEntry chooseValue(final ActiveMQDestination destination) {
Set<DestinationMapEntry> set = get(destination);
if (set == null || set.isEmpty()) {
return null;
}
SortedSet sortedSet = new TreeSet(new Comparator<DestinationMapEntry>() {
//Comparator to sort in order - we want to pick the exact match by destination or the
//closest parent that applies
final Comparator<DestinationMapEntry> comparator = new Comparator<DestinationMapEntry>() {
@Override
public int compare(DestinationMapEntry entry1, DestinationMapEntry entry2) {
return destination.equals(entry1.destination) ? -1 : (destination.equals(entry2.destination) ? 1 : entry1.compareTo(entry2));
}
});
sortedSet.addAll(set);
return sortedSet.first();
};
//Sort and filter out any children and non matching entries
final SortedSet<DestinationMapEntry> sortedSet = set.stream()
.filter(entry -> isMatchOrParent(destination, (DestinationMapEntry)entry))
.collect(Collectors.toCollection(() -> new TreeSet<DestinationMapEntry>(comparator)));
return sortedSet.size() > 0 ? sortedSet.first() : null;
}
@SuppressWarnings("rawtypes")
//Used to filter out any child/unmatching entries
private boolean isMatchOrParent(final ActiveMQDestination destination, final DestinationMapEntry entry) {
//If destination not set then do not filter out
if (entry.getDestination() == null) {
return true;
}
final DestinationFilter filter = DestinationFilter.parseFilter(entry.getDestination());
return destination.equals(entry.getDestination()) || filter.matches(destination);
}
/**
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-console</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-http</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-itests-spring31</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-jaas</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.3</version>
<version>5.15.4</version>
</parent>
<artifactId>activemq-jdbc-store</artifactId>
......