Commit 336714a4 authored by Markus Koschany's avatar Markus Koschany

Imported Upstream version 5.13.3+dfsg

parent 0ed42d96
...@@ -14,7 +14,7 @@ ...@@ -14,7 +14,7 @@
<parent> <parent>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId> <artifactId>activemq-parent</artifactId>
<version>5.13.2</version> <version>5.13.3</version>
</parent> </parent>
<artifactId>activemq-all</artifactId> <artifactId>activemq-all</artifactId>
...@@ -127,6 +127,7 @@ ...@@ -127,6 +127,7 @@
<include>org.springframework:spring-context</include> <include>org.springframework:spring-context</include>
<include>org.springframework:spring-expression</include> <include>org.springframework:spring-expression</include>
<include>org.springframework:spring-jms</include> <include>org.springframework:spring-jms</include>
<include>org.springframework:spring-messaging</include>
<include>org.springframework:spring-tx</include> <include>org.springframework:spring-tx</include>
<include>org.apache.xbean:xbean-spring</include> <include>org.apache.xbean:xbean-spring</include>
<include>org.apache.camel:camel-jms</include> <include>org.apache.camel:camel-jms</include>
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
<parent> <parent>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId> <artifactId>activemq-parent</artifactId>
<version>5.13.2</version> <version>5.13.3</version>
</parent> </parent>
<artifactId>activemq-amqp</artifactId> <artifactId>activemq-amqp</artifactId>
......
/** /*
* Licensed to the Apache Software Foundation (ASF) under one or more * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with * contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. * this work for additional information regarding copyright ownership.
...@@ -118,14 +118,17 @@ public abstract class InboundTransformer { ...@@ -118,14 +118,17 @@ public abstract class InboundTransformer {
} else { } else {
jms.setJMSDeliveryMode(defaultDeliveryMode); jms.setJMSDeliveryMode(defaultDeliveryMode);
} }
if (header.getPriority() != null) { if (header.getPriority() != null) {
jms.setJMSPriority(header.getPriority().intValue()); jms.setJMSPriority(header.getPriority().intValue());
} else { } else {
jms.setJMSPriority(defaultPriority); jms.setJMSPriority(defaultPriority);
} }
if (header.getFirstAcquirer() != null) { if (header.getFirstAcquirer() != null) {
jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer()); jms.setBooleanProperty(prefixVendor + "FirstAcquirer", header.getFirstAcquirer());
} }
if (header.getDeliveryCount() != null) { if (header.getDeliveryCount() != null) {
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue()); vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
} }
...@@ -188,7 +191,7 @@ public abstract class InboundTransformer { ...@@ -188,7 +191,7 @@ public abstract class InboundTransformer {
final Properties properties = amqp.getProperties(); final Properties properties = amqp.getProperties();
if (properties != null) { if (properties != null) {
if (properties.getMessageId() != null) { if (properties.getMessageId() != null) {
jms.setJMSMessageID(properties.getMessageId().toString()); jms.setJMSMessageID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getMessageId()));
} }
Binary userId = properties.getUserId(); Binary userId = properties.getUserId();
if (userId != null) { if (userId != null) {
...@@ -236,6 +239,7 @@ public abstract class InboundTransformer { ...@@ -236,6 +239,7 @@ public abstract class InboundTransformer {
if (header.getTtl() != null) { if (header.getTtl() != null) {
ttl = header.getTtl().longValue(); ttl = header.getTtl().longValue();
} }
if (ttl == 0) { if (ttl == 0) {
jms.setJMSExpiration(0); jms.setJMSExpiration(0);
} else { } else {
......
...@@ -41,6 +41,7 @@ import javax.jms.Topic; ...@@ -41,6 +41,7 @@ import javax.jms.Topic;
import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.MessageId; import org.apache.activemq.command.MessageId;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedByte;
...@@ -180,7 +181,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { ...@@ -180,7 +181,11 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
MessageId msgId = amqMsg.getMessageId(); MessageId msgId = amqMsg.getMessageId();
if (msgId.getTextView() != null) { if (msgId.getTextView() != null) {
props.setMessageId(msgId.getTextView()); try {
props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId.getTextView()));
} catch (AmqpProtocolException e) {
props.setMessageId(msgId.getTextView().toString());
}
} else { } else {
props.setMessageId(msgId.toString()); props.setMessageId(msgId.toString());
} }
......
...@@ -198,6 +198,32 @@ public class AmqpMessage { ...@@ -198,6 +198,32 @@ public class AmqpMessage {
return message.getProperties().getMessageId().toString(); return message.getProperties().getMessageId().toString();
} }
/**
* Return the set MessageId value in the original form, if there are no properties
* in the given message return null.
*
* @return the set message ID in its original form or null if not set.
*/
public Object getRawMessageId() {
if (message.getProperties() == null) {
return null;
}
return message.getProperties().getMessageId();
}
/**
* Sets the MessageId property on an outbound message using the provided value
*
* @param messageId
* the message ID value to set.
*/
public void setRawMessageId(Object messageId) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setMessageId(messageId);
}
/** /**
* Sets the GroupId property on an outbound message using the provided String * Sets the GroupId property on an outbound message using the provided String
* *
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.junit.Test;
/**
* Tests that the AMQP MessageID value and type are preserved.
*/
public class AmqpMessageIdPreservationTest extends AmqpClientTestSupport {
@Override
protected boolean isPersistent() {
return true;
}
@Test(timeout = 60000)
public void testStringMessageIdIsPreserved() throws Exception {
doTestMessageIdPreservation("msg-id-string:1");
}
@Test(timeout = 60000)
public void testStringMessageIdIsPreservedAfterRestart() throws Exception {
doTestMessageIdPreservationOnBrokerRestart("msg-id-string:1");
}
@Test(timeout = 60000)
public void testUUIDMessageIdIsPreserved() throws Exception {
doTestMessageIdPreservation(UUID.randomUUID());
}
@Test(timeout = 60000)
public void testUUIDMessageIdIsPreservedAfterRestart() throws Exception {
doTestMessageIdPreservationOnBrokerRestart(UUID.randomUUID());
}
@Test(timeout = 60000)
public void testUnsignedLongMessageIdIsPreserved() throws Exception {
doTestMessageIdPreservation(new UnsignedLong(255l));
}
@Test(timeout = 60000)
public void testUnsignedLongMessageIdIsPreservedAfterRestart() throws Exception {
doTestMessageIdPreservationOnBrokerRestart(new UnsignedLong(255l));
}
@Test(timeout = 60000)
public void testBinaryLongMessageIdIsPreserved() throws Exception {
byte[] payload = new byte[32];
for (int i = 0; i < 32; ++i) {
payload[i] = (byte) ('a' + i);
}
doTestMessageIdPreservation(new Binary(payload));
}
@Test(timeout = 60000)
public void testBinaryLongMessageIdIsPreservedAfterRestart() throws Exception {
byte[] payload = new byte[32];
for (int i = 0; i < 32; ++i) {
payload[i] = (byte) ('a' + i);
}
doTestMessageIdPreservationOnBrokerRestart(new Binary(payload));
}
@Test(timeout = 60000)
public void testStringMessageIdPrefixIsPreserved() throws Exception {
doTestMessageIdPreservation("ID:msg-id-string:1");
}
public void doTestMessageIdPreservation(Object messageId) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpMessage message = new AmqpMessage();
message.setRawMessageId(messageId);
message.setText("Test-Message");
sender.send(message);
sender.close();
QueueViewMBean queue = getProxyToQueue(getTestName());
assertEquals(1, queue.getQueueSize());
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have got a message", received);
assertEquals(received.getRawMessageId().getClass(), messageId.getClass());
assertEquals(messageId, received.getRawMessageId());
receiver.close();
connection.close();
}
public void doTestMessageIdPreservationOnBrokerRestart(Object messageId) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpMessage message = new AmqpMessage();
message.setRawMessageId(messageId);
message.setText("Test-Message");
message.setDurable(true);
sender.send(message);
sender.close();
connection.close();
restartBroker();
QueueViewMBean queue = getProxyToQueue(getTestName());
assertEquals(1, queue.getQueueSize());
connection = client.connect();
session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Should have got a message", received);
assertEquals(received.getRawMessageId().getClass(), messageId.getClass());
assertEquals(messageId, received.getRawMessageId());
receiver.close();
connection.close();
}
}
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
<parent> <parent>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId> <artifactId>activemq-parent</artifactId>
<version>5.13.2</version> <version>5.13.3</version>
</parent> </parent>
<artifactId>activemq-blueprint</artifactId> <artifactId>activemq-blueprint</artifactId>
......
...@@ -22,7 +22,7 @@ ...@@ -22,7 +22,7 @@
<parent> <parent>
<groupId>org.apache.activemq</groupId> <groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId> <artifactId>activemq-parent</artifactId>
<version>5.13.2</version> <version>5.13.3</version>
</parent> </parent>
<artifactId>activemq-broker</artifactId> <artifactId>activemq-broker</artifactId>
......
...@@ -289,11 +289,10 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -289,11 +289,10 @@ public class AdvisoryBroker extends BrokerFilter {
//in case of multiple matches //in case of multiple matches
VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination); VirtualConsumerPair key = new VirtualConsumerPair(virtualDestination, destination);
ConsumerInfo i = brokerConsumerDests.get(key); ConsumerInfo i = brokerConsumerDests.get(key);
if (consumerInfo.equals(i)) { if (consumerInfo.equals(i) && brokerConsumerDests.remove(key) != null) {
if (brokerConsumerDests.remove(key) != null) { LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", key, i);
fireVirtualDestinationRemoveAdvisory(context, consumerInfo); fireVirtualDestinationRemoveAdvisory(context, consumerInfo);
break; break;
}
} }
} }
} }
...@@ -549,6 +548,7 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -549,6 +548,7 @@ public class AdvisoryBroker extends BrokerFilter {
super.virtualDestinationAdded(context, virtualDestination); super.virtualDestinationAdded(context, virtualDestination);
if (virtualDestinations.add(virtualDestination)) { if (virtualDestinations.add(virtualDestination)) {
LOG.debug("Virtual destination added: {}", virtualDestination);
try { try {
// Don't advise advisory topics. // Don't advise advisory topics.
if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) { if (!AdvisorySupport.isAdvisoryTopic(virtualDestination.getVirtualDestination())) {
...@@ -592,20 +592,25 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -592,20 +592,25 @@ public class AdvisoryBroker extends BrokerFilter {
//if no consumer info, we need to create one - this is the case when an advisory is fired //if no consumer info, we need to create one - this is the case when an advisory is fired
//because of the existence of a destination matching a virtual destination //because of the existence of a destination matching a virtual destination
if (info == null) { if (info == null) {
ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
info = new ConsumerInfo(consumerId);
//store the virtual destination and the activeMQDestination as a pair so that we can keep track //store the virtual destination and the activeMQDestination as a pair so that we can keep track
//of all matching forwarded destinations that caused demand //of all matching forwarded destinations that caused demand
if(brokerConsumerDests.putIfAbsent(new VirtualConsumerPair(virtualDestination, activeMQDest), info) == null) { VirtualConsumerPair pair = new VirtualConsumerPair(virtualDestination, activeMQDest);
info.setDestination(virtualDestination.getVirtualDestination()); if (brokerConsumerDests.get(pair) == null) {
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); ConnectionId connectionId = new ConnectionId(connectionIdGenerator.generateId());
SessionId sessionId = new SessionId(connectionId, sessionIdGenerator.getNextSequenceId());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { ConsumerId consumerId = new ConsumerId(sessionId, consumerIdGenerator.getNextSequenceId());
fireConsumerAdvisory(context, info.getDestination(), topic, info); info = new ConsumerInfo(consumerId);
if(brokerConsumerDests.putIfAbsent(pair, info) == null) {
LOG.debug("Virtual consumer pair added: {} for consumer: {} ", pair, info);
info.setDestination(virtualDestination.getVirtualDestination());
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
fireConsumerAdvisory(context, info.getDestination(), topic, info);
}
} }
} }
//this is the case of a real consumer coming online //this is the case of a real consumer coming online
...@@ -615,6 +620,7 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -615,6 +620,7 @@ public class AdvisoryBroker extends BrokerFilter {
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination()); ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(info.getDestination());
if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) { if (virtualDestinationConsumers.putIfAbsent(info, virtualDestination) == null) {
LOG.debug("Virtual consumer added: {}, for virtual destination: {}", info, virtualDestination);
fireConsumerAdvisory(context, info.getDestination(), topic, info); fireConsumerAdvisory(context, info.getDestination(), topic, info);
} }
} }
...@@ -626,6 +632,7 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -626,6 +632,7 @@ public class AdvisoryBroker extends BrokerFilter {
super.virtualDestinationRemoved(context, virtualDestination); super.virtualDestinationRemoved(context, virtualDestination);
if (virtualDestinations.remove(virtualDestination)) { if (virtualDestinations.remove(virtualDestination)) {
LOG.debug("Virtual destination removed: {}", virtualDestination);
try { try {
consumersLock.readLock().lock(); consumersLock.readLock().lock();
try { try {
...@@ -636,16 +643,17 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -636,16 +643,17 @@ public class AdvisoryBroker extends BrokerFilter {
//find all consumers for this virtual destination //find all consumers for this virtual destination
if (virtualDestinationConsumers.get(info).equals(virtualDestination)) { if (virtualDestinationConsumers.get(info).equals(virtualDestination)) {
fireVirtualDestinationRemoveAdvisory(context, info); fireVirtualDestinationRemoveAdvisory(context, info);
}
//check consumers created for the existence of a destination to see if they //check consumers created for the existence of a destination to see if they
//match the consumerinfo and clean up //match the consumerinfo and clean up
for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) { for (VirtualConsumerPair activeMQDest : brokerConsumerDests.keySet()) {
ConsumerInfo i = brokerConsumerDests.get(activeMQDest); ConsumerInfo i = brokerConsumerDests.get(activeMQDest);
if (info.equals(i)) { if (info.equals(i) && brokerConsumerDests.remove(activeMQDest) != null) {
brokerConsumerDests.remove(activeMQDest); LOG.debug("Virtual consumer pair removed: {} for consumer: {} ", activeMQDest, i);
}
} }
} }
} }
} }
} }
...@@ -663,6 +671,7 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -663,6 +671,7 @@ public class AdvisoryBroker extends BrokerFilter {
VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info); VirtualDestination virtualDestination = virtualDestinationConsumers.remove(info);
if (virtualDestination != null) { if (virtualDestination != null) {
LOG.debug("Virtual consumer removed: {}, for virtual destination: {}", info, virtualDestination);
ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination()); ActiveMQTopic topic = AdvisorySupport.getVirtualDestinationConsumerAdvisoryTopic(virtualDestination.getVirtualDestination());
ActiveMQDestination dest = info.getDestination(); ActiveMQDestination dest = info.getDestination();
...@@ -897,6 +906,7 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -897,6 +906,7 @@ public class AdvisoryBroker extends BrokerFilter {
this.virtualDestination = virtualDestination; this.virtualDestination = virtualDestination;
this.activeMQDestination = activeMQDestination; this.activeMQDestination = activeMQDestination;
} }
@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31; final int prime = 31;
...@@ -912,6 +922,7 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -912,6 +922,7 @@ public class AdvisoryBroker extends BrokerFilter {
.hashCode()); .hashCode());
return result; return result;
} }
@Override @Override
public boolean equals(Object obj) { public boolean equals(Object obj) {
if (this == obj) if (this == obj)
...@@ -935,6 +946,13 @@ public class AdvisoryBroker extends BrokerFilter { ...@@ -935,6 +946,13 @@ public class AdvisoryBroker extends BrokerFilter {
return false; return false;
return true; return true;
} }
@Override
public String toString() {
return "VirtualConsumerPair [virtualDestination=" + virtualDestination + ", activeMQDestination="
+ activeMQDestination + "]";
}
private AdvisoryBroker getOuterType() { private AdvisoryBroker getOuterType() {
return AdvisoryBroker.this; return AdvisoryBroker.this;
} }
......
...@@ -35,8 +35,8 @@ import org.apache.activemq.usage.SystemUsage; ...@@ -35,8 +35,8 @@ import org.apache.activemq.usage.SystemUsage;
public class HealthView implements HealthViewMBean { public class HealthView implements HealthViewMBean {
ManagedRegionBroker broker; private ManagedRegionBroker broker;
String currentState = "Good"; private volatile String currentState = "Good";
public HealthView(ManagedRegionBroker broker) { public HealthView(ManagedRegionBroker broker) {
this.broker = broker; this.broker = broker;
...@@ -87,6 +87,7 @@ public class HealthView implements HealthViewMBean { ...@@ -87,6 +87,7 @@ public class HealthView implements HealthViewMBean {
while (dir != null && !dir.isDirectory()) { while (dir != null && !dir.isDirectory()) {
dir = dir.getParentFile(); dir = dir.getParentFile();
} }
long storeSize = adapter.size(); long storeSize = adapter.size();
long storeLimit = usage.getStoreUsage().getLimit(); long storeLimit = usage.getStoreUsage().getLimit();
long dirFreeSpace = dir.getUsableSpace(); long dirFreeSpace = dir.getUsableSpace();
...@@ -166,18 +167,30 @@ public class HealthView implements HealthViewMBean { ...@@ -166,18 +167,30 @@ public class HealthView implements HealthViewMBean {
} }
} }
StringBuilder currentState = new StringBuilder();
if (answer != null && !answer.isEmpty()) { if (answer != null && !answer.isEmpty()) {
this.currentState = "Getting Worried {"; currentState.append("Getting Worried {");
for (HealthStatus hs : answer) { for (HealthStatus hs : answer) {
currentState += hs + " , "; currentState.append(hs).append(" , ");
} }
currentState += " }"; currentState.append(" }");
} else { } else {
this.currentState = "Good"; currentState.append("Good");
} }
this.currentState = currentState.toString();
return answer; return answer;
} }
@Override
public String healthStatus() throws Exception {
// Must invoke healthList in order to update state.
healthList();