Skip to content
Commits on Source (5)
......@@ -15,21 +15,23 @@ A High Performance Inter-Thread Messaging Library
## Changelog
### 3.3.11
### 3.4.2
- Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
### 3.3.10
### 3.4.1
- Fix race condition in BatchEventProcessor between run() and halt().
- Fix race between run() and halt() on BatchEventProcessor.
### 3.3.9
### 3.4.0
- Change SleepingWaitStrategy to use a parkNanos(100).
- Drop support for JDK6, support JDK7 and above only.
- Add `ThreadHints.onSpinWait` to all busy spins within Disruptor.
- Increase default sleep time for LockSupport.parkNanos to prevent busy spinning.
### 3.3.8
- Revert belt and braces WaitStategy signalling.
- Revert belt and braces WaitStrategy signalling.
### 3.3.7
......
......@@ -24,7 +24,7 @@ apply plugin: 'idea'
defaultTasks 'build'
group = 'com.lmax'
version = new Version(major: 3, minor: 3, revision: 11)
version = new Version(major: 3, minor: 4, revision: 2)
ext {
fullName = 'Disruptor Framework'
......@@ -61,8 +61,8 @@ idea.module {
scopes.TEST.plus += [ configurations.perfCompile ]
}
sourceCompatibility = 1.6
targetCompatibility = 1.6
sourceCompatibility = 1.7
targetCompatibility = 1.7
compileJava {
......@@ -175,7 +175,7 @@ task perfJar(type: Jar) {
}
task wrapper(type: Wrapper) {
gradleVersion = '4.2'
gradleVersion = '4.3'
}
class Version {
......
disruptor (3.4.2-1) unstable; urgency=medium
* Team upload.
* New upstream version 3.4.2
- Fix race condition in BatchEventProcessor with 3 or more
starting/halting concurrently.
- Fix race between run() and halt() on BatchEventProcessor.
- Drop support for JDK6, support JDK7 and above only.
- Add ThreadHints.onSpinWait to all busy spins within Disruptor.
- Increase default sleep time for LockSupport.parkNanos to prevent
busy spinning.
- Removes the Histogram class
* Set target=1.9 and specify "-XDignore.symbol.file" in debian/pom.xml
to address FTBFS with openjdk-10. (Closes: #902317)
* Enable tests; add build-dep on junit4
-- tony mancill <tmancill@debian.org> Sun, 24 Jun 2018 22:13:04 -0700
disruptor (3.3.11-1) unstable; urgency=medium
* Team upload.
......
......@@ -4,7 +4,7 @@ Priority: optional
Maintainer: Debian Java Maintainers <pkg-java-maintainers@lists.alioth.debian.org>
Uploaders: Emmanuel Bourg <ebourg@apache.org>,
tony mancill <tmancill@debian.org>
Build-Depends: debhelper (>= 11), default-jdk, maven-debian-helper (>= 1.5)
Build-Depends: debhelper (>= 11), default-jdk, maven-debian-helper (>= 1.5), junit4
Standards-Version: 4.1.4
Vcs-Git: https://salsa.debian.org/java-team/disruptor.git
Vcs-Browser: https://salsa.debian.org/java-team/disruptor
......
junit junit * * * *
org.jmock jmock-junit4 * * * *
org.jmock jmock-legacy * * * *
......@@ -2,8 +2,4 @@
# For example:
# maven.test.skip=true
maven.test.skip=true
# Required for building on platforms older than Jessie
maven.compiler.source=1.5
maven.compiler.target=1.5
maven.test.skip=false
junit junit jar s/4\..*/4.x/ * *
......@@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.3.11</version>
<version>3.4.2</version>
<name>Disruptor Framework</name>
<description>Disruptor - Concurrent Programming Framework</description>
<url>http://lmax-exchange.github.com/disruptor</url>
......@@ -34,4 +34,20 @@
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<verbose>true</verbose>
<fork>true</fork>
<target>1.9</target>
<compilerArgs>
<arg>-XDignore.symbol.file</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
......@@ -15,3 +15,7 @@ echo "Running OnHeap..."
$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
echo "Done"
echo "Running Sliced OnHeap..."
$BIN $ARGS $GCARGS -Xloggc:custom-gc.log $CPATH -Dsliced=true com.lmax.disruptor.offheap.OneToOneOnHeapThroughputTest
echo "Done"
......@@ -30,6 +30,7 @@ public final class AggregateEventHandler<T>
*
* @param eventHandlers to be called in sequence.
*/
@SafeVarargs
public AggregateEventHandler(final EventHandler<T>... eventHandlers)
{
this.eventHandlers = eventHandlers;
......
......@@ -19,6 +19,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.lmax.disruptor.util.ThreadHints;
/**
* Blocking strategy that uses a lock and condition variable for {@link EventProcessor}s waiting on a barrier.
* <p>
......@@ -54,6 +56,7 @@ public final class BlockingWaitStrategy implements WaitStrategy
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
......
......@@ -16,6 +16,8 @@
package com.lmax.disruptor;
import com.lmax.disruptor.util.ThreadHints;
/**
* Busy Spin strategy that uses a busy spin loop for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier.
* <p>
......@@ -34,6 +36,7 @@ public final class BusySpinWaitStrategy implements WaitStrategy
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
......
......@@ -24,7 +24,12 @@ package com.lmax.disruptor;
public interface EventHandler<T>
{
/**
* Called when a publisher has published an event to the {@link RingBuffer}
* Called when a publisher has published an event to the {@link RingBuffer}. The {@link BatchEventProcessor} will
* read messages from the {@link RingBuffer} in batches, where a batch is all of the events available to be
* processed without having to wait for any new event to arrive. This can be useful for event handlers that need
* to do slower operations like I/O as they can group together the data from multiple events into a single
* operation. Implementations should ensure that the operation is always performed when endOfBatch is true as
* the time between that message an the next one is inderminate.
*
* @param event published to the {@link RingBuffer}
* @param sequence of the event being processed
......
......@@ -16,7 +16,10 @@
package com.lmax.disruptor;
/**
* EventProcessors waitFor events to become available for consumption from the {@link RingBuffer}
* An EventProcessor needs to be an implementation of a runnable that will poll for events from the {@link RingBuffer}
* using the appropriate wait strategy. It is unlikely that you will need to implement this interface yourself.
* Look at using the {@link EventHandler} interface along with the pre-supplied BatchEventProcessor in the first
* instance.
* <p>
* An EventProcessor will generally be associated with a Thread for execution.
*/
......
package com.lmax.disruptor;
/**
* Created by barkerm on 13/06/17.
*/
public class Foo
{
int a;
int b;
short c;
short d;
@Override
public boolean equals(Object o)
{
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Foo foo = (Foo) o;
if (a != foo.a) return false;
if (b != foo.b) return false;
if (c != foo.c) return false;
return d == foo.d;
}
@Override
public int hashCode()
{
int result = a;
result = 31 * result + b;
result = 31 * result + (int) c;
result = 31 * result + (int) d;
return result;
}
}
......@@ -20,6 +20,8 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import com.lmax.disruptor.util.ThreadHints;
/**
* Variation of the {@link BlockingWaitStrategy} that attempts to elide conditional wake-ups when
* the lock is uncontended. Shows performance improvements on microbenchmarks. However this
......@@ -66,6 +68,7 @@ public final class LiteBlockingWaitStrategy implements WaitStrategy
while ((availableSequence = dependentSequence.get()) < sequence)
{
barrier.checkAlert();
ThreadHints.onSpinWait();
}
return availableSequence;
......
......@@ -60,9 +60,9 @@ public final class SequenceGroup extends Sequence
public void set(final long value)
{
final Sequence[] sequences = this.sequences;
for (int i = 0, size = sequences.length; i < size; i++)
for (Sequence sequence : sequences)
{
sequences[i].set(value);
sequence.set(value);
}
}
......
......@@ -19,12 +19,14 @@ import java.util.concurrent.locks.LockSupport;
/**
* Sleeping strategy that initially spins, then uses a Thread.yield(), and
* eventually sleep (<code>LockSupport.parkNanos(1)</code>) for the minimum
* eventually sleep (<code>LockSupport.parkNanos(n)</code>) for the minimum
* number of nanos the OS and JVM will allow while the
* {@link com.lmax.disruptor.EventProcessor}s are waiting on a barrier.
* <p>
* This strategy is a good compromise between performance and CPU resource.
* Latency spikes can occur after quiet periods.
* Latency spikes can occur after quiet periods. It will also reduce the impact
* on the producing thread as it will not need signal any conditional variables
* to wake up the event handling thread.
*/
public final class SleepingWaitStrategy implements WaitStrategy
{
......
......@@ -45,6 +45,7 @@ public final class WorkerPool<T>
* @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
* @param workHandlers to distribute the work load across.
*/
@SafeVarargs
public WorkerPool(
final RingBuffer<T> ringBuffer,
final SequenceBarrier sequenceBarrier,
......@@ -57,7 +58,7 @@ public final class WorkerPool<T>
for (int i = 0; i < numWorkers; i++)
{
workProcessors[i] = new WorkProcessor<T>(
workProcessors[i] = new WorkProcessor<>(
ringBuffer,
sequenceBarrier,
workHandlers[i],
......@@ -75,6 +76,7 @@ public final class WorkerPool<T>
* @param exceptionHandler to callback when an error occurs which is not handled by the {@link WorkHandler}s.
* @param workHandlers to distribute the work load across.
*/
@SafeVarargs
public WorkerPool(
final EventFactory<T> eventFactory,
final ExceptionHandler<? super T> exceptionHandler,
......@@ -87,7 +89,7 @@ public final class WorkerPool<T>
for (int i = 0; i < numWorkers; i++)
{
workProcessors[i] = new WorkProcessor<T>(
workProcessors[i] = new WorkProcessor<>(
ringBuffer,
barrier,
workHandlers[i],
......
......@@ -20,7 +20,8 @@ package com.lmax.disruptor;
* Yielding strategy that uses a Thread.yield() for {@link com.lmax.disruptor.EventProcessor}s waiting on a barrier
* after an initially spinning.
* <p>
* This strategy is a good compromise between performance and CPU resource without incurring significant latency spikes.
* This strategy will use 100% CPU, but will more readily give up the CPU than a busy spin strategy if other threads
* require CPU resource.
*/
public final class YieldingWaitStrategy implements WaitStrategy
{
......