概要
この記事はReactive Streamsと、JDK Flow APIを調べたときのメモになります。
Flow API
(java.util.concurrent.Flow) はJDK 9([JEP 266] (http://openjdk.java.net/jeps/266))で導入されたAPIで、`Reactive Streams Special Interest Group` (SIG)というワーキンググループが作成した仕様([Reactive Streams] (https://www.reactive-streams.org/))に対応しています。
この仕様に対応しているJVMのライブラリには[Akka Streams] (https://akka.io/)(Lightbend, Inc.)、[ReactiveX/RxJava] (http://reactivex.io/)などがあり、Spring WebFluxで使われている[Project Reactor] (https://projectreactor.io/)(Pivotal Software, Inc.)も対応しています。
環境
- Windows 10 Professional 1909
- OpenJDK 13.0.1
参考
- [Reactive Streams] (https://www.reactive-streams.org/)
- [The Reactive Manifesto] (https://www.reactivemanifesto.org/)
- [リアクティブ宣言] (https://www.reactivemanifesto.org/ja)
- [Reactive Programming with JDK 9 Flow API] (https://community.oracle.com/docs/DOC-1006738)
Reactive Streamsについて
Reactive Streams とは
下記は、[Reactive Streams] (https://www.reactive-streams.org/)の冒頭の一文から引用しました。(日本語訳はGoogle翻訳です。)
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Reactive Streamsは、ノンブロッキングバックプレッシャーを伴う非同期ストリーム処理の標準を提供するイニシアチブです。 これには、ランタイム環境(JVMおよびJavaScript)とネットワークプロトコルを対象とした取り組みが含まれます。
この文中にある『ノンブロッキングバックプレッシャーを伴う非同期ストリーム処理 (asynchronous stream processing with non-blocking back pressure)』という一文がReactive Streamsの特徴を端的に表しています。以下に用語集からそれぞれの用語の説明を引用しました。
ノンブロッキング (non blocking) とは
[ノンブロッキング] (https://www.reactivemanifesto.org/ja/glossary#Non-Blocking)
API は、リソースが利用可能ならアクセスさせ、そうでなければ直ちに返って、リソースが現時点では利用できなかったり、操作が開始されて未だ完了していないことを呼び出し元へ伝える。リソースに対するノンブロッキング API では、呼び出し元は、リソースが利用可能になるまでブロックして待つ代わりに他の仕事をすることができる。
バックプレッシャー (back pressure) とは
[バック・プレッシャー] (https://www.reactivemanifesto.org/ja/glossary#Back-Pressure)
過負荷状態のコンポーネントが壊滅的にクラッシュしたり、制御無くメッセージを損失することは許されない。処理が追いつかなくなっていて、かつクラッシュすることも許されないならば、コンポーネントは上流のコンポーネント群に自身が過負荷状態であることを伝えて負荷を減らしてもらうべきだ。このバック・プレッシャー (back-pressure) と呼ばれる仕組みは、過負荷の下でシステムを崩壊させず緩やかに応答を続ける重要なフィードバック機構だ。
非同期 (asynchronous) とは
[非同期] (https://www.reactivemanifesto.org/ja/glossary#Asynchronous)
リアクティブ宣言の文脈では、「クライアントからサービスへ送信されたリクエストが、送信後の任意の時点で処理されること」を意味する。送信先のサービス内でのリクエスト処理の実行を直接クライアントが観測したり、それに対して同期を取ることはできない。
[Reactive Streams Specification for the JVM] (https://github.com/reactive-streams/reactive-streams-jvm/blob/v1.0.3/README.md)
SIGが作成したJVM向けの仕様は、2019年11月現在version 1.0.3まで更新されています。
- version 1.0.0 : 2015年4月30日
- version 1.0.1 : 2017年8月9日
- version 1.0.2 : 2017年12月19日
- version 1.0.3 : 2019年8月23日
成果物
Mavenの成果物として以下のものがありますが、これらは仕様、TCK (Technology Compatibility Kit)、実装例であるため通常のプロジェクトでは直接使用せず、Akka StreamsやReactiveX/RxJava、Reactorなどのライブラリを使用することになると思います。
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
<version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck</artifactId>
<version>1.0.3</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-tck-flow -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-tck-flow</artifactId>
<version>1.0.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.reactivestreams/reactive-streams-examples -->
<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams-examples</artifactId>
<version>1.0.3</version>
</dependency>
API Components
Reactive StreamsのJVM向け仕様 version 1.0.3で定義されているインターフェースは下記の4つです。
Publisher
Publisher
は、無制限または有限の順序付けられた要素(sequenced elements)のプロバイダ(つまり、データストリームの発行)で、Subscriberから(Subscriptionを通じて)要求を受け取ると要素を発行(publish)します。
public interface Publisher<T> {
public void subscribe(Subscriber<? super T> s);
}
メソッド | 説明 |
---|---|
subscribe | データのストリーミングを開始するようにPublisherに要求をするファクトリメソッド。新しいSubscription毎に複数回呼び出すことができる。 |
Subscriber
Subscriber
は、Publisherから購読(subscribe)した要素を消費(consume)します。このインターフェースのonXxxというメソッドはPublisherからのシグナルに対応しているコールバックメソッドになります。
public interface Subscriber<T> {
public void onSubscribe(Subscription s);
public void onNext(T t);
public void onError(Throwable t);
public void onComplete();
}
メソッド | 説明 |
---|---|
onSubscribe | Publisher#subscribeを呼び出した後に実行される。Subscriberは引数で受け取ったSubscription を使ってデータを要求、またはキャンセルを行う。 |
onNext | Subscription#requestを呼び出した後に実行される。 |
onError | Publisherのデータ送信が失敗したときに実行される。 |
onComplete | Publisherのデータ送信が正常に終了したときに実行される。(キャンセル含む) |
Subscription
Subscription
は、PublisherとそのPublisherを購読(subscribe)するSubscriberを1対1で表します。SubscriberはSubscriptionのメソッドを介してPublisherへデータ送信またはキャンセルを要求します。
public interface Subscription {
public void request(long n);
public void cancel();
}
メソッド | 説明 |
---|---|
request | データを送信するようにPublisherへ要求する。 |
cancel | データの送信を停止しリソースをクリーンアップするようにPublisherへ要求する。 |
Processor
Processor
は、SubscriberとPublisherの両方の機能を持つコンポーネントです。Processorは始端のPublisherと終端のSubscriberの中間に位置しますが、1つだけでなく複数のProcessorを連結して配置することも可能です。
public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
Processorは必ずしも必要というわけではなく、必要がなければ下図のようにPublisherとSubscriberが直接連携をします。
+-----------+ +------------+
| | <-subscribe- | |
| Publisher | | Subscriber |
| | <--request-- | |
+-----------+ +------------+
下図は2つのProcessor(A,B)を連結して配置したときのイメージです。
このように中間にProcessorが必要となる状況とは、データストリーム上の途中でフィルタリングやデータ変換を行いたい場合です。
+-----------+ +-----------+ +-----------+ +------------+
| | <-subscribe- | | <-subscribe- | | <-subscribe- | |
| Publisher | | Processor | | Processor | | Subscriber |
| | <--request-- | (A) | <--request-- | (B) | <--request-- | |
+-----------+ +-----------+ +-----------+ +------------+
実装例
実装例がGitHub([reactive-streams/reactive-streams-jvm] (https://github.com/reactive-streams/reactive-streams-jvm))にあります。
下記はPublisher実装例の1つのAsyncIterablePublisherクラスを使ったデモプログラムです。
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.reactivestreams.example.unicast.AsyncIterablePublisher;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@Slf4j
public class Demo {
public static void main(String ... args) {
List<Integer> elements = IntStream.rangeClosed(1, 20).boxed().collect(Collectors.toList());
ExecutorService executor = Executors.newFixedThreadPool(3);
AsyncIterablePublisher<Integer> pub = new AsyncIterablePublisher<>(elements, executor);
MySub mySub1 = new MySub("sub_1");
MySub mySub2 = new MySub("sub_2");
MySub mySub3 = new MySub("sub_3");
log.info("start");
// Publisher#subscribeを呼び出すと
// SubscriberのonSubscribeメソッドがコールバックされる
pub.subscribe(mySub1);
pub.subscribe(mySub2);
pub.subscribe(mySub3);
log.info("end");
try {
// 非同期処理のため処理が終了するまで30秒間待機する
TimeUnit.SECONDS.sleep(30);
executor.shutdown();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class MySub implements Subscriber<Integer> {
private final String name;
private Subscription s;
public MySub(String name) {
this.name = name;
}
private Long getId() {
return Thread.currentThread().getId();
}
@Override
public void onSubscribe(Subscription s) {
log.info("({}) onSubscribe:[{}]", getId(), name);
this.s = s;
// サブスクライブが完了したらデータを発行するようにPublisherへ要求する
// onSubscribeメソッド内でrequestすることでサブスクライブ完了と同時にデータ発行が始まる
s.request(1);
}
@Override
public void onNext(Integer integer) {
// Publisherからデータ発行が行われるとonNextメソッドがコールバックされる
log.info("({}) onNext:[{}] item:{}", getId(), name, integer);
// このメソッド内でデータ処理を行う
// なんらかのデータ処理を行う
// 次のデータを発行するようにPublisherへ要求する
s.request(1);
// もしくはキャンセルする
//s.cancel();
}
@Override
public void onError(Throwable t) {
// Publisherのデータ発行にエラーが発生するとコールバックされる
log.info("onError:[{}]", name);
}
@Override
public void onComplete() {
// Publisherのデータ発行が完了(若しくはキャンセル)するとコールバックされる
log.info("({}) onComplete:[{}]", getId(), name);
}
}
}
実行結果
[main] INFO Demo - start
[main] INFO Demo - end
[pool-1-thread-2] INFO Demo - (15) onSubscribe:[sub_2]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:2
[pool-1-thread-3] INFO Demo - (16) onSubscribe:[sub_3]
[pool-1-thread-1] INFO Demo - (14) onSubscribe:[sub_1]
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:3
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:1
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:4
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:2
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:2
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:5
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:3
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:3
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:4
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:4
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:7
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:5
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:5
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:8
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:6
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:6
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:9
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:7
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:11
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:9
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:8
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:12
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:10
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:9
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:10
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:12
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:13
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:19
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:14
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_2] item:20
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:15
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_2]
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:16
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:11
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:12
[pool-1-thread-3] INFO Demo - (16) onNext:[sub_3] item:13
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:17
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:14
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:18
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:15
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:16
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:17
[pool-1-thread-1] INFO Demo - (14) onNext:[sub_1] item:20
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:18
[pool-1-thread-1] INFO Demo - (14) onComplete:[sub_1]
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:19
[pool-1-thread-2] INFO Demo - (15) onNext:[sub_3] item:20
[pool-1-thread-2] INFO Demo - (15) onComplete:[sub_3]
JDK Flow API
[java.util.concurrent.Flow] (https://docs.oracle.com/javase/jp/13/docs/api/java.base/java/util/concurrent/Flow.html)
FlowクラスにはReactive Streamsの仕様に対応する4つのインターフェースが宣言されています。リアクティブストリームに対応したアプリケーションの開発では、これらのインターフェースを実装する必要があります。
public final class Flow {
@FunctionalInterface
public static interface Publisher<T> {
public void subscribe(Subscriber<? super T> subscriber);
}
public static interface Subscriber<T> {
public void onSubscribe(Subscription subscription);
public void onNext(T item);
public void onError(Throwable throwable);
public void onComplete();
}
public static interface Subscription {
public void request(long n);
public void cancel();
}
public static interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
}
[SubmissionPublisher<T>] (https://docs.oracle.com/javase/jp/13/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html)
なお、PublisherについてはSubmissionPublisher<T> という実装クラスがあり、これをそのまま使用するか継承して独自処理を実装するという使い方ができます。
コンストラクタ
コンストラクタ |
---|
SubmissionPublisher() |
SubmissionPublisher(Executor executor, int maxBufferCapacity) |
SubmissionPublisher(Executor executor, int maxBufferCapacity, BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler) |
- maxBufferCapacityは2のべき乗に丸められます。
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>()) {
// 省略
}
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(ForkJoinPool.commonPool(), 8)) {
// 省略
}
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 8, (subscriber, throwable) -> {
})) {
// 省略
}
データ発行
SubmissionPublisherクラスにはデータを発行(publish)するメソッドにsubmit
とoffer
があります。
データ発行メソッド |
---|
public int submit(T item) |
public int offer(T item, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) |
public int offer(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) |
submit
submitはデータ送信できるまでブロックします。
int lag = pub.submit(value);
if (lag < 0) {
// submitではドロップは発生しない
} else {
// 最大遅延の推定値(送信されたがまだ消費されていないアイテムの数)
}
offer
offerはデータ送信をブロックせず、送信できなかった場合の処理(再送するかしないか等)を実行できます。
この例では再送せずにデータをドロップします。
int lag = offer(item, (subscriber, value) -> {
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; // 再送しない
});
if (lag < 0) {
// ドロップ数
} else {
// 最大遅延の推定値(送信されたがまだ消費されていないアイテムの数)
}
offer
タイムアウト時間を指定することもできます。この例では送信できなかった場合、1秒まで待機します。
int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; // 再送しない
});
if (lag < 0) {
// ドロップ数
} else {
// 最大遅延の推定値(送信されたがまだ消費されていないアイテムの数)
}
実装例
下記はSubmissionPublisherクラスを使ったデモプログラムです。
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
@Slf4j
public class Demo {
public static void main(String ... args) {
log.info("start");
MySub<Integer> mySub1 = new MySub<>("sub_1");
MySub<Integer> mySub2 = new MySub<>("sub_2");
MySub<Integer> mySub3 = new MySub<>("sub_3");
ExecutorService executor = Executors.newFixedThreadPool(3);
try (SubmissionPublisher<Integer> pub = new SubmissionPublisher<>(executor, 256)) {
pub.subscribe(mySub1);
pub.subscribe(mySub2);
pub.subscribe(mySub3);
log.info("NumberOfSubscribers:{}", pub.getNumberOfSubscribers());
log.info("MaxBufferCapacity:{}", pub.getMaxBufferCapacity());
IntStream.rangeClosed(1, 100000).forEach(value -> {
log.info("publish:{} estimateMinimumDemand:{} estimateMaximumLag:{}", value, pub.estimateMinimumDemand(), pub.estimateMaximumLag());
int lag = pub.offer(value, 1, TimeUnit.SECONDS, (subscriber, integer) -> {
log.info("publish offer on drop:{}", integer);
subscriber.onError(new RuntimeException("drop item:[" + integer + "]"));
return false; // 再送しない
});
if (lag < 0) {
// ドロップ数
log.info("drops:{}", lag * -1);
} else {
// 最大遅延の推定値(送信されたがまだ消費されていないアイテムの数)
log.info("lag:{}", lag);
}
});
}
log.info("end");
try {
TimeUnit.SECONDS.sleep(10);
mySub1.result();
mySub2.result();
mySub3.result();
if (!executor.isShutdown()) {
log.info("shutdown");
executor.shutdown();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
static class MySub<Integer> implements Flow.Subscriber<Integer> {
private final String name;
private AtomicInteger success = new AtomicInteger(0);
private AtomicInteger error = new AtomicInteger(0);
private Flow.Subscription s;
public MySub(String name) {
this.name = name;
}
private Long getId() {
return Thread.currentThread().getId();
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
log.info("({}) onSubscribe:[{}]", getId(), name);
this.s = subscription;
s.request(1);
}
@Override
public void onNext(Integer item) {
log.info("({}) onNext:[{}] item:{}", getId(), name, item);
success.incrementAndGet();
s.request(1);
}
@Override
public void onError(Throwable throwable) {
log.info("({}) onError:[{}]", getId(), name);
error.incrementAndGet();
}
@Override
public void onComplete() {
log.info("({}) onComplete:[{}]", getId(), name);
}
public void result() {
log.info("result:[{}] success:{} error:{}", name, success.get(), error.get());
}
}
}