Commit 1f906845 authored by Markus Koschany's avatar Markus Koschany

Imported Upstream version 5.13.4+dfsg

parent 336714a4
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.13.3</version>
<version>5.13.4</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.13.3</version>
<version>5.13.4</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......@@ -103,6 +103,11 @@
<artifactId>activemq-leveldb-store</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq.tooling</groupId>
<artifactId>activemq-junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
......
......@@ -40,6 +40,7 @@ public class AmqpWireFormat implements WireFormat {
public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
public static final int DEFAULT_IDLE_TIMEOUT = 30000;
public static final int DEFAULT_PRODUCER_CREDIT = 1000;
public static final boolean DEFAULT_ALLOW_NON_SASL_CONNECTIONS = true;
private static final int SASL_PROTOCOL = 3;
......@@ -50,6 +51,7 @@ public class AmqpWireFormat implements WireFormat {
private int idelTimeout = DEFAULT_IDLE_TIMEOUT;
private int producerCredit = DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_JMS;
private boolean allowNonSaslConnections = DEFAULT_ALLOW_NON_SASL_CONNECTIONS;
private boolean magicRead = false;
private ResetListener resetListener;
......@@ -58,8 +60,6 @@ public class AmqpWireFormat implements WireFormat {
void onProtocolReset();
}
private boolean allowNonSaslConnections = true;
@Override
public ByteSequence marshal(Object command) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
......@@ -121,17 +121,30 @@ public class AmqpWireFormat implements WireFormat {
* Given an AMQP header validate that the AMQP magic is present and
* if so that the version and protocol values align with what we support.
*
* In the case where authentication occurs the client sends us two AMQP
* headers, the first being the SASL initial header which triggers the
* authentication process and then if that succeeds we should get a second
* AMQP header that does not contain the SASL protocol ID indicating the
* connection process should follow the normal path. We validate that the
* header align with these expectations.
*
* @param header
* the header instance received from the client.
* @param authenticated
* has the client already authenticated already.
*
* @return true if the header is valid against the current WireFormat.
*/
public boolean isHeaderValid(AmqpHeader header) {
public boolean isHeaderValid(AmqpHeader header, boolean authenticated) {
if (!header.hasValidPrefix()) {
return false;
}
if (!isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) {
if (!(header.getProtocolId() == 0 || header.getProtocolId() == SASL_PROTOCOL)) {
return false;
}
if (!authenticated && !isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) {
return false;
}
......
......@@ -30,6 +30,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
private int idelTimeout = AmqpWireFormat.DEFAULT_IDLE_TIMEOUT;
private int producerCredit = AmqpWireFormat.DEFAULT_PRODUCER_CREDIT;
private String transformer = InboundTransformer.TRANSFORMER_NATIVE;
private boolean allowNonSaslConnections = AmqpWireFormat.DEFAULT_ALLOW_NON_SASL_CONNECTIONS;
@Override
public WireFormat createWireFormat() {
......@@ -40,6 +41,7 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
wireFormat.setIdleTimeout(getIdelTimeout());
wireFormat.setProducerCredit(getProducerCredit());
wireFormat.setTransformer(getTransformer());
wireFormat.setAllowNonSaslConnections(isAllowNonSaslConnections());
return wireFormat;
}
......@@ -83,4 +85,12 @@ public class AmqpWireFormatFactory implements WireFormatFactory {
public void setTransformer(String transformer) {
this.transformer = transformer;
}
public boolean isAllowNonSaslConnections() {
return allowNonSaslConnections;
}
public void setAllowNonSaslConnections(boolean allowNonSaslConnections) {
this.allowNonSaslConnections = allowNonSaslConnections;
}
}
......@@ -46,7 +46,6 @@ import org.apache.qpid.proton.amqp.UnsignedLong;
*
* <p>When provided a string for conversion which attempts to identify itself as an encoded binary, uuid, or
* ulong but can't be converted into the indicated format, an exception will be thrown.
*
*/
public class AMQPMessageIdHelper {
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......
......@@ -207,7 +207,7 @@ public abstract class InboundTransformer {
jms.setJMSReplyTo(vendor.createDestination(properties.getReplyTo()));
}
if (properties.getCorrelationId() != null) {
jms.setJMSCorrelationID(properties.getCorrelationId().toString());
jms.setJMSCorrelationID(AMQPMessageIdHelper.INSTANCE.toBaseMessageIdString(properties.getCorrelationId()));
}
if (properties.getContentType() != null) {
jms.setStringProperty(prefixVendor + "ContentType", properties.getContentType().toString());
......
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
......@@ -211,7 +211,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
maMap.put(LEGACY_JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationAttributes(msg.getJMSReplyTo()));
}
if (msg.getJMSCorrelationID() != null) {
props.setCorrelationId(msg.getJMSCorrelationID());
String correlationId = msg.getJMSCorrelationID();
try {
props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId));
} catch (AmqpProtocolException e) {
props.setCorrelationId(correlationId);
}
}
if (msg.getJMSExpiration() != 0) {
long ttl = msg.getJMSExpiration() - System.currentTimeMillis();
......
......@@ -142,7 +142,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
}
/**
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
* Shortcut method to hand off an ActiveMQ Command to the broker and assign
* a ResponseHandler to deal with any reply from the broker.
*
* @param command
......@@ -153,7 +153,7 @@ public abstract class AmqpAbstractLink<LINK_TYPE extends Link> implements AmqpLi
}
/**
* Shorcut method to hand off an ActiveMQ Command to the broker and assign
* Shortcut method to hand off an ActiveMQ Command to the broker and assign
* a ResponseHandler to deal with any reply from the broker.
*
* @param command
......
......@@ -173,6 +173,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
this.protonTransport.bind(this.protonConnection);
this.protonTransport.setChannelMax(CHANNEL_MAX);
this.protonTransport.setEmitFlowEventOnSend(false);
this.protonConnection.collect(eventCollector);
......@@ -327,7 +328,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
if (command.getClass() == AmqpHeader.class) {
AmqpHeader header = (AmqpHeader) command;
if (amqpWireFormat.isHeaderValid(header)) {
if (amqpWireFormat.isHeaderValid(header, authenticator != null)) {
LOG.trace("Connection from an AMQP v1.0 client initiated. {}", header);
} else {
LOG.warn("Connection attempt from non AMQP v1.0 client. {}", header);
......@@ -492,6 +493,7 @@ public class AmqpConnection implements AmqpProtocolConverter {
protonConnection.setOfferedCapabilities(getConnectionCapabilitiesOffered());
protonConnection.setProperties(getConnetionProperties());
protonConnection.setContainer(brokerService.getBrokerName());
protonConnection.open();
configureInactivityMonitor();
......
......@@ -81,7 +81,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
private final ConsumerInfo consumerInfo;
private final boolean presettle;
private int currentCredit;
private boolean draining;
private long lastDeliveredSequenceId;
......@@ -101,7 +100,6 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
public AmqpSender(AmqpSession session, Sender endpoint, ConsumerInfo consumerInfo) {
super(session, endpoint);
this.currentCredit = endpoint.getRemoteCredit();
this.consumerInfo = consumerInfo;
this.presettle = getEndpoint().getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
}
......@@ -120,7 +118,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
sendToActiveMQ(removeCommand);
session.unregisterSender(getConsumerId());
}
......@@ -133,7 +131,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
if (!isClosed() && isOpened()) {
RemoveInfo removeCommand = new RemoveInfo(getConsumerId());
removeCommand.setLastDeliveredSequenceId(lastDeliveredSequenceId);
sendToActiveMQ(removeCommand, null);
sendToActiveMQ(removeCommand);
if (consumerInfo.isDurable()) {
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
......@@ -141,7 +139,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
rsi.setSubscriptionName(getEndpoint().getName());
rsi.setClientId(session.getConnection().getClientId());
sendToActiveMQ(rsi, null);
sendToActiveMQ(rsi);
}
session.unregisterSender(getConsumerId());
......@@ -152,39 +150,56 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
@Override
public void flow() throws Exception {
int updatedCredit = getEndpoint().getCredit();
LOG.trace("Flow: drain={} credit={}, remoteCredit={}",
getEndpoint().getDrain(), getEndpoint().getCredit(), getEndpoint().getRemoteCredit());
if (LOG.isTraceEnabled()) {
LOG.trace("Flow: draining={}, drain={} credit={}, remoteCredit={}, queued={}",
draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
if (getEndpoint().getDrain() && (updatedCredit != currentCredit || !draining)) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
draining = true;
if (getEndpoint().getDrain() && !draining) {
// Revert to a pull consumer.
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(0);
sendToActiveMQ(control, null);
// Now request dispatch of the drain amount, we request immediate
// timeout and an completion message regardless so that we can know
// when we should marked the link as drained.
MessagePull pullRequest = new MessagePull();
pullRequest.setConsumerId(getConsumerId());
pullRequest.setDestination(getDestination());
pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true);
pullRequest.setQuantity(currentCredit);
sendToActiveMQ(pullRequest, null);
} else if (updatedCredit != currentCredit) {
currentCredit = updatedCredit >= 0 ? updatedCredit : 0;
LOG.trace("Flow: Pull case -> consumer control with prefetch (0) to control output");
sendToActiveMQ(control);
if (endpoint.getCredit() > 0) {
draining = true;
// Now request dispatch of the drain amount, we request immediate
// timeout and an completion message regardless so that we can know
// when we should marked the link as drained.
MessagePull pullRequest = new MessagePull();
pullRequest.setConsumerId(getConsumerId());
pullRequest.setDestination(getDestination());
pullRequest.setTimeout(-1);
pullRequest.setAlwaysSignalDone(true);
pullRequest.setQuantity(endpoint.getCredit());
LOG.trace("Pull case -> consumer pull request quantity = {}", endpoint.getCredit());
sendToActiveMQ(pullRequest);
} else {
LOG.trace("Pull case -> sending any Queued messages and marking drained");
pumpOutbound();
getEndpoint().drained();
session.pumpProtonToSocket();
}
} else {
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(currentCredit);
sendToActiveMQ(control, null);
control.setPrefetch(getEndpoint().getCredit());
LOG.trace("Flow: update -> consumer control with prefetch {}", control.getPrefetch());
sendToActiveMQ(control);
}
}
......@@ -401,8 +416,19 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
// It's the end of browse signal in response to a MessagePull
getEndpoint().drained();
draining = false;
currentCredit = 0;
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Sender:[{}] msgId={} draining={}, drain={}, credit={}, remoteCredit={}, queued={}",
getEndpoint().getName(), jms.getJMSMessageID(), draining, getEndpoint().getDrain(),
getEndpoint().getCredit(), getEndpoint().getRemoteCredit(), getEndpoint().getQueued());
}
if (draining && getEndpoint().getCredit() == 0) {
LOG.trace("Sender:[{}] browse complete.", getEndpoint().getName());
getEndpoint().drained();
draining = false;
}
jms.setRedeliveryCounter(md.getRedeliveryCounter());
jms.setReadOnlyBody(true);
final EncodedMessage amqp = outboundTransformer.transform(jms);
......@@ -431,6 +457,17 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
tagCache.returnTag(tag);
}
int newCredit = Math.max(0, getEndpoint().getCredit() - 1);
LOG.trace("Sender:[{}] updating conumser prefetch:{} after delivery settled.",
getEndpoint().getName(), newCredit);
ConsumerControl control = new ConsumerControl();
control.setConsumerId(getConsumerId());
control.setDestination(getDestination());
control.setPrefetch(newCredit);
sendToActiveMQ(control);
if (ackType == -1) {
// we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
delivery.settle();
......
......@@ -30,7 +30,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* SASL Authenitcation engine.
* SASL Authentication engine.
*/
public class AmqpAuthenticator {
......
......@@ -182,7 +182,7 @@ public class JMSClientContext {
factory.setUsername(username);
factory.setPassword(password);
factory.setAlwaysSyncSend(syncPublish);
factory.setForceSyncSend(syncPublish);
factory.setTopicPrefix("topic://");
factory.setQueuePrefix("queue://");
......
......@@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
......@@ -85,6 +86,45 @@ public class JMSClientTestSupport extends AmqpTestSupport {
return amqpURI;
}
protected URI getAmqpURI() {
return getAmqpURI("");
}
protected URI getAmqpURI(String uriOptions) {
boolean useSSL = getBrokerURI().getScheme().toLowerCase().contains("ssl");
String amqpURI = (useSSL ? "amqps://" : "amqp://") + getBrokerURI().getHost() + ":" + getBrokerURI().getPort();
if (uriOptions != null && !uriOptions.isEmpty()) {
if (uriOptions.startsWith("?") || uriOptions.startsWith("&")) {
uriOptions = uriOptions.substring(1);
}
} else {
uriOptions = "";
}
if (useSSL) {
amqpURI += "?transport.verifyHost=false";
}
if (!uriOptions.isEmpty()) {
if (useSSL) {
amqpURI += "&" + uriOptions;
} else {
amqpURI += "?" + uriOptions;
}
}
URI result = getBrokerURI();
try {
result = new URI(amqpURI);
} catch (URISyntaxException e) {
}
return result;
}
protected Connection createConnection() throws JMSException {
return createConnection(name.toString(), false);
}
......
......@@ -20,9 +20,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
......@@ -30,6 +35,7 @@ import javax.jms.TextMessage;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.jmx.SubscriptionViewMBean;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
......@@ -186,13 +192,107 @@ public class JMSClientTransactionTest extends JMSClientTestSupport {
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
assertNotNull(subscription);
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
assertTrue(subscription.getPrefetchSize() > 0);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Trying to receive message: {}", i);
TextMessage message = (TextMessage) consumer.receive(1000);
assertNotNull("Message " + i + "should be available", message);
assertNotNull("Message " + i + " should be available", message);
assertEquals("Should get message: " + i, i , message.getIntProperty("MessageSequence"));
}
session.commit();
}
@Test(timeout = 60000)
public void testQueueTXRollbackAndCommitAsyncConsumer() throws Exception {
final int MSG_COUNT = 3;
final AtomicInteger counter = new AtomicInteger();
connection = createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue destination = session.createQueue(getDestinationName());
MessageProducer producer = session.createProducer(destination);
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
LOG.info("Received Message {}", message.getJMSMessageID());
} catch (JMSException e) {
}
counter.incrementAndGet();
}
});
int msgIndex = 0;
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to rollback", msgIndex++);
TextMessage message = session.createTextMessage("Rolled back Message: " + msgIndex);
message.setIntProperty("MessageSequence", msgIndex);
producer.send(message);
}
LOG.info("ROLLBACK of sent message here:");
session.rollback();
assertEquals(0, getProxyToQueue(getDestinationName()).getQueueSize());
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to commit", msgIndex++);
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
message.setIntProperty("MessageSequence", msgIndex);
producer.send(message);
}
LOG.info("COMMIT of sent message here:");
session.commit();
assertEquals(MSG_COUNT, getProxyToQueue(getDestinationName()).getQueueSize());
SubscriptionViewMBean subscription = getProxyToQueueSubscriber(getDestinationName());
assertNotNull(subscription);
LOG.info("Subscription[{}]: prefetch size after rollback = {}", subscription.getSubscriptionId(), subscription.getPrefetchSize());
assertTrue(subscription.getPrefetchSize() > 0);
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return counter.get() == MSG_COUNT;
}
}));
LOG.info("COMMIT of first received batch here:");
session.commit();
assertTrue(subscription.getPrefetchSize() > 0);
for (int i = 1; i <= MSG_COUNT; i++) {
LOG.info("Sending message: {} to commit", msgIndex++);
TextMessage message = session.createTextMessage("Commit Message: " + msgIndex);
message.setIntProperty("MessageSequence", msgIndex);
producer.send(message);
}
LOG.info("COMMIT of next sent message batch here:");
session.commit();
LOG.info("WAITING -> for next three messages to arrive:");
assertTrue(subscription.getPrefetchSize() > 0);
assertTrue("Should read all " + MSG_COUNT + " messages.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Read {} messages so far", counter.get());
return counter.get() == MSG_COUNT * 2;
}
}));
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;