Skip to content

Commits on Source 11

......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-all</artifactId>
......@@ -115,7 +115,7 @@
<include>org.fusesource.hawtbuf:hawtbuf</include>
<include>org.jasypt:jasypt</include>
<include>org.apache.geronimo.specs:geronimo-jms_1.1_spec</include>
<include>org.apache.geronimo.specs:geronimo-jta_1.0.1B_spec</include>
<include>org.apache.geronimo.specs:geronimo-jta_1.1_spec</include>
<include>org.apache.geronimo.specs:geronimo-j2ee-management_1.1_spec</include>
<include>org.apache.geronimo.specs:geronimo-annotation_1.0_spec</include>
<include>org.slf4j:slf4j-api</include>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......
......@@ -52,6 +52,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import javax.jms.JMSException;
......@@ -67,7 +68,11 @@ import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQStreamMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.transport.amqp.AmqpProtocolException;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.TypeConversionSupport;
......@@ -333,6 +338,15 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
apMap = new HashMap<>();
}
apMap.put(key, value);
int messageType = message.getDataStructureType();
if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
// Type of command to recognize advisory message
Object data = message.getDataStructure();
if(data != null) {
apMap.put("ActiveMqDataStructureType", data.getClass().getSimpleName());
}
}
}
final AmqpWritableBuffer buffer = new AmqpWritableBuffer();
......@@ -376,7 +390,39 @@ public class JMSMappingOutboundTransformer implements OutboundTransformer {
int messageType = message.getDataStructureType();
if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
if (messageType == CommandTypes.ACTIVEMQ_MESSAGE) {
Object data = message.getDataStructure();
if (data instanceof ConnectionInfo) {
ConnectionInfo connectionInfo = (ConnectionInfo)data;
final HashMap<String, Object> connectionMap = new LinkedHashMap<String, Object>();
connectionMap.put("ConnectionId", connectionInfo.getConnectionId().getValue());
connectionMap.put("ClientId", connectionInfo.getClientId());
connectionMap.put("ClientIp", connectionInfo.getClientIp());
connectionMap.put("UserName", connectionInfo.getUserName());
connectionMap.put("BrokerMasterConnector", connectionInfo.isBrokerMasterConnector());
connectionMap.put("Manageable", connectionInfo.isManageable());
connectionMap.put("ClientMaster", connectionInfo.isClientMaster());
connectionMap.put("FaultTolerant", connectionInfo.isFaultTolerant());
connectionMap.put("FailoverReconnect", connectionInfo.isFailoverReconnect());
body = new AmqpValue(connectionMap);
} else if (data instanceof RemoveInfo) {
RemoveInfo removeInfo = (RemoveInfo)message.getDataStructure();
final HashMap<String, Object> removeMap = new LinkedHashMap<String, Object>();
if (removeInfo.isConnectionRemove()) {
removeMap.put(ConnectionId.class.getSimpleName(), ((ConnectionId)removeInfo.getObjectId()).getValue());
} else if (removeInfo.isConsumerRemove()) {
removeMap.put(ConsumerId.class.getSimpleName(), ((ConsumerId)removeInfo.getObjectId()).getValue());
removeMap.put("SessionId", ((ConsumerId)removeInfo.getObjectId()).getSessionId());
removeMap.put("ConnectionId", ((ConsumerId)removeInfo.getObjectId()).getConnectionId());
removeMap.put("ParentId", ((ConsumerId)removeInfo.getObjectId()).getParentId());
}
body = new AmqpValue(removeMap);
}
} else if (messageType == CommandTypes.ACTIVEMQ_BYTES_MESSAGE) {
Binary payload = getBinaryFromMessageBody((ActiveMQBytesMessage) message);
if (payload == null) {
......
......@@ -39,6 +39,7 @@ import java.io.ObjectInputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
......@@ -57,6 +58,9 @@ import org.apache.activemq.command.ActiveMQTempQueue;
import org.apache.activemq.command.ActiveMQTempTopic;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.RemoveInfo;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
......@@ -772,6 +776,82 @@ public class JMSMappingOutboundTransformerTest {
assertEquals(contentString, contents);
}
@Test
public void testConvertConnectionInfo() throws Exception {
String connectionId = "myConnectionId";
String clientId = "myClientId";
ConnectionInfo dataStructure = new ConnectionInfo();
dataStructure.setConnectionId(new ConnectionId(connectionId));
dataStructure.setClientId(clientId);
ActiveMQMessage outbound = createMessage();
Map<String, String> properties = new HashMap<String, String>();
properties.put("originUrl", "localhost");
outbound.setProperties(properties);
outbound.setDataStructure(dataStructure);
outbound.onSend();
outbound.storeContent();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
EncodedMessage encoded = transformer.transform(outbound);
assertNotNull(encoded);
Message amqp = encoded.decode();
assertNotNull(amqp.getApplicationProperties());
Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
assertEquals(ConnectionInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType"));
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
assertTrue(clientId.equals(amqpMap.get("ClientId")));
}
@Test
public void testConvertRemoveInfo() throws Exception {
String connectionId = "myConnectionId";
RemoveInfo dataStructure = new RemoveInfo(new ConnectionId(connectionId));
ActiveMQMessage outbound = createMessage();
Map<String, String> properties = new HashMap<String, String>();
properties.put("originUrl", "localhost");
outbound.setProperties(properties);
outbound.setDataStructure(dataStructure);
outbound.onSend();
outbound.storeContent();
JMSMappingOutboundTransformer transformer = new JMSMappingOutboundTransformer();
EncodedMessage encoded = transformer.transform(outbound);
assertNotNull(encoded);
Message amqp = encoded.decode();
assertNotNull(amqp.getApplicationProperties());
Map<String, Object> apMap = amqp.getApplicationProperties().getValue();
assertEquals(RemoveInfo.class.getSimpleName(), apMap.get("ActiveMqDataStructureType"));
assertNotNull(amqp.getBody());
assertTrue(amqp.getBody() instanceof AmqpValue);
assertTrue(((AmqpValue) amqp.getBody()).getValue() instanceof Map);
@SuppressWarnings("unchecked")
Map<Object, Object> amqpMap = (Map<Object, Object>) ((AmqpValue) amqp.getBody()).getValue();
assertTrue(connectionId.equals(amqpMap.get("ConnectionId")));
}
//----- Test JMSDestination Handling -------------------------------------//
@Test
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-blueprint</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-broker</artifactId>
......
......@@ -175,7 +175,7 @@ public class BrokerService implements Service {
private TaskRunnerFactory persistenceTaskRunnerFactory;
private SystemUsage systemUsage;
private SystemUsage producerSystemUsage;
private SystemUsage consumerSystemUsaage;
private SystemUsage consumerSystemUsage;
private PersistenceAdapter persistenceAdapter;
private PersistenceAdapterFactory persistenceFactory;
protected DestinationFactory destinationFactory;
......@@ -1207,29 +1207,29 @@ public class BrokerService implements Service {
* @throws IOException
*/
public SystemUsage getConsumerSystemUsage() throws IOException {
if (this.consumerSystemUsaage == null) {
if (this.consumerSystemUsage == null) {
if (splitSystemUsageForProducersConsumers) {
this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
this.consumerSystemUsage = new SystemUsage(getSystemUsage(), "Consumer");
float portion = consumerSystemUsagePortion / 100f;
this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
addService(this.consumerSystemUsaage);
this.consumerSystemUsage.getMemoryUsage().setUsagePortion(portion);
addService(this.consumerSystemUsage);
} else {
consumerSystemUsaage = getSystemUsage();
consumerSystemUsage = getSystemUsage();
}
}
return this.consumerSystemUsaage;
return this.consumerSystemUsage;
}
/**
* @param consumerSystemUsaage
* @param consumerSystemUsage
* the storeSystemUsage to set
*/
public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
if (this.consumerSystemUsaage != null) {
removeService(this.consumerSystemUsaage);
public void setConsumerSystemUsage(SystemUsage consumerSystemUsage) {
if (this.consumerSystemUsage != null) {
removeService(this.consumerSystemUsage);
}
this.consumerSystemUsaage = consumerSystemUsaage;
addService(this.consumerSystemUsaage);
this.consumerSystemUsage = consumerSystemUsage;
addService(this.consumerSystemUsage);
}
/**
......
......@@ -240,10 +240,10 @@ public class TransportConnector implements Connector, BrokerServiceAware {
private void onAcceptError(Exception error, String remoteHost) {
if (brokerService != null && brokerService.isStopping()) {
LOG.info("Could not accept connection during shutdown {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error);
LOG.info("Could not accept connection during shutdown {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error.getLocalizedMessage());
} else {
LOG.error("Could not accept connection {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error);
LOG.debug("Reason: " + error, error);
LOG.warn("Could not accept connection {} : {}", (remoteHost == null ? "" : "from " + remoteHost), error.getLocalizedMessage());
LOG.debug("Reason: " + error.getMessage(), error);
}
}
});
......
......@@ -131,8 +131,10 @@ public class SubQueueSelectorCacheBroker extends BrokerFilter implements Runnabl
@Override
public Subscription addConsumer(ConnectionContext context, ConsumerInfo info) throws Exception {
// don't track selectors for advisory topics or temp destinations
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination()) && !info.getDestination().isTemporary()) {
// don't track selectors for advisory topics, temp destinations or console
// related consumers
if (!AdvisorySupport.isAdvisoryTopic(info.getDestination()) && !info.getDestination().isTemporary()
&& !info.isBrowser()) {
String destinationName = info.getDestination().getQualifiedName();
LOG.debug("Caching consumer selector [{}] on '{}'", info.getSelector(), destinationName);
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-camel</artifactId>
......
......@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-cf</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-client</artifactId>
......
......@@ -1377,7 +1377,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
private void forceCloseOnSecurityException(Throwable exception) {
LOG.trace("force close on security exception:" + this + ", transport=" + transport, exception);
LOG.trace("force close on security exception:{}, transport={}", this, transport, exception);
onException(new IOException("Force close due to SecurityException on connect", exception));
}
......@@ -1941,8 +1941,8 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
}
});
} else {
LOG.debug("Async client internal exception occurred with no exception listener registered: "
+ error, error);
LOG.debug("Async client internal exception occurred with no exception listener registered: {}",
error, error);
}
}
}
......@@ -1969,7 +1969,7 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
});
} else {
LOG.debug("Async exception with no exception listener: " + error, error);
LOG.debug("Async exception with no exception listener: {}", error, error);
}
}
}
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-console</artifactId>
......@@ -125,7 +125,7 @@
</dependency>
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity</artifactId>
<artifactId>velocity-engine-core</artifactId>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.10</version>
<version>5.15.11</version>
</parent>
<artifactId>activemq-http</artifactId>
......
......@@ -201,7 +201,7 @@ public class HttpClientTransport extends HttpTransportSupport {
}
stream.close();
}
} catch (IOException e) {
} catch (Exception e) { // handle RuntimeException from unmarshal
onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e));
break;
} finally {
......@@ -414,4 +414,10 @@ public class HttpClientTransport extends HttpTransportSupport {
public WireFormat getWireFormat() {
return getTextWireFormat();
}
@Override
protected String getSystemPropertyPrefix() {
return "http.";
}
}
......@@ -48,9 +48,11 @@ public class HttpTransportFactory extends TransportFactory {
Map<String, Object> jettyOptions = IntrospectionSupport.extractProperties(options, "jetty.");
Map<String, Object> httpOptions = IntrospectionSupport.extractProperties(options, "http.");
Map<String, Object> transportOptions = IntrospectionSupport.extractProperties(options, "transport.");
Map<String, Object> wireFormatOptions = IntrospectionSupport.extractProperties(options, "wireFormat.");
result.setJettyOptions(jettyOptions);
result.setTransportOption(transportOptions);
result.setHttpOptions(httpOptions);
result.setWireFormatOptions(wireFormatOptions);
return result;
} catch (URISyntaxException e) {
throw IOExceptionSupport.create(e);
......
......@@ -18,6 +18,7 @@ package org.apache.activemq.transport.http;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.command.BrokerInfo;
......@@ -38,6 +39,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
private TextWireFormat wireFormat;
private final HttpTransportFactory transportFactory;
private Map<String, Object> wireFormatOptions = new HashMap<>();
public HttpTransportServer(URI uri, HttpTransportFactory factory) {
super(uri);
......@@ -93,6 +95,7 @@ public class HttpTransportServer extends WebTransportServerSupport {
contextHandler.setAttribute("wireFormat", getWireFormat());
contextHandler.setAttribute("transportFactory", transportFactory);
contextHandler.setAttribute("transportOptions", transportOptions);
contextHandler.setAttribute("wireFormatOptions", wireFormatOptions);
//AMQ-6182 - disabling trace by default
configureTraceMethod((ConstraintSecurityHandler) contextHandler.getSecurityHandler(),
......@@ -171,6 +174,10 @@ public class HttpTransportServer extends WebTransportServerSupport {
super.setTransportOption(transportOptions);
}
public void setWireFormatOptions(Map<String, Object> wireFormatOptions) {
this.wireFormatOptions = wireFormatOptions;
}
@Override
public boolean isSslServer() {
return false;
......
......@@ -27,10 +27,16 @@ import org.apache.activemq.transport.util.TextWireFormat;
*
*/
public abstract class HttpTransportSupport extends TransportThreadSupport {
private static final int DEFAULT_PROXY_PORT = 8080;
private static final String PROPERTY_PROXY_HOST = "proxyHost";
private static final String PROPERTY_PROXY_PORT = "proxyPort";
private static final String PROPERTY_PROXY_USER = "proxyUser";
private static final String PROPERTY_PROXY_PASSWORD = "proxyPassword";
private TextWireFormat textWireFormat;
private URI remoteUrl;
private String proxyHost;
private int proxyPort = 8080;
private Integer proxyPort;
private String proxyUser;
private String proxyPassword;
......@@ -62,7 +68,7 @@ public abstract class HttpTransportSupport extends TransportThreadSupport {
}
public String getProxyHost() {
return proxyHost;
return proxyHost != null ? proxyHost : getSystemProperty(PROPERTY_PROXY_HOST);
}
public void setProxyHost(String proxyHost) {
......@@ -70,7 +76,9 @@ public abstract class HttpTransportSupport extends TransportThreadSupport {
}
public int getProxyPort() {
return proxyPort;
return proxyPort != null ? proxyPort
: (getSystemProperty(PROPERTY_PROXY_PORT) != null
? Integer.parseInt(getSystemProperty(PROPERTY_PROXY_PORT)) : DEFAULT_PROXY_PORT);
}
public void setProxyPort(int proxyPort) {
......@@ -78,7 +86,7 @@ public abstract class HttpTransportSupport extends TransportThreadSupport {
}
public String getProxyUser() {
return proxyUser;
return proxyUser != null ? proxyUser : getSystemProperty(PROPERTY_PROXY_USER);
}
public void setProxyUser(String proxyUser) {
......@@ -86,10 +94,17 @@ public abstract class HttpTransportSupport extends TransportThreadSupport {
}
public String getProxyPassword() {
return proxyPassword;
return proxyPassword != null ? proxyPassword : getSystemProperty(PROPERTY_PROXY_PASSWORD);
}
public void setProxyPassword(String proxyPassword) {
this.proxyPassword = proxyPassword;
}
protected abstract String getSystemPropertyPrefix();
private String getSystemProperty(String propertyName) {
return System.getProperty(getSystemPropertyPrefix() + propertyName);
}
}
......@@ -60,6 +60,7 @@ public class HttpTunnelServlet extends HttpServlet {
private ConcurrentMap<String, BlockingQueueTransport> clients = new ConcurrentHashMap<String, BlockingQueueTransport>();
private final long requestTimeout = 30000L;
private HashMap<String, Object> transportOptions;
private HashMap<String, Object> wireFormatOptions;
@SuppressWarnings("unchecked")
@Override
......@@ -74,6 +75,7 @@ public class HttpTunnelServlet extends HttpServlet {
throw new ServletException("No such attribute 'transportFactory' available in the ServletContext");
}
transportOptions = (HashMap<String, Object>)getServletContext().getAttribute("transportOptions");
wireFormatOptions = (HashMap<String, Object>)getServletContext().getAttribute("wireFormatOptions");
wireFormat = (TextWireFormat)getServletContext().getAttribute("wireFormat");
if (wireFormat == null) {
wireFormat = createWireFormat();
......@@ -118,6 +120,10 @@ public class HttpTunnelServlet extends HttpServlet {
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
if (wireFormatOptions.get("maxFrameSize") != null && request.getContentLength() > Integer.parseInt(wireFormatOptions.get("maxFrameSize").toString())) {
throw new ServletException("maxFrameSize exceeded");
}
InputStream stream = request.getInputStream();
String contentType = request.getContentType();
if (contentType != null && contentType.equals("application/x-gzip")) {
......@@ -197,7 +203,7 @@ public class HttpTunnelServlet extends HttpServlet {
// Record the client's transport and ensure that it has not already registered; this is thread-safe and only allows one
// thread to register the client
if (clients.putIfAbsent(clientID, answer) != null) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for clientID '" + clientID + "' has already been established");
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "A session for the given clientID has already been established");
LOG.warn("A session for clientID '" + clientID + "' has already been established");
return null;
}
......@@ -237,7 +243,7 @@ public class HttpTunnelServlet extends HttpServlet {
// Ensure that the transport was not prematurely disposed.
if (transport.isDisposed()) {
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "The session for clientID '" + clientID + "' was prematurely disposed");
response.sendError(HttpServletResponse.SC_BAD_REQUEST, "The session for the given clientID was prematurely disposed");
LOG.warn("The session for clientID '" + clientID + "' was prematurely disposed");
return null;
}
......