Commit 19b260c5 authored by Markus Koschany's avatar Markus Koschany

New upstream version 5.15.6

parent bd8960b3
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.4</version>
<version>5.15.6</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.4</version>
<version>5.15.6</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......
......@@ -41,11 +41,15 @@ public class AMQPNativeOutboundTransformer implements OutboundTransformer {
}
static EncodedMessage transform(OutboundTransformer options, ActiveMQBytesMessage message) throws JMSException {
long messageFormat;
try {
messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
} catch (MessageFormatException e) {
return null;
final long messageFormat;
if (message.propertyExists(JMS_AMQP_MESSAGE_FORMAT)) {
try {
messageFormat = message.getLongProperty(JMS_AMQP_MESSAGE_FORMAT);
} catch (MessageFormatException e) {
return null;
}
} else {
messageFormat = 0;
}
Binary encodedMessage = getBinaryFromMessageBody(message);
......
......@@ -88,7 +88,6 @@ import org.apache.qpid.proton.amqp.messaging.Section;
import org.apache.qpid.proton.codec.AMQPDefinedTypes;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.fusesource.hawtbuf.UTF8Buffer;
public class JMSMappingOutboundTransformer implements OutboundTransformer {
......@@ -102,11 +101,17 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
public static final byte TEMP_QUEUE_TYPE = 0x02;
public static final byte TEMP_TOPIC_TYPE = 0x03;
private final UTF8BufferType utf8BufferEncoding;
// For now Proton requires that we create a decoder to create an encoder
private final DecoderImpl decoder = new DecoderImpl();
private final EncoderImpl encoder = new EncoderImpl(decoder);
{
AMQPDefinedTypes.registerAllTypes(decoder, encoder);
utf8BufferEncoding = new UTF8BufferType(encoder, decoder);
encoder.register(utf8BufferEncoding);
}
@Override
......@@ -159,7 +164,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
}
properties.setTo(destination.getQualifiedName());
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
maMap = new HashMap<>();
}
maMap.put(JMS_DEST_TYPE_MSG_ANNOTATION, destinationType(destination));
}
......@@ -170,7 +175,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
}
properties.setReplyTo(replyTo.getQualifiedName());
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
maMap = new HashMap<>();
}
maMap.put(JMS_REPLY_TO_TYPE_MSG_ANNOTATION, destinationType(replyTo));
}
......@@ -250,9 +255,6 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
for (Map.Entry<String, Object> entry : entries.entrySet()) {
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof UTF8Buffer) {
value = value.toString();
}
if (key.startsWith(JMS_AMQP_PREFIX)) {
if (key.startsWith(NATIVE, JMS_AMQP_PREFIX_LENGTH)) {
......@@ -276,7 +278,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
continue;
} else if (key.startsWith(MESSAGE_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (maMap == null) {
maMap = new HashMap<Symbol, Object>();
maMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_MESSAGE_ANNOTATION_PREFIX.length());
maMap.put(Symbol.valueOf(name), value);
......@@ -307,14 +309,14 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
continue;
} else if (key.startsWith(DELIVERY_ANNOTATION_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (daMap == null) {
daMap = new HashMap<Symbol, Object>();
daMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_DELIVERY_ANNOTATION_PREFIX.length());
daMap.put(Symbol.valueOf(name), value);
continue;
} else if (key.startsWith(FOOTER_PREFIX, JMS_AMQP_PREFIX_LENGTH)) {
if (footerMap == null) {
footerMap = new HashMap<Object, Object>();
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(name, value);
......@@ -328,7 +330,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
// The property didn't map into any other slot so we store it in the
// Application Properties section of the message.
if (apMap == null) {
apMap = new HashMap<String, Object>();
apMap = new HashMap<>();
}
apMap.put(key, value);
}
......@@ -409,7 +411,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
} else if (messageType == CommandTypes.ACTIVEMQ_MAP_MESSAGE) {
body = new AmqpValue(getMapFromMessageBody((ActiveMQMapMessage) message));
} else if (messageType == CommandTypes.ACTIVEMQ_STREAM_MESSAGE) {
ArrayList<Object> list = new ArrayList<Object>();
ArrayList<Object> list = new ArrayList<>();
final ActiveMQStreamMessage m = (ActiveMQStreamMessage) message;
try {
while (true) {
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;
import java.util.Arrays;
import java.util.Collection;
import org.apache.qpid.proton.codec.DecoderImpl;
import org.apache.qpid.proton.codec.EncoderImpl;
import org.apache.qpid.proton.codec.EncodingCodes;
import org.apache.qpid.proton.codec.PrimitiveType;
import org.apache.qpid.proton.codec.PrimitiveTypeEncoding;
import org.apache.qpid.proton.codec.TypeEncoding;
import org.apache.qpid.proton.codec.WritableBuffer;
import org.fusesource.hawtbuf.UTF8Buffer;
/**
* AMQP Type used to allow to proton-j codec to deal with UTF8Buffer types as if
* they were String elements.
*/
public class UTF8BufferType implements PrimitiveType<UTF8Buffer> {
private final UTF8BufferEncoding largeBufferEncoding;
private final UTF8BufferEncoding smallBufferEncoding;
public UTF8BufferType(EncoderImpl encoder, DecoderImpl decoder) {
this.largeBufferEncoding = new LargeUTF8BufferEncoding(encoder, decoder);
this.smallBufferEncoding = new SmallUTF8BufferEncoding(encoder, decoder);
}
@Override
public Class<UTF8Buffer> getTypeClass() {
return UTF8Buffer.class;
}
@Override
public PrimitiveTypeEncoding<UTF8Buffer> getEncoding(UTF8Buffer value) {
return value.getLength() <= 255 ? smallBufferEncoding : largeBufferEncoding;
}
@Override
public PrimitiveTypeEncoding<UTF8Buffer> getCanonicalEncoding() {
return largeBufferEncoding;
}
@Override
public Collection<? extends PrimitiveTypeEncoding<UTF8Buffer>> getAllEncodings() {
return Arrays.asList(smallBufferEncoding, largeBufferEncoding);
}
@Override
public void write(UTF8Buffer value) {
final TypeEncoding<UTF8Buffer> encoding = getEncoding(value);
encoding.writeConstructor();
encoding.writeValue(value);
}
public abstract class UTF8BufferEncoding implements PrimitiveTypeEncoding<UTF8Buffer> {
private final EncoderImpl encoder;
private final DecoderImpl decoder;
public UTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
this.encoder = encoder;
this.decoder = decoder;
}
@Override
public int getConstructorSize() {
return 1;
}
@Override
public boolean isFixedSizeVal() {
return false;
}
@Override
public boolean encodesJavaPrimitive() {
return false;
}
/**
* @return the number of bytes the size portion of the encoded value requires.
*/
public abstract int getSizeBytes();
@Override
public void writeConstructor() {
getEncoder().writeRaw(getEncodingCode());
}
@Override
public void writeValue(UTF8Buffer value) {
writeSize(value);
WritableBuffer buffer = getEncoder().getBuffer();
buffer.put(value.getData(), value.getOffset(), value.getLength());
}
/**
* Write the size of the buffer using the appropriate type (byte or int) depending
* on the encoding type being used.
*
* @param value
* The UTF8Buffer value that is being encoded.
*/
public abstract void writeSize(UTF8Buffer value);
@Override
public int getValueSize(UTF8Buffer value) {
return getSizeBytes() + value.getLength();
}
@Override
public Class<UTF8Buffer> getTypeClass() {
return UTF8Buffer.class;
}
@Override
public PrimitiveType<UTF8Buffer> getType() {
return UTF8BufferType.this;
}
@Override
public boolean encodesSuperset(TypeEncoding<UTF8Buffer> encoding) {
return (getType() == encoding.getType());
}
@Override
public UTF8Buffer readValue() {
throw new UnsupportedOperationException("No decoding to UTF8Buffer exists");
}
@Override
public void skipValue() {
throw new UnsupportedOperationException("No decoding to UTF8Buffer exists");
}
public DecoderImpl getDecoder() {
return decoder;
}
public EncoderImpl getEncoder() {
return encoder;
}
}
public class LargeUTF8BufferEncoding extends UTF8BufferEncoding {
public LargeUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
super(encoder, decoder);
}
@Override
public byte getEncodingCode() {
return EncodingCodes.STR32;
}
@Override
public int getSizeBytes() {
return Integer.BYTES;
}
@Override
public void writeSize(UTF8Buffer value) {
getEncoder().getBuffer().putInt(value.getLength());
}
}
public class SmallUTF8BufferEncoding extends UTF8BufferEncoding {
public SmallUTF8BufferEncoding(EncoderImpl encoder, DecoderImpl decoder) {
super(encoder, decoder);
}
@Override
public byte getEncodingCode() {
return EncodingCodes.STR8;
}
@Override
public int getSizeBytes() {
return Byte.BYTES;
}
@Override
public void writeSize(UTF8Buffer value) {
getEncoder().getBuffer().put((byte) value.getLength());
}
}
}
......@@ -17,7 +17,6 @@
package org.apache.activemq.transport.amqp.protocol;
import static org.apache.activemq.transport.amqp.AmqpSupport.toLong;
import static org.apache.activemq.transport.amqp.message.AmqpMessageSupport.JMS_AMQP_MESSAGE_FORMAT;
import java.io.IOException;
import java.util.LinkedList;
......@@ -449,21 +448,7 @@ public class AmqpSender extends AmqpAbstractLink<Sender> {
ActiveMQMessage temp = null;
if (md.getMessage() != null) {
// Topics can dispatch the same Message to more than one consumer
// so we must copy to prevent concurrent read / write to the same
// message object.
if (md.getDestination().isTopic()) {
synchronized (md.getMessage()) {
temp = (ActiveMQMessage) md.getMessage().copy();
}
} else {
temp = (ActiveMQMessage) md.getMessage();
}
if (!temp.getProperties().containsKey(JMS_AMQP_MESSAGE_FORMAT)) {
temp.setProperty(JMS_AMQP_MESSAGE_FORMAT, 0);
}
temp = (ActiveMQMessage) md.getMessage().copy();
}
final ActiveMQMessage jms = temp;
......
......@@ -36,6 +36,7 @@ import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
......@@ -99,6 +100,33 @@ public class JMSClientTest extends JMSClientTestSupport {
}
}
@Test(timeout = 60000)
public void testSendJMSMapMessage() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertNotNull(session);
Queue queue = session.createQueue(name.getMethodName());
MessageProducer producer = session.createProducer(queue);
MapMessage message = session.createMapMessage();
message.setBoolean("Boolean", false);
message.setString("STRING", "TEST");
producer.send(message);
QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(1, proxy.getQueueSize());
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
assertTrue(received instanceof MapMessage);
MapMessage map = (MapMessage) received;
assertEquals("TEST", map.getString("STRING"));
assertEquals(false, map.getBooleanProperty("Boolean"));
}
}
@Test(timeout=30000)
public void testAnonymousProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
......
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.interop;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.util.Arrays;
import java.util.Collection;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.apache.activemq.store.kahadb.KahaDBStore.PROPERTY_CANCELED_TASK_MOD_METRIC;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@RunWith(Parameterized.class)
public class OpenWireToAmqpConcurrentStoreAndDispatchTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(OpenWireToAmqpConcurrentStoreAndDispatchTest.class);
private final String transformer;
@Parameters(name="Transformer->{0}")
public static Collection<Object[]> data() {
System.setProperty(PROPERTY_CANCELED_TASK_MOD_METRIC, "100");
return Arrays.asList(new Object[][] {
{"jms"}
});
}
public OpenWireToAmqpConcurrentStoreAndDispatchTest(String transformer) {
this.transformer = transformer;
}
@Override
protected String getAmqpTransformer() {
return transformer;
}
@Override
protected boolean isPersistent() {
return true;
}
@Override
protected boolean isUseOpenWireConnector() {
return true;
}
@Test(timeout = 6000000)
@Ignore("takes more than 6 mins to complete but fails earlier without fix")
public void testNoErrorOnSend() throws Exception {
final int numIterations = 100;
int numConsumers = 3;
final int numProducers = 10;
final int numMessages = 2000;
final AtomicBoolean done = new AtomicBoolean(false);
final AtomicInteger sent = new AtomicInteger();
final AtomicInteger received = new AtomicInteger();
final AtomicBoolean errorOnSend = new AtomicBoolean(false);
final AtomicInteger toSend = new AtomicInteger(numMessages);
final Random random = new Random();
for (int i=0; i<numIterations; i++) {
done.set(false);
sent.set(0);
received.set(0);
toSend.set(numMessages);
ExecutorService executorService = Executors.newCachedThreadPool();
for (int j = 0; j < numConsumers; j++) {
executorService.execute(new Runnable() {
@Override
public void run() {
AmqpConnection connection = null;
try {
AmqpClient client = createAmqpClient();
connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver("queue://" + getTestName(), null, false, true);
while (!done.get() && received.get() < numMessages) {
receiver.flow(1);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
if (message != null) {
received.incrementAndGet();
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (connection != null) {
connection.close();
}
}
}
});
}
final byte[] payload = new byte[100];
for (int k = 0; k < numProducers; k++) {
executorService.execute(new Runnable() {
@Override
public void run() {
Connection connection = null;
try {
ActiveMQConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString());
connection = connectionFactory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(new ActiveMQQueue(getTestName()));
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes(payload);
bytesMessage.setStringProperty("PP", "VALUE");
while (!done.get() && toSend.decrementAndGet() >= 0) {
producer.send(bytesMessage);
sent.incrementAndGet();
}
} catch (Exception e) {
e.printStackTrace();
errorOnSend.set(true);
} finally {
if (connection != null) {
try {
connection.close();
} catch (JMSException ignored) {}
}
}
}
});
}
executorService.shutdown();
executorService.awaitTermination(30, TimeUnit.SECONDS);
done.set(true);
assertEquals("[" + i + "] sent all requested", numMessages, sent.get());
assertEquals("[" + i + "] got all sent", numMessages, received.get());
assertFalse("[" + i + "] no error on send", errorOnSend.get());
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp.message;