Reactive Extensionの調査中にReactor Core 2.5が面白いことに気がついたのでまとめ。
- JavaDoc
- GitHub
- A lite Rx API for the JVM by Sébastien Deleuze (Spring I/O 2016での発表資料)
Reactor Core (2.5) = Reactive Streams + Reactive Extensions(の一部)
Reactor(〜2.0)は効率的な非同期プログラミングのためのツールキットで、基本的な関数型(Java8以前に開発されたため)に始まって高効率なスケジューラー上に実装されたReactive StreamsやそのReactive Extensions(Rx)、非同期のネットワークライブラリ等を提供していました。
ところがReactor 2.5を開発するにあたってプロジェクトの構成をガラガラポンしたらしく、結果としてReactor Core(2.5)単体はReactive Streamsに準拠した軽量なRxライブラリに仕上がっています(依存性はreactive-streamsのみ)。
(RxJavaの主要コミッターであるDávid Karnok曰く現在のRxJavaは第二世代のリアクティブライブラリなのに対してReactor 2.5+は第四世代との解説)
Spring Frameworkの次期バージョン5ではこのReactorがSpring MVCのリアクティブ対応に用いられるようです。
現在のバージョンはマイルストーンリリース4で、利用するにはspring.ioのスナップショットリポジトリを参照する必要があります。
<repositories>
<repository>
<id>spring-snapshot</id>
<name>Spring Snapshot Repository</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
...
</repositories>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>2.5.0.M4</version>
</dependency>
...
</dependencies>
リファレンスマニュアル等が依然として書かれていないのがプロジェクトの進捗としては気になるところですが、今後充実すると期待したいところです。
Flux
とMono
: Reactive Streams準拠なObservable
RxのObservable
の概念を説明するのによく以下のような表が用いられます。
Single item | Multiple items | |
---|---|---|
同期 | T |
Iterable<T> |
非同期 | Future<T> |
Observable<T> |
つまりObservable
は非同期に出力されるデータの列を抽象するクラスです。
これに対して、Reactor Core(2.5)で提供されるクラスは以下のように対応します。
Single item | Multiple items | |
---|---|---|
Reactor core (非同期) | Mono<T> |
Flux<T> |
Observable
相当のFlux
に加えて、最大1個の出力を返すMono
が用意されているのがポイントです。共にRxで定義されているAPIのサブセットを実装しています。
最も重要な点として、Flux
もMono
も、Reactive StreamsのPublisher
を実装しています。
public abstract class Flux<T> implements Publisher<T>, Introspectable, Backpressurable{...}
public abstract class Mono<T> implements Publisher<T>, Backpressurable, Introspectable {...}
Flux
もMono
もPublisher
から直接作成出来るほか、flatMap
等の重要な操作の多くもFlux
等ではなくPublisher
を引数に用います。
public static <T> Flux<T> from(Publisher<? extends T> source) {...}
public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {...}
Reactor CoreはReactive Streamsの上にReactive ExtensionsのAPIを被せるラッパーのように見ることが出来るかと思います。
Mono
は便利
Mono
は最大1個の値を出力するPublisher
です。前掲の表でも対応していたJDKのFutureのように、単発の値を返す非同期処理の戻り値として使うと便利です。
Mono
はPromise風
最近は特にAndroidアプリの開発にRxJavaを導入する事例が多く報告されていますが、その理由の一つとしてObservable
をPromiseとして用いることでUI周りの非同期処理の書き方を統一したいという事情があるかと思います。
Mono
はこの用途に適しています。単一の出力値しか扱わないためObservable
よりもAPIがコンパクトであり、加えて他の言語のPromiseに多く用いられるthen
メソッドも提供しています。
Mono
を作る方法は複数ありますが、コールバックを用いた古典的な非同期処理はMono.create
でラップするのが簡単です。
class SampleService{
private final Timer timer = new Timer();
// タイマーを用いて1秒後に入力値+1を計算して出力
Mono<Integer> increment(int v){
return Mono.create(e -> timer.schedule(new TimerTask() {
@Override
public void run() { e.complete(v + 1); }
}, 1000));
}
}
Mono.create
はMonoEmitter
を引数とした関数からMono
を作成します。
コールバックの中でMonoEmitter
のcomplete(v)
を呼び出すことで値を下流のSubscriberに伝達します。
MonoEmitter
はキャンセル処理登録のsetCancellation(c)
や例外捕捉のfail(e)
といったメソッドも持ちます。
// より丁寧に実装したincrementメソッドの例
Mono<Integer> increment(int v) {
return Mono.create(e -> {
TimerTask t = new TimerTask() {
@Override
public void run() { e.complete(v + 1);}
};
e.setCancellation(t::cancel); // 下流でキャンセルされたとき
try {
timer.schedule(t, 1000);
}catch (Exception ex){
e.fail(ex); // タイマー開始時の例外を下流にerrorとして転送
}
});
}
このように定義された非同期処理はsubscribe
されて初めて実行されます。
SampleService service = new SampleService();
service.increment(0).subscribe(System.out::println);
System.out.println("Subscribed"); // 即座に"Subscribed"、1秒後に"1"を表示
現在のスレッドをブロックして同期的に値を得たいときは代わりにblock
を使います。
int result = service.increment(0).block();
System.out.println(result); // 1秒後に"1"を表示
またMono
をthen
で連鎖することで非同期処理を逐次実行することが出来ます。
service.increment(0)
.then(service::increment)
.then(service::increment)
.then(service::increment)
.map(v -> "Result: " + v)
.subscribe(System.out::println);
System.out.println("Subscribed"); // 即座に"Subscribed"、4秒後に"Result: 4"と表示
「Mono
のthen
」 = 「Mono
を返す関数をflatMap
」と殆ど等価だと思うのですが、JavaScript等のPromiseを読み慣れている場合はより読みやすいかと思います。
他方でMono
を返さない同期処理を繋ぎたいときにはthen
ではなくmap
を使わなければならないのは現状イマイチなところです。
複数のMono
の同期を取る関数も用意されています。
Mono.when( // 全てのMonoの完了を持つ
service.increment(0),
service.increment(0).then(service::increment),
service.increment(0).then(service::increment).then(service::increment)
).subscribe(System.out::println); // 3秒後に"1, 2, 3"を表示
RxJavaのObservable
をPromise代わりに用いる場合、例えば単一の値を返すObservable
をメソッドの返値として用いる運用を行うかと思います。
この場合、メソッドのシグネチャからだけでは返値が一つの値なのかストリームなのか判別が出来ません。
interface AsyncService {
Observable<Integer> action();
Observable<Integer> stream();
}
この点、ReactorではMono
とFlux
を使い分けることで区別出来ます。また後述の通り使い分けても相互運用はスムーズです。
interface AsyncService {
Mono<Integer> action();
Flux<Integer> stream();
}
Mono
はFlux
との相互運用が容易
RxJavaにも非同期に一個の値を出力するクラスとしてSingle
が定義されています。Single
のAPIはObservable
風ではありますがそれ自体はObservable
では無く、インターフェイスも共有していません。
そのため、例えばSingle
を返す非同期処理をObservable
に適用するには一旦Single
をObservable
に変換する手順が必要になります。
interface RxService {
Single<String> action(int v);
Observable<Integer> stream();
}
RxService rxService = ...;
rxService.stream().flatMap(v -> rxService.action(v).toObservable());
他方でFlux
とMono
は共にReactive StreamのPublisher
であり、そのAPIも多くは共通のインターフェイスであるPublisher
を引数に用いる事が出来ます。
ですので一方を他方に直接適用することが可能です。
interface RoService{
Mono<String> action(int v);
Flux<Integer> stream();
}
RoService roService = ...;
roService.stream().flatMap(roService::action);
単発の値を返す非同期処理にはMono
、ストリームにはFlux
と使い分けて、前者を後者にflatMap
するという書き方が多くなるかと思います。
Mono
は四文字
微妙に重要(個人的には)。
実際のところJDK8にも従来のFuture
の拡張としてPromise
風に使えるCompletableFuture
が導入されています。しかし17文字です。
例として、非同期にレスポンスを返すサービスのインターフェイスを定義してみましょう。
interface AsyncService{
CompletableFuture<Integer> action1();
CompletableFuture<Integer> action2();
CompletableFuture<Integer> action3();
CompletableFuture<Integer> action4();
Observable<Integer> stream();
}
返値のシグネチャを見ているだけで目が微妙にチカチカと・・・Mono
だとこうなります。
interface AsyncService{
Mono<Integer> action1();
Mono<Integer> action2();
Mono<Integer> action3();
Mono<Integer> action4();
Flux<Integer> stream();
}
実際、非同期メッセージングを用いるコードを書き始めると非同期レスポンスの型は頻繁に記述することになります。その度にCompletableFuture
と書くのと(命名者出てこいっ)Mono
4文字で済むのではコードの可読性にも差が出ると思います。
また上記の例ではストリームの返値としてObservable
とFlux
も比べてみました。Flux
も4文字で、しかもMono
と文字数が揃っているのが個人的には好ましいです。
Flux
は過不足無し
・・・と思います。RxJavaのObservable
を使い慣れていればFlux
も難なく使えるはずです。
Reactorが提供するのはRxのサブセットですが必要なAPIは過不足無くカバーしているとの開発サイドの主張です。
Under its new scope and close ties with Reactive Streams Commons, Reactor Core offers just enough Rx coverage to build reactive apps or libraries alike, e.g. Spring Reactive Web support.
ただしObservable
以上のメリットが現在あるかというとやや微妙です。
最大の違いはFlux
はReactive StreamsのPublisher
を実装しているという点ですが、現状Reactive Streams準拠のライブラリやクライアント等が少ない以上、Reactive Streamsとの互換性はメリットとしては実質あまり大きくないと思います。
現時点でのFlux
のメリットは専ら前述のMono
とインターフェイスを共有しているという設計の筋の良さ、それに伴う相互運用の良さになるかと思います。それと文字数。
というわけで、Androidアプリ開発への適用としては既に実績も支援ライブラリも豊富なRxJavaから移行する積極的な理由は少ないと思います(現状ではAndroid対応にはJava Time API周りの問題があるようです issue)。
ただ今回はサーバーサイドでSpringと共に使う事を目的として調査したのですが、他のSpringライブラリの対応が増えるようだと今後楽しみだと思います。