Commit 83c9ec26 authored by Markus Koschany's avatar Markus Koschany

New upstream version 5.15.8

parent 50714d8e
......@@ -14,7 +14,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-all</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-amqp</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-blueprint</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-broker</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-camel</artifactId>
......
......@@ -17,18 +17,17 @@
package org.apache.activemq.camel.component;
import java.net.URISyntaxException;
import java.util.*;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.jms.Connection;
import org.apache.activemq.EnhancedConnection;
import org.apache.activemq.Service;
import org.apache.activemq.advisory.DestinationSource;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.camel.CamelContext;
import org.apache.camel.ComponentConfiguration;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.component.jms.JmsConfiguration;
import org.apache.camel.spi.EndpointCompleter;
import org.apache.camel.util.IntrospectionSupport;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
......@@ -37,23 +36,20 @@ import org.slf4j.LoggerFactory;
import org.springframework.jms.connection.SingleConnectionFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Connection;
/**
* The <a href="http://activemq.apache.org/camel/activemq.html">ActiveMQ Component</a>
*/
public class ActiveMQComponent extends JmsComponent implements EndpointCompleter {
public class ActiveMQComponent extends JmsComponent {
private final CopyOnWriteArrayList<SingleConnectionFactory> singleConnectionFactoryList =
new CopyOnWriteArrayList<SingleConnectionFactory>();
new CopyOnWriteArrayList<SingleConnectionFactory>();
private final CopyOnWriteArrayList<Service> pooledConnectionFactoryServiceList =
new CopyOnWriteArrayList<Service>();
new CopyOnWriteArrayList<Service>();
private static final transient Logger LOG = LoggerFactory.getLogger(ActiveMQComponent.class);
private boolean exposeAllQueues;
private CamelEndpointLoader endpointLoader;
private EnhancedConnection connection;
DestinationSource source;
boolean sourceInitialized = false;
/**
* Creates an <a href="http://camel.apache.org/activemq.html">ActiveMQ Component</a>
......@@ -103,14 +99,6 @@ public class ActiveMQComponent extends JmsComponent implements EndpointCompleter
}
}
/**
* @deprecated - use JmsComponent#setUsername(String)
* @see JmsComponent#setUsername(String)
*/
public void setUserName(String userName) {
setUsername(userName);
}
public void setTrustAllPackages(boolean trustAllPackages) {
if (getConfiguration() instanceof ActiveMQConfiguration) {
((ActiveMQConfiguration)getConfiguration()).setTrustAllPackages(trustAllPackages);
......@@ -258,34 +246,4 @@ public class ActiveMQComponent extends JmsComponent implements EndpointCompleter
return answer;
}
@Override
public List<String> completeEndpointPath(ComponentConfiguration componentConfiguration, String completionText) {
// try to initialize destination source only the first time
if (!sourceInitialized) {
createDestinationSource();
sourceInitialized = true;
}
ArrayList<String> answer = new ArrayList<String>();
if (source != null) {
Set candidates = source.getQueues();
String destinationName = completionText;
if (completionText.startsWith("topic:")) {
candidates = source.getTopics();
destinationName = completionText.substring(6);
} else if (completionText.startsWith("queue:")) {
destinationName = completionText.substring(6);
}
Iterator it = candidates.iterator();
while (it.hasNext()) {
ActiveMQDestination destination = (ActiveMQDestination) it.next();
if (destination.getPhysicalName().startsWith(destinationName)) {
answer.add(destination.getPhysicalName());
}
}
}
return answer;
}
}
......@@ -66,29 +66,6 @@ public class AutoExposeQueuesInCamelTest extends EmbeddedBrokerTestSupport {
assertEquals("Should have found an endpoint: "+ endpoints, 2, endpoints.size());
}
public void testCompleter() throws Exception {
Thread.sleep(1000);
List<String> result = component.completeEndpointPath(null, "foo");
assertThat(result, is(Arrays.asList("foo.bar")));
result = component.completeEndpointPath(null, "queue:foo");
assertThat(result, is(Arrays.asList("foo.bar")));
result = component.completeEndpointPath(null, "topic:ch");
assertThat(result, is(Arrays.asList("cheese")));
result = component.completeEndpointPath(null, "ch");
assertTrue(result.isEmpty());
result = component.completeEndpointPath(null, "queue:ch");
assertTrue(result.isEmpty());
result = component.completeEndpointPath(null, "topic:foo");
assertTrue(result.isEmpty());
broker.getAdminView().addQueue("runtime");
Thread.sleep(1000);
result = component.completeEndpointPath(null, "run");
assertThat(result, is(Arrays.asList("runtime")));
}
public <T> List<T> getEndpoints(CamelContext camelContext, Class<T> type) {
List<T> answer = new ArrayList<T>();
Collection<Endpoint> endpoints = camelContext.getEndpoints();
......
......@@ -24,7 +24,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-cf</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-client</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-console</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-http</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-itests-spring31</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-jaas</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-jdbc-store</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-jms-pool</artifactId>
......
......@@ -22,7 +22,7 @@
<parent>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-parent</artifactId>
<version>5.15.7</version>
<version>5.15.8</version>
</parent>
<artifactId>activemq-kahadb-store</artifactId>
......
......@@ -2346,7 +2346,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
ListIndex<String, Location> subLocations;
// Transient data used to track which Messages are no longer needed.
final TreeMap<Long, Long> messageReferences = new TreeMap<>();
final HashSet<String> subscriptionCache = new LinkedHashSet<>();
public void trackPendingAdd(Long seq) {
......@@ -2616,30 +2615,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
// Configure the message references index
Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
while (subscriptions.hasNext()) {
Entry<String, SequenceSet> subscription = subscriptions.next();
SequenceSet pendingAcks = subscription.getValue();
if (pendingAcks != null && !pendingAcks.isEmpty()) {
Long lastPendingAck = pendingAcks.getTail().getLast();
for (Long sequenceId : pendingAcks) {
Long current = rc.messageReferences.get(sequenceId);
if (current == null) {
current = new Long(0);
}
// We always add a trailing empty entry for the next position to start from
// so we need to ensure we don't count that as a message reference on reload.
if (!sequenceId.equals(lastPendingAck)) {
current = current.longValue() + 1;
} else {
current = Long.valueOf(0L);
}
rc.messageReferences.put(sequenceId, current);
}
}
}
// Configure the subscription cache
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
......@@ -2658,10 +2634,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
} else {
// update based on ackPositions for unmatched, last entry is always the next
if (!rc.messageReferences.isEmpty()) {
Long nextMessageId = (Long) rc.messageReferences.keySet().toArray()[rc.messageReferences.size() - 1];
rc.orderIndex.nextMessageId =
Math.max(rc.orderIndex.nextMessageId, nextMessageId);
Iterator<Entry<String, SequenceSet>> subscriptions = rc.ackPositions.iterator(tx);
while (subscriptions.hasNext()) {
Entry<String, SequenceSet> subscription = subscriptions.next();
SequenceSet pendingAcks = subscription.getValue();
if (pendingAcks != null && !pendingAcks.isEmpty()) {
for (Long sequenceId : pendingAcks) {
rc.orderIndex.nextMessageId = Math.max(rc.orderIndex.nextMessageId, sequenceId);
}
}
}
}
}
......@@ -2865,13 +2846,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sequences.add(messageSequence);
sd.ackPositions.put(tx, subscriptionKey, sequences);
}
Long count = sd.messageReferences.get(messageSequence);
if (count == null) {
count = Long.valueOf(0L);
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
}
// new sub is interested in potentially all existing messages
......@@ -2885,18 +2859,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
sd.ackPositions.put(tx, subscriptionKey, allOutstanding);
for (Long ackPosition : allOutstanding) {
Long count = sd.messageReferences.get(ackPosition);
// There might not be a reference if the ackLocation was the last
// one which is a placeholder for the next incoming message and
// no value was added to the message references table.
if (count != null) {
count = count.longValue() + 1;
sd.messageReferences.put(ackPosition, count);
}
}
}
// on a new message add, all existing subs are interested in this message
......@@ -2914,16 +2876,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
MessageKeys key = sd.orderIndex.get(tx, messageSequence);
incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey,
key.location.getSize());
Long count = sd.messageReferences.get(messageSequence);
if (count == null) {
count = Long.valueOf(0L);
}
count = count.longValue() + 1;
sd.messageReferences.put(messageSequence, count);
sd.messageReferences.put(messageSequence + 1, Long.valueOf(0L));
incrementAndAddSizeToStoreStat(kahaDest, subscriptionKey, key.location.getSize());
}
}
......@@ -2938,16 +2891,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
ArrayList<Long> unreferenced = new ArrayList<>();
for(Long sequenceId : sequences) {
Long references = sd.messageReferences.get(sequenceId);
if (references != null) {
references = references.longValue() - 1;
if (references.longValue() > 0) {
sd.messageReferences.put(sequenceId, references);
} else {
sd.messageReferences.remove(sequenceId);
unreferenced.add(sequenceId);
}
if(!isSequenceReferenced(tx, sd, sequenceId)) {
unreferenced.add(sequenceId);
}
}
......@@ -2967,6 +2912,16 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
private boolean isSequenceReferenced(final Transaction tx, final StoredDestination sd, final Long sequenceId) throws IOException {
for(String subscriptionKey : sd.subscriptionCache) {
SequenceSet sequence = sd.ackPositions.get(tx, subscriptionKey);
if (sequence != null && sequence.contains(sequenceId)) {
return true;
}
}
return false;
}
/**
* @param tx
* @param sd
......@@ -2993,17 +2948,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
key.location.getSize());
// Check if the message is reference by any other subscription.
Long count = sd.messageReferences.get(messageSequence);
if (count != null) {
long references = count.longValue() - 1;
if (references > 0) {
sd.messageReferences.put(messageSequence, Long.valueOf(references));
return;
} else {
sd.messageReferences.remove(messageSequence);
}
if (isSequenceReferenced(tx, sd, messageSequence)) {
return;
}
// Find all the entries that need to get deleted.
ArrayList<Entry<Long, MessageKeys>> deletes = new ArrayList<>();
sd.orderIndex.getDeleteList(tx, deletes, messageSequence);
......
......@@ -135,7 +135,10 @@ public class PageFile {
// Keeps track of free pages.
private final AtomicLong nextFreePageId = new AtomicLong();
private SequenceSet freeList = new SequenceSet();
private AtomicReference<SequenceSet> recoveredFreeList = new AtomicReference<SequenceSet>();
private AtomicReference<SequenceSet> trackingFreeDuringRecovery = new AtomicReference<SequenceSet>();
private final AtomicLong nextTxid = new AtomicLong();
// Persistent settings stored in the page file.
......@@ -146,8 +149,6 @@ public class PageFile {
private boolean useLFRUEviction = false;
private float LFUEvictionFactor = 0.2f;
private boolean needsFreePageRecovery = false;
/**
* Use to keep track of updated pages which have not yet been committed.
*/
......@@ -412,7 +413,7 @@ public class PageFile {
} else {
LOG.debug(toString() + ", Recovering page file...");
nextTxid.set(redoRecoveryUpdates());
needsFreePageRecovery = true;
trackingFreeDuringRecovery.set(new SequenceSet());
}
if (writeFile.length() < PAGE_FILE_HEADER_SIZE) {
......@@ -424,20 +425,20 @@ public class PageFile {
storeMetaData();
getFreeFile().delete();
startWriter();
if (needsFreePageRecovery) {
asyncFreePageRecovery();
if (trackingFreeDuringRecovery.get() != null) {
asyncFreePageRecovery(nextFreePageId.get());
}
} else {
throw new IllegalStateException("Cannot load the page file when it is already loaded.");
}
}
private void asyncFreePageRecovery() {
private void asyncFreePageRecovery(final long lastRecoveryPage) {
Thread thread = new Thread("KahaDB Index Free Page Recovery") {
@Override
public void run() {
try {
recoverFreePages();
recoverFreePages(lastRecoveryPage);
} catch (Throwable e) {
if (loaded.get()) {
LOG.warn("Error recovering index free page list", e);
......@@ -450,7 +451,7 @@ public class PageFile {
thread.start();
}
private void recoverFreePages() throws Exception {
private void recoverFreePages(final long lastRecoveryPage) throws Exception {
LOG.info(toString() + ". Recovering pageFile free list due to prior unclean shutdown..");
SequenceSet newFreePages = new SequenceSet();
// need new pageFile instance to get unshared readFile
......@@ -459,6 +460,11 @@ public class PageFile {
try {
for (Iterator<Page> i = new Transaction(recoveryPageFile).iterator(true); i.hasNext(); ) {
Page page = i.next();
if (page.getPageId() >= lastRecoveryPage) {
break;
}
if (page.getType() == Page.PAGE_FREE_TYPE) {
newFreePages.add(page.getPageId());
}
......@@ -473,8 +479,6 @@ public class PageFile {
// allow flush (with index lock held) to merge eventually
recoveredFreeList.lazySet(newFreePages);
}
// all set for clean shutdown
needsFreePageRecovery = false;
}
private void loadForRecovery(long nextFreePageIdSnap) throws Exception {
......@@ -513,7 +517,7 @@ public class PageFile {
}
metaData.setLastTxId(nextTxid.get() - 1);
if (needsFreePageRecovery) {
if (trackingFreeDuringRecovery.get() != null) {
// async recovery incomplete, will have to try again
metaData.setCleanShutdown(false);
} else {
......@@ -562,14 +566,16 @@ public class PageFile {
throw new IOException("Page file already stopped: checkpointing is not allowed");
}
SequenceSet toMerge = recoveredFreeList.get();
if (toMerge != null) {
SequenceSet recovered = recoveredFreeList.get();
if (recovered != null) {
recoveredFreeList.lazySet(null);
Sequence seq = toMerge.getHead();
while (seq != null) {
freeList.add(seq);
seq = seq.getNext();
}
SequenceSet inUse = trackingFreeDuringRecovery.get();
recovered.remove(inUse);
freeList.merge(recovered);
// all set for clean shutdown
trackingFreeDuringRecovery.set(null);
inUse.clear();
}
// Setup a latch that gets notified when all buffered writes hits the disk.
......@@ -817,6 +823,9 @@ public class PageFile {
return toOffset(nextFreePageId.get());
}
public boolean isFreePage(long pageId) {
return freeList.contains(pageId);
}
/**
* @return the number of pages allocated in the PageFile
*/
......@@ -953,6 +962,11 @@ public class PageFile {
public void freePage(long pageId) {
freeList.add(pageId);
removeFromCache(pageId);
SequenceSet trackFreeDuringRecovery = trackingFreeDuringRecovery.get();
if (trackFreeDuringRecovery != null) {
trackFreeDuringRecovery.add(pageId);
}
}
@SuppressWarnings("unchecked")
......
......@@ -670,6 +670,8 @@ public class Transaction implements Iterable<Page> {
allocateList.clear();
writes.clear();
writeTransactionId = -1;
} else {
freePages(allocateList);
}
size = 0;
}
......@@ -692,6 +694,8 @@ public class Transaction implements Iterable<Page> {
allocateList.clear();
writes.clear();
writeTransactionId = -1;
} else {
freePages(allocateList);
}
size = 0;
}
......
......@@ -94,6 +94,30 @@ public class SequenceSet extends LinkedNodeList<Sequence> implements Iterable<Lo
}
}
public void merge(SequenceSet sequenceSet) {
Sequence node = sequenceSet.getHead();
while (node != null) {
add(node);
node = node.getNext();
}
}
public void remove(SequenceSet sequenceSet) {
Sequence node = sequenceSet.getHead();
while (node != null) {
remove(node);
node = node.getNext();
}
}
public void remove(Sequence value) {
for(long i=value.first; i<value.last+1; i++) {
remove(i);
}
}
/**
*
* @param value
......
......@@ -175,7 +175,6 @@ public class KahaDBStoreOpenWireVersionTest {
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
entry.getValue().messageReferences.clear();
}
}
});
......
......@@ -57,11 +57,12 @@ public class BTreeIndexTest extends IndexTestSupport {
protected Index<String, Long> createIndex() throws Exception {
long id = tx.allocate().getPageId();
tx.commit();
BTreeIndex<String, Long> index = new BTreeIndex<String,Long>(pf, id);
index.setKeyMarshaller(StringMarshaller.INSTANCE);
index.setValueMarshaller(LongMarshaller.INSTANCE);
index.load(tx);
tx.commit();
return index;
}
......@@ -231,8 +232,6 @@ public class BTreeIndexTest extends IndexTestSupport {
this.index.load(tx);
long id = tx.allocate().getPageId();
tx.commit();
BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
sindex.setValueMarshaller(StringMarshaller.INSTANCE);
......@@ -273,7 +272,6 @@ public class BTreeIndexTest extends IndexTestSupport {
this.index.load(tx);
long id = tx.allocate().getPageId();
tx.commit();
BTreeIndex<String, String> sindex = new BTreeIndex<String,String>(pf, id);
sindex.setKeyMarshaller(StringMarshaller.INSTANCE);
......@@ -364,7 +362,6 @@ public class BTreeIndexTest extends IndexTestSupport {
pf.load();
tx = pf.tx();
long id = tx.allocate().getPageId();
tx.commit();
BTreeIndex<Long, HashSet<String>> test = new BTreeIndex<Long, HashSet<String>>(pf, id);
test.setKeyMarshaller(LongMarshaller.INSTANCE);
......
......@@ -27,12 +27,13 @@ public class HashIndexBenchMark extends IndexBenchmark {
Transaction tx = pf.tx();
long id = tx.allocate().getPageId();