Skip to content
Commits on Source (7)
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.8</version>
<version>5.15.9</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.8</version>
<version>5.15.9</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.8</version>
<version>5.15.9</version>
</parent>
<artifactId>activemq-blueprint</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.8</version>
<version>5.15.9</version>
</parent>
<artifactId>activemq-broker</artifactId>
......
......@@ -138,8 +138,14 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
private boolean blocked;
private boolean connected;
private boolean active;
private final AtomicBoolean starting = new AtomicBoolean();
private final AtomicBoolean pendingStop = new AtomicBoolean();
// state management around pending stop
private static final int NEW = 0;
private static final int STARTING = 1;
private static final int STARTED = 2;
private static final int PENDING_STOP = 3;
private final AtomicInteger status = new AtomicInteger(NEW);
private long timeStamp;
private final AtomicBoolean stopping = new AtomicBoolean(false);
private final CountDownLatch stopped = new CountDownLatch(1);
......@@ -229,7 +235,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
public void serviceTransportException(IOException e) {
if (!stopping.get() && !pendingStop.get()) {
if (!stopping.get() && status.get() != PENDING_STOP) {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
TRANSPORTLOG.debug(this + " failed: " + e, e);
......@@ -308,7 +314,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
ConnectionError ce = new ConnectionError();
ce.setException(e);
if (pendingStop.get()) {
if (status.get() == PENDING_STOP) {
dispatchSync(ce);
} else {
dispatchAsync(ce);
......@@ -326,7 +332,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
boolean responseRequired = command.isResponseRequired();
int commandId = command.getCommandId();
try {
if (!pendingStop.get()) {
if (status.get() != PENDING_STOP) {
response = command.visit(this);
} else {
response = new ExceptionResponse(transportException.get());
......@@ -998,7 +1004,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
@Override
public boolean iterate() {
try {
if (pendingStop.get() || stopping.get()) {
if (status.get() == PENDING_STOP || stopping.get()) {
if (dispatchStopped.compareAndSet(false, true)) {
if (transportException.get() == null) {
try {
......@@ -1054,39 +1060,39 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
@Override
public void start() throws Exception {
try {
synchronized (this) {
starting.set(true);
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
} else {
taskRunner = null;
if (status.compareAndSet(NEW, STARTING)) {
try {
synchronized (this) {
if (taskRunnerFactory != null) {
taskRunner = taskRunnerFactory.createTaskRunner(this, "ActiveMQ Connection Dispatcher: "
+ getRemoteAddress());
} else {
taskRunner = null;
}
transport.start();
active = true;
BrokerInfo info = connector.getBrokerInfo().copy();
if (connector.isUpdateClusterClients()) {
info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
} else {
info.setPeerBrokerInfos(null);
}
dispatchAsync(info);
connector.onStarted(this);
}
transport.start();
active = true;
BrokerInfo info = connector.getBrokerInfo().copy();
if (connector.isUpdateClusterClients()) {
info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
} else {
info.setPeerBrokerInfos(null);
} catch (Exception e) {
// Force clean up on an error starting up.
status.set(PENDING_STOP);
throw e;
} finally {
// stop() can be called from within the above block,
// but we want to be sure start() completes before
// stop() runs, so queue the stop until right now:
if (!status.compareAndSet(STARTING, STARTED)) {
LOG.debug("Calling the delayed stop() after start() {}", this);
stop();
}
dispatchAsync(info);
connector.onStarted(this);
}
} catch (Exception e) {
// Force clean up on an error starting up.
pendingStop.set(true);
throw e;
} finally {
// stop() can be called from within the above block,
// but we want to be sure start() completes before
// stop() runs, so queue the stop until right now:
setStarting(false);
if (isPendingStop()) {
LOG.debug("Calling the delayed stop() after start() {}", this);
stop();
}
}
}
......@@ -1104,10 +1110,8 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void delayedStop(final int waitTime, final String reason, Throwable cause) {
if (waitTime > 0) {
synchronized (this) {
pendingStop.set(true);
transportException.set(cause);
}
status.compareAndSet(STARTING, PENDING_STOP);
transportException.set(cause);
try {
stopTaskRunnerFactory.execute(new Runnable() {
@Override
......@@ -1133,12 +1137,9 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
public void stopAsync() {
// If we're in the middle of starting then go no further... for now.
synchronized (this) {
pendingStop.set(true);
if (starting.get()) {
LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
return;
}
if (status.compareAndSet(STARTING, PENDING_STOP)) {
LOG.debug("stopAsync() called in the middle of start(). Delaying till start completes..");
return;
}
if (stopping.compareAndSet(false, true)) {
// Let all the connection contexts know we are shutting down
......@@ -1347,7 +1348,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
* @return true if the Connection is starting
*/
public boolean isStarting() {
return starting.get();
return status.get() == STARTING;
}
@Override
......@@ -1360,19 +1361,11 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
return this.faultTolerantConnection;
}
protected void setStarting(boolean starting) {
this.starting.set(starting);
}
/**
* @return true if the Connection needs to stop
*/
public boolean isPendingStop() {
return pendingStop.get();
}
protected void setPendingStop(boolean pendingStop) {
this.pendingStop.set(pendingStop);
return status.get() == PENDING_STOP;
}
private NetworkBridgeConfiguration getNetworkConfiguration(final BrokerInfo info) throws IOException {
......
......@@ -157,6 +157,21 @@ public class DestinationView implements DestinationViewMBean {
return destination.getDestinationStatistics().getProcessTime().getAverageTime();
}
@Override
public int getTempUsagePercentUsage() {
return destination.getTempUsage().getPercentUsage();
}
@Override
public long getTempUsageLimit() {
return destination.getTempUsage().getLimit();
}
@Override
public void setTempUsageLimit(long limit) {
destination.getTempUsage().setLimit(limit);
}
@Override
public long getMaxEnqueueTime() {
return destination.getDestinationStatistics().getProcessTime().getMaxTime();
......
......@@ -238,6 +238,24 @@ public interface DestinationViewMBean {
*/
void setMemoryLimit(long limit);
/**
* @return the percentage of amount of temp usage used
*/
@MBeanInfo("The percentage of the temp usage limit used")
int getTempUsagePercentUsage();
/**
* @return the amount of temp usage allocated to this destination
*/
@MBeanInfo("Temp usage limit, in bytes, assigned to this destination.")
long getTempUsageLimit();
/**
* set the amount of temp usage allocated to this destination
* @param limit the amount of temp usage allocated to this destination
*/
void setTempUsageLimit(long limit);
/**
* @return the portion of memory from the broker memory limit for this destination
*/
......
......@@ -16,16 +16,25 @@
*/
package org.apache.activemq.broker.jmx;
import java.util.concurrent.Callable;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.activemq.management.TimeStatisticImpl;
import org.apache.activemq.store.PersistenceAdapter;
import org.apache.activemq.store.PersistenceAdapterStatistics;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
public class PersistenceAdapterView implements PersistenceAdapterViewMBean {
private final static ObjectMapper mapper = new ObjectMapper();
private final String name;
private final PersistenceAdapter persistenceAdapter;
private Callable<String> inflightTransactionViewCallable;
private Callable<String> dataViewCallable;
private PersistenceAdapterStatistics persistenceAdapterStatistics;
public PersistenceAdapterView(PersistenceAdapter adapter) {
this.name = adapter.toString();
......@@ -52,6 +61,22 @@ public class PersistenceAdapterView implements PersistenceAdapterViewMBean {
return persistenceAdapter.size();
}
@Override
public String getStatistics() {
return serializePersistenceAdapterStatistics();
}
@Override
public String resetStatistics() {
final String result = serializePersistenceAdapterStatistics();
if (persistenceAdapterStatistics != null) {
persistenceAdapterStatistics.reset();
}
return result;
}
private String invoke(Callable<String> callable) {
String result = null;
if (callable != null) {
......@@ -64,6 +89,36 @@ public class PersistenceAdapterView implements PersistenceAdapterViewMBean {
return result;
}
private String serializePersistenceAdapterStatistics() {
if (persistenceAdapterStatistics != null) {
try {
Map<String, Object> result = new HashMap<String, Object>();
result.put("writeTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getWriteTime()));
result.put("readTime", getTimeStatisticAsMap(persistenceAdapterStatistics.getReadTime()));
return mapper.writeValueAsString(result);
} catch (IOException e) {
return e.toString();
}
}
return null;
}
private Map<String, Object> getTimeStatisticAsMap(final TimeStatisticImpl timeStatistic) {
Map<String, Object> result = new HashMap<String, Object>();
result.put("count", timeStatistic.getCount());
result.put("maxTime", timeStatistic.getMaxTime());
result.put("minTime", timeStatistic.getMinTime());
result.put("totalTime", timeStatistic.getTotalTime());
result.put("averageTime", timeStatistic.getAverageTime());
result.put("averageTimeExMinMax", timeStatistic.getAverageTimeExcludingMinMax());
result.put("averagePerSecond", timeStatistic.getAveragePerSecond());
result.put("averagePerSecondExMinMax", timeStatistic.getAveragePerSecondExcludingMinMax());
return result;
}
public void setDataViewCallable(Callable<String> dataViewCallable) {
this.dataViewCallable = dataViewCallable;
}
......@@ -71,4 +126,8 @@ public class PersistenceAdapterView implements PersistenceAdapterViewMBean {
public void setInflightTransactionViewCallable(Callable<String> inflightTransactionViewCallable) {
this.inflightTransactionViewCallable = inflightTransactionViewCallable;
}
public void setPersistenceAdapterStatistics(PersistenceAdapterStatistics persistenceAdapterStatistics) {
this.persistenceAdapterStatistics = persistenceAdapterStatistics;
}
}
......@@ -29,4 +29,10 @@ public interface PersistenceAdapterViewMBean {
@MBeanInfo("Current size.")
long getSize();
@MBeanInfo("Statistics related to the PersistentAdapter.")
String getStatistics();
@MBeanInfo("Resets statistics.")
String resetStatistics();
}
......@@ -297,6 +297,9 @@ public abstract class AbstractRegion implements Region {
}
}
destinationMap.unsynchronizedRemove(destination, dest);
if (dest instanceof Queue){
((Queue) dest).purge();
}
dispose(context, dest);
DestinationInterceptor destinationInterceptor = broker.getDestinationInterceptor();
if (destinationInterceptor != null) {
......
......@@ -42,6 +42,7 @@ import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.usage.Usage;
import org.slf4j.Logger;
......@@ -277,6 +278,11 @@ public abstract class BaseDestination implements Destination {
this.memoryUsage = memoryUsage;
}
@Override
public TempUsage getTempUsage() {
return systemUsage.getTempUsage();
}
@Override
public DestinationStatistics getDestinationStatistics() {
return destinationStatistics;
......
......@@ -33,6 +33,7 @@ import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.thread.Task;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.usage.Usage;
/**
......@@ -70,6 +71,8 @@ public interface Destination extends Service, Task, Message.MessageDestination {
void setMemoryUsage(MemoryUsage memoryUsage);
TempUsage getTempUsage();
void dispose(ConnectionContext context) throws IOException;
boolean isDisposed();
......
......@@ -32,6 +32,7 @@ import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.usage.TempUsage;
import org.apache.activemq.usage.Usage;
import org.apache.activemq.util.SubscriptionKey;
......@@ -122,6 +123,11 @@ public class DestinationFilter implements Destination {
next.setMemoryUsage(memoryUsage);
}
@Override
public TempUsage getTempUsage() {
return next.getTempUsage();
}
@Override
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception {
next.removeSubscription(context, sub, lastDeliveredSequenceId);
......
......@@ -987,6 +987,9 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
if (systemUsage.getStoreUsage() != null) {
systemUsage.getStoreUsage().start();
}
if (systemUsage.getTempUsage() != null) {
systemUsage.getTempUsage().start();
}
systemUsage.getMemoryUsage().addUsageListener(this);
messages.start();
if (getExpireMessagesPeriod() > 0) {
......
......@@ -177,6 +177,16 @@ public abstract class AbstractPendingMessageCursor implements PendingMessageCurs
return systemUsage != null ? (!isParentFull() && systemUsage.getMemoryUsage().getPercentUsage() < memoryUsageHighWaterMark) : true;
}
boolean parentHasSpace(int waterMark) {
boolean result = true;
if (systemUsage != null) {
if (systemUsage.getMemoryUsage().getParent() != null) {
return systemUsage.getMemoryUsage().getParent().getPercentUsage() <= waterMark;
}
}
return result;
}
private boolean isParentFull() {
boolean result = false;
if (systemUsage != null) {
......
......@@ -276,6 +276,12 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
return useCache && size==0 && hasSpace() && isStarted();
}
@Override
public boolean canRecoveryNextMessage() {
// Should be safe to recovery messages if the overall memory usage if < 90%
return parentHasSpace(90);
}
private void syncWithStore(Message currentAdd) throws Exception {
pruneLastCached();
for (ListIterator<MessageId> it = pendingCachedIds.listIterator(pendingCachedIds.size()); it.hasPrevious(); ) {
......
......@@ -120,6 +120,23 @@ public class BrokerDestinationView {
return destination.getMemoryUsage().getLimit();
}
/**
* Gets the temp usage as a percentage for this Destination.
*
* @return Gets the temp usage as a percentage for this Destination.
*/
public int getTempPercentUsage() {
return destination.getTempUsage().getPercentUsage();
}
/**
* Gets the temp usage limit in bytes.
*
* @return the temp usage limit in bytes.
*/
public long getTempUsageLimit() {
return destination.getTempUsage().getLimit();
}
/**
* @return the average time it takes to store a message on this destination (ms)
......
......@@ -26,6 +26,9 @@ public interface MessageRecoveryListener {
boolean recoverMessage(Message message) throws Exception;
boolean recoverMessageReference(MessageId ref) throws Exception;
boolean hasSpace();
default boolean canRecoveryNextMessage() {
return true;
}
/**
* check if ref is a duplicate but do not record the reference
* @param ref
......
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store;
import org.apache.activemq.management.StatsImpl;
import org.apache.activemq.management.TimeStatisticImpl;
public class PersistenceAdapterStatistics extends StatsImpl {
protected TimeStatisticImpl writeTime;
protected TimeStatisticImpl readTime;
public PersistenceAdapterStatistics() {
writeTime = new TimeStatisticImpl("writeTime", "Time to write data to the PersistentAdapter.");
readTime = new TimeStatisticImpl("readTime", "Time to read data from the PersistentAdapter.");
addStatistic("writeTime", writeTime);
addStatistic("readTime", readTime);
}
public void addWriteTime(final long time) {
writeTime.addTime(time);
}
public void addReadTime(final long time) {
readTime.addTime(time);
}
@Override
public void setEnabled(boolean enabled) {
super.setEnabled(enabled);
writeTime.setEnabled(enabled);
readTime.setEnabled(enabled);
}
public TimeStatisticImpl getWriteTime() {
return writeTime;
}
public TimeStatisticImpl getReadTime() { return readTime; }
@Override
public void reset() {
if (isDoReset()) {
writeTime.reset();
readTime.reset();
}
}
public void setParent(PersistenceAdapterStatistics parent) {
if (parent != null) {
writeTime.setParent(parent.writeTime);
readTime.setParent(parent.readTime);
} else {
writeTime.setParent(null);
readTime.setParent(null);
}
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import javax.jms.*;
import javax.management.ObjectName;
import static org.junit.Assert.assertEquals;
/**
* Tests to ensure when the temp usage limit is updated on the broker the queues also have their
* temp usage limits automatically updated.
*/
public class AMQ7085Test
{
private BrokerService brokerService;
private String testQueueName = "testAMQ7085Queue";
private ActiveMQQueue queue = new ActiveMQQueue(testQueueName);
@Before
public void setUp() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setUseJmx(true);
String connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
brokerService.start();
brokerService.waitUntilStarted();
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
final Connection conn = connectionFactory.createConnection();
try {
conn.start();
final Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination queue = session.createQueue(testQueueName);
final Message toSend = session.createMessage();
toSend.setStringProperty("foo", "bar");
final MessageProducer producer = session.createProducer(queue);
producer.send(queue, toSend);
} finally {
conn.close();
}
}
@After
public void tearDown() throws Exception {
brokerService.stop();
brokerService.waitUntilStopped();
}
@Test
public void testQueueTempUsageWhenSetExplicitly() throws Exception {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(
queueViewMBeanName, QueueViewMBean.class, true);
// Check that by default the queue's temp limit is the same as the broker's.
BrokerView brokerView = brokerService.getAdminView();
long brokerTempLimit = brokerView.getTempLimit();
assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit());
// Change the queue's temp limit independently of the broker's setting and check the broker's limit does not
// change.
long queueTempLimit = brokerTempLimit + 111;
queueViewMBean.setTempUsageLimit(queueTempLimit);
assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit);
assertEquals(brokerView.getTempLimit(), brokerTempLimit);
// Now increase the broker's temp limit. Since the queue's limit was explicitly changed it should remain
// unchanged.
long newBrokerTempLimit = brokerTempLimit + 555;
brokerView.setTempLimit(newBrokerTempLimit);
assertEquals(brokerView.getTempLimit(), newBrokerTempLimit);
assertEquals(queueViewMBean.getTempUsageLimit(), queueTempLimit);
}
@Test
public void testQueueTempUsageWhenBrokerTempUsageUpdated() throws Exception {
ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + queue.getQueueName());
QueueViewMBean queueViewMBean = (QueueViewMBean) brokerService.getManagementContext().newProxyInstance(
queueViewMBeanName, QueueViewMBean.class, true);
// Check that by default the queue's temp limit is the same as the broker's.
BrokerView brokerView = brokerService.getAdminView();
long brokerTempLimit = brokerView.getTempLimit();
assertEquals(brokerTempLimit, queueViewMBean.getTempUsageLimit());
// Increase the broker's temp limit and check the queue's limit is updated to the same value.
long newBrokerTempLimit = brokerTempLimit + 555;
brokerView.setTempLimit(newBrokerTempLimit);
assertEquals(brokerView.getTempLimit(), newBrokerTempLimit);
assertEquals(queueViewMBean.getTempUsageLimit(), newBrokerTempLimit);
}
}