45
50

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Reactor Core 2.5: もう一つのJava向けReactive Extensions実装

Last updated at Posted at 2016-06-19

Reactive Extensionの調査中にReactor Core 2.5が面白いことに気がついたのでまとめ。

Reactor Core (2.5) = Reactive Streams + Reactive Extensions(の一部)

Reactor(〜2.0)は効率的な非同期プログラミングのためのツールキットで、基本的な関数型(Java8以前に開発されたため)に始まって高効率なスケジューラー上に実装されたReactive StreamsやそのReactive Extensions(Rx)、非同期のネットワークライブラリ等を提供していました。

ところがReactor 2.5を開発するにあたってプロジェクトの構成をガラガラポンしたらしく、結果としてReactor Core(2.5)単体はReactive Streamsに準拠した軽量なRxライブラリに仕上がっています(依存性はreactive-streamsのみ)。

(RxJavaの主要コミッターであるDávid Karnok曰く現在のRxJavaは第二世代のリアクティブライブラリなのに対してReactor 2.5+は第四世代との解説)

Spring Frameworkの次期バージョン5ではこのReactorがSpring MVCのリアクティブ対応に用いられるようです。

現在のバージョンはマイルストーンリリース4で、利用するにはspring.ioのスナップショットリポジトリを参照する必要があります。

pom.xml
  <repositories>
    <repository>
      <id>spring-snapshot</id>
      <name>Spring Snapshot Repository</name>
      <url>https://repo.spring.io/snapshot</url>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
    ...
  </repositories>

  <dependencies>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-core</artifactId>
      <version>2.5.0.M4</version>
    </dependency>
    ...
  </dependencies>

リファレンスマニュアル等が依然として書かれていないのがプロジェクトの進捗としては気になるところですが、今後充実すると期待したいところです。

FluxMono: Reactive Streams準拠なObservable

RxのObservableの概念を説明するのによく以下のようなが用いられます。

Single item Multiple items
同期 T Iterable<T>
非同期 Future<T> Observable<T>

つまりObservableは非同期に出力されるデータの列を抽象するクラスです。

これに対して、Reactor Core(2.5)で提供されるクラスは以下のように対応します。

Single item Multiple items
Reactor core (非同期) Mono<T> Flux<T>

Observable相当のFluxに加えて、最大1個の出力を返すMonoが用意されているのがポイントです。共にRxで定義されているAPIのサブセットを実装しています。

最も重要な点として、FluxMonoも、Reactive StreamsのPublisherを実装しています。

public abstract class Flux<T> implements Publisher<T>, Introspectable, Backpressurable{...}

public abstract class Mono<T> implements Publisher<T>, Backpressurable, Introspectable {...}

FluxMonoPublisherから直接作成出来るほか、flatMap等の重要な操作の多くもFlux等ではなくPublisherを引数に用います。

public static <T> Flux<T> from(Publisher<? extends T> source) {...}

public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) {...}

Reactor CoreはReactive Streamsの上にReactive ExtensionsのAPIを被せるラッパーのように見ることが出来るかと思います。

Monoは便利

Monoは最大1個の値を出力するPublisherです。前掲の表でも対応していたJDKのFutureのように、単発の値を返す非同期処理の戻り値として使うと便利です。

MonoはPromise風

最近は特にAndroidアプリの開発にRxJavaを導入する事例が多く報告されていますが、その理由の一つとしてObservableをPromiseとして用いることでUI周りの非同期処理の書き方を統一したいという事情があるかと思います。

Monoはこの用途に適しています。単一の出力値しか扱わないためObservableよりもAPIがコンパクトであり、加えて他の言語のPromiseに多く用いられるthenメソッドも提供しています。

Monoを作る方法は複数ありますが、コールバックを用いた古典的な非同期処理はMono.createでラップするのが簡単です。

class SampleService{

  private final Timer timer = new Timer();

  // タイマーを用いて1秒後に入力値+1を計算して出力
  Mono<Integer> increment(int v){

    return Mono.create(e -> timer.schedule(new TimerTask() {
      @Override
      public void run() { e.complete(v + 1); }
    }, 1000));
  }
}

Mono.createMonoEmitterを引数とした関数からMonoを作成します。
コールバックの中でMonoEmittercomplete(v)を呼び出すことで値を下流のSubscriberに伝達します。

MonoEmitterはキャンセル処理登録のsetCancellation(c)や例外捕捉のfail(e)といったメソッドも持ちます。

// より丁寧に実装したincrementメソッドの例
  Mono<Integer> increment(int v) {

    return Mono.create(e -> {
      
      TimerTask t = new TimerTask() {
        @Override
        public void run() { e.complete(v + 1);}
      };
      
      e.setCancellation(t::cancel); // 下流でキャンセルされたとき
      
      try {
        timer.schedule(t, 1000);
      }catch (Exception ex){
        e.fail(ex); // タイマー開始時の例外を下流にerrorとして転送
      }
    });
  }

このように定義された非同期処理はsubscribeされて初めて実行されます。

SampleService service = new SampleService();

service.increment(0).subscribe(System.out::println);

System.out.println("Subscribed"); // 即座に"Subscribed"、1秒後に"1"を表示

現在のスレッドをブロックして同期的に値を得たいときは代わりにblockを使います。

