Skip to content
Commits on Source (5)
......@@ -41,3 +41,8 @@ patriknw | Patrik Nordwall, patrik.nordwall@gmail.com, Lightbend Inc
angelsanz | Ángel Sanz, angelsanz@users.noreply.github.com
shenghaiyang | 盛海洋, shenghaiyang@aliyun.com
kiiadi | Kyle Thomson, kylthoms@amazon.com, Amazon.com
jroper | James Roper, james@jazzy.id.au, Lightbend Inc.
olegdokuka | Oleh Dokuka, shadowgun@.i.ua, Netifi Inc.
Scottmitch | Scott Mitchell, scott_mitchell@apple.com, Apple Inc.
retronym | Jason Zaugg, jzaugg@gmail.com, Lightbend Inc.
......@@ -8,12 +8,12 @@ The latest release is available on Maven Central as
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
```
......@@ -79,12 +79,12 @@ followed by a possibly unbounded number of `onNext` signals (as requested by `Su
| <a name="term_non-obstructing">Non-obstructing</a> | Quality describing a method which is as quick to execute as possible—on the calling thread. This means, for example, avoids heavy computations and other things that would stall the caller´s thread of execution. |
| <a name="term_terminal_state">Terminal state</a> | For a Publisher: When `onComplete` or `onError` has been signalled. For a Subscriber: When an `onComplete` or `onError` has been received.|
| <a name="term_nop">NOP</a> | Execution that has no detectable effect to the calling thread, and can as such safely be called any number of times.|
| <a name="term_ext_sync">External synchronization</a> | Access coordination for thread safety purposes implemented outside of the constructs defined in this specification, using techniques such as, but not limited to, `atomics`, `monitors`, or `locks`. |
| <a name="term_serially">Serial(ly)</a> | In the context of a [Signal](#term_signal), non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks. |
| <a name="term_thread-safe">Thread-safe</a> | Can be safely invoked synchronously, or asychronously, without requiring external synchronization to ensure program correctness. |
### SPECIFICATION
#### 1. Publisher ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Publisher.java))
#### 1. Publisher ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Publisher.java))
```java
public interface Publisher<T> {
......@@ -98,8 +98,8 @@ public interface Publisher<T> {
| [:bulb:](#1.1 "1.1 explained") | *The intent of this rule is to make it clear that Publishers cannot signal more elements than Subscribers have requested. There’s an implicit, but important, consequence to this rule: Since demand can only be fulfilled after it has been received, there’s a happens-before relationship between requesting elements and receiving elements.* |
| <a name="1.2">2</a> | A `Publisher` MAY signal fewer `onNext` than requested and terminate the `Subscription` by calling `onComplete` or `onError`. |
| [:bulb:](#1.2 "1.2 explained") | *The intent of this rule is to make it clear that a Publisher cannot guarantee that it will be able to produce the number of elements requested; it simply might not be able to produce them all; it may be in a failed state; it may be empty or otherwise already completed.* |
| <a name="1.3">3</a> | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a [thread-safe](#term_thread-safe) manner—and if performed by multiple threads—use [external synchronization](#term_ext_sync). |
| [:bulb:](#1.3 "1.3 explained") | *The intent of this rule is to make it clear that [external synchronization](#term_ext_sync) must be employed if the Publisher intends to send signals from multiple/different threads.* |
| <a name="1.3">3</a> | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled [serially](#term_serially). |
| [:bulb:](#1.3 "1.3 explained") | *The intent of this rule is to permit the signalling of signals (including from multiple threads) if and only if a happens-before relation between each of the signals is established.* |
| <a name="1.4">4</a> | If a `Publisher` fails it MUST signal an `onError`. |
| [:bulb:](#1.4 "1.4 explained") | *The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers if it detects that it cannot proceed—Subscribers must be given a chance to clean up resources or otherwise deal with the Publisher´s failures.* |
| <a name="1.5">5</a> | If a `Publisher` terminates successfully (finite stream) it MUST signal an `onComplete`. |
......@@ -117,7 +117,7 @@ public interface Publisher<T> {
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
| [:bulb:](#1.11 "1.11 explained") | *The intent of this rule is to give Publisher implementations the flexibility to decide how many, if any, Subscribers they will support, and how elements are going to be distributed.* |
#### 2. Subscriber ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Subscriber.java))
#### 2. Subscriber ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Subscriber.java))
```java
public interface Subscriber<T> {
......@@ -131,7 +131,7 @@ public interface Subscriber<T> {
| ID | Rule |
| ------------------------- | ------------------------------------------------------------------------------------------------------ |
| <a name="2.1">1</a> | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals. |
| [:bulb:](#2.1 "2.1 explained") | *The intent of this rule is to establish that it is the responsibility of the Subscriber to signal when, and how many, elements it is able and willing to receive.* |
| [:bulb:](#2.1 "2.1 explained") | *The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.* |
| <a name="2.2">2</a> | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`´s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals. |
| [:bulb:](#2.2 "2.2 explained") | *The intent of this rule is that a Subscriber should [not obstruct](#term_non-obstructing) the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.* |
| <a name="2.3">3</a> | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher`. |
......@@ -139,11 +139,11 @@ public interface Subscriber<T> {
| <a name="2.4">4</a> | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal. |
| [:bulb:](#2.4 "2.4 explained") | *The intent of this rule is to make sure that Subscribers respect a Publisher’s [terminal state](#term_terminal_state) signals. A Subscription is simply not valid anymore after an onComplete or onError signal has been received.* |
| <a name="2.5">5</a> | A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription`. |
| [:bulb:](#2.5 "2.5 explained") | *The intent of this rule is to prevent that two, or more, separate Publishers from thinking that they can interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled.* |
| [:bulb:](#2.5 "2.5 explained") | *The intent of this rule is to prevent that two, or more, separate Publishers from trying to interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled. Failure to conform to this rule may lead to violations of Publisher rule 1, amongst others. Such violations can lead to hard-to-diagnose bugs.* |
| <a name="2.6">6</a> | A `Subscriber` MUST call `Subscription.cancel()` if the `Subscription` is no longer needed. |
| [:bulb:](#2.6 "2.6 explained") | *The intent of this rule is to establish that Subscribers cannot just throw Subscriptions away when they are no longer needed, they have to call `cancel` so that resources held by that Subscription can be safely, and timely, reclaimed. An example of this would be a Subscriber which is only interested in a specific element, which would then cancel its Subscription to signal its completion to the Publisher.* |
| <a name="2.7">7</a> | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective [external synchronization](#term_ext_sync). |
| [:bulb:](#2.7 "2.7 explained") | *The intent of this rule is to establish that [external synchronization](#term_ext_sync) must be added if a Subscriber will be using a Subscription concurrently by two or more threads.* |
| <a name="2.7">7</a> | A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed [serially](#term_serially). |
| [:bulb:](#2.7 "2.7 explained") | *The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if a happens-before relation between each of the calls is established.* |
| <a name="2.8">8</a> | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see [3.12](#3.12)]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately. |
| [:bulb:](#2.8 "2.8 explained") | *The intent of this rule is to highlight that there may be a delay between calling `cancel` and the Publisher observing that cancellation.* |
| <a name="2.9">9</a> | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call. |
......@@ -157,7 +157,7 @@ public interface Subscriber<T> {
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST [return normally](#term_return_normally) except when any provided parameter is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
| [:bulb:](#2.13 "2.13 explained") | *The intent of this rule is to establish the semantics for the methods of Subscriber and what the Publisher is allowed to do in which case this rule is violated. «Raise this error condition in a fashion that is adequate for the runtime environment» could mean logging the error—or otherwise make someone or something aware of the situation—as the error cannot be signalled to the faulty Subscriber.* |
#### 3. Subscription ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Subscription.java))
#### 3. Subscription ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Subscription.java))
```java
public interface Subscription {
......@@ -205,7 +205,7 @@ public interface Subscription {
A `Subscription` is shared by exactly one `Publisher` and one `Subscriber` for the purpose of mediating the data exchange between this pair. This is the reason why the `subscribe()` method does not return the created `Subscription`, but instead returns `void`; the `Subscription` is only passed to the `Subscriber` via the `onSubscribe` callback.
#### 4.Processor ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Processor.java))
#### 4.Processor ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/api/src/main/java/org/reactivestreams/Processor.java))
```java
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
......
# Release notes for Reactive Streams
# Version 1.0.3 released on 2019-08-23
## Announcement:
We—the Reactive Streams community—are pleased to announce the immediate availability of `Reactive Streams 1.0.3`. This update to `Reactive Streams` brings the following improvements over `1.0.2`.
## Highlights:
- Specification
+ Glossary term "External synchronization" has been [superseded](#Glossary)
+ No breaking/semantical changes
+ Rule [clarifications](#specification-clarifications-103-RC1)
- Interfaces
+ No changes
- Technology Compatibility Kit (TCK)
+ Improved [coverage](#tck-alterations-103-RC1)
+ Improved JavaDoc
- Examples
+ No changes
- Artifacts
+ FlowAdapters artifact removed, FlowAdapters moved into the core jar ([#424](https://github.com/reactive-streams/reactive-streams-jvm/issues/424))
## Specification clarifications 1.0.3
## Glossary term "External synchronization" replaced by "Serial(ly)"
**1.0.2:** Access coordination for thread safety purposes implemented outside of the constructs defined in this specification, using techniques such as, but not limited to, `atomics`, `monitors`, or `locks`.
**1.0.3** In the context of a Signal, non-overlapping. In the context of the JVM, calls to methods on an object are serial if and only if there is a happens-before relationship between those calls (implying also that the calls do not overlap). When the calls are performed asynchronously, coordination to establish the happens-before relationship is to be implemented using techniques such as, but not limited to, atomics, monitors, or locks.
## Publisher Rule 3 (Rule and Intent clarified)
**1.0.2:** `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a thread-safe manner—and if performed by multiple threads—use external synchronization.
*The intent of this rule is to make it clear that external synchronization must be employed if the Publisher intends to send signals from multiple/different threads.*
**1.0.3:** `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled serially.
*The intent of this rule is to permit the signalling of signals (including from multiple threads) if and only if a happens-before relation between each of the signals is established.*
## Subscriber Rule 1 (Intent clarified)
**1.0.2:** A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals.
*The intent of this rule is to establish that it is the responsibility of the Subscriber to signal when, and how many, elements it is able and willing to receive.*
**1.0.3:** A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals.
*The intent of this rule is to establish that it is the responsibility of the Subscriber to decide when and how many elements it is able and willing to receive. To avoid signal reordering caused by reentrant Subscription methods, it is strongly RECOMMENDED for synchronous Subscriber implementations to invoke Subscription methods at the very end of any signal processing. It is RECOMMENDED that Subscribers request the upper limit of what they are able to process, as requesting only one element at a time results in an inherently inefficient "stop-and-wait" protocol.*
## Subscriber Rule 5 (Intent clarified)
**1.0.2:** A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription`
*The intent of this rule is to prevent that two, or more, separate Publishers from thinking that they can interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled.*
**1.0.3:** A `Subscriber` MUST call `Subscription.cancel()` on the given `Subscription` after an `onSubscribe` signal if it already has an active `Subscription`
*The intent of this rule is to prevent that two, or more, separate Publishers from trying to interact with the same Subscriber. Enforcing this rule means that resource leaks are prevented since extra Subscriptions will be cancelled. Failure to conform to this rule may lead to violations of Publisher rule 1, amongst others. Such violations can lead to hard-to-diagnose bugs.*
## Subscriber Rule 7 (Rule and Intent clarified)
**1.0.2:** A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective external synchronization.
*The intent of this rule is to establish that external synchronization must be added if a Subscriber will be using a Subscription concurrently by two or more threads.*
**1.0.3:** A Subscriber MUST ensure that all calls on its Subscription's request and cancel methods are performed serially.
*The intent of this rule is to permit the calling of the request and cancel methods (including from multiple threads) if and only if a happens-before relation between each of the calls is established.*
## TCK alterations 1.0.3
- `PublisherVerification.optional_spec105_emptyStreamMustTerminateBySignallingOnComplete` fails if the publisher completes synchronously ([#422](https://github.com/reactive-streams/reactive-streams-jvm/issues/422))
- IdentityFlowProcessorVerification throws NPE when `createFailedFlowPublisher` returns null ([#425](https://github.com/reactive-streams/reactive-streams-jvm/issues/425))
- `required_spec208_mustBePreparedToReceiveOnNextSignalsAfterHavingCalledSubscriptionCancel` does not wait for request before invoking onNext ([#277](https://github.com/reactive-streams/reactive-streams-jvm/issues/277))
- Subscriber whitebox verification tests demand ([#280](https://github.com/reactive-streams/reactive-streams-jvm/issues/280))
- Incomplete documentation on stochastic tests in TCK ([#278](https://github.com/reactive-streams/reactive-streams-jvm/issues/278))
- TCK performance ([#446](https://github.com/reactive-streams/reactive-streams-jvm/issues/446))
- TCK: Receptacle#expectError timeout approach ([#451](https://github.com/reactive-streams/reactive-streams-jvm/issues/451))
## Contributors
+ Roland Kuhn [(@rkuhn)](https://github.com/rkuhn)
+ Ben Christensen [(@benjchristensen)](https://github.com/benjchristensen)
+ Viktor Klang [(@viktorklang)](https://github.com/viktorklang)
+ Stephane Maldini [(@smaldini)](https://github.com/smaldini)
+ Stanislav Savulchik [(@savulchik)](https://github.com/savulchik)
+ Konrad Malawski [(@ktoso)](https://github.com/ktoso)
+ Slim Ouertani [(@ouertani)](https://github.com/ouertani)
+ Martynas Mickevičius [(@2m)](https://github.com/2m)
+ Luke Daley [(@ldaley)](https://github.com/ldaley)
+ Colin Godsey [(@colinrgodsey)](https://github.com/colinrgodsey)
+ David Moten [(@davidmoten)](https://github.com/davidmoten)
+ Brian Topping [(@briantopping)](https://github.com/briantopping)
+ Rossen Stoyanchev [(@rstoyanchev)](https://github.com/rstoyanchev)
+ Björn Hamels [(@BjornHamels)](https://github.com/BjornHamels)
+ Jake Wharton [(@JakeWharton)](https://github.com/JakeWharton)
+ Anthony Vanelverdinghe[(@anthonyvdotbe)](https://github.com/anthonyvdotbe)
+ Kazuhiro Sera [(@seratch)](https://github.com/seratch)
+ Dávid Karnok [(@akarnokd)](https://github.com/akarnokd)
+ Evgeniy Getman [(@egetman)](https://github.com/egetman)
+ Ángel Sanz [(@angelsanz)](https://github.com/angelsanz)
+ shenghaiyang [(@shenghaiyang)](https://github.com/shenghaiyang)
+ Kyle Thomson [(@kiiadi)](https://github.com/kiiadi)
+ (new) James Roper [(@jroper)](https://github.com/jroper)
+ (new) Oleh Dokuka [(@olegdokuka)](https://github.com/olegdokuka)
+ (new) Scott Mitchell [(@Scottmitch)](https://github.com/Scottmitch)
---
# Version 1.0.2 released on 2017-12-19
......
description = "reactive-streams"
def jdkFlow = false
try {
Class.forName("java.util.concurrent.Flow")
jdkFlow = true
} catch (ClassNotFoundException cnfe) {
}
sourceSets {
main {
java {
if (jdkFlow)
srcDirs = ['src/main/java', 'src/main/java9']
else
srcDirs = ['src/main/java']
}
}
}
jar {
manifest {
......
......@@ -23,8 +23,8 @@ public interface Subscription {
/**
* No events will be sent by a {@link Publisher} until demand is signaled via this method.
* <p>
* It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE.
* An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded".
* It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more,
* it may be treated by the {@link Publisher} as "effectively unbounded".
* <p>
* Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled.
* <p>
......
......@@ -12,6 +12,7 @@
package org.reactivestreams;
import java.util.concurrent.Flow;
import static java.util.Objects.requireNonNull;
/**
* Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
......@@ -31,16 +32,16 @@ public final class FlowAdapters {
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Publisher<T> toPublisher(
Flow.Publisher<? extends T> flowPublisher) {
if (flowPublisher == null) {
throw new NullPointerException("flowPublisher");
}
requireNonNull(flowPublisher, "flowPublisher");
final org.reactivestreams.Publisher<T> publisher;
if (flowPublisher instanceof FlowPublisherFromReactive) {
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
}
if (flowPublisher instanceof org.reactivestreams.Publisher) {
return (org.reactivestreams.Publisher<T>)flowPublisher;
publisher = (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
} else if (flowPublisher instanceof org.reactivestreams.Publisher) {
publisher = (org.reactivestreams.Publisher<T>)flowPublisher;
} else {
publisher = new ReactivePublisherFromFlow<T>(flowPublisher);
}
return new ReactivePublisherFromFlow<T>(flowPublisher);
return publisher;
}
/**
......@@ -53,16 +54,16 @@ public final class FlowAdapters {
public static <T> Flow.Publisher<T> toFlowPublisher(
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
) {
if (reactiveStreamsPublisher == null) {
throw new NullPointerException("reactiveStreamsPublisher");
}
requireNonNull(reactiveStreamsPublisher, "reactiveStreamsPublisher");
final Flow.Publisher<T> flowPublisher;
if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
}
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
return (Flow.Publisher<T>)reactiveStreamsPublisher;
flowPublisher = (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
} else if (reactiveStreamsPublisher instanceof Flow.Publisher) {
flowPublisher = (Flow.Publisher<T>)reactiveStreamsPublisher;
} else {
flowPublisher = new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
}
return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
return flowPublisher;
}
/**
......@@ -76,16 +77,16 @@ public final class FlowAdapters {
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
Flow.Processor<? super T, ? extends U> flowProcessor
) {
if (flowProcessor == null) {
throw new NullPointerException("flowProcessor");
}
requireNonNull(flowProcessor, "flowProcessor");
final org.reactivestreams.Processor<T, U> processor;
if (flowProcessor instanceof FlowToReactiveProcessor) {
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
}
if (flowProcessor instanceof org.reactivestreams.Processor) {
return (org.reactivestreams.Processor<T, U>)flowProcessor;
processor = (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
} else if (flowProcessor instanceof org.reactivestreams.Processor) {
processor = (org.reactivestreams.Processor<T, U>)flowProcessor;
} else {
processor = new ReactiveToFlowProcessor<T, U>(flowProcessor);
}
return new ReactiveToFlowProcessor<T, U>(flowProcessor);
return processor;
}
/**
......@@ -99,16 +100,16 @@ public final class FlowAdapters {
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
) {
if (reactiveStreamsProcessor == null) {
throw new NullPointerException("reactiveStreamsProcessor");
}
requireNonNull(reactiveStreamsProcessor, "reactiveStreamsProcessor");
final Flow.Processor<T, U> flowProcessor;
if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
flowProcessor = (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
} else if (reactiveStreamsProcessor instanceof Flow.Processor) {
flowProcessor = (Flow.Processor<T, U>)reactiveStreamsProcessor;
} else {
flowProcessor = new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
}
if (reactiveStreamsProcessor instanceof Flow.Processor) {
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
}
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
return flowProcessor;
}
/**
......@@ -119,16 +120,16 @@ public final class FlowAdapters {
*/
@SuppressWarnings("unchecked")
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
if (reactiveStreamsSubscriber == null) {
throw new NullPointerException("reactiveStreamsSubscriber");
}
requireNonNull(reactiveStreamsSubscriber, "reactiveStreamsSubscriber");
final Flow.Subscriber<T> flowSubscriber;
if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
flowSubscriber = (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
} else if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
flowSubscriber = (Flow.Subscriber<T>)reactiveStreamsSubscriber;
} else {
flowSubscriber = new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
}
if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
}
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
return flowSubscriber;
}
/**
......@@ -139,16 +140,16 @@ public final class FlowAdapters {
*/
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
if (flowSubscriber == null) {
throw new NullPointerException("flowSubscriber");
}
requireNonNull(flowSubscriber, "flowSubscriber");
final org.reactivestreams.Subscriber<T> subscriber;
if (flowSubscriber instanceof FlowToReactiveSubscriber) {
return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
subscriber = (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
} else if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
subscriber = (org.reactivestreams.Subscriber<T>)flowSubscriber;
} else {
subscriber = new ReactiveToFlowSubscriber<T>(flowSubscriber);
}
if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
return (org.reactivestreams.Subscriber<T>)flowSubscriber;
}
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
return subscriber;
}
/**
......@@ -200,8 +201,7 @@ public final class FlowAdapters {
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
* @param <T> the element type
*/
static final class FlowToReactiveSubscriber<T>
implements Flow.Subscriber<T> {
static final class FlowToReactiveSubscriber<T> implements Flow.Subscriber<T> {
final org.reactivestreams.Subscriber<? super T> reactiveStreams;
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
......@@ -231,11 +231,10 @@ public final class FlowAdapters {
}
/**
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
* Wraps a Flow Subscriber and forwards methods of the Reactive Streams Subscriber to it.
* @param <T> the element type
*/
static final class ReactiveToFlowSubscriber<T>
implements org.reactivestreams.Subscriber<T> {
static final class ReactiveToFlowSubscriber<T> implements org.reactivestreams.Subscriber<T> {
final Flow.Subscriber<? super T> flow;
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
......@@ -269,8 +268,7 @@ public final class FlowAdapters {
* @param <T> the input type
* @param <U> the output type
*/
static final class ReactiveToFlowProcessor<T, U>
implements org.reactivestreams.Processor<T, U> {
static final class ReactiveToFlowProcessor<T, U> implements org.reactivestreams.Processor<T, U> {
final Flow.Processor<? super T, ? extends U> flow;
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
......@@ -308,8 +306,7 @@ public final class FlowAdapters {
* @param <T> the input type
* @param <U> the output type
*/
static final class FlowToReactiveProcessor<T, U>
implements Flow.Processor<T, U> {
static final class FlowToReactiveProcessor<T, U> implements Flow.Processor<T, U> {
final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
......@@ -347,7 +344,6 @@ public final class FlowAdapters {
* @param <T> the element type
*/
static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
final Flow.Publisher<? extends T> flow;
public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
......
......@@ -3,7 +3,7 @@ subprojects {
apply plugin: "osgi"
group = "org.reactivestreams"
version = "1.0.2"
version = "1.0.3"
sourceCompatibility = 1.6
targetCompatibility = 1.6
......@@ -42,17 +42,17 @@ subprojects {
instructionReplace "Bundle-Vendor", "Reactive Streams SIG"
instructionReplace "Bundle-Description", "Reactive Streams API"
instructionReplace "Bundle-DocURL", "http://reactive-streams.org"
instructionReplace "Bundle-Version", "1.0.2"
instructionReplace "Bundle-Version", "1.0.3"
}
}
if (name in ["reactive-streams",
"reactive-streams-tck",
"reactive-streams-tck-flow",
"reactive-streams-examples",
"reactive-streams-flow-adapters"]) {
"reactive-streams-examples"]) {
apply plugin: "maven"
apply plugin: "signing"
apply plugin: "maven-publish"
signing {
sign configurations.archives
......@@ -68,6 +68,17 @@ subprojects {
from javadoc
}
publishing {
publications {
mavenJava(MavenPublication) {
from components.java
}
}
repositories {
mavenLocal()
}
}
artifacts {
archives sourcesJar, javadocJar
}
......
reactive-streams (1.0.3-1) unstable; urgency=medium
* New upstream release
- Updated the pom
* Standards-Version updated to 4.5.0
-- Emmanuel Bourg <ebourg@apache.org> Sun, 26 Jan 2020 15:02:43 +0100
reactive-streams (1.0.2-1) unstable; urgency=medium
* New upstream release
......
......@@ -8,7 +8,7 @@ Build-Depends:
default-jdk,
gradle-debian-helper,
maven-repo-helper
Standards-Version: 4.2.1
Standards-Version: 4.5.0
Vcs-Git: https://salsa.debian.org/java-team/reactive-streams.git
Vcs-Browser: https://salsa.debian.org/java-team/reactive-streams
Homepage: http://www.reactive-streams.org
......
......@@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
<name>reactive-streams</name>
<description>A Protocol for Asynchronous Non-Blocking Data Sequence</description>
<url>http://www.reactive-streams.org/</url>
......
/bin
\ No newline at end of file
description = 'reactive-streams-flow-adapters'
dependencies {
compile project(':reactive-streams')
testCompile project(':reactive-streams-tck')
testCompile group: 'org.testng', name: 'testng', version: '5.14.10'
}
jar {
manifest {
attributes('Automatic-Module-Name': 'org.reactivestreams.flowadapters')
}
}
test.useTestNG()
javadoc {
options.links("https://docs.oracle.com/javase/9/docs/api")
}
\ No newline at end of file
......@@ -11,12 +11,12 @@ try {
Class.forName("java.util.concurrent.Flow")
jdkFlow = true
println(ANSI_GREEN + " INFO: ------------------ JDK9 classes detected ---------------------------------" + ANSI_RESET)
println(ANSI_GREEN + " INFO: Java 9 Flow API found; Including [flow-adapters, tck-flow] in build. " + ANSI_RESET)
println(ANSI_GREEN + " INFO: Java 9 Flow API found; Including [tck-flow] & FlowAdapters in build. " + ANSI_RESET)
println(ANSI_GREEN + " INFO: --------------------------------------------------------------------------" + ANSI_RESET)
} catch (Throwable ex) {
// Flow API not available
println(ANSI_RED + "WARNING: -------------------- JDK9 classes NOT detected -----------------------------" + ANSI_RESET)
println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [flow-adapters, tck-flow] in build." + ANSI_RESET)
println(ANSI_RED + "WARNING: Java 9 Flow API not found; Not including [tck-flow] & FlowAdapters in build." + ANSI_RESET)
println(ANSI_RED + "WARNING: In order to execute the complete test-suite run the build using JDK9+. " + ANSI_RESET)
println(ANSI_RED + "WARNING: ----------------------------------------------------------------------------" + ANSI_RESET)
}
......@@ -26,7 +26,6 @@ include ':reactive-streams-tck'
include ':reactive-streams-examples'
if (jdkFlow) {
include ':reactive-streams-flow-adapters'
include ':reactive-streams-tck-flow'
}
......@@ -34,6 +33,5 @@ project(':reactive-streams').projectDir = "$rootDir/api" as File
project(':reactive-streams-tck').projectDir = "$rootDir/tck" as File
project(':reactive-streams-examples').projectDir = "$rootDir/examples" as File
if (jdkFlow) {
project(':reactive-streams-flow-adapters').projectDir = "$rootDir/flow-adapters" as File
project(':reactive-streams-tck-flow').projectDir = "$rootDir/tck-flow" as File
}
......@@ -27,7 +27,7 @@ The TCK is provided as binary artifact on [Maven Central](http://search.maven.or
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.2</version>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
```
......@@ -209,18 +209,24 @@ within the TCK which await for something to happen. The other timeout is `publis
[Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped
by the Publisher.
Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``),
and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``).
While the latter defaults to the prior, it may be useful to tweak them independently when running on continious
integration servers (for example, keeping the no-signals timeout significantly lower).
Note that the TCK differentiates between timeouts for "waiting for a signal"
(`defaultTimeoutMillis`), and "asserting no signals happen during a given amount of time"
(`defaultNoSignalsTimeoutMillis`). While the latter defaults to the prior, it may be useful to tweak
them independently when running on continuous integration servers (for example, keeping the
no-signals timeout significantly lower). Another configuration option is the "poll timeout" which is
used whenever an operation has to poll for a `defaultTimeoutMillis` for a signal to appear (most
often errors), it can then poll and check using the `defaultPollTimeoutMillis`, for the expected
error, rather than blocking for the full default timeout.
In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either:
In order to configure these timeouts (for example when running on a slow continuous integration
machine), you can either:
**Use env variables** to set these timeouts, in which case the you can do:
```bash
export DEFAULT_TIMEOUT_MILLIS=100
export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100
export DEFAULT_POLL_TIMEOUT_MILLIS=20
export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300
```
......@@ -231,10 +237,11 @@ public class RangePublisherTest extends FlowPublisherVerification<Integer> {
public static final long DEFAULT_TIMEOUT_MILLIS = 100L;
public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L;
public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 20L;
public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 300L;
public RangePublisherTest() {
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_POLL_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS);
}
// ...
......
......@@ -2,7 +2,6 @@ description = 'reactive-streams-tck-flow'
dependencies {
compile group: 'org.testng', name: 'testng', version:'5.14.10'
compile project(':reactive-streams-tck')
compile project(':reactive-streams-flow-adapters')
}
jar {
......
......@@ -60,7 +60,9 @@ public abstract class IdentityFlowProcessorVerification<T> extends IdentityProce
@Override
public final Publisher<T> createFailedPublisher() {
return FlowAdapters.toPublisher(createFailedFlowPublisher());
Flow.Publisher<T> failed = createFailedFlowPublisher();
if (failed == null) return null; // because `null` means "SKIP" in createFailedPublisher
else return FlowAdapters.toPublisher(failed);
}
}