Skip to content
GitLab
Explore
Sign in
Register
Commits on Source (3)
New upstream version 3.3.9
· 7534ec40
Emmanuel Bourg
authored
Jun 04, 2018
7534ec40
New upstream version 3.3.10
· d19d76c0
Emmanuel Bourg
authored
Jun 04, 2018
d19d76c0
New upstream version 3.3.11
· 2b92f9b2
Emmanuel Bourg
authored
Jun 04, 2018
2b92f9b2
Hide whitespace changes
Inline
Side-by-side
README.md
View file @
2b92f9b2
...
...
@@ -15,6 +15,18 @@ A High Performance Inter-Thread Messaging Library
## Changelog
### 3.3.11
-
Fix race condition in BatchEventProcessor with 3 or more starting/halting concurrently.
### 3.3.10
-
Fix race condition in BatchEventProcessor between run() and halt().
### 3.3.9
-
Change SleepingWaitStrategy to use a parkNanos(100).
### 3.3.8
-
Revert belt and braces WaitStategy signalling.
...
...
build.gradle
View file @
2b92f9b2
...
...
@@ -24,7 +24,7 @@ apply plugin: 'idea'
defaultTasks
'build'
group
=
'com.lmax'
version
=
new
Version
(
major:
3
,
minor:
3
,
revision:
8
)
version
=
new
Version
(
major:
3
,
minor:
3
,
revision:
11
)
ext
{
fullName
=
'Disruptor Framework'
...
...
src/main/java/com/lmax/disruptor/BatchEventProcessor.java
View file @
2b92f9b2
...
...
@@ -15,7 +15,7 @@
*/
package
com.lmax.disruptor
;
import
java.util.concurrent.atomic.Atomic
Boolean
;
import
java.util.concurrent.atomic.Atomic
Integer
;
/**
...
...
@@ -30,7 +30,11 @@ import java.util.concurrent.atomic.AtomicBoolean;
public
final
class
BatchEventProcessor
<
T
>
implements
EventProcessor
{
private
final
AtomicBoolean
running
=
new
AtomicBoolean
(
false
);
private
static
final
int
IDLE
=
0
;
private
static
final
int
HALTED
=
IDLE
+
1
;
private
static
final
int
RUNNING
=
HALTED
+
1
;
private
final
AtomicInteger
running
=
new
AtomicInteger
(
IDLE
);
private
ExceptionHandler
<?
super
T
>
exceptionHandler
=
new
FatalExceptionHandler
();
private
final
DataProvider
<
T
>
dataProvider
;
private
final
SequenceBarrier
sequenceBarrier
;
...
...
@@ -62,9 +66,9 @@ public final class BatchEventProcessor<T>
}
batchStartAware
=
(
eventHandler
instanceof
BatchStartAware
)
?
(
BatchStartAware
)
eventHandler
:
null
;
(
eventHandler
instanceof
BatchStartAware
)
?
(
BatchStartAware
)
eventHandler
:
null
;
timeoutHandler
=
(
eventHandler
instanceof
TimeoutHandler
)
?
(
TimeoutHandler
)
eventHandler
:
null
;
(
eventHandler
instanceof
TimeoutHandler
)
?
(
TimeoutHandler
)
eventHandler
:
null
;
}
@Override
...
...
@@ -76,14 +80,14 @@ public final class BatchEventProcessor<T>
@Override
public
void
halt
()
{
running
.
set
(
false
);
running
.
set
(
HALTED
);
sequenceBarrier
.
alert
();
}
@Override
public
boolean
isRunning
()
{
return
running
.
get
();
return
running
.
get
()
!=
IDLE
;
}
/**
...
...
@@ -109,61 +113,88 @@ public final class BatchEventProcessor<T>
@Override
public
void
run
()
{
if
(
!
running
.
compareAndSet
(
false
,
true
))
if
(
running
.
compareAndSet
(
IDLE
,
RUNNING
))
{
throw
new
IllegalStateException
(
"Thread is already running"
);
}
sequenceBarrier
.
clearAlert
();
sequenceBarrier
.
clearAlert
();
notifyStart
();
notifyStart
();
try
{
if
(
running
.
get
()
==
RUNNING
)
{
processEvents
();
}
}
finally
{
notifyShutdown
();
running
.
set
(
IDLE
);
}
}
else
{
// This is a little bit of guess work. The running state could of changed to HALTED by
// this point. However, Java does not have compareAndExchange which is the only way
// to get it exactly correct.
if
(
running
.
get
()
==
RUNNING
)
{
throw
new
IllegalStateException
(
"Thread is already running"
);
}
else
{
earlyExit
();
}
}
}
private
void
processEvents
()
{
T
event
=
null
;
long
nextSequence
=
sequence
.
get
()
+
1L
;
try
while
(
true
)
{
while
(
true
)
try
{
try
{
final
long
availableSequence
=
sequenceBarrier
.
waitFor
(
nextSequence
);
if
(
batchStartAware
!=
null
)
{
batchStartAware
.
onBatchStart
(
availableSequence
-
nextSequence
+
1
);
}
while
(
nextSequence
<=
availableSequence
)
{
event
=
dataProvider
.
get
(
nextSequence
);
eventHandler
.
onEvent
(
event
,
nextSequence
,
nextSequence
==
availableSequence
);
nextSequence
++;
}
sequence
.
set
(
availableSequence
);
}
catch
(
final
TimeoutException
e
)
final
long
availableSequence
=
sequenceBarrier
.
waitFor
(
nextSequence
);
if
(
batchStartAware
!=
null
)
{
notifyTimeout
(
sequence
.
get
()
);
batchStartAware
.
onBatchStart
(
availableSequence
-
nextSequence
+
1
);
}
catch
(
final
AlertException
ex
)
while
(
nextSequence
<=
availableSequence
)
{
if
(!
running
.
get
())
{
break
;
}
event
=
dataProvider
.
get
(
nextSequence
);
eventHandler
.
onEvent
(
event
,
nextSequence
,
nextSequence
==
availableSequence
);
nextSequence
++;
}
catch
(
final
Throwable
ex
)
sequence
.
set
(
availableSequence
);
}
catch
(
final
TimeoutException
e
)
{
notifyTimeout
(
sequence
.
get
());
}
catch
(
final
AlertException
ex
)
{
if
(
running
.
get
()
!=
RUNNING
)
{
exceptionHandler
.
handleEventException
(
ex
,
nextSequence
,
event
);
sequence
.
set
(
nextSequence
);
nextSequence
++;
break
;
}
}
catch
(
final
Throwable
ex
)
{
exceptionHandler
.
handleEventException
(
ex
,
nextSequence
,
event
);
sequence
.
set
(
nextSequence
);
nextSequence
++;
}
}
finally
{
notifyShutdown
();
running
.
set
(
false
);
}
}
private
void
earlyExit
()
{
notifyStart
();
notifyShutdown
();
}
private
void
notifyTimeout
(
final
long
availableSequence
)
...
...
src/main/java/com/lmax/disruptor/SleepingWaitStrategy.java
View file @
2b92f9b2
...
...
@@ -29,23 +29,31 @@ import java.util.concurrent.locks.LockSupport;
public
final
class
SleepingWaitStrategy
implements
WaitStrategy
{
private
static
final
int
DEFAULT_RETRIES
=
200
;
private
static
final
long
DEFAULT_SLEEP
=
100
;
private
final
int
retries
;
private
final
long
sleepTimeNs
;
public
SleepingWaitStrategy
()
{
this
(
DEFAULT_RETRIES
);
this
(
DEFAULT_RETRIES
,
DEFAULT_SLEEP
);
}
public
SleepingWaitStrategy
(
int
retries
)
{
this
(
retries
,
DEFAULT_SLEEP
);
}
public
SleepingWaitStrategy
(
int
retries
,
long
sleepTimeNs
)
{
this
.
retries
=
retries
;
this
.
sleepTimeNs
=
sleepTimeNs
;
}
@Override
public
long
waitFor
(
final
long
sequence
,
Sequence
cursor
,
final
Sequence
dependentSequence
,
final
SequenceBarrier
barrier
)
throws
AlertException
,
InterruptedException
throws
AlertException
{
long
availableSequence
;
int
counter
=
retries
;
...
...
@@ -79,7 +87,7 @@ public final class SleepingWaitStrategy implements WaitStrategy
}
else
{
LockSupport
.
parkNanos
(
1L
);
LockSupport
.
parkNanos
(
sleepTimeNs
);
}
return
counter
;
...
...
src/test/java/com/lmax/disruptor/BatchEventProcessorTest.java
View file @
2b92f9b2
...
...
@@ -187,4 +187,91 @@ public final class BatchEventProcessorTest
assertEquals
(
Arrays
.
asList
(
3L
,
2L
,
1L
),
batchSizes
);
}
@Test
public
void
shouldAlwaysHalt
()
throws
InterruptedException
{
WaitStrategy
waitStrategy
=
new
BusySpinWaitStrategy
();
final
SingleProducerSequencer
sequencer
=
new
SingleProducerSequencer
(
8
,
waitStrategy
);
final
ProcessingSequenceBarrier
barrier
=
new
ProcessingSequenceBarrier
(
sequencer
,
waitStrategy
,
new
Sequence
(-
1
),
new
Sequence
[
0
]);
DataProvider
<
Object
>
dp
=
new
DataProvider
<
Object
>()
{
@Override
public
Object
get
(
long
sequence
)
{
return
null
;
}
};
final
LatchLifeCycleHandler
h1
=
new
LatchLifeCycleHandler
();
final
BatchEventProcessor
p1
=
new
BatchEventProcessor
<
Object
>(
dp
,
barrier
,
h1
);
Thread
t1
=
new
Thread
(
p1
);
p1
.
halt
();
t1
.
start
();
assertTrue
(
h1
.
awaitStart
(
2
,
TimeUnit
.
SECONDS
));
assertTrue
(
h1
.
awaitStop
(
2
,
TimeUnit
.
SECONDS
));
for
(
int
i
=
0
;
i
<
1000
;
i
++)
{
final
LatchLifeCycleHandler
h2
=
new
LatchLifeCycleHandler
();
final
BatchEventProcessor
p2
=
new
BatchEventProcessor
<
Object
>(
dp
,
barrier
,
h2
);
Thread
t2
=
new
Thread
(
p2
);
t2
.
start
();
p2
.
halt
();
assertTrue
(
h2
.
awaitStart
(
2
,
TimeUnit
.
SECONDS
));
assertTrue
(
h2
.
awaitStop
(
2
,
TimeUnit
.
SECONDS
));
}
for
(
int
i
=
0
;
i
<
1000
;
i
++)
{
final
LatchLifeCycleHandler
h2
=
new
LatchLifeCycleHandler
();
final
BatchEventProcessor
p2
=
new
BatchEventProcessor
<
Object
>(
dp
,
barrier
,
h2
);
Thread
t2
=
new
Thread
(
p2
);
t2
.
start
();
Thread
.
yield
();
p2
.
halt
();
assertTrue
(
h2
.
awaitStart
(
2
,
TimeUnit
.
SECONDS
));
assertTrue
(
h2
.
awaitStop
(
2
,
TimeUnit
.
SECONDS
));
}
}
private
static
class
LatchLifeCycleHandler
implements
EventHandler
<
Object
>,
LifecycleAware
{
private
final
CountDownLatch
startLatch
=
new
CountDownLatch
(
1
);
private
final
CountDownLatch
stopLatch
=
new
CountDownLatch
(
1
);
@Override
public
void
onEvent
(
Object
event
,
long
sequence
,
boolean
endOfBatch
)
throws
Exception
{
}
@Override
public
void
onStart
()
{
startLatch
.
countDown
();
}
@Override
public
void
onShutdown
()
{
stopLatch
.
countDown
();
}
public
boolean
awaitStart
(
long
time
,
TimeUnit
unit
)
throws
InterruptedException
{
return
startLatch
.
await
(
time
,
unit
);
}
public
boolean
awaitStop
(
long
time
,
TimeUnit
unit
)
throws
InterruptedException
{
return
stopLatch
.
await
(
time
,
unit
);
}
}
}