7
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Reactor Core 3.x のハンズオン "Lite Rx API Hands-on"をやってみる

Posted at

概要

Reactive Extensions 実装の1つである Reactor Core にはトレーニングマテリアル "Lite Rx API Hands-on" が用意されていると知りましたので、早速やってみました。

"Lite Rx API Hands-on" とは

"Eclipse Collections Kata" のように、JUnit の穴開きコードが用意されていて、それをテストが通るように修正していくことでプログラミングの学習ができる、というものです。JUnit を動かせる環境であればどこでも実施できます。題材は Reactor Core ですが、インタフェイスのよく似ている RxJava の練習にもなるのではないでしょうか。

なお、Reactor Core は Java SE 8で追加されたクラス(主に関数型インタフェイス)を参照しているため、 Java SE 8以上でないと動作しません。

Reactor Coreとは

Reactive Extensions 実装の1つです。RxJava に似たインタフェイスながら、より洗練された命名や設計となっています。詳しくは 「Reactor Core 2.5: もう一つのJava向けReactive Extensions実装」 にまとめられていますので、そちらをご覧ください。

どうでもいいことを書くと、"Reactor Core" で画像検索をすると、心を揺さぶられる壮大な構造物の画像が上位に出ます。

Reactive Extensions とは

Reactive Streams を命令型言語のパラダイムで利用しやすくするためのライブラリだそうです。あえて書くほどのことではありませんが、 Reactive Extensions の略が Rx です。

Reactive Extensions や Reactive Streams についての説明は 「Reactive Webアプリケーション - そしてSpring 5へ」がわかりやすいので、まだお読みでなければ、ご一読をお勧めします。


注意

メソッド名の表記

Class#method は static メソッドを、Class.method はインスタンスメソッドを、それぞれ示します。例えば、Flux#just は Flux クラスの static メソッドである just を指します。

実行環境

前述の通り、このハンズオンを実施する際は Java SE 8 のコードが動作する環境を用意してください。

OS Windows10
Java SE 1.8.0_91
Gradle 2.12

ソースコードの取得

"Lite Rx API Hands-on" は GitHub 上で公開されています。まず GitHub のリポジトリ からソースコードを取得します。zipでダウンロードしてもよいですし、今後プロジェクトに貢献する可能性があるならフォークしてもよいでしょう。

Gradle 対応

Gradle でのビルドに対応していなかったので、まずそこから始めます。幸い、複雑な設定はなかったので、テキストエディタで置換する程度で作れました。下記に示します。

build.gradle
apply plugin: 'java'

def jdkVersion      = '1.8'
sourceCompatibility = jdkVersion
targetCompatibility = jdkVersion

repositories {
    mavenCentral()
}

dependencies {
    compile 'io.projectreactor:reactor-core:3.0.0.RC1'
    compile 'ch.qos.logback:logback-classic:1.1.3'
    testCompile 'junit:junit:4.12'
    testCompile 'io.reactivex:rxjava:1.1.5'
}

確認

test を gradle から実行すると、すべて失敗します。

テスト実行
$ gradle test
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test

io.pivotal.literx.Part01CreateFlux > fromValues FAILED
    java.lang.NullPointerException at Part01CreateFlux.java:38

......中略......

io.pivotal.literx.Part09BlockingToReactive > fastPublisherSlowSubscriber FAILED
    java.lang.NullPointerException at Part09BlockingToReactive.java:62

33 tests completed, 33 failed
:test FAILED

FAILURE: Build failed with an exception.

* What went wrong:
Execution failed for task ':test'.
> There were failing tests. See the report at: lite-rx-api-hands-on/build/reports/tests/index.html

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.

BUILD FAILED

Total time: 7.434 secs

テストが実装されていないから当然です。下記のようなレポートの HTML ファイルが build/reports/tests/index.html に生成されていれば、gradle の設定がちゃんとできていることの確認となります。

スクリーンショット (108).png

では、順に解いていきます。以下、解答を書いてありますので、未挑戦の方はご覧にならないことをお勧めします。なお、模範解答はこちらのリポジトリから参照可能です。どうしてもわからない問題があるなら覗いてみましょう。


Part1 CreateFlux

Learn how to create Flux instances.

  1. Flux Javadoc
  2. TestSubscriber Javadoc

まず Flux の生成方法を学びます。 Flux は RxJava の Observable に当たる、複数の要素を持つ Publisher です。

Return an empty Flux

そのままなので特に言うことはありません。

Flux#just
Flux<String> emptyFlux() {
    return Flux.just();
}

Return a Flux that contains 2 values "foo" and "bar" without using an array or a collection

そのままなので特に言うことはありません。Flux#just は可変長引数をサポートしています。

