Skip to content

Commits on Source 7

......@@ -26,3 +26,4 @@ test-results
test-tmp
*.class
gradle.properties
*.orig
......@@ -39,3 +39,5 @@ akarnokd | David Karnok, akarnokd@gmail.com
egetman | Evgeniy Getman, getman.eugene@gmail.com
patriknw | Patrik Nordwall, patrik.nordwall@gmail.com, Lightbend Inc
angelsanz | Ángel Sanz, angelsanz@users.noreply.github.com
shenghaiyang | 盛海洋, shenghaiyang@aliyun.com
kiiadi | Kyle Thomson, kylthoms@amazon.com, Amazon.com
......@@ -8,12 +8,12 @@ The latest release is available on Maven Central as
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
</dependency>
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
<scope>test</scope>
</dependency>
```
......@@ -80,10 +80,11 @@ followed by a possibly unbounded number of `onNext` signals (as requested by `Su
| <a name="term_terminal_state">Terminal state</a> | For a Publisher: When `onComplete` or `onError` has been signalled. For a Subscriber: When an `onComplete` or `onError` has been received.|
| <a name="term_nop">NOP</a> | Execution that has no detectable effect to the calling thread, and can as such safely be called any number of times.|
| <a name="term_ext_sync">External synchronization</a> | Access coordination for thread safety purposes implemented outside of the constructs defined in this specification, using techniques such as, but not limited to, `atomics`, `monitors`, or `locks`. |
| <a name="term_thread-safe">Thread-safe</a> | Can be safely invoked synchronously, or asychronously, without requiring external synchronization to ensure program correctness. |
### SPECIFICATION
#### 1. Publisher ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.1/api/src/main/java/org/reactivestreams/Publisher.java))
#### 1. Publisher ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Publisher.java))
```java
public interface Publisher<T> {
......@@ -97,7 +98,7 @@ public interface Publisher<T> {
| [:bulb:](#1.1 "1.1 explained") | *The intent of this rule is to make it clear that Publishers cannot signal more elements than Subscribers have requested. There’s an implicit, but important, consequence to this rule: Since demand can only be fulfilled after it has been received, there’s a happens-before relationship between requesting elements and receiving elements.* |
| <a name="1.2">2</a> | A `Publisher` MAY signal fewer `onNext` than requested and terminate the `Subscription` by calling `onComplete` or `onError`. |
| [:bulb:](#1.2 "1.2 explained") | *The intent of this rule is to make it clear that a Publisher cannot guarantee that it will be able to produce the number of elements requested; it simply might not be able to produce them all; it may be in a failed state; it may be empty or otherwise already completed.* |
| <a name="1.3">3</a> | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a `thread-safe` manner—and if performed by multiple threads—use [external synchronization](#term_ext_sync). |
| <a name="1.3">3</a> | `onSubscribe`, `onNext`, `onError` and `onComplete` signaled to a `Subscriber` MUST be signaled in a [thread-safe](#term_thread-safe) manner—and if performed by multiple threads—use [external synchronization](#term_ext_sync). |
| [:bulb:](#1.3 "1.3 explained") | *The intent of this rule is to make it clear that [external synchronization](#term_ext_sync) must be employed if the Publisher intends to send signals from multiple/different threads.* |
| <a name="1.4">4</a> | If a `Publisher` fails it MUST signal an `onError`. |
| [:bulb:](#1.4 "1.4 explained") | *The intent of this rule is to make it clear that a Publisher is responsible for notifying its Subscribers if it detects that it cannot proceed—Subscribers must be given a chance to clean up resources or otherwise deal with the Publisher´s failures.* |
......@@ -108,7 +109,7 @@ public interface Publisher<T> {
| <a name="1.7">7</a> | Once a [terminal state](#term_terminal_state) has been signaled (`onError`, `onComplete`) it is REQUIRED that no further signals occur. |
| [:bulb:](#1.7 "1.7 explained") | *The intent of this rule is to make sure that onError and onComplete are the final states of an interaction between a Publisher and Subscriber pair.* |
| <a name="1.8">8</a> | If a `Subscription` is cancelled its `Subscriber` MUST eventually stop being signaled. |
| [:bulb:](#1.8 "1.8 explained") | *The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for *eventually* is because signals can have propagation delay due to being asynchronous.* |
| [:bulb:](#1.8 "1.8 explained") | *The intent of this rule is to make sure that Publishers respect a Subscriber’s request to cancel a Subscription when Subscription.cancel() has been called. The reason for **eventually** is because signals can have propagation delay due to being asynchronous.* |
| <a name="1.9">9</a> | `Publisher.subscribe` MUST call `onSubscribe` on the provided `Subscriber` prior to any other signals to that `Subscriber` and MUST [return normally](#term_return_normally), except when the provided `Subscriber` is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way to signal failure (or reject the `Subscriber`) is by calling `onError` (after calling `onSubscribe`). |
| [:bulb:](#1.9 "1.9 explained") | *The intent of this rule is to make sure that `onSubscribe` is always signalled before any of the other signals, so that initialization logic can be executed by the Subscriber when the signal is received. Also `onSubscribe` MUST only be called at most once, [see [2.12](#2.12)]. If the supplied `Subscriber` is `null`, there is nowhere else to signal this but to the caller, which means a `java.lang.NullPointerException` must be thrown. Examples of possible situations: A stateful Publisher can be overwhelmed, bounded by a finite number of underlying resources, exhausted, or in a [terminal state](#term_terminal_state).* |
| <a name="1.10">10</a> | `Publisher.subscribe` MAY be called as many times as wanted but MUST be with a different `Subscriber` each time [see [2.12](#2.12)]. |
......@@ -116,7 +117,7 @@ public interface Publisher<T> {
| <a name="1.11">11</a> | A `Publisher` MAY support multiple `Subscriber`s and decides whether each `Subscription` is unicast or multicast. |
| [:bulb:](#1.11 "1.11 explained") | *The intent of this rule is to give Publisher implementations the flexibility to decide how many, if any, Subscribers they will support, and how elements are going to be distributed.* |
#### 2. Subscriber ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.1/api/src/main/java/org/reactivestreams/Subscriber.java))
#### 2. Subscriber ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Subscriber.java))
```java
public interface Subscriber<T> {
......@@ -132,7 +133,7 @@ public interface Subscriber<T> {
| <a name="2.1">1</a> | A `Subscriber` MUST signal demand via `Subscription.request(long n)` to receive `onNext` signals. |
| [:bulb:](#2.1 "2.1 explained") | *The intent of this rule is to establish that it is the responsibility of the Subscriber to signal when, and how many, elements it is able and willing to receive.* |
| <a name="2.2">2</a> | If a `Subscriber` suspects that its processing of signals will negatively impact its `Publisher`´s responsivity, it is RECOMMENDED that it asynchronously dispatches its signals. |
| [:bulb:](#2.2 "2.2 explained") | *The intent of this rule is that a Subscriber should [not obstruct](#term_non-obstructing) the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from CPU cycles.* |
| [:bulb:](#2.2 "2.2 explained") | *The intent of this rule is that a Subscriber should [not obstruct](#term_non-obstructing) the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.* |
| <a name="2.3">3</a> | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST NOT call any methods on the `Subscription` or the `Publisher`. |
| [:bulb:](#2.3 "2.3 explained") | *The intent of this rule is to prevent cycles and race-conditions—between Publisher, Subscription and Subscriber—during the processing of completion signals.* |
| <a name="2.4">4</a> | `Subscriber.onComplete()` and `Subscriber.onError(Throwable t)` MUST consider the Subscription cancelled after having received the signal. |
......@@ -144,7 +145,7 @@ public interface Subscriber<T> {
| <a name="2.7">7</a> | A `Subscriber` MUST ensure that all calls on its `Subscription` take place from the same thread or provide for respective [external synchronization](#term_ext_sync). |
| [:bulb:](#2.7 "2.7 explained") | *The intent of this rule is to establish that [external synchronization](#term_ext_sync) must be added if a Subscriber will be using a Subscription concurrently by two or more threads.* |
| <a name="2.8">8</a> | A `Subscriber` MUST be prepared to receive one or more `onNext` signals after having called `Subscription.cancel()` if there are still requested elements pending [see [3.12](#3.12)]. `Subscription.cancel()` does not guarantee to perform the underlying cleaning operations immediately. |
| [:bulb:](#2.8 "2.8 explained") | *The intent of this rule is to highlight that there may be a delay between calling `cancel` the Publisher seeing that.* |
| [:bulb:](#2.8 "2.8 explained") | *The intent of this rule is to highlight that there may be a delay between calling `cancel` and the Publisher observing that cancellation.* |
| <a name="2.9">9</a> | A `Subscriber` MUST be prepared to receive an `onComplete` signal with or without a preceding `Subscription.request(long n)` call. |
| [:bulb:](#2.9 "2.9 explained") | *The intent of this rule is to establish that completion is unrelated to the demand flow—this allows for streams which complete early, and obviates the need to *poll* for completion.* |
| <a name="2.10">10</a> | A `Subscriber` MUST be prepared to receive an `onError` signal with or without a preceding `Subscription.request(long n)` call. |
......@@ -156,7 +157,7 @@ public interface Subscriber<T> {
| <a name="2.13">13</a> | Calling `onSubscribe`, `onNext`, `onError` or `onComplete` MUST [return normally](#term_return_normally) except when any provided parameter is `null` in which case it MUST throw a `java.lang.NullPointerException` to the caller, for all other situations the only legal way for a `Subscriber` to signal failure is by cancelling its `Subscription`. In the case that this rule is violated, any associated `Subscription` to the `Subscriber` MUST be considered as cancelled, and the caller MUST raise this error condition in a fashion that is adequate for the runtime environment. |
| [:bulb:](#2.13 "2.13 explained") | *The intent of this rule is to establish the semantics for the methods of Subscriber and what the Publisher is allowed to do in which case this rule is violated. «Raise this error condition in a fashion that is adequate for the runtime environment» could mean logging the error—or otherwise make someone or something aware of the situation—as the error cannot be signalled to the faulty Subscriber.* |
#### 3. Subscription ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.1/api/src/main/java/org/reactivestreams/Subscription.java))
#### 3. Subscription ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Subscription.java))
```java
public interface Subscription {
......@@ -172,10 +173,10 @@ public interface Subscription {
| <a name="3.2">2</a> | The `Subscription` MUST allow the `Subscriber` to call `Subscription.request` synchronously from within `onNext` or `onSubscribe`. |
| [:bulb:](#3.2 "3.2 explained") | *The intent of this rule is to make it clear that implementations of `request` must be reentrant, to avoid stack overflows in the case of mutual recursion between `request` and `onNext` (and eventually `onComplete` / `onError`). This implies that Publishers can be `synchronous`, i.e. signalling `onNext`´s on the thread which calls `request`.* |
| <a name="3.3">3</a> | `Subscription.request` MUST place an upper bound on possible synchronous recursion between `Publisher` and `Subscriber`. |
| [:bulb:](#3.3 "3.3 explained") | *The intent of this rule is to complement [see [3.2](#3.2)] by placing an upper limit on the mutual recursion between `request` and `onNext` (and eventually `onComplete` / `onError`). Implementations are RECOMMENDED to limit this mutual recursion to a depth of `1` (ONE)—for the sake of conserving stack space. An example for undesirable synchronous, open recursion would be Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …, as it otherwise will result in blowing the calling Thread´s stack.* |
| [:bulb:](#3.3 "3.3 explained") | *The intent of this rule is to complement [see [3.2](#3.2)] by placing an upper limit on the mutual recursion between `request` and `onNext` (and eventually `onComplete` / `onError`). Implementations are RECOMMENDED to limit this mutual recursion to a depth of `1` (ONE)—for the sake of conserving stack space. An example for undesirable synchronous, open recursion would be Subscriber.onNext -> Subscription.request -> Subscriber.onNext -> …, as it otherwise will result in blowing the calling thread´s stack.* |
| <a name="3.4">4</a> | `Subscription.request` SHOULD respect the responsivity of its caller by returning in a timely manner. |
| [:bulb:](#3.4 "3.4 explained") | *The intent of this rule is to establish that `request` is intended to be a [non-obstructing](#term_non-obstructing) method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution.* |
| <a name="3.5">5</a> | `Subscription.cancel` MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be thread-safe. |
| <a name="3.5">5</a> | `Subscription.cancel` MUST respect the responsivity of its caller by returning in a timely manner, MUST be idempotent and MUST be [thread-safe](#term_thread-safe). |
| [:bulb:](#3.5 "3.5 explained") | *The intent of this rule is to establish that `cancel` is intended to be a [non-obstructing](#term_non-obstructing) method, and should be as quick to execute as possible on the calling thread, so avoid heavy computations and other things that would stall the caller´s thread of execution. Furthermore, it is also important that it is possible to call it multiple times without any adverse effects.* |
| <a name="3.6">6</a> | After the `Subscription` is cancelled, additional `Subscription.request(long n)` MUST be [NOPs](#term_nop). |
| [:bulb:](#3.6 "3.6 explained") | *The intent of this rule is to establish a causal relationship between cancellation of a subscription and the subsequent non-operation of requesting more elements.* |
......@@ -204,7 +205,7 @@ public interface Subscription {
A `Subscription` is shared by exactly one `Publisher` and one `Subscriber` for the purpose of mediating the data exchange between this pair. This is the reason why the `subscribe()` method does not return the created `Subscription`, but instead returns `void`; the `Subscription` is only passed to the `Subscriber` via the `onSubscribe` callback.
#### 4.Processor ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.1/api/src/main/java/org/reactivestreams/Processor.java))
#### 4.Processor ([Code](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/api/src/main/java/org/reactivestreams/Processor.java))
```java
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
......@@ -218,7 +219,7 @@ public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
| <a name="4.2">2</a> | A `Processor` MAY choose to recover an `onError` signal. If it chooses to do so, it MUST consider the `Subscription` cancelled, otherwise it MUST propagate the `onError` signal to its Subscribers immediately. |
| [:bulb:](#4.2 "4.2 explained") | *The intent of this rule is to inform that it’s possible for implementations to be more than simple transformations.* |
While not mandated, it can be a good idea to cancel a `Processors` upstream `Subscription` when/if its last `Subscriber` cancels their `Subscription`,
While not mandated, it can be a good idea to cancel a `Processor`´s upstream `Subscription` when/if its last `Subscriber` cancels their `Subscription`,
to let the cancellation signal propagate upstream.
### Asynchronous vs Synchronous Processing ###
......
# Release notes for Reactive Streams
---
# Version 1.0.2 released on 2017-12-19
## Announcement:
We—the Reactive Streams community—are pleased to announce the immediate availability of `Reactive Streams 1.0.2`. This update to `Reactive Streams` brings the following improvements over `1.0.1`.
## Highlights:
- Specification
+ Glossary term added for `Thread-safe`
+ No breaking/semantical changes
+ Rule [clarifications](#specification-clarifications-102)
- Interfaces
+ No changes
- Technology Compatibility Kit (TCK)
+ Improved [coverage](#tck-alterations-102)
* Supports Publishers/Processors which do [coordinated emission](http://www.reactive-streams.org/reactive-streams-tck-1.0.2-javadoc/org/reactivestreams/tck/PublisherVerification.html#doesCoordinatedEmission--).
+ Improved JavaDoc
- Examples
+ New example [RangePublisher](http://www.reactive-streams.org/reactive-streams-examples-1.0.2-javadoc/org/reactivestreams/example/unicast/RangePublisher.html)
- Artifacts
+ NEW! [Flow adapters](#flow-adapters)
+ NEW! [Flow TCK](#flow-tck)
+ Java 9 [Automatic-Module-Name](#automatic-module-name) added for all artifacts
## Specification clarifications 1.0.2
## Subscriber Rule 2
**1.0.1:** The intent of this rule is that a Subscriber should not obstruct the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from CPU cycles.
**1.0.2:** The intent of this rule is that a Subscriber should not obstruct the progress of the Publisher from an execution point-of-view. In other words, the Subscriber should not starve the Publisher from receiving CPU cycles.
## Subscriber Rule 8
**1.0.1:** The intent of this rule is to highlight that there may be a delay between calling `cancel` the Publisher seeing that.
**1.0.2** The intent of this rule is to highlight that there may be a delay between calling `cancel` and the Publisher observing that cancellation.
## Flow adapters
An adapter library has been created to convert `org.reactivestreams` to `java.util.concurrent.Flow` and vice versa. Read more about it [here](http://www.reactive-streams.org/reactive-streams-flow-adapters-1.0.2-javadoc)
~~~xml
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-flow-adapters</artifactId>
<version>1.0.2</version>
</dependency>
~~~
## Flow TCK
A TCK artifact has been created to allow for direct TCK verification of `java.util.concurrent.Flow` implementations. Read more about it [here](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.2/tck-flow/README.md)
~~~xml
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.2</version>
</dependency>
~~~
## Automatic Module Name
* `org.reactivestreams:reactive-streams` => `org.reactivestreams`
* `org.reactivestreams:reactive-streams-examples` => `org.reactivestreams.examples`
* `org.reactivestreams:reactive-streams-tck` => `org.reactivestreams.tck`
* `org.reactivestreams:reactive-streams-flow-adapters` => `org.reactivestreams.flowadapters`
* `org.reactivestreams:reactive-streams-tck-flow` => `org.reactivestreams.tckflow`
## TCK alterations 1.0.2
- Added support for Publisher verification of Publishers who do coordinated emission, i.e. where elements only are emitted after all current Subscribers have signalled demand. ([#284](https://github.com/reactive-streams/reactive-streams-jvm/issues/284))
- The `SubscriberWhiteboxVerification` has been given more user friendly error messages in the case where the user forgets to call `registerOnSubscribe`. (#416)[https://github.com/reactive-streams/reactive-streams-jvm/pull/416]
## Contributors
+ Roland Kuhn [(@rkuhn)](https://github.com/rkuhn)
+ Ben Christensen [(@benjchristensen)](https://github.com/benjchristensen)
+ Viktor Klang [(@viktorklang)](https://github.com/viktorklang)
+ Stephane Maldini [(@smaldini)](https://github.com/smaldini)
+ Stanislav Savulchik [(@savulchik)](https://github.com/savulchik)
+ Konrad Malawski [(@ktoso)](https://github.com/ktoso)
+ Slim Ouertani [(@ouertani)](https://github.com/ouertani)
+ Martynas Mickevičius [(@2m)](https://github.com/2m)
+ Luke Daley [(@ldaley)](https://github.com/ldaley)
+ Colin Godsey [(@colinrgodsey)](https://github.com/colinrgodsey)
+ David Moten [(@davidmoten)](https://github.com/davidmoten)
+ Brian Topping [(@briantopping)](https://github.com/briantopping)
+ Rossen Stoyanchev [(@rstoyanchev)](https://github.com/rstoyanchev)
+ Björn Hamels [(@BjornHamels)](https://github.com/BjornHamels)
+ Jake Wharton [(@JakeWharton)](https://github.com/JakeWharton)
+ Anthony Vanelverdinghe[(@anthonyvdotbe)](https://github.com/anthonyvdotbe)
+ Kazuhiro Sera [(@seratch)](https://github.com/seratch)
+ Dávid Karnok [(@akarnokd)](https://github.com/akarnokd)
+ Evgeniy Getman [(@egetman)](https://github.com/egetman)
+ Ángel Sanz [(@angelsanz)](https://github.com/angelsanz)
+ (new) shenghaiyang [(@shenghaiyang)](https://github.com/shenghaiyang)
+ (new) Kyle Thomson [(@kiiadi)](https://github.com/kiiadi)
---
# Version 1.0.1 released on 2017-08-09
......@@ -20,16 +123,16 @@ When JDK9 ships, `Reactive Streams` will publish a compatibility/conversion libr
+ A new [Glossary](https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.1/README.md#glossary) section
+ Description of the intent behind every single rule
+ No breaking semantical changes
+ Multiple rule [clarifications](#specification-clarifications)
+ Multiple rule [clarifications](#specification-clarifications-1.0.1)
- Interfaces
+ No changes
+ Improved JavaDoc
- Technology Compatibility Kit (TCK)
+ Improved coverage
+ Improved JavaDoc
+ Multiple test [alterations](#tck-alterations)
+ Multiple test [alterations](#tck-alterations-1.0.1)
## Specification clarifications
## Specification clarifications 1.0.1
## Publisher Rule 1
......@@ -145,7 +248,7 @@ When JDK9 ships, `Reactive Streams` will publish a compatibility/conversion libr
---
## TCK alterations
## TCK alterations 1.0.1
- Fixed potential resource leaks in partially consuming Publisher tests ([#375](https://github.com/reactive-streams/reactive-streams-jvm/issues/375))
- Fixed potential resource leaks in partially emitting Subscriber tests ([#372](https://github.com/reactive-streams/reactive-streams-jvm/issues/372), [#373](https://github.com/reactive-streams/reactive-streams-jvm/issues/373))
......
description = "reactive-streams"
jar {
manifest {
attributes('Automatic-Module-Name': 'org.reactivestreams')
}
}
......@@ -3,7 +3,7 @@ subprojects {
apply plugin: "osgi"
group = "org.reactivestreams"
version = "1.0.1"
version = "1.0.2"
sourceCompatibility = 1.6
targetCompatibility = 1.6
......@@ -42,11 +42,15 @@ subprojects {
instructionReplace "Bundle-Vendor", "Reactive Streams SIG"
instructionReplace "Bundle-Description", "Reactive Streams API"
instructionReplace "Bundle-DocURL", "http://reactive-streams.org"
instructionReplace "Bundle-Version", "1.0.1"
instructionReplace "Bundle-Version", "1.0.2"
}
}
if (name in ["reactive-streams", "reactive-streams-tck", "reactive-streams-examples"]) {
if (name in ["reactive-streams",
"reactive-streams-tck",
"reactive-streams-tck-flow",
"reactive-streams-examples",
"reactive-streams-flow-adapters"]) {
apply plugin: "maven"
apply plugin: "signing"
......
reactive-streams (1.0.2-1) unstable; urgency=medium
* New upstream release
- Updated the pom
* Standards-Version updated to 4.2.1
* Replaced orig-tar.sh with the Files-Excluded field in debian/copyright
* Use salsa.debian.org Vcs-* URLs
-- Emmanuel Bourg <ebourg@apache.org> Mon, 19 Nov 2018 10:50:31 +0100
reactive-streams (1.0.1-1) unstable; urgency=medium
* New upstream release
......
......@@ -8,9 +8,9 @@ Build-Depends:
default-jdk,
gradle-debian-helper,
maven-repo-helper
Standards-Version: 4.1.0
Vcs-Git: https://anonscm.debian.org/git/pkg-java/reactive-streams.git
Vcs-Browser: https://anonscm.debian.org/cgit/pkg-java/reactive-streams.git
Standards-Version: 4.2.1
Vcs-Git: https://salsa.debian.org/java-team/reactive-streams.git
Vcs-Browser: https://salsa.debian.org/java-team/reactive-streams
Homepage: http://www.reactive-streams.org
Package: libreactive-streams-java
......
Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: reactive-streams
Source: http://www.reactive-streams.org/
Files-Excluded: .travis.yml
gradlew*
gradle/wrapper/*
Files: *
Copyright: 2014-2015, Reactive Streams SIG
......
#!/bin/sh -e
# called by uscan with '--upstream-version' <version> <file>
PACKAGE=$(dpkg-parsechangelog -S Source)
VERSION=$2
DIR=${PACKAGE}-${VERSION}
TAR=../${PACKAGE}_${VERSION}.orig.tar.xz
mkdir $DIR
tar -xf $3 --strip-components=1 -C $DIR
rm $3
XZ_OPT=--best tar cJvf $TAR \
--exclude '.travis.yml' \
--exclude 'gradlew*' \
--exclude 'gradle/wrapper/*' \
$DIR
rm -rf $DIR
......@@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.1</version>
<version>1.0.2</version>
<name>reactive-streams</name>
<description>A Protocol for Asynchronous Non-Blocking Data Sequence</description>
<url>http://www.reactive-streams.org/</url>
......
......@@ -7,6 +7,3 @@ export JAVA_HOME := /usr/lib/jvm/default-java
override_dh_auto_build:
dh_auto_build -- :reactive-streams:jar
get-orig-source:
uscan --download-current-version --rename --force-download
version=3
https://github.com/reactive-streams/reactive-streams-jvm/tags .*/v([\d\.]+).tar.gz debian debian/orig-tar.sh
version=4
opts=repack,compression=xz \
https://github.com/reactive-streams/reactive-streams-jvm/tags .*/v([\d\.]+).tar.gz
......@@ -3,4 +3,11 @@ dependencies {
compile project(':reactive-streams')
testCompile project(':reactive-streams-tck')
}
jar {
manifest {
attributes('Automatic-Module-Name': 'org.reactivestreams.examples')
}
}
test.useTestNG()
/************************************************************************
* Licensed under Public Domain (CC0) *
* *
* To the extent possible under law, the person who associated CC0 with *
* this code has waived all copyright and related or neighboring *
* rights to this code. *
* *
* You should have received a copy of the CC0 legalcode along with this *
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
************************************************************************/
package org.reactivestreams.example.unicast;
import org.reactivestreams.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* A synchronous implementation of the {@link Publisher} that can
* be subscribed to multiple times and each individual subscription
* will receive range of monotonically increasing integer values on demand.
*/
public final class RangePublisher implements Publisher<Integer> {
/** The starting value of the range. */
final int start;
/** The number of items to emit. */
final int count;
/**
* Constructs a RangePublisher instance with the given start and count values
* that yields a sequence of [start, start + count).
* @param start the starting value of the range
* @param count the number of items to emit
*/
public RangePublisher(int start, int count) {
this.start = start;
this.count = count;
}
@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
// As per rule 1.11, we have decided to support multiple subscribers
// in a unicast configuration for this `Publisher` implementation.
// As per rule 1.09, we need to throw a `java.lang.NullPointerException`
// if the `Subscriber` is `null`
if (subscriber == null) throw null;
// As per 2.13, this method must return normally (i.e. not throw).
try {
subscriber.onSubscribe(new RangeSubscription(subscriber, start, start + count));
} catch (Throwable ex) {
new IllegalStateException(subscriber + " violated the Reactive Streams rule 2.13 " +
"by throwing an exception from onSubscribe.", ex)
// When onSubscribe fails this way, we don't know what state the
// subscriber is thus calling onError may cause more crashes.
.printStackTrace();
}
}
/**
* A Subscription implementation that holds the current downstream
* requested amount and responds to the downstream's request() and
* cancel() calls.
*/
static final class RangeSubscription
// We are using this `AtomicLong` to make sure that this `Subscription`
// doesn't run concurrently with itself, which would violate rule 1.3
// among others (no concurrent notifications).
// The atomic transition from 0L to N > 0L will ensure this.
extends AtomicLong implements Subscription {
private static final long serialVersionUID = -9000845542177067735L;
/** The Subscriber we are emitting integer values to. */
final Subscriber<? super Integer> downstream;
/** The end index (exclusive). */
final int end;
/**
* The current index and within the [start, start + count) range that
* will be emitted as downstream.onNext().
*/
int index;
/**
* Indicates the emission should stop.
*/
volatile boolean cancelled;
/**
* Holds onto the IllegalArgumentException (containing the offending stacktrace)
* indicating there was a non-positive request() call from the downstream.
*/
volatile Throwable invalidRequest;
/**
* Constructs a stateful RangeSubscription that emits signals to the given
* downstream from an integer range of [start, end).
* @param downstream the Subscriber receiving the integer values and the completion signal.
* @param start the first integer value emitted, start of the range
* @param end the end of the range, exclusive
*/
RangeSubscription(Subscriber<? super Integer> downstream, int start, int end) {
this.downstream = downstream;
this.index = start;
this.end = end;
}
// This method will register inbound demand from our `Subscriber` and
// validate it against rule 3.9 and rule 3.17
@Override
public void request(long n) {
// Non-positive requests should be honored with IllegalArgumentException
if (n <= 0L) {
invalidRequest = new IllegalArgumentException("§3.9: non-positive requests are not allowed!");
n = 1;
}
// Downstream requests are cumulative and may come from any thread
for (;;) {
long requested = get();
long update = requested + n;
// As governed by rule 3.17, when demand overflows `Long.MAX_VALUE`
// we treat the signalled demand as "effectively unbounded"
if (update < 0L) {
update = Long.MAX_VALUE;
}
// atomically update the current requested amount
if (compareAndSet(requested, update)) {
// if there was no prior request amount, we start the emission loop
if (requested == 0L) {
emit(update);
}
break;
}
}
}
// This handles cancellation requests, and is idempotent, thread-safe and not
// synchronously performing heavy computations as specified in rule 3.5
@Override
public void cancel() {
// Indicate to the emission loop it should stop.
cancelled = true;
}
void emit(long currentRequested) {
// Load fields to avoid re-reading them from memory due to volatile accesses in the loop.
Subscriber<? super Integer> downstream = this.downstream;
int index = this.index;
int end = this.end;
int emitted = 0;
try {
for (; ; ) {
// Check if there was an invalid request and then report its exception
// as mandated by rule 3.9. The stacktrace in it should
// help locate the faulty logic in the Subscriber.
Throwable invalidRequest = this.invalidRequest;
if (invalidRequest != null) {
// When we signal onError, the subscription must be considered as cancelled, as per rule 1.6
cancelled = true;
downstream.onError(invalidRequest);
return;
}
// Loop while the index hasn't reached the end and we haven't
// emitted all that's been requested
while (index != end && emitted != currentRequested) {
// to make sure that we follow rule 1.8, 3.6 and 3.7
// We stop if cancellation was requested.
if (cancelled) {
return;
}
downstream.onNext(index);
// Increment the index for the next possible emission.
index++;
// Increment the emitted count to prevent overflowing the downstream.
emitted++;
}
// If the index reached the end, we complete the downstream.
if (index == end) {
// to make sure that we follow rule 1.8, 3.6 and 3.7
// Unless cancellation was requested by the last onNext.
if (!cancelled) {
// We need to consider this `Subscription` as cancelled as per rule 1.6
// Note, however, that this state is not observable from the outside
// world and since we leave the loop with requested > 0L, any
// further request() will never trigger the loop.
cancelled = true;
downstream.onComplete();
}
return;
}
// Did the requested amount change while we were looping?
long freshRequested = get();
if (freshRequested == currentRequested) {
// Save where the loop has left off: the next value to be emitted
this.index = index;
// Atomically subtract the previously requested (also emitted) amount
currentRequested = addAndGet(-currentRequested);
// If there was no new request in between get() and addAndGet(), we simply quit
// The next 0 to N transition in request() will trigger the next emission loop.
if (currentRequested == 0L) {
break;
}
// Looks like there were more async requests, reset the emitted count and continue.
emitted = 0;
} else {
// Yes, avoid the atomic subtraction and resume.
// emitted != currentRequest in this case and index
// still points to the next value to be emitted
currentRequested = freshRequested;
}
}
} catch (Throwable ex) {
// We can only get here if `onNext`, `onError` or `onComplete` threw, and they
// are not allowed to according to 2.13, so we can only cancel and log here.
// If `onError` throws an exception, this is a spec violation according to rule 1.9,
// and all we can do is to log it.
// Make sure that we are cancelled, since we cannot do anything else
// since the `Subscriber` is faulty.
cancelled = true;
// We can't report the failure to onError as the Subscriber is unreliable.
(new IllegalStateException(downstream + " violated the Reactive Streams rule 2.13 by " +
"throwing an exception from onNext, onError or onComplete.", ex))
.printStackTrace();
}
}
}
}
/************************************************************************
* Licensed under Public Domain (CC0) *
* *
* To the extent possible under law, the person who associated CC0 with *
* this code has waived all copyright and related or neighboring *
* rights to this code. *
* *
* You should have received a copy of the CC0 legalcode along with this *
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
************************************************************************/
package org.reactivestreams.example.unicast;
import org.reactivestreams.Publisher;
import org.reactivestreams.example.unicast.RangePublisher;
import org.reactivestreams.tck.*;
public class RangePublisherTest extends PublisherVerification<Integer> {
public RangePublisherTest() {
super(new TestEnvironment(50, 50));
}
@Override
public Publisher<Integer> createPublisher(long elements) {
return new RangePublisher(1, (int)elements);
}
@Override
public Publisher<Integer> createFailedPublisher() {
return null;
}
}
/bin
\ No newline at end of file
description = 'reactive-streams-flow-adapters'
dependencies {
compile project(':reactive-streams')
testCompile project(':reactive-streams-tck')
testCompile group: 'org.testng', name: 'testng', version: '5.14.10'
}
jar {
manifest {
attributes('Automatic-Module-Name': 'org.reactivestreams.flowadapters')
}
}
test.useTestNG()
javadoc {
options.links("https://docs.oracle.com/javase/9/docs/api")
}
\ No newline at end of file
/************************************************************************
* Licensed under Public Domain (CC0) *
* *
* To the extent possible under law, the person who associated CC0 with *
* this code has waived all copyright and related or neighboring *
* rights to this code. *
* *
* You should have received a copy of the CC0 legalcode along with this *
* work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.*
************************************************************************/
package org.reactivestreams;
import java.util.concurrent.Flow;
/**
* Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
*/
public final class FlowAdapters {
/** Utility class. */
private FlowAdapters() {
throw new IllegalStateException("No instances!");
}
/**
* Converts a Flow Publisher into a Reactive Streams Publisher.
* @param <T> the element type
* @param flowPublisher the source Flow Publisher to convert
* @return the equivalent Reactive Streams Publisher
*/
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Publisher<T> toPublisher(
Flow.Publisher<? extends T> flowPublisher) {
if (flowPublisher == null) {
throw new NullPointerException("flowPublisher");
}
if (flowPublisher instanceof FlowPublisherFromReactive) {
return (org.reactivestreams.Publisher<T>)(((FlowPublisherFromReactive<T>)flowPublisher).reactiveStreams);
}
if (flowPublisher instanceof org.reactivestreams.Publisher) {
return (org.reactivestreams.Publisher<T>)flowPublisher;
}
return new ReactivePublisherFromFlow<T>(flowPublisher);
}
/**
* Converts a Reactive Streams Publisher into a Flow Publisher.
* @param <T> the element type
* @param reactiveStreamsPublisher the source Reactive Streams Publisher to convert
* @return the equivalent Flow Publisher
*/
@SuppressWarnings("unchecked")
public static <T> Flow.Publisher<T> toFlowPublisher(
org.reactivestreams.Publisher<? extends T> reactiveStreamsPublisher
) {
if (reactiveStreamsPublisher == null) {
throw new NullPointerException("reactiveStreamsPublisher");
}
if (reactiveStreamsPublisher instanceof ReactivePublisherFromFlow) {
return (Flow.Publisher<T>)(((ReactivePublisherFromFlow<T>)reactiveStreamsPublisher).flow);
}
if (reactiveStreamsPublisher instanceof Flow.Publisher) {
return (Flow.Publisher<T>)reactiveStreamsPublisher;
}
return new FlowPublisherFromReactive<T>(reactiveStreamsPublisher);
}
/**
* Converts a Flow Processor into a Reactive Streams Processor.
* @param <T> the input value type
* @param <U> the output value type
* @param flowProcessor the source Flow Processor to convert
* @return the equivalent Reactive Streams Processor
*/
@SuppressWarnings("unchecked")
public static <T, U> org.reactivestreams.Processor<T, U> toProcessor(
Flow.Processor<? super T, ? extends U> flowProcessor
) {
if (flowProcessor == null) {
throw new NullPointerException("flowProcessor");
}
if (flowProcessor instanceof FlowToReactiveProcessor) {
return (org.reactivestreams.Processor<T, U>)(((FlowToReactiveProcessor<T, U>)flowProcessor).reactiveStreams);
}
if (flowProcessor instanceof org.reactivestreams.Processor) {
return (org.reactivestreams.Processor<T, U>)flowProcessor;
}
return new ReactiveToFlowProcessor<T, U>(flowProcessor);
}
/**
* Converts a Reactive Streams Processor into a Flow Processor.
* @param <T> the input value type
* @param <U> the output value type
* @param reactiveStreamsProcessor the source Reactive Streams Processor to convert
* @return the equivalent Flow Processor
*/
@SuppressWarnings("unchecked")
public static <T, U> Flow.Processor<T, U> toFlowProcessor(
org.reactivestreams.Processor<? super T, ? extends U> reactiveStreamsProcessor
) {
if (reactiveStreamsProcessor == null) {
throw new NullPointerException("reactiveStreamsProcessor");
}
if (reactiveStreamsProcessor instanceof ReactiveToFlowProcessor) {
return (Flow.Processor<T, U>)(((ReactiveToFlowProcessor<T, U>)reactiveStreamsProcessor).flow);
}
if (reactiveStreamsProcessor instanceof Flow.Processor) {
return (Flow.Processor<T, U>)reactiveStreamsProcessor;
}
return new FlowToReactiveProcessor<T, U>(reactiveStreamsProcessor);
}
/**
* Converts a Reactive Streams Subscriber into a Flow Subscriber.
* @param <T> the input and output value type
* @param reactiveStreamsSubscriber the Reactive Streams Subscriber instance to convert
* @return the equivalent Flow Subscriber
*/
@SuppressWarnings("unchecked")
public static <T> Flow.Subscriber<T> toFlowSubscriber(org.reactivestreams.Subscriber<T> reactiveStreamsSubscriber) {
if (reactiveStreamsSubscriber == null) {
throw new NullPointerException("reactiveStreamsSubscriber");
}
if (reactiveStreamsSubscriber instanceof ReactiveToFlowSubscriber) {
return (Flow.Subscriber<T>)((ReactiveToFlowSubscriber<T>)reactiveStreamsSubscriber).flow;
}
if (reactiveStreamsSubscriber instanceof Flow.Subscriber) {
return (Flow.Subscriber<T>)reactiveStreamsSubscriber;
}
return new FlowToReactiveSubscriber<T>(reactiveStreamsSubscriber);
}
/**
* Converts a Flow Subscriber into a Reactive Streams Subscriber.
* @param <T> the input and output value type
* @param flowSubscriber the Flow Subscriber instance to convert
* @return the equivalent Reactive Streams Subscriber
*/
@SuppressWarnings("unchecked")
public static <T> org.reactivestreams.Subscriber<T> toSubscriber(Flow.Subscriber<T> flowSubscriber) {
if (flowSubscriber == null) {
throw new NullPointerException("flowSubscriber");
}
if (flowSubscriber instanceof FlowToReactiveSubscriber) {
return (org.reactivestreams.Subscriber<T>)((FlowToReactiveSubscriber<T>)flowSubscriber).reactiveStreams;
}
if (flowSubscriber instanceof org.reactivestreams.Subscriber) {
return (org.reactivestreams.Subscriber<T>)flowSubscriber;
}
return new ReactiveToFlowSubscriber<T>(flowSubscriber);
}
/**
* Wraps a Reactive Streams Subscription and converts the calls to a Flow Subscription.
*/
static final class FlowToReactiveSubscription implements Flow.Subscription {
final org.reactivestreams.Subscription reactiveStreams;
public FlowToReactiveSubscription(org.reactivestreams.Subscription reactive) {
this.reactiveStreams = reactive;
}
@Override
public void request(long n) {
reactiveStreams.request(n);
}
@Override
public void cancel() {
reactiveStreams.cancel();
}
}
/**
* Wraps a Flow Subscription and converts the calls to a Reactive Streams Subscription.
*/
static final class ReactiveToFlowSubscription implements org.reactivestreams.Subscription {
final Flow.Subscription flow;
public ReactiveToFlowSubscription(Flow.Subscription flow) {
this.flow = flow;
}
@Override
public void request(long n) {
flow.request(n);
}
@Override
public void cancel() {
flow.cancel();
}
}
/**
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
* @param <T> the element type
*/
static final class FlowToReactiveSubscriber<T>
implements Flow.Subscriber<T> {
final org.reactivestreams.Subscriber<? super T> reactiveStreams;
public FlowToReactiveSubscriber(org.reactivestreams.Subscriber<? super T> reactive) {
this.reactiveStreams = reactive;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
}
@Override
public void onNext(T item) {
reactiveStreams.onNext(item);
}
@Override
public void onError(Throwable throwable) {
reactiveStreams.onError(throwable);
}
@Override
public void onComplete() {
reactiveStreams.onComplete();
}
}
/**
* Wraps a Reactive Streams Subscriber and forwards methods of the Flow Subscriber to it.
* @param <T> the element type
*/
static final class ReactiveToFlowSubscriber<T>
implements org.reactivestreams.Subscriber<T> {
final Flow.Subscriber<? super T> flow;
public ReactiveToFlowSubscriber(Flow.Subscriber<? super T> flow) {
this.flow = flow;
}
@Override
public void onSubscribe(org.reactivestreams.Subscription subscription) {
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
}
@Override
public void onNext(T item) {
flow.onNext(item);
}
@Override
public void onError(Throwable throwable) {
flow.onError(throwable);
}
@Override
public void onComplete() {
flow.onComplete();
}
}
/**
* Wraps a Flow Processor and forwards methods of the Reactive Streams Processor to it.
* @param <T> the input type
* @param <U> the output type
*/
static final class ReactiveToFlowProcessor<T, U>
implements org.reactivestreams.Processor<T, U> {
final Flow.Processor<? super T, ? extends U> flow;
public ReactiveToFlowProcessor(Flow.Processor<? super T, ? extends U> flow) {
this.flow = flow;
}
@Override
public void onSubscribe(org.reactivestreams.Subscription subscription) {
flow.onSubscribe((subscription == null) ? null : new FlowToReactiveSubscription(subscription));
}
@Override
public void onNext(T t) {
flow.onNext(t);
}
@Override
public void onError(Throwable t) {
flow.onError(t);
}
@Override
public void onComplete() {
flow.onComplete();
}
@Override
public void subscribe(org.reactivestreams.Subscriber<? super U> s) {
flow.subscribe((s == null) ? null : new FlowToReactiveSubscriber<U>(s));
}
}
/**
* Wraps a Reactive Streams Processor and forwards methods of the Flow Processor to it.
* @param <T> the input type
* @param <U> the output type
*/
static final class FlowToReactiveProcessor<T, U>
implements Flow.Processor<T, U> {
final org.reactivestreams.Processor<? super T, ? extends U> reactiveStreams;
public FlowToReactiveProcessor(org.reactivestreams.Processor<? super T, ? extends U> reactive) {
this.reactiveStreams = reactive;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
reactiveStreams.onSubscribe((subscription == null) ? null : new ReactiveToFlowSubscription(subscription));
}
@Override
public void onNext(T t) {
reactiveStreams.onNext(t);
}
@Override
public void onError(Throwable t) {
reactiveStreams.onError(t);
}
@Override
public void onComplete() {
reactiveStreams.onComplete();
}
@Override
public void subscribe(Flow.Subscriber<? super U> s) {
reactiveStreams.subscribe((s == null) ? null : new ReactiveToFlowSubscriber<U>(s));
}
}
/**
* Reactive Streams Publisher that wraps a Flow Publisher.
* @param <T> the element type
*/
static final class ReactivePublisherFromFlow<T> implements org.reactivestreams.Publisher<T> {
final Flow.Publisher<? extends T> flow;
public ReactivePublisherFromFlow(Flow.Publisher<? extends T> flowPublisher) {
this.flow = flowPublisher;
}
@Override
public void subscribe(org.reactivestreams.Subscriber<? super T> reactive) {
flow.subscribe((reactive == null) ? null : new FlowToReactiveSubscriber<T>(reactive));
}
}
/**
* Flow Publisher that wraps a Reactive Streams Publisher.
* @param <T> the element type
*/
static final class FlowPublisherFromReactive<T> implements Flow.Publisher<T> {
final org.reactivestreams.Publisher<? extends T> reactiveStreams;
public FlowPublisherFromReactive(org.reactivestreams.Publisher<? extends T> reactivePublisher) {
this.reactiveStreams = reactivePublisher;
}
@Override
public void subscribe(Flow.Subscriber<? super T> flow) {
reactiveStreams.subscribe((flow == null) ? null : new ReactiveToFlowSubscriber<T>(flow));
}
}
}