Commit 1decc249 authored by Markus Koschany's avatar Markus Koschany

New upstream version 5.15.10

parent 8c9479e3
......@@ -1256,4 +1256,62 @@ Qx9BTyOei6Jch8RDnif5y4DhatGc6GnSWyxpIyKEqcCyfMf8XO0wPaSNOkUVkq2G
69n5gjLRKX91+CJbf2eQ51EAo1z7k2BpoJJlvSIBz6NZqL9GjY33oPLmBHV7TMMS
uXXzh4h85JfEvhsuoyzzBsuNh2zQi9zpUL32Xg/IHQ0h
=O/lW
-----END PGP PUBLIC KEY BLOCK-----
pub 4096R/C8282E76 2009-09-08
uid Jean-Baptiste Onofré <jbonofre@apache.org>
sig 3 C8282E76 2009-09-08 Jean-Baptiste Onofré <jbonofre@apache.org>
sub 4096R/9F043BBC 2009-09-08
sig C8282E76 2009-09-08 Jean-Baptiste Onofré <jbonofre@apache.org>
-----BEGIN PGP PUBLIC KEY BLOCK-----
Version: GnuPG v1
mQINBEqmJkEBEADAAMOjOidXzoyK4FK9WhhRg2EEGX1gm5lK8PpJtk68Fqmz6xvv
N8VJXMIJUgeD7M35zZSQUWJY43xEU8Yfn6oLL0KR0dIqVOclxE+7G8vxXFcIbRE9
ziZFp7Z5yzsdzjiIzXv5MVQMczcAAMev/i0BnjiRy5Cg+k6kHXVpu/Gsn05JKPaG
s7ZcfSxpboyS99MVKQvoFLE5Z/Shh4gFJn2rFInqK5EgVpoZbVyysF52nx0dti/e
O0NjraQkrEDBWvsPt3cYZA0oP1gWiZiRvOLfAFIarf3poMDyoWBIwnbqb3Msv09j
yDAmcGq9wsD3alHFHcRIiJl5SzFUStml1d5x/BvUl/Xc5VfHPi2ObKF3xOPGkyTf
aZ6mYFLaRCAJ0v2MPW+4/grDXKsP8n8xPbE2VQvHBpxaZklD7q4Omn2d+m2sUOLX
NRUo4n29NyfowAffBYl7ZqrYBBodR9YngWC9LpgM+APHyiw3HzauZ94bGy5Of3+L
Yu6/riDcP4OXF6r6IH6KIsVqIkv5xzq7OGxxXmlhWg8ifNPLq5yNRccS0nWXc5BD
/9q06ta/ceQGNkXL327XPuZC+lstWGAa4dKEosRDgcO0Pv2j2a3h8W8oHyxF+gEe
O+9s0mGdQFxNiEA+JyeKCg+jvfx9Hv/2Syrlert76NEkfbaTFA7BJ4c3EQARAQAB
tCtKZWFuLUJhcHRpc3RlIE9ub2Zyw6kgPGpib25vZnJlQGFwYWNoZS5vcmc+iQI2
BBMBAgAgBQJKpiZBAhsDBgsJCAcDAgQVAggDBBYCAwECHgECF4AACgkQv/LuQsgo
LnboyRAAguqFIpiKkCCR6TR0Y5UQDFhgEMhBreQKCEW0czbGoFnxfULV9H1kJRSB
Vt0knecGaYS340WEmz4B7BMpkBCgaszgn66+fhacZTBd+Aff1k2lbhdMgdBvlPcm
q9vFGtbE515j9bPHzsPRJ2wFWd6ot9wXiLD3RJLV6c7L3Egstu3qTp0tEoFHrQps
qskGBl+mahhMyz3BUDlusavB0Y0tb6hhXCR79ErhjQrTgU947isztYWpgJlA40lx
DW0hskZWbuGNXjxUJvTT3pKiYUN32WG+2CDNYHceuhsfRLxO/Wb4BKwwDaHWAlH9
d5F9/vhdPObSv5GQbuUtmCEzeqADUd65jLLM7WSlvRJ+i4m0/TTeP8y4NfxlVbBP
WuYrQW4gPmDKEDNvEec6PH6hhBfMLJz3M6o4huwLp2kQrq6wSTMDGIoxOLP0ae3c
BMIuFM5EavLDJmuATUIWWyZt/c7mmAOOh5TGcFWTugnJ6l4FllOrFPiWyFsjMn+U
zzzaeSkYmq/xZYxjRTdWjK5Zb5rbVuCx/q5VF9Awdy4EM6UXhaqWo06VyjWNOJ86
wgres4+bVldB7+TiVi9iO6n80WNlPgIaQJlLc+FRsld4Er21kdXreX5doxFD5Iue
S4y/pLwftHfx1xxj+p2jPJ49Hb0ddNr+XrsrO5txing2pNJgfH65Ag0ESqYmQQEQ
AKPoXgIIKnyJiPvks7xBV+FqJPecVAx3SSlLyTfsh/jBat9QLd4hsfiZcv1ANZHB
n4qDeGlsmJ6uDGv8wnUZQ2Im8Heje1h7dKeLNpNnxfBS9gn6e2bXKhAsJGUE7gip
qVfijFnEY0Vj6Tztzq+Wyqg2Gbz+bJZMo1JVQiaAYyQeQlrOcoZcQHsA/Ol+y48h
Le36A1TSIPMOSI4ZAZXkqxXAumEaMaz82EvV8KDH7Ijr23Y0wZjEUJ+dJQM9ssuE
f9GMLIuCbmM/CJ5MCCwepGJd52ymllvgJTHC7B+BY/jKNMWHwAsMJ1oWcPlLzFQI
Bmyy5RjKoMifzaoSo/hTWkiwcL2Vc+qU3b3/2eUtnCnBB/nkrZkJNNc+OV5YGBSP
vNPaN43Gvjbvborv4PBvt7QhVjZYQemtXO2sWx1XWSFsucD2K4kJ8ipNWxVgIqDu
J8SJOnGigX9hMpsZ2HVAwOeKP/jI90J3voKrCPLaKcL1Ip+b28k0aj7kl44YJqw4
5pbRSx/v73bH4uleQiXSW+JczA+KLw7hX3tOWJEnLS2+Ig9sNUKYGZOg0nw613bN
fZy8Cbx/UkT10Lznx9FW6MedGyJPYT4MJMMh/PnnsWv50jFnfu2rtnRXEOUXwujL
fwrmCYbXHgE3Ka+fmRz8HxsyTmtqIHtPixw8RoqfoFfxABEBAAGJAh8EGAECAAkF
AkqmJkECGwwACgkQv/LuQsgoLnb8AQ/+POsLFdqNqSKfwBXp1YOIEjNdbVjysQc6
zC6LlMJXNSxAmUmol2g9bJYh9LdpvOTU3gfFgIanaGytC75U7/NOl0zEsN4IU18j
CLBNaD5/Or1ciQ3CVrID/lPO8s0Hm0/cUPreEjJPPrrPbXG+i9bweg3Dtfy3+WQl
PhfpvgudwtUjB3st2gztYipkUhmrH+STbbJZVJN5ZNL8mOoM5M2wGS+9VweOWbKe
z0QeZ9hIPyQNMzTn1xlvRUVNTu8fz2FGvumrd+zgzYcpTE5VpFkOxxUayr3aWXSf
Cak+HH0WjUDWc9/lJR4dVpwdjLonJfiC70W07J4CnNodYwnPUaGKTVYq3pvQzAPw
hjx4u6t5zTZy5CbCAEhZC/9GeQmtuM0rcQhz048don4s4baDrqUPKL+X3C3ev4/o
00yLrQ5rLX8K4iE/Go5xUyhzT7gqFJUPWdo8neTXXwQGThqqhVQovnn3M6i55rCg
EeOTd7uW+k3vt6kunWZFKPjzRBxMD4NYovIQXwhPxj0vq6DnE0RQa7Dfm6l3cAV7
/l3kRQcT69AWXotUJQnpY4bemTuYlxAYWCkTGNLdNNiBhiaqlR7xgYMNXS4XqcgA
6QtP8ulb2FPR0MWEtvGkbHgAAIayV+Jt1Ed2JkIsdJHGeSZO5WEiupySDQCGn6rZ
DR2E2zua3tQ=
=LCyH
-----END PGP PUBLIC KEY BLOCK-----
\ No newline at end of file
Apache ActiveMQ Copyright 2005-2017 Apache Software Foundation
Apache ActiveMQ Copyright 2005-2019 Apache Software Foundation
This product includes software developed by
The Apache Software Foundation (http://www.apache.org/).
......
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.9</version>
<version>5.15.10</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.9</version>
<version>5.15.10</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......@@ -80,6 +80,7 @@
<!-- using it for Jetty's JNDI context to work /w Joram tests. -->
<groupId>org.eclipse.jetty.aggregate</groupId>
<artifactId>jetty-all</artifactId>
<classifier>uber</classifier>
<scope>test</scope>
</dependency>
<dependency>
......
......@@ -91,6 +91,8 @@ public abstract class InboundTransformer {
if (header.getDurable() != null) {
jms.setPersistent(header.getDurable().booleanValue());
} else {
jms.setPersistent(false);
}
if (header.getPriority() != null) {
......
......@@ -319,7 +319,7 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
footerMap = new HashMap<>();
}
String name = key.substring(JMS_AMQP_FOOTER_PREFIX.length());
footerMap.put(name, value);
footerMap.put(Symbol.valueOf(name), value);
continue;
}
} else if (key.startsWith(AMQ_SCHEDULED_MESSAGE_PREFIX )) {
......
......@@ -22,6 +22,8 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.net.URI;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.BytesMessage;
import javax.jms.Connection;
......@@ -36,6 +38,11 @@ import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.junit.After;
import org.junit.Test;
import org.slf4j.Logger;
......@@ -188,6 +195,54 @@ public class AmqpTransformerTest {
openwireConn.close();
}
@Test(timeout = 60000)
public void testSendAMQPMessageWithComplexAnnotationsReceiveCore() throws Exception {
startBrokerWithAmqpTransport(String.format(AMQP_URL, "?transport.transformer=jms"));
URI remoteURI = new URI("tcp://" + amqpConnectionURI.getHost() + ":" + amqpConnectionURI.getPort());
AmqpClient client = new AmqpClient(remoteURI, null, null);
AmqpConnection connection = client.connect();
try {
connection.connect();
String annotation = "x-opt-embedded-map";
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("test-key-1", "value-1");
embeddedMap.put("test-key-2", "value-2");
embeddedMap.put("test-key-3", "value-3");
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(TEST_QUEUE);
AmqpMessage message = createAmqpMessage((byte) 'A', 65535);
message.setApplicationProperty("IntProperty", 42);
message.setDurable(true);
message.setMessageAnnotation(annotation, embeddedMap);
sender.send(message);
session.close();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(openwireConnectionURI);
Connection connection2 = factory.createConnection();
try {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.start();
MessageConsumer consumer = session2.createConsumer(session2.createQueue(TEST_QUEUE));
Message received = consumer.receive(5000);
assertNotNull(received);
assertEquals(42, received.getIntProperty("IntProperty"));
connection2.close();
} finally {
connection2.close();
}
} finally {
connection.close();
}
}
public void startBrokerWithAmqpTransport(String amqpUrl) throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
......@@ -211,4 +266,14 @@ public class AmqpTransformerTest {
brokerService = null;
}
}
private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[payloadSize];
for (int i = 0; i < payload.length; i++) {
payload[i] = value;
}
message.setBytes(payload);
return message;
}
}
/**
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.http;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.activemq.transport.util.TextWireFormat;
import org.apache.http.client.HttpClient;
import org.apache.http.client.params.HttpClientParams;
import org.junit.Before;
import org.junit.Test;
/**
* Test that {@link HttpClientTransport} sets a broad-range compatibility
* cookie policy.
*
* @see <a href="https://issues.apache.org/jira/browse/AMQ-6571">AMQ-6571: HttpClientTransport refuses to accept cookies using `Expires' header</a>
*/
@SuppressWarnings("deprecation")
public class HttpClientTransportCookiePolicyTest {
private HttpClientTransport transport;
/**
* Create the transport so we can inspect it.
* @throws URISyntaxException if something goes wrong.
*/
@Before
public void setUp() throws URISyntaxException {
transport = new HttpClientTransport(mock(TextWireFormat.class), new URI("http://localhost:8080/test"));
}
package org.apache.activemq.transport.amqp.interop;
public class AmqpSendReceiveNativeTest extends AmqpSendReceiveTest {
/**
* Create a new connection and check the connection properties.
*/
@Test
public void test() {
HttpClient client = transport.createHttpClient();
assertEquals("Cookie spec", org.apache.http.client.params.CookiePolicy.BROWSER_COMPATIBILITY, HttpClientParams.getCookiePolicy(client.getParams()));
@Override
protected String getAmqpTransformer() {
return "native";
}
}
......@@ -23,14 +23,17 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.DeliveryMode;
import javax.jms.Queue;
import javax.jms.Topic;
......@@ -46,6 +49,13 @@ import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.activemq.util.Wait;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.message.Message;
import org.apache.qpid.proton.message.impl.MessageImpl;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
......@@ -60,6 +70,8 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(AmqpSendReceiveTest.class);
private static final int PAYLOAD = 110 * 1024;
@Test(timeout = 60000)
public void testSimpleSendOneReceiveOneToQueue() throws Exception {
doTestSimpleSendOneReceiveOne(Queue.class);
......@@ -496,6 +508,20 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
@Test(timeout = 60000)
public void testMessageWithNoHeaderNotMarkedDurable() throws Exception {
doMessageNotMarkedDurableTestImpl(false, false);
}
@Test(timeout = 60000)
public void testMessageWithHeaderAndDefaultedNonDurableNotMarkedDurable() throws Exception {
doMessageNotMarkedDurableTestImpl(true, false);
}
@Test(timeout = 60000)
public void testMessageWithHeaderAndMarkedNonDurableNotMarkedDurable() throws Exception {
doMessageNotMarkedDurableTestImpl(true, true);
}
private void doMessageNotMarkedDurableTestImpl(boolean sendHeaderWithPriority, boolean explicitSetNonDurable) throws Exception {
AmqpClient client = createAmqpClient();
AmqpConnection connection = trackConnection(client.connect());
AmqpSession session = connection.createSession();
......@@ -503,16 +529,39 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
AmqpSender sender = session.createSender("queue://" + getTestName());
AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
// Create default message that should be sent as non-durable
AmqpMessage message1 = new AmqpMessage();
message1.setText("Test-Message -> non-durable");
message1.setMessageId("ID:Message:1");
Message protonMessage = Message.Factory.create();
protonMessage.setMessageId("ID:Message:1");
protonMessage.setBody(new AmqpValue("Test-Message -> non-durable"));
if(sendHeaderWithPriority) {
Header header = new Header();
if(explicitSetNonDurable) {
header.setDurable(false);
}
header.setPriority(UnsignedByte.valueOf((byte) 5));
protonMessage.setHeader(header);
} else {
assertNull("Should not have a header", protonMessage.getHeader());
}
AmqpMessage message1 = new AmqpMessage(protonMessage);
sender.send(message1);
final QueueViewMBean queueView = getProxyToQueue(getTestName());
assertNotNull(queueView);
assertEquals(1, queueView.getQueueSize());
List<javax.jms.Message> messages = (List<javax.jms.Message>) queueView.browseMessages();
assertEquals(1, messages.size());
javax.jms.Message queueMessage = messages.get(0);
assertEquals("Queued message should not be persistent", DeliveryMode.NON_PERSISTENT, queueMessage.getJMSDeliveryMode());
receiver1.flow(1);
AmqpMessage message2 = receiver1.receive(50, TimeUnit.SECONDS);
assertNotNull("Should have read a message", message2);
assertFalse("Second message sent should not be durable", message2.isDurable());
assertFalse("Received message should not be durable", message2.isDurable());
message2.accept();
sender.close();
......@@ -816,4 +865,66 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
receiver.close();
connection.close();
}
@Test(timeout = 60000)
public void testSendAMQPMessageWithComplexAnnotationsReceiveAMQP() throws Exception {
String testQueueName = "ConnectionFrameSize";
int nMsgs = 200;
AmqpClient client = createAmqpClient();
Symbol annotation = Symbol.valueOf("x-opt-embedded-map");
Map<String, String> embeddedMap = new LinkedHashMap<>();
embeddedMap.put("test-key-1", "value-1");
embeddedMap.put("test-key-2", "value-2");
embeddedMap.put("test-key-3", "value-3");
{
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpSender sender = session.createSender(testQueueName);
AmqpMessage message = createAmqpMessage((byte) 'A', PAYLOAD);
message.setApplicationProperty("IntProperty", 42);
message.setDurable(true);
message.setMessageAnnotation(annotation.toString(), embeddedMap);
sender.send(message);
session.close();
connection.close();
}
{
AmqpConnection connection = client.connect();
AmqpSession session = connection.createSession();
AmqpReceiver receiver = session.createReceiver(testQueueName);
receiver.flow(nMsgs);
AmqpMessage message = receiver.receive(5, TimeUnit.SECONDS);
assertNotNull("Failed to read message with embedded map in annotations", message);
MessageImpl wrapped = (MessageImpl) message.getWrappedMessage();
if (wrapped.getBody() instanceof Data) {
Data data = (Data) wrapped.getBody();
System.out.println("received : message: " + data.getValue().getLength());
assertEquals(PAYLOAD, data.getValue().getLength());
}
assertNotNull(message.getWrappedMessage().getMessageAnnotations());
assertNotNull(message.getWrappedMessage().getMessageAnnotations().getValue());
assertEquals(embeddedMap, message.getWrappedMessage().getMessageAnnotations().getValue().get(annotation));
message.accept();
session.close();
connection.close();
}
}
private AmqpMessage createAmqpMessage(byte value, int payloadSize) {
AmqpMessage message = new AmqpMessage();
byte[] payload = new byte[payloadSize];
for (int i = 0; i < payload.length; i++) {
payload[i] = value;
}
message.setBytes(payload);
return message;
}
}
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.9</version>
<version>5.15.10</version>
</parent>
<artifactId>activemq-blueprint</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.9</version>
<version>5.15.10</version>
</parent>
<artifactId>activemq-broker</artifactId>
......
......@@ -68,6 +68,8 @@ public class AnnotatedMBean extends StandardMBean {
}
}
private final ObjectName objectName;
private static byte byteFromProperty(String s) {
byte val = OFF;
String config = System.getProperty(s, "").toLowerCase(Locale.ENGLISH);
......@@ -88,7 +90,7 @@ public class AnnotatedMBean extends StandardMBean {
for (Class c : object.getClass().getInterfaces()) {
if (mbeanName.equals(c.getName())) {
context.registerMBean(new AnnotatedMBean(object, c), objectName);
context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
return;
}
}
......@@ -97,13 +99,15 @@ public class AnnotatedMBean extends StandardMBean {
}
/** Instance where the MBean interface is implemented by another object. */
public <T> AnnotatedMBean(T impl, Class<T> mbeanInterface) throws NotCompliantMBeanException {
public <T> AnnotatedMBean(T impl, Class<T> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
super(impl, mbeanInterface);
this.objectName = objectName;
}
/** Instance where the MBean interface is implemented by this object. */
protected AnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException {
protected AnnotatedMBean(Class<?> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
super(mbeanInterface);
this.objectName = objectName;
}
/** {@inheritDoc} */
......@@ -212,6 +216,7 @@ public class AnnotatedMBean extends StandardMBean {
entry = new JMXAuditLogEntry();
entry.setUser(caller);
entry.setTimestamp(System.currentTimeMillis());
entry.setTarget(extractTargetTypeProperty(objectName));
entry.setOperation(this.getMBeanInfo().getClassName() + "." + s);
try
......@@ -245,6 +250,21 @@ public class AnnotatedMBean extends StandardMBean {
return result;
}
// keep brokerName last b/c objectNames include the brokerName
final static String[] targetPropertiesCandidates = new String[] {"destinationName", "networkConnectorName", "connectorName", "connectionName", "brokerName"};
private String extractTargetTypeProperty(ObjectName objectName) {
String result = null;
for (String attr: targetPropertiesCandidates) {
try {
result = objectName.getKeyProperty(attr);
if (result != null) {
break;
}
} catch (NullPointerException ok) {}
}
return result;
}
private Method getMBeanMethod(Class clazz, String methodName, String[] signature) throws ReflectiveOperationException {
Class[] parameterTypes = new Class[signature.length];
for (int i = 0; i < signature.length; i++) {
......
......@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit;
import javax.management.MBeanException;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectInstance;
import javax.management.ObjectName;
import javax.management.ReflectionException;
......@@ -36,15 +37,15 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
private ExecutorService executor;
private long timeout = 0;
public <T> AsyncAnnotatedMBean(ExecutorService executor, long timeout, T impl, Class<T> mbeanInterface) throws NotCompliantMBeanException {
super(impl, mbeanInterface);
public <T> AsyncAnnotatedMBean(ExecutorService executor, long timeout, T impl, Class<T> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
super(impl, mbeanInterface, objectName);
this.executor = executor;
this.timeout = timeout;
}
protected AsyncAnnotatedMBean(Class<?> mbeanInterface) throws NotCompliantMBeanException {
super(mbeanInterface);
protected AsyncAnnotatedMBean(Class<?> mbeanInterface, ObjectName objectName) throws NotCompliantMBeanException {
super(mbeanInterface, objectName);
}
protected Object asyncInvole(String s, Object[] objects, String[] strings) throws MBeanException, ReflectionException {
......@@ -52,7 +53,7 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
}
@SuppressWarnings({ "unchecked", "rawtypes" })
public static void registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception {
public static ObjectInstance registerMBean(ExecutorService executor, long timeout, ManagementContext context, Object object, ObjectName objectName) throws Exception {
if (timeout < 0 && executor != null) {
throw new IllegalArgumentException("async timeout cannot be negative.");
......@@ -67,15 +68,14 @@ public class AsyncAnnotatedMBean extends AnnotatedMBean {
for (Class c : object.getClass().getInterfaces()) {
if (mbeanName.equals(c.getName())) {
if (timeout == 0) {
context.registerMBean(new AnnotatedMBean(object, c), objectName);
return context.registerMBean(new AnnotatedMBean(object, c, objectName), objectName);
} else {
context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c), objectName);
return context.registerMBean(new AsyncAnnotatedMBean(executor, timeout, object, c, objectName), objectName);
}
return;
}
}
context.registerMBean(object, objectName);
return context.registerMBean(object, objectName);
}
@Override
......
......@@ -96,7 +96,7 @@ public class ManagedRegionBroker extends RegionBroker {
private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>();
private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>();
private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>();
private final Set<ObjectName> registeredMBeans = new ConcurrentHashMap<>().newKeySet();
/* This is the first broker in the broker interceptor chain. */
private Broker contextBroker;
......@@ -324,8 +324,9 @@ public class ManagedRegionBroker extends RegionBroker {
}
}
try {
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
registeredMBeans.add(key);
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
registeredMBeans.add(key);
}
} catch (Throwable e) {
LOG.warn("Failed to register MBean {}", key);
LOG.debug("Failure reason: ", e);
......@@ -380,8 +381,9 @@ public class ManagedRegionBroker extends RegionBroker {
}
try {
AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key);
registeredMBeans.add(key);
if (AsyncAnnotatedMBean.registerMBean(asyncInvokeService, mbeanTimeout, managementContext, view, key) != null) {
registeredMBeans.add(key);
}
} catch (Throwable e) {
LOG.warn("Failed to register MBean {}", key);
LOG.debug("Failure reason: ", e);
......@@ -444,8 +446,9 @@ public class ManagedRegionBroker extends RegionBroker {
}