int result = service.increment(0).block();
System.out.println(result); // 1秒後に"1"を表示

またMonothenで連鎖することで非同期処理を逐次実行することが出来ます。

service.increment(0)
  .then(service::increment)
  .then(service::increment)
  .then(service::increment)
  .map(v -> "Result: " + v)
  .subscribe(System.out::println);

System.out.println("Subscribed"); // 即座に"Subscribed"、4秒後に"Result: 4"と表示

Monothen」 = 「Monoを返す関数をflatMap」と殆ど等価だと思うのですが、JavaScript等のPromiseを読み慣れている場合はより読みやすいかと思います。

他方でMonoを返さない同期処理を繋ぎたいときにはthenではなくmapを使わなければならないのは現状イマイチなところです。

複数のMonoの同期を取る関数も用意されています。

Mono.when(                           // 全てのMonoの完了を持つ
  service.increment(0),
  service.increment(0).then(service::increment),
  service.increment(0).then(service::increment).then(service::increment)
).subscribe(System.out::println);    // 3秒後に"1, 2, 3"を表示

RxJavaのObservableをPromise代わりに用いる場合、例えば単一の値を返すObservableをメソッドの返値として用いる運用を行うかと思います。
この場合、メソッドのシグネチャからだけでは返値が一つの値なのかストリームなのか判別が出来ません。

interface AsyncService {
  Observable<Integer> action();
  Observable<Integer> stream();
}

この点、ReactorではMonoFluxを使い分けることで区別出来ます。また後述の通り使い分けても相互運用はスムーズです。

interface AsyncService {
  Mono<Integer> action();
  Flux<Integer> stream();
}

MonoFluxとの相互運用が容易

RxJavaにも非同期に一個の値を出力するクラスとしてSingleが定義されています。SingleのAPIはObservable風ではありますがそれ自体はObservableでは無く、インターフェイスも共有していません。
そのため、例えばSingleを返す非同期処理をObservableに適用するには一旦SingleObservableに変換する手順が必要になります。

interface RxService {
  Single<String> action(int v);
  Observable<Integer> stream();
}

RxService rxService = ...;
rxService.stream().flatMap(v -> rxService.action(v).toObservable());

他方でFluxMonoは共にReactive StreamのPublisherであり、そのAPIも多くは共通のインターフェイスであるPublisherを引数に用いる事が出来ます。
ですので一方を他方に直接適用することが可能です。

interface RoService{
  Mono<String> action(int v);
  Flux<Integer> stream();
}

RoService roService = ...;
roService.stream().flatMap(roService::action);

単発の値を返す非同期処理にはMono、ストリームにはFluxと使い分けて、前者を後者にflatMapするという書き方が多くなるかと思います。

Monoは四文字

微妙に重要(個人的には)。

実際のところJDK8にも従来のFutureの拡張としてPromise風に使えるCompletableFutureが導入されています。しかし17文字です。

例として、非同期にレスポンスを返すサービスのインターフェイスを定義してみましょう。

interface AsyncService{
  CompletableFuture<Integer> action1();
  CompletableFuture<Integer> action2();
  CompletableFuture<Integer> action3();
  CompletableFuture<Integer> action4();
  Observable<Integer> stream();
}

返値のシグネチャを見ているだけで目が微妙にチカチカと・・・Monoだとこうなります。

interface AsyncService{
  Mono<Integer> action1();
  Mono<Integer> action2();
  Mono<Integer> action3();
  Mono<Integer> action4();
  Flux<Integer> stream();
}

実際、非同期メッセージングを用いるコードを書き始めると非同期レスポンスの型は頻繁に記述することになります。その度にCompletableFutureと書くのと(命名者出てこいっ)Mono4文字で済むのではコードの可読性にも差が出ると思います。

また上記の例ではストリームの返値としてObservableFluxも比べてみました。Fluxも4文字で、しかもMonoと文字数が揃っているのが個人的には好ましいです。

Fluxは過不足無し

・・・と思います。RxJavaのObservableを使い慣れていればFluxも難なく使えるはずです。

Reactorが提供するのはRxのサブセットですが必要なAPIは過不足無くカバーしているとの開発サイドの主張です。

Under its new scope and close ties with Reactive Streams Commons, Reactor Core offers just enough Rx coverage to build reactive apps or libraries alike, e.g. Spring Reactive Web support.

ただしObservable以上のメリットが現在あるかというとやや微妙です。

最大の違いはFluxはReactive StreamsのPublisherを実装しているという点ですが、現状Reactive Streams準拠のライブラリやクライアント等が少ない以上、Reactive Streamsとの互換性はメリットとしては実質あまり大きくないと思います。

現時点でのFluxのメリットは専ら前述のMonoとインターフェイスを共有しているという設計の筋の良さ、それに伴う相互運用の良さになるかと思います。それと文字数。

というわけで、Androidアプリ開発への適用としては既に実績も支援ライブラリも豊富なRxJavaから移行する積極的な理由は少ないと思います(現状ではAndroid対応にはJava Time API周りの問題があるようです issue)。

ただ今回はサーバーサイドでSpringと共に使う事を目的として調査したのですが、他のSpringライブラリの対応が増えるようだと今後楽しみだと思います。

45
50
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
45
50

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?