Flux#just
Flux<String> fooBarFluxFromValues() {
    return Flux.just("foo", "bar");
}

おまけ1

あまり意味はありませんが、下記2つでもテストはパスします。

Flux#fromArray
return Flux.fromArray(new String[]{"foo", "bar"});
Flux#fromIterable
return Flux.fromIterable(Arrays.asList("foo", "bar"));

おまけ2

Reactor Core では要素1つの Flux (乱暴な表現)として Mono が用意されています。それを2つ concatWith でつなげるやり方でもテストは通ります。

Mono#concatWith
return Mono.just("foo").concatWith(Mono.just("bar"));

Create a Flux from a List that contains 2 values "foo" and "bar"

先ほどの Flux#fromIterable でよさそうです。

Flux#fromIterable
Flux<String> fooBarFluxFromList() {
    return Flux.fromIterable(Arrays.asList("foo", "bar"));
}

Create a Flux that emits an IllegalStateException

Flux#error というのがあるので、それを使います。

Flux#error
Flux<String> errorFlux() {
    return Flux.error(new IllegalStateException());
}

Create a Flux that never terminates

Flux#never を使います。

Flux#never
Flux<String> neverTerminatedFlux() {
    return Flux.never();
}

Create a Flux that emits an increasing value each 100ms

100ms 待機する Flux を Flux#intervalMillis で生成します。

Flux#intervalMillis
Flux<Long> counter() {
    return Flux.intervalMillis(100L);
}

Part2 CreateMono

Learn how to create Mono instances.

  1. Mono Javadoc
  2. TestSubscriber Javadoc

次は単一の要素を持つ Mono のオブジェクト生成について学びます。 Mono は RxJava の Single に当たるものです。 Flux との連携がスムーズに実行できる実装になっています。

Return an empty Mono

そのまますぎて何も書くことはありません。

Mono#empty
Mono<String> emptyMono() {
    return Mono.empty();
}

Return a Mono that contains a "foo" value

呼び出し元テストは fromValue メソッドですが、just を使います。

Mono<String> fooMono() {
    return Mono.just("foo");
}

Create a Mono that emits an IllegalStateException

Flux の時と大差ありません。

Mono#error
Mono<String> errorMono() {
    return Mono.error(new IllegalStateException());
}

Part3 Transform

Learn how to transform values.

要素の値を変換する方法を学びます。ReactiveUserRepository のオブジェクト repository が用意されているので、これを使います。さっきまでに比べると大分難しくなります。

Capitalize the user username, firstname and lastname

各要素の変更をするには map を使います。第0要素の User を持つ Mono が引数で渡されるので、それの各要素を UpperCase に変更し、新しいUser オブジェクトに詰めます。

Mono.map
Mono<User> capitalizeOne(final Mono<User> mono) {
    return mono.map(person -> new User(
                person.getUsername().toUpperCase(),
                person.getFirstname().toUpperCase(),
                person.getLastname().toUpperCase())
            );
}

Capitalize the users username, firstName and lastName

先ほどの Mono と同じ処理を Flux でやります。

Flux.map
Flux<User> capitalizeMany(final Flux<User> flux) {
    return flux.map(user -> new User(
                user.getUsername().toUpperCase(),
                user.getFirstname().toUpperCase(),
                user.getLastname().toUpperCase()
            )
    );
}

Capitalize the users username, firstName and lastName using asyncCapitalizeUser()

すでに User を渡すと Capitalize された User を持つ Mono を返す asyncCapitalizeUser メソッドが用意されています。あとはそれを flatMap 中で呼ぶだけです。

Flux.flatMap
Flux<User> asyncCapitalizeMany(final Flux<User> flux) {
    return flux.flatMap(user -> asyncCapitalizeUser(user));
}

Part4 Merge

Learn how to merge flux.

複数の Flux を合成する処理について学習します。

Merge flux1 and flux2 values with interleave

Flux.mergeWith を使って、2つの Flux を merge するだけの簡単なお仕事です。

Flux#merge
Flux<User> mergeFluxWithInterleave(final Flux<User> flux1, final Flux<User> flux2) {
    return Flux.merge(flux1, flux2);
}
Flux.mergeWith
Flux<User> mergeFluxWithInterleave(final Flux<User> flux1, final Flux<User> flux2) {
    return flux1.mergeWith(flux2);
}

Merge flux1 and flux2 values with no interleave (flux1 values, and then flux2 values)

Flux 同士の結合には merge のほかに concat もあります。

Flux#concat
Flux<User> mergeFluxWithNoInterleave(final Flux<User> flux1, final Flux<User> flux2) {
    return Flux.concat(flux1, flux2);
}

