Commit 93db1e0e authored by Markus Koschany's avatar Markus Koschany

New upstream version 5.14.5

parent acecaa2c
Apache ActiveMQ Copyright 2005-2016 Apache Software Foundation
Apache ActiveMQ Copyright 2005-2017 Apache Software Foundation
This product includes software developed by
The Apache Software Foundation (http://www.apache.org/).
-------------
Jabber xsds in activemq-xmpp
This software was generated using XSDs from the Jabber Software Foundation
http://www.xmpp.org/schemas/
Copyright (c) 1999-2006 The Jabber Software Foundation
http://www.xmpp.org/about/copyright.shtml
These XSD files are licensed under the Creative Commons License 2.5
http://creativecommons.org/licenses/by/2.5/
==============================================================
Jetty Web Container
Copyright 1995-2006 Mort Bay Consulting Pty Ltd
......
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.14.3</version>
<version>5.14.5</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.14.3</version>
<version>5.14.5</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......
......@@ -32,9 +32,13 @@ import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.Buffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class AmqpWireFormat implements WireFormat {
private static final Logger LOG = LoggerFactory.getLogger(AmqpWireFormat.class);
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
public static final int NO_AMQP_MAX_FRAME_SIZE = -1;
public static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
......@@ -137,18 +141,22 @@ public class AmqpWireFormat implements WireFormat {
*/
public boolean isHeaderValid(AmqpHeader header, boolean authenticated) {
if (!header.hasValidPrefix()) {
LOG.trace("AMQP Header arrived with invalid prefix: {}", header);
return false;
}
if (!(header.getProtocolId() == 0 || header.getProtocolId() == SASL_PROTOCOL)) {
LOG.trace("AMQP Header arrived with invalid protocol ID: {}", header);
return false;
}
if (!authenticated && !isAllowNonSaslConnections() && header.getProtocolId() != SASL_PROTOCOL) {
LOG.trace("AMQP Header arrived without SASL and server requires SASL: {}", header);
return false;
}
if (header.getMajor() != 1 || header.getMinor() != 0 || header.getRevision() != 0) {
LOG.trace("AMQP Header arrived invalid version: {}", header);
return false;
}
......
......@@ -72,7 +72,7 @@ public class AmqpTestSupport {
protected ExecutorService testService = Executors.newSingleThreadExecutor();
protected BrokerService brokerService;
protected Vector<Throwable> exceptions = new Vector<Throwable>();
protected Vector<Throwable> exceptions = new Vector<>();
protected int numberOfMessages;
protected URI amqpURI;
......@@ -150,7 +150,7 @@ public class AmqpTestSupport {
System.setProperty("javax.net.ssl.keyStorePassword", "password");
System.setProperty("javax.net.ssl.keyStoreType", "jks");
ArrayList<BrokerPlugin> plugins = new ArrayList<BrokerPlugin>();
ArrayList<BrokerPlugin> plugins = new ArrayList<>();
addAdditionalPlugins(plugins);
......@@ -182,28 +182,28 @@ public class AmqpTestSupport {
}
if (isUseTcpConnector()) {
connector = brokerService.addConnector(
"amqp://0.0.0.0:" + amqpPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp://0.0.0.0:" + amqpPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpPort = connector.getConnectUri().getPort();
amqpURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp port " + amqpPort);
}
if (isUseSslConnector()) {
connector = brokerService.addConnector(
"amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp+ssl://0.0.0.0:" + amqpSslPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpSslPort = connector.getConnectUri().getPort();
amqpSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+ssl port " + amqpSslPort);
}
if (isUseNioConnector()) {
connector = brokerService.addConnector(
"amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp+nio://0.0.0.0:" + amqpNioPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPort = connector.getConnectUri().getPort();
amqpNioURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio port " + amqpNioPort);
}
if (isUseNioPlusSslConnector()) {
connector = brokerService.addConnector(
"amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"amqp+nio+ssl://0.0.0.0:" + amqpNioPlusSslPort + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpNioPlusSslPort = connector.getConnectUri().getPort();
amqpNioPlusSslURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+nio+ssl port " + amqpNioPlusSslPort);
......@@ -238,14 +238,14 @@ public class AmqpTestSupport {
}
if (isUseWsConnector()) {
connector = brokerService.addConnector(
"ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"ws://0.0.0.0:" + getProxyPort(amqpWsPort) + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpWsPort = connector.getConnectUri().getPort();
amqpWsURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+ws port " + amqpWsPort);
}
if (isUseWssConnector()) {
connector = brokerService.addConnector(
"wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
"wss://0.0.0.0:" + getProxyPort(amqpWssPort) + "?transport.tcpNoDelay=true&transport.transformer=" + getAmqpTransformer() + getAdditionalConfig());
amqpWssPort = connector.getConnectUri().getPort();
amqpWssURI = connector.getPublishableConnectURI();
LOG.debug("Using amqp+wss port " + amqpWssPort);
......
......@@ -165,13 +165,21 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements
}
});
if (connectTimeout <= 0) {
future.sync();
} else {
future.sync(connectTimeout, TimeUnit.MILLISECONDS);
if (getEndpoint().getRemoteState() != EndpointState.ACTIVE) {
throw new IOException("Failed to connect after configured timeout.");
try {
if (connectTimeout <= 0) {
future.sync();
} else {
future.sync(connectTimeout, TimeUnit.MILLISECONDS);
if (getEndpoint().getRemoteState() != EndpointState.ACTIVE) {
throw new IOException("Failed to connect after configured timeout.");
}
}
} catch (Throwable error) {
try {
close();
} catch (Throwable ignore) {}
throw error;
}
}
}
......
......@@ -432,6 +432,78 @@ public class AmqpMessage {
return message.getHeader().getDurable();
}
/**
* Sets the priority header on the outgoing message.
*
* @param priority the priority value to set.
*/
public void setPriority(short priority) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setPriority(priority);
}
/**
* Gets the priority header on the message.
*/
public short getPriority() {
return getWrappedMessage().getPriority();
}
/**
* Sets the ttl header on the outgoing message.
*
* @param timeToLive the ttl value to set.
*/
public void setTimeToLive(long timeToLive) {
checkReadOnly();
lazyCreateHeader();
getWrappedMessage().setTtl(timeToLive);
}
/**
* Sets the ttl header on the outgoing message.
*/
public long getTimeToLive() {
return getWrappedMessage().getTtl();
}
/**
* Sets the absolute expiration time property on the message.
*
* @param absoluteExpiryTime the expiration time value to set.
*/
public void setAbsoluteExpiryTime(long absoluteExpiryTime) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setExpiryTime(absoluteExpiryTime);
}
/**
* Gets the absolute expiration time property on the message.
*/
public long getAbsoluteExpiryTime() {
return getWrappedMessage().getExpiryTime();
}
/**
* Sets the creation time property on the message.
*
* @param creationTime the time value to set.
*/
public void setCreationTime(long creationTime) {
checkReadOnly();
lazyCreateProperties();
getWrappedMessage().setCreationTime(creationTime);
}
/**
* Gets the absolute expiration time property on the message.
*/
public long getCreationTime() {
return getWrappedMessage().getCreationTime();
}
/**
* Sets a given application property on an outbound message.
*
......@@ -615,21 +687,21 @@ public class AmqpMessage {
private void lazyCreateMessageAnnotations() {
if (messageAnnotationsMap == null) {
messageAnnotationsMap = new HashMap<Symbol,Object>();
messageAnnotationsMap = new HashMap<>();
message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
}
}
private void lazyCreateDeliveryAnnotations() {
if (deliveryAnnotationsMap == null) {
deliveryAnnotationsMap = new HashMap<Symbol,Object>();
deliveryAnnotationsMap = new HashMap<>();
message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
}
}
private void lazyCreateApplicationProperties() {
if (applicationPropertiesMap == null) {
applicationPropertiesMap = new HashMap<String, Object>();
applicationPropertiesMap = new HashMap<>();
message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
}
}
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Test;
public class AmqpExpiredMessageTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testSendMessageThatIsAlreadyExpiredUsingAbsoluteTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingAbsoluteTime() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsExiredUsingAbsoluteTimeWithLongTTL() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() - 5000);
// AET should override any TTL set
message.setTimeToLive(60000);
message.setText("Test-Message");
sender.send(message);
sender.close();
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
// Broker doesn't track messages that arrived already expired.
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsExpiredUsingTTLWhenAbsoluteIsZero() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(0);
// AET should override any TTL set unless it is zero
message.setTimeToLive(1000);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getQueueSize());
Thread.sleep(1000);
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
assertEquals(1, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingAbsoluteTimeWithElspsedTTL() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setAbsoluteExpiryTime(System.currentTimeMillis() + 5000);
// AET should override any TTL set
message.setTimeToLive(10);
message.setText("Test-Message");
sender.send(message);
sender.close();
Thread.sleep(50);
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThatIsNotExpiredUsingTimeToLive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(5000);
message.setText("Test-Message");
sender.send(message);
sender.close();
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull(received);
assertEquals(0, queueView.getExpiredCount());
connection.close();
}
@Test(timeout = 60000)
public void testSendMessageThenAllowToExpiredUsingTimeToLive() throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(getTestName());
// Get the Queue View early to avoid racing the delivery.
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
AmqpMessage message = new AmqpMessage();
message.setTimeToLive(10);
message.setText("Test-Message");
sender.send(message);
sender.close();
Thread.sleep(50);
assertEquals(1, queueView.getQueueSize());
// Now try and get the message
AmqpReceiver receiver = session.createReceiver(getTestName());
receiver.flow(1);
AmqpMessage received = receiver.receive(1, TimeUnit.SECONDS);
assertNull(received);
assertEquals(1, queueView.getExpiredCount());
connection.close();
}
}
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.14.3</version>
<version>5.14.5</version>
</parent>
<artifactId>activemq-blueprint</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.14.3</version>
<version>5.14.5</version>
</parent>
<artifactId>activemq-broker</artifactId>
......
......@@ -50,6 +50,7 @@ import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.InstanceNotFoundException;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
......@@ -2257,7 +2258,7 @@ public class BrokerService implements Service {
}
}
protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
public ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
}
......@@ -2590,8 +2591,8 @@ public class BrokerService implements Service {
* @throws Exception
*/
public void startAllConnectors() throws Exception {
Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<TransportConnector>();
final Set<ActiveMQDestination> durableDestinations = getBroker().getDurableDestinations();
List<TransportConnector> al = new ArrayList<>();
for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
TransportConnector connector = iter.next();
al.add(startTransportConnector(connector));
......@@ -2629,26 +2630,7 @@ public class BrokerService implements Service {
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
final NetworkConnector connector = iter.next();
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
if (getDefaultSocketURIString() != null) {
connector.setBrokerURL(getDefaultSocketURIString());
}
if (networkConnectorStartExecutor != null) {
networkConnectorStartExecutor.execute(new Runnable() {
@Override
public void run() {
try {
LOG.info("Async start of {}", connector);
connector.start();
} catch(Exception e) {
LOG.error("Async start of network connector: {} failed", connector, e);
}
}
});
} else {
connector.start();
}
startNetworkConnector(connector, durableDestinations, networkConnectorStartExecutor);
}
if (networkConnectorStartExecutor != null) {