Commit f1355ee5 authored by Markus Koschany's avatar Markus Koschany

Update upstream source from tag 'upstream/5.15.7'

Update to upstream version '5.15.7'
with Debian dir a5d2f0e0ee5bd8c4e55738f280c0bb0dd7317615
parents 9b7d681f 50714d8e
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......
......@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
......@@ -23,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.util.Arrays;
import java.util.Collection;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
......@@ -45,6 +47,8 @@ import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
@Parameters(name="{0}")
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
......@@ -55,46 +59,38 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
});
}
public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
super(connectorScheme, secure);
}
@Rule
public TestName testName = new TestName();
protected static final Logger LOG = LoggerFactory.getLogger(JMSLargeMessageSendRecvTest.class);
private String createLargeString(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
StringBuilder builder = new StringBuilder();
for (int i = 0; i < sizeInBytes; i++) {
builder.append(base[i % base.length]);
}
public JMSLargeMessageSendRecvTest(String connectorScheme, boolean secure) {
super(connectorScheme, secure);
}
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
return builder.toString();
@Test(timeout = 60 * 1000)
public void testSendSmallerTextMessage() throws JMSException {
doTestSendTextMessageOfGivenSize(1024);
}
@Test(timeout = 60 * 1000)
public void testSendSmallerMessages() throws JMSException {
public void testSendSeriesOfSmallerTextMessages() throws JMSException {
for (int i = 512; i <= (8 * 1024); i += 512) {
doTestSendLargeMessage(i);
doTestSendTextMessageOfGivenSize(i);
}
}
@Test(timeout = 60 * 1000)
public void testSendFixedSizedMessages() throws JMSException {
doTestSendLargeMessage(65536);
doTestSendLargeMessage(65536 * 2);
doTestSendLargeMessage(65536 * 4);
public void testSendFixedSizedTextMessages() throws JMSException {
doTestSendTextMessageOfGivenSize(65536);
doTestSendTextMessageOfGivenSize(65536 * 2);
doTestSendTextMessageOfGivenSize(65536 * 4);
}
@Test(timeout = 60 * 1000)
public void testSendHugeMessage() throws JMSException {
doTestSendLargeMessage(1024 * 1024 * 10);
public void testSendHugeTextMessage() throws JMSException {
doTestSendTextMessageOfGivenSize(1024 * 1024 * 5);
}
public void doTestSendLargeMessage(int expectedSize) throws JMSException{
public void doTestSendTextMessageOfGivenSize(int expectedSize) throws JMSException{
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
String payload = createLargeString(expectedSize);
assertEquals(expectedSize, payload.getBytes().length);
......@@ -126,4 +122,86 @@ public class JMSLargeMessageSendRecvTest extends AmqpClientTestSupport {
assertEquals(payload, receivedText);
connection.close();
}
@Test(timeout = 60 * 1000)
public void testSendSmallerBytesMessage() throws JMSException {
doTestSendBytesMessageOfGivenSize(1024);
}
@Test(timeout = 60 * 1000)
public void testSendSeriesOfSmallerBytesMessages() throws JMSException {
for (int i = 512; i <= (8 * 1024); i += 512) {
doTestSendBytesMessageOfGivenSize(i);
}
}
@Test(timeout = 60 * 1000)
public void testSendFixedSizedBytesMessages() throws JMSException {
doTestSendBytesMessageOfGivenSize(65536);
doTestSendBytesMessageOfGivenSize(65536 * 2);
doTestSendBytesMessageOfGivenSize(65536 * 4);
}
@Test(timeout = 60 * 1000)
public void testSendHugeBytesMessage() throws JMSException {
doTestSendBytesMessageOfGivenSize(1024 * 1024 * 5);
}
public void doTestSendBytesMessageOfGivenSize(int expectedSize) throws JMSException{
LOG.info("doTestSendLargeMessage called with expectedSize " + expectedSize);
byte[] payload = createLargeByteArray(expectedSize);
assertEquals(expectedSize, payload.length);
Connection connection = JMSClientContext.INSTANCE.createConnection(getBrokerAmqpConnectionURI());
long startTime = System.currentTimeMillis();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(testName.getMethodName());
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(payload);
producer.send(message);
long endTime = System.currentTimeMillis();
LOG.info("Returned from send after {} ms", endTime - startTime);
startTime = System.currentTimeMillis();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
LOG.info("Calling receive");
Message receivedMessage = consumer.receive();
assertNotNull(receivedMessage);
assertTrue(receivedMessage instanceof BytesMessage);
BytesMessage receivedBytesMessage = (BytesMessage) receivedMessage;
assertNotNull(receivedMessage);
endTime = System.currentTimeMillis();
LOG.info("Returned from receive after {} ms", endTime - startTime);
byte[] receivedBytes = new byte[(int) receivedBytesMessage.getBodyLength()];
receivedBytesMessage.readBytes(receivedBytes);
assertEquals(expectedSize, receivedBytes.length);
assertArrayEquals(payload, receivedBytes);
connection.close();
}
private String createLargeString(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
StringBuilder builder = new StringBuilder();
for (int i = 0; i < sizeInBytes; i++) {
builder.append(base[i % base.length]);
}
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
return builder.toString();
}
private byte[] createLargeByteArray(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
byte[] payload = new byte[sizeInBytes];
for (int i = 0; i < sizeInBytes; i++) {
payload[i] = (base[i % base.length]);
}
LOG.debug("Created byte array with size : " + payload.length + " bytes");
return payload;
}
}
......@@ -17,6 +17,7 @@
package org.apache.activemq.transport.amqp.client;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
......@@ -457,7 +458,7 @@ public class AmqpSender extends AmqpAbstractResource<Sender> {
try {
encodedSize = message.encode(encodeBuffer, 0, encodeBuffer.length);
break;
} catch (java.nio.BufferOverflowException e) {
} catch (BufferOverflowException | IndexOutOfBoundsException e) {
encodeBuffer = new byte[encodeBuffer.length * 2];
}
}
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-blueprint</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-broker</artifactId>
......
......@@ -40,6 +40,7 @@ import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.Usage;
......@@ -217,7 +218,8 @@ public class DurableTopicSubscription extends PrefetchSubscription implements Us
for (final MessageReference node : dispatched) {
// Mark the dispatched messages as redelivered for next time.
if (lastDeliveredSequenceId == 0 || (lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) {
if (lastDeliveredSequenceId == RemoveInfo.LAST_DELIVERED_UNKNOWN || lastDeliveredSequenceId == 0 ||
(lastDeliveredSequenceId > 0 && node.getMessageId().getBrokerSequenceId() <= lastDeliveredSequenceId)) {
Integer count = redeliveredMessages.get(node.getMessageId());
if (count != null) {
redeliveredMessages.put(node.getMessageId(), Integer.valueOf(count.intValue() + 1));
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-camel</artifactId>
......
......@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-cf</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-client</artifactId>
......
......@@ -46,6 +46,10 @@ import org.apache.activemq.wireformat.WireFormat;
*/
public class SslTransport extends TcpTransport {
/**
* Default to null as there are different defaults between server and client, initialiseSocket
* for more details
*/
private Boolean verifyHostName = null;
/**
......@@ -80,18 +84,40 @@ public class SslTransport extends TcpTransport {
@Override
protected void initialiseSocket(Socket sock) throws SocketException, IllegalArgumentException {
//This needs to default to null because this transport class is used for both a server transport
//and a client connection and if we default it to a value it might override the transport server setting
//that was configured inside TcpTransportServer
//The idea here is that if this is a server transport then verifyHostName will be set by the setter
//below and not be null (if using transport.verifyHostName) but if a client uses socket.verifyHostName
//then it will be null and we can check socketOptions
//Unfortunately we have to do this to stay consistent because every other SSL option on the client
//side is configured using socket. but this particular option isn't actually part of the socket
//so it makes it tricky
/**
* This needs to default to null because this transport class is used for both a server transport
* and a client connection and we have different defaults for both.
* If we default it to a value it might override the transport server setting
* that was configured inside TcpTransportServer (which sets a default to false for server side)
*
* The idea here is that if this is a server transport then verifyHostName will be set by the setter
* and not be null as TcpTransportServer will set a default value of false (or a user will set it
* using transport.verifyHostName) but if this is a client connection the value will be null by default
* and will stay null if the user uses socket.verifyHostName to set the value or doesn't use the setter
* If it is null then we can check socketOptions for the value and if not set there then we can
* just set a default of true as this will be a client
*
* Unfortunately we have to do this to stay consistent because every other SSL option on the client
* side can be configured using socket. but this particular option isn't actually part of the socket
* so it makes it tricky from a user standpoint. For consistency sake I think it makes sense to allow
* using the socket. prefix that has been established so users do not get confused (as well as
* allow using no prefix which just calls the setter directly)
*
* Because of this there are actually two ways a client can configure this value, the client can either use
* socket.verifyHostName=<value> as mentioned or just simply use verifyHostName=<value> without using the socket.
* prefix and that will also work as the value will be set using the setter on the transport
*
* example server transport config:
* ssl://localhost:61616?transport.verifyHostName=true
*
* example from client:
* ssl://localhost:61616?verifyHostName=true
* OR
* ssl://localhost:61616?socket.verifyHostName=true
*
*/
if (verifyHostName == null) {
//Check to see if the user included the value as part of socket options and if so then use that value
if (socketOptions != null && socketOptions.containsKey("verifyHostName")) {
verifyHostName = Boolean.parseBoolean(socketOptions.get("verifyHostName").toString());
socketOptions.remove("verifyHostName");
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-console</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-http</artifactId>
......
......@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport;
import java.io.File;
import java.net.InetAddress;
import java.net.URI;
import java.util.Map;
......@@ -27,15 +28,21 @@ import org.eclipse.jetty.security.ConstraintSecurityHandler;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.security.Constraint;
import org.eclipse.jetty.xml.XmlConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
abstract public class WebTransportServerSupport extends TransportServerSupport {
private final static Logger LOG = LoggerFactory.getLogger(WebTransportServerSupport.class);
protected URI bindAddress;
protected Server server;
protected Connector connector;
protected SocketConnectorFactory socketConnectorFactory;
protected String host;
protected final HttpOptions httpOptions = new HttpOptions();
protected final JettyOptions jettyOptions = new JettyOptions();
public WebTransportServerSupport(URI location) {
super(location);
......@@ -46,7 +53,22 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
}
protected void createServer() {
server = new Server();
LOG.info("Starting Jetty server");
if (jettyOptions.getConfig() != null) {
try {
LOG.info("Configuring Jetty server using {}", jettyOptions.getConfig());
File file = new File(jettyOptions.getConfig());
if (!file.exists()) {
throw new IllegalArgumentException("Jetty XML not found: " + file.getAbsolutePath());
}
XmlConfiguration xmlConfiguration = new XmlConfiguration(file.toURI().toURL());
server = (Server) xmlConfiguration.configure();
} catch (Throwable t) {
throw new IllegalStateException("Jetty configuration can't be loaded", t);
}
} else {
server = new Server();
}
try {
server.getClass().getMethod("setStopTimeout", Long.TYPE).invoke(server, 500l);
} catch (Throwable t) {
......@@ -55,21 +77,31 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
}
public URI bind() throws Exception {
URI bind = getBindLocation();
String bindHost = bind.getHost();
bindHost = (bindHost == null || bindHost.length() == 0) ? "localhost" : bindHost;
InetAddress addr = InetAddress.getByName(bindHost);
host = addr.getCanonicalHostName();
setConnectorProperty("Host", String.class, host);
setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort());
server.addConnector(connector);
if (server.getConnectors().length == 0) {
LOG.info("Creating Jetty connector");
setConnectorProperty("Host", String.class, host);
setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort());
server.addConnector(connector);
} else {
LOG.info("Using Jetty configured connector");
connector = server.getConnectors()[0];
for (Connector c : server.getConnectors()) {
if (c.getName() != null && c.getName().equalsIgnoreCase("activemq")) {
connector = c;
}
}
setConnectorProperty("Host", String.class, host);
setConnectorProperty("Port", Integer.TYPE, bindAddress.getPort());
server.addConnector(connector);
}
if (addr.isAnyLocalAddress()) {
host = InetAddressUtil.getLocalHostName();
}
URI boundUri = new URI(bind.getScheme(), bind.getUserInfo(), host, bindAddress.getPort(), bind.getPath(), bind.getQuery(), bind.getFragment());
setConnectURI(boundUri);
return boundUri;
......@@ -94,6 +126,12 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
}
}
public void setJettyOptions(Map<String, Object> options) {
if (options != null) {
IntrospectionSupport.setProperties(this.jettyOptions, options);
}
}
protected static class HttpOptions {
private boolean enableTrace = false;
......@@ -105,4 +143,17 @@ abstract public class WebTransportServerSupport extends TransportServerSupport {
this.enableTrace = enableTrace;
}
}
protected static class JettyOptions {
private String config;
public String getConfig() {
return config;
}
public void setConfig(String config) {
this.config = config;
}
}
}
......@@ -45,8 +45,10 @@ public class HttpTransportFactory extends TransportFactory {
try {
Map<String, String> options = new HashMap<String, String>(URISupport.parseParameters(location));
HttpTransportServer result = new HttpTransportServer(location, this);
Map<String, Object> jettyOptions = IntrospectionSupport.extractProperties(options, "jetty.");
Map<String, Object> httpOptions = IntrospectionSupport.extractProperties(options, "http.");
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
result.setJettyOptions(jettyOptions);
result.setTransportOption(transportOptions);
result.setHttpOptions(httpOptions);
return result;
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.http;
import org.apache.activemq.broker.BrokerService;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class HttpJettyConfigurationTest {
private BrokerService brokerService;
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setUseJmx(false);
brokerService.deleteAllMessages();
brokerService.addConnector("http://0.0.0.0:0?jetty.config=src/test/resources/jetty.xml");
brokerService.start();
}
@After
public void tearDown() throws Exception {
brokerService.stop();
}
@Test
public void test() throws Exception {
// nothing to do
}
}
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure.dtd">
<Configure id="server" class="org.eclipse.jetty.server.Server">
<Arg>
<New class="org.eclipse.jetty.util.thread.QueuedThreadPool">
<Set name="minThreads">10</Set>
<Set name="maxThreads">1000</Set>
</New>
</Arg>
<Call name="addConnector">
<Arg>
<New id="activemq" class="org.eclipse.jetty.server.ServerConnector">
<Arg name="server"><Ref refid="server"/></Arg>
<Arg name="acceptors" type="int"><Property name="jetty.http.acceptors" default="-1"/></Arg>
<Arg name="selectors" type="int"><Property name="jetty.http.selectors" default="-1"/></Arg>
</New>
</Arg>
</Call>
</Configure>
\ No newline at end of file
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-itests-spring31</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-jaas</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-jdbc-store</artifactId>
......
## ---------------------------------------------------------------------------
## Licensed to the Apache Software Foundation (ASF) under one or more
## contributor license agreements. See the NOTICE file distributed with
## this work for additional information regarding copyright ownership.
## The ASF licenses this file to You under the Apache License, Version 2.0
## (the "License"); you may not use this file except in compliance with
## the License. You may obtain a copy of the License at
##
## http://www.apache.org/licenses/LICENSE-2.0
##
## Unless required by applicable law or agreed to in writing, software
## distributed under the License is distributed on an "AS IS" BASIS,
## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
class=org.apache.activemq.store.jdbc.adapter.PostgresqlJDBCAdapter
\ No newline at end of file
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-jms-pool</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.6</version>
<version>5.15.7</version>
</parent>
<artifactId>activemq-kahadb-store</artifactId>
......
......@@ -1401,6 +1401,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
public void execute(Transaction tx) throws IOException {
for (Operation op : messagingTx) {
op.execute(tx);
recordAckMessageReferenceLocation(location, op.getLocation());
}
}
});
......@@ -1413,12 +1414,23 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
@SuppressWarnings("rawtypes")