違いは下記の通りです。「非同期や並列処理にも役立つRxJavaの使い方」で説明されていますので、詳細はそちらの記事でご確認ください。

method name desription
merge 結合後全体で順序を並べ替える
concat 引数で渡された順に結合

Create a Flux containing the values of the 2 Mono

mono1のandでmono2をつなげる方法、Flux の concatメソッドでつなげる方法、2つやり方がありました。

Mono.concatWith
Flux<User> createFluxFromMultipleMono(final Mono<User> mono1, final Mono<User> mono2) {
    return mono1.concatWith(mono2);
}
Flux#concat
Flux<User> createFluxFromMultipleMono(final Mono<User> mono1, final Mono<User> mono2) {
    return Flux.concat(mono1, mono2);
}

Part5 Request

Learn how to control the demand.

Reactor Core のテストをする際に役立つ TestSubscriber の使い方を少し学びます。

Create a TestSubscriber that requests initially no value

TestSubscriber<User> createSubscriber(final Flux<User> flux) {
    return TestSubscriber.subscribe(flux, 0);
}

Request one value

void requestOne(final TestSubscriber<User> testSubscriber) {
    testSubscriber.request(1L);
}

Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals

Flux.logを呼びます。

Flux<User> fluxWithLog() {
    final Logger logger = Loggers.getLogger(getClass());
    return repository.findAll().log();
}

Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete

doOnSubscribe で subscribe 直後に1回だけ実行する処理、 doOnNext で全要素に対する処理、doOnComplete で終了時に1回だけ実行する処理を実装します。

return repository.findAll()
            .doOnSubscribe(sub -> System.out.println("Starring:"))
            .doOnNext(user -> System.out.println(
                   String.format("%s %s", user.getFirstname(), user.getLastname())))
            .doOnComplete(() -> System.out.println("The end!"));
実行結果
Starring:
Skyler White
Jesse Pinkman
Walter White
Saul Goodman
The end!

Part6 OtherOperations

Learn how to use various other operators.

さまざまな operator の使い方について学習します。

Create a Flux of user from Flux of username, firstname and lastname.

zip で Tuple3 にまとめ、それをさらに map で User オブジェクトに変換します。

Flux#zip&Flux.map
Flux<User> userFluxFromStringFlux(
    final Flux<String> usernameFlux,
    final Flux<String> firstnameFlux,
    final Flux<String> lastnameFlux
    ) {
    return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux)
               .map(tuple3 -> new User(tuple3.t1, tuple3.t2, tuple3.t3));
}

return the mono which returns faster its value

複数の Mono を受け取り、最も早い Mono を返す Mono#first メソッドを使います。

Mono#first
Mono<User> useFastestMono(final Mono<User> mono1, final Mono<User> mono2) {
    return Mono.first(mono1, mono2);
}

return the flux which returns faster the first value

Flux にも Flux#firstEmitting メソッドがあります。

Flux#firstEmitting
Flux<User> useFastestFlux(final Flux<User> flux1, final Flux<User> flux2) {
    return Flux.firstEmitting(flux1, flux2);
}

Convert the input Flux to a Mono that represents the complete signal of the flux

Mono<Void> を返す Flux.then を使います。

Flux.then
Mono<Void> endOfFlux(final Flux<User> flux) {
    return flux.then();
}

Return a Mono<User> containing Saul when an error occurs in the input Mono, else do not change the input Mono.

エラー発生時は User.SAUL を、それ以外の時は渡した Mono を、それぞれ返します。 Mono.otherwise を使って実装します。

Mono.otherwise
Mono<User> betterCallSaulForBogusMono(final Mono<User> mono) {
    return mono.otherwise(e -> Mono.just(User.SAUL));
}

Return a Flux containing Saul when an error occurs in the input Flux, else do not change the input Flux.

そのものずばりな Flux.onErrorReturn というメソッドがあります。

Flux.onErrorReturn
Flux<User> betterCallSaulForBogusFlux(final Flux<User> flux) {
    return flux.onErrorReturn(User.SAUL);
}

長くなってきたので、休憩を入れるのであればこの辺がちょうどよいです。

Part7 ReactiveToBlocking

Learn how to turn Reactive API to blocking one.

Mono や Flux をブロッキングな集合に変換する方法を学びます。

Return the user contained in that Mono

Mono.block メソッドを使うと、その Mono オブジェクトが保有する単一のオブジェクトを取得できます。

Mono.block
User monoToValue(final Mono<User> mono) {
    return mono.block();
}

Return the users contained in that Flux

Flux.toIterable メソッドを使うと、その Flux オブジェクトが保有するオブジェクトの一覧を Iterable で取得できます。

