Skip to content
Commits on Source (9)
<img src="https://github.com/conversant/disruptor/blob/master/src/main/resources/ConversantDisruptorLogo.png?raw=true">
# Conversant ConcurrentQueue and Disruptor BlockingQueue
# Conversant ConcurrentQueue, Disruptor BlockingQueue and ConcurrentStack
Disruptor is the highest performing intra-thread transfer mechanism available in Java. Conversant Disruptor is the highest performing implementation of this type of ring buffer queue because it has almost no overhead and it exploits a particularly simple design.
Disruptor is the highest performing intra-thread transfer mechanism available in Java. Conversant Disruptor is the highest performing implementation of this type of ring buffer because it has almost no overhead and it exploits a particularly simple design.
<table>
<td><img src="https://github.com/conversant/disruptor/blob/master/benchmark/benchmark.jpg?raw=true"></td><tr>
<caption><strong>2017 Conversant Disruptor - Still the World's Fastest</strong></caption>
</table>
# Getting Started
Simply run the maven build to build and use the package.
Run the maven build to build and use the package.
```$ mvn -U clean package```
# Conversant Disruptor is on Maven Central
Maven Java 8 users can incorporate Conversant Disruptor the usual way:
For Java 8, Include Conversant Disruptor from Maven Central:
```
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<version>1.2.10</version>
<version>1.2.14</version>
<classifier>jdk8</classifier>
</dependency>
```
OR
For Java 10:
```
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<version>1.2.10</version>
<version>1.2.14</version>
<classifier>jdk10</classifier>
</dependency>
```
Java 7 is also supported
Or through the default Java 8 jar
```
<dependency>
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<version>1.2.14</version>
</dependency>
```
Java 7 is only supported in 1.2.10 and below.
```
<dependency>
......@@ -43,9 +59,10 @@ Java 7 is also supported
</dependency>
```
Java 9 is no longer supported.
## Discussion Forum
Conversant Disruptor has a google group so you can follow releases and changes:
https://groups.google.com/forum/#!forum/conversant-disruptor
......@@ -8,3 +8,4 @@
1.2.8 - Padding changes
1.2.9 - bug fixes
1.2.10 - major point release, cleanup, adding MPMC, @Contended annotation
1.2.11 - Java 8 specific performance improvements, Java 7 no longer supported
Conversant 31.148
MPMC 52.401
LMAX 61.129
LinkedTransferQueue 125.511
ConcurrentStack 169.317
ArrayBlockingQueue 376.906
reset
set terminal jpeg
set output "benchmark.jpg"
set style fill solid 1.00 border 0
set style histogram
set style data histogram
set xtics rotate by -45
set grid ytics linestyle 1
set xlabel "Conversant Disruptor vs Competition (Intel Xeon - Broadwell)" font "bold"
set ylabel "time (ms)" font "bold"
plot "performance.dat" using 2:xtic(1) ti "1M Transactions" linecolor rgb "#0066FF"
conversant-disruptor (1.2.15-1) unstable; urgency=medium
* New upstream release
- Refreshed the patches
* Standards-Version updated to 4.4.0
* Use salsa.debian.org Vcs-* URLs
-- Emmanuel Bourg <ebourg@apache.org> Fri, 13 Sep 2019 09:36:15 +0200
conversant-disruptor (1.2.11-1) unstable; urgency=medium
* Team upload.
......
......@@ -5,9 +5,9 @@ Maintainer: Debian Java Maintainers <pkg-java-maintainers@lists.alioth.debian.or
Uploaders: Emmanuel Bourg <ebourg@apache.org>
Build-Depends: debhelper (>= 11), default-jdk, maven-debian-helper (>= 2.1)
Build-Depends-Indep: libslf4j-java, junit4
Standards-Version: 4.1.3
Vcs-Git: https://anonscm.debian.org/git/pkg-java/conversant-disruptor.git
Vcs-Browser: https://anonscm.debian.org/cgit/pkg-java/conversant-disruptor.git
Standards-Version: 4.4.0
Vcs-Git: https://salsa.debian.org/java-team/conversant-disruptor.git
Vcs-Browser: https://salsa.debian.org/java-team/conversant-disruptor
Homepage: https://github.com/conversant/disruptor
Package: libconversant-disruptor-java
......
From: Markus Koschany <apo@debian.org>
Date: Thu, 22 Mar 2018 14:23:56 +0100
Subject: java9
---
pom.xml | 3 ++-
.../com/conversantmedia/util/concurrent/AbstractWaitingCondition.java | 4 ++--
.../java/com/conversantmedia/util/concurrent/MPMCConcurrentQueue.java | 2 +-
.../conversantmedia/util/concurrent/MultithreadConcurrentQueue.java | 4 ++--
.../com/conversantmedia/util/concurrent/PushPullConcurrentQueue.java | 4 ++--
5 files changed, 9 insertions(+), 8 deletions(-)
diff --git a/pom.xml b/pom.xml
index 8f6345b..e270a84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -61,7 +61,7 @@
</prerequisites>
<properties>
- <jdk.version>1.8</jdk.version>
+ <jdk.version>1.9</jdk.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<snapshots.repo.id>ossrh</snapshots.repo.id>
@@ -81,6 +81,7 @@
<target>${jdk.version}</target>
<encoding>${project.build.sourceEncoding}</encoding>
<!-- <executable>/usr/lib/jvm/jdk-9/bin/javac</executable> -->
+ <compilerArgs><arg>--add-exports</arg><arg>java.base/jdk.internal.vm.annotation=ALL-UNNAMED</arg></compilerArgs>
</configuration>
</plugin>
<plugin>
diff --git a/src/main/java/com/conversantmedia/util/concurrent/AbstractWaitingCondition.java b/src/main/java/com/conversantmedia/util/concurrent/AbstractWaitingCondition.java
index ff243de..29ea8ca 100644
--- a/src/main/java/com/conversantmedia/util/concurrent/AbstractWaitingCondition.java
+++ b/src/main/java/com/conversantmedia/util/concurrent/AbstractWaitingCondition.java
@@ -45,10 +45,10 @@ abstract class AbstractWaitingCondition implements Condition {
private final LongAdder waitCount = new LongAdder();
- @sun.misc.Contended
+ @jdk.internal.vm.annotation.Contended
private final AtomicReferenceArray<Thread> waiter = new AtomicReferenceArray<>(MAX_WAITERS+2*CACHE_LINE_REFS);
- @sun.misc.Contended
+ @jdk.internal.vm.annotation.Contended
private long waitCache = 0L;
/**
diff --git a/src/main/java/com/conversantmedia/util/concurrent/MPMCConcurrentQueue.java b/src/main/java/com/conversantmedia/util/concurrent/MPMCConcurrentQueue.java
index d4f62ac..525e4eb 100644
--- a/src/main/java/com/conversantmedia/util/concurrent/MPMCConcurrentQueue.java
+++ b/src/main/java/com/conversantmedia/util/concurrent/MPMCConcurrentQueue.java
@@ -167,7 +167,7 @@ class MPMCConcurrentQueue<E> implements ConcurrentQueue<E> {
public long p1, p2, p3, p4, p5, p6, p7;
- @sun.misc.Contended
+ @jdk.internal.vm.annotation.Contended
R entry;
public long a1, a2, a3, a4, a5, a6, a7, a8;
diff --git a/src/main/java/com/conversantmedia/util/concurrent/MultithreadConcurrentQueue.java b/src/main/java/com/conversantmedia/util/concurrent/MultithreadConcurrentQueue.java
index 76573e1..3fe30c2 100644
--- a/src/main/java/com/conversantmedia/util/concurrent/MultithreadConcurrentQueue.java
+++ b/src/main/java/com/conversantmedia/util/concurrent/MultithreadConcurrentQueue.java
@@ -59,7 +59,7 @@ public class MultithreadConcurrentQueue<E> implements ConcurrentQueue<E> {
// use the value in the L1 cache rather than reading from memory when possible
long p1, p2, p3, p4, p5, p6, p7;
- @sun.misc.Contended
+ @jdk.internal.vm.annotation.Contended
long tailCache = 0L;
long a1, a2, a3, a4, a5, a6, a7, a8;
@@ -67,7 +67,7 @@ public class MultithreadConcurrentQueue<E> implements ConcurrentQueue<E> {
final E[] buffer;
long r1, r2, r3, r4, r5, r6, r7;
- @sun.misc.Contended
+ @jdk.internal.vm.annotation.Contended
long headCache = 0L;
long c1, c2, c3, c4, c5, c6, c7, c8;
diff --git a/src/main/java/com/conversantmedia/util/concurrent/PushPullConcurrentQueue.java b/src/main/java/com/conversantmedia/util/concurrent/PushPullConcurrentQueue.java
index 7baa563..694f4b7 100644
--- a/src/main/java/com/conversantmedia/util/concurrent/PushPullConcurrentQueue.java
+++ b/src/main/java/com/conversantmedia/util/concurrent/PushPullConcurrentQueue.java
@@ -41,14 +41,14 @@ public class PushPullConcurrentQueue<E> implements ConcurrentQueue<E> {
final LongAdder tail = new LongAdder();
long p1, p2, p3, p4, p5, p6, p7;
- @sun.misc.Contended
+ @jdk.internal.vm.annotation.Contended
long tailCache = 0L;
long a1, a2, a3, a4, a5, a6, a7, a8;
final E[] buffer;
long r1, r2, r3, r4, r5, r6, r7;
- @sun.misc.Contended
+ @jdk.internal.vm.annotation.Contended
long headCache = 0L;
long c1, c2, c3, c4, c5, c6, c7, c8;
From: Markus Koschany <apo@debian.org>
Date: Sun, 15 Oct 2017 19:24:25 +0200
Subject: remove jdkClassifier
---
pom.xml | 3 ---
1 file changed, 3 deletions(-)
diff --git a/pom.xml b/pom.xml
index 5b5aa0a..8f6345b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -146,9 +146,6 @@
@@ -157,9 +157,6 @@
</plugin>
<plugin>
<artifactId>maven-jar-plugin</artifactId>
......
remove-jdkClassifier.patch
java9.patch
......@@ -2,6 +2,3 @@
%:
dh $@
get-orig-source:
uscan --download-current-version --force-download --rename --repack --compression xz
version=3
version=4
opts="repack,compression=xz" \
https://github.com/conversant/disruptor/tags .*/([\d\.]+)\.tar\.gz
......@@ -5,10 +5,10 @@
<groupId>com.conversantmedia</groupId>
<artifactId>disruptor</artifactId>
<packaging>jar</packaging>
<version>1.2.11</version>
<version>1.2.15</version>
<name>${project.groupId}:${project.artifactId}</name>
<url>https://github.com/conversant/disruptor</url>
<description>Conversant Disruptor - very low latency Java BlockingQueue</description>
<description>Conversant Disruptor - very high throughput Java BlockingQueue</description>
<inceptionYear>2012</inceptionYear>
......@@ -21,8 +21,7 @@
<developer>
<name>John Cairns</name>
<email>john@2ad.com</email>
<organization>Conversant, Inc.</organization>
<organizationUrl>http://engineering.conversantmedia.com</organizationUrl>
<organizationUrl>https://github.com/jac18281828</organizationUrl>
</developer>
</developers>
......@@ -70,6 +69,18 @@
</properties>
<build>
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
</resources>
<testResources>
<testResource>
<directory>src/test/resources</directory>
</testResource>
</testResources>
<pluginManagement>
<plugins>
<plugin>
......@@ -176,7 +187,21 @@
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.8</version>
<version>1.11</version>
<executions>
<execution>
<id>add-integration-test-source</id>
<phase>generate-test-sources</phase>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>src/it/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-site-plugin</artifactId>
......@@ -213,6 +238,16 @@
<profiles>
<!-- Java 7 is no longer supported as of release 1.2.11 -->
<!-- Java 9 is no longer supported as of release 1.2.12 -->
<profile>
<id>default</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<properties>
<jdkClassifier></jdkClassifier>
</properties>
</profile>
<profile>
<id>jdk8</id>
<activation>
......@@ -223,12 +258,12 @@
</properties>
</profile>
<profile>
<id>jdk9</id>
<id>jdk10</id>
<activation>
<jdk>1.9</jdk>
<jdk>10</jdk>
</activation>
<properties>
<jdkClassifier>jdk9</jdkClassifier>
<jdkClassifier>jdk10</jdkClassifier>
</properties>
</profile>
</profiles>
......
......@@ -20,13 +20,13 @@ package com.conversantmedia.util.concurrent;
* #L%
*/
import com.conversantmedia.util.concurrent.ConcurrentQueue;
import org.junit.Ignore;
import com.conversantmedia.util.estimation.Percentile;
/**
* Created by jcairns on 5/29/14.
*/
@Ignore
public class ConcurrentQueuePerformanceTest {
// increase this number for a legit performance test
......
package com.conversantmedia.util.concurrent;
import com.conversantmedia.util.collection.Stack;
import com.conversantmedia.util.concurrent.ConcurrentStack;
import org.junit.Assert;
import org.junit.Test;
......
package com.conversantmedia.util.concurrent;
import com.conversantmedia.util.collection.Stack;
import com.conversantmedia.util.concurrent.ConcurrentStack;
import org.junit.Ignore;
import org.junit.Test;
......
package com.conversantmedia.util.concurrent;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
......@@ -11,7 +13,6 @@ import java.util.concurrent.TimeUnit;
/**
* Created by jcairns on 2/23/16.
*/
@Ignore // stress test
public class DisruptorFairSchedulingTest {
private static final int NTHREAD = 4*Runtime.getRuntime().availableProcessors();
......
package com.conversantmedia.util.concurrent;
import com.conversantmedia.util.concurrent.DisruptorBlockingQueue;
import com.conversantmedia.util.concurrent.SpinPolicy;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
......@@ -14,7 +16,6 @@ import java.util.concurrent.TimeUnit;
/**
* Created by jcairns on 2/23/16.
*/
@Ignore // stress test
public class DisruptorOfferPollStressTest {
private static final int QUEUE_SZ = 1024;
......