Flux.toIterable
Iterable<User> fluxToValues(final Flux<User> flux) {
    return flux.toIterable();
}

Part8 Conversion

Learn how to convert from/to Java 8+ CompletableFuture, RxJava Observable/Single and Reactor Stream.

Mono and Flux already implements Reactive Streams interfaces so they are natively
Reactive Streams compliant + there are Mono.from(Publisher) and Flux.from(Publisher) factory methods.

Reactor Core と RxJava 、あるいは CompletableFuture との変換について学習します。

Convert Flux to RxJava Observable thanks to a Reactor converter

Reactor Core では Flux & Mono と RxJava のオブジェクトを相互変換する RxJava1Adapter というものが用意されています。

RxJava1Adapter#publisherToObservable
Observable<User> fromFluxToObservable(final Flux<User> flux) {
    return RxJava1Adapter.publisherToObservable(flux);
}

一応、Observable#from と Flux.toIterable を組み合わせて使っても同じことはできます。

Observable#from
Observable<User> fromFluxToObservable(final Flux<User> flux) {
    return Observable.from(flux.toIterable());
}

Convert RxJava Observable to Flux thanks to a Reactor converter

RxJava1Adapter#observableToFlux を使えば一発で変換できます。

RxJava1Adapter#observableToFlux
Flux<User> fromObservableToFlux(final Observable<User> observable) {
    return RxJava1Adapter.observableToFlux(observable);
}

Convert Mono to RxJava Single thanks to a Reactor converter

RxJava1Adapter#publisherToSingle を使います。

RxJava1Adapter#publisherToSingle
Single<User> fromMonoToSingle(final Mono<User> mono) {
    return RxJava1Adapter.publisherToSingle(mono);
}

Convert RxJava Single to Mono thanks to a Reactor converter

RxJava1Adapter#singleToMono を使います。

RxJava1Adapter#singleToMono
Mono<User> fromSingleToMono(final Single<User> single) {
    return RxJava1Adapter.singleToMono(single);
}

Convert Mono to Java 8+ CompletableFuture thanks to a Reactor converter

Mono.toFuture を使えば変換できます。

Mono.toFuture
CompletableFuture<User> fromMonoToCompletableFuture(final Mono<User> mono) {
    return mono.toFuture();
}

Convert Java 8+ CompletableFuture to Mono thanks to a Reactor converter

Mono#fromFuture で変換できます。

Mono#fromFuture
Mono<User> fromCompletableFutureToMono(final CompletableFuture<User> future) {
    return Mono.fromFuture(future);
}

Part9 BlockingToReactive

Learn how to call blocking code from Reactive one with adapted concurrency strategy for blocking code that produces or receives data.

For those who know RxJava:

  • RxJava subscribeOn = Reactor subscribeOn
  • RxJava observeOn = Reactor publishOn

この章は一際難しいです。

Create a Flux for reading all users from the blocking repository, and run it with an elastic scheduler

BlockingRepository はメソッドの呼び出し回数を記録していて、1回でもメソッドを使うと呼び出し元の assertEquals(0, repository.getCallCount());が通らなくなってしまいます。それを回避するために Flux#defer を使用するようです。この問題は API ドキュメント(もしくはライブラリのソースコード)を読む力が試されています。

Flux#defer
Flux<User> blockingRepositoryToFlux(final BlockingRepository<User> repository) {
    return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
               .subscribeOn(Schedulers.elastic());
}

Insert users contained in the Flux parameter in the blocking repository using a parallel scheduler

注意すべきは subscribeOn ではなく publishOn を使っている点です。前者だと処理が始まりません。

Flux.publishOn
Mono<Void> fluxToBlockingRepository(final Flux<User> flux, final BlockingRepository<User> repository) {
    return flux.publishOn(Schedulers.parallel())
               .doOnNext(user -> repository.save(user))
               .then();
}

Return a valid Mono of user for null input and non null input user (hint: Reactive Streams does not accept null values)

前2つと難しさに差があります。この問題は Mono#justOrEmpty を使うだけでした。

Mono#justOrEmpty
Mono<User> nullAwareUserToMono(final User user) {
    return Mono.justOrEmpty(user);
}

終了

上記ですべての問題を終えました。最後に $ gradle test を実行すると自己満足に浸ることができます。

実行結果
$ gradle test
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test

BUILD SUCCESSFUL

Total time: 15.729 secs

おまけ:Logger

今回のハンズオンでは直接使用しませんでしたが、Reactor Core には独自の Logger が用意されています。2系と3系でインスタンスの取得方法が異なります。下記は3系での例です。

Logger logger = Loggers.getLogger(getClass());

参考

7
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?