概要
Reactive Extensions 実装の1つである Reactor Core にはトレーニングマテリアル "Lite Rx API Hands-on" が用意されていると知りましたので、早速やってみました。
"Lite Rx API Hands-on" とは
"Eclipse Collections Kata" のように、JUnit の穴開きコードが用意されていて、それをテストが通るように修正していくことでプログラミングの学習ができる、というものです。JUnit を動かせる環境であればどこでも実施できます。題材は Reactor Core ですが、インタフェイスのよく似ている RxJava の練習にもなるのではないでしょうか。
なお、Reactor Core は Java SE 8で追加されたクラス(主に関数型インタフェイス)を参照しているため、 Java SE 8以上でないと動作しません。
Reactor Coreとは
Reactive Extensions 実装の1つです。RxJava に似たインタフェイスながら、より洗練された命名や設計となっています。詳しくは 「Reactor Core 2.5: もう一つのJava向けReactive Extensions実装」 にまとめられていますので、そちらをご覧ください。
どうでもいいことを書くと、"Reactor Core" で画像検索をすると、心を揺さぶられる壮大な構造物の画像が上位に出ます。
Reactive Extensions とは
Reactive Streams を命令型言語のパラダイムで利用しやすくするためのライブラリだそうです。あえて書くほどのことではありませんが、 Reactive Extensions の略が Rx です。
Reactive Extensions や Reactive Streams についての説明は 「Reactive Webアプリケーション - そしてSpring 5へ」がわかりやすいので、まだお読みでなければ、ご一読をお勧めします。
注意
メソッド名の表記
Class#method は static メソッドを、Class.method はインスタンスメソッドを、それぞれ示します。例えば、Flux#just
は Flux クラスの static メソッドである just を指します。
実行環境
前述の通り、このハンズオンを実施する際は Java SE 8 のコードが動作する環境を用意してください。
OS | Windows10 |
---|---|
Java SE | 1.8.0_91 |
Gradle | 2.12 |
ソースコードの取得
"Lite Rx API Hands-on" は GitHub 上で公開されています。まず GitHub のリポジトリ からソースコードを取得します。zipでダウンロードしてもよいですし、今後プロジェクトに貢献する可能性があるならフォークしてもよいでしょう。
Gradle 対応
Gradle でのビルドに対応していなかったので、まずそこから始めます。幸い、複雑な設定はなかったので、テキストエディタで置換する程度で作れました。下記に示します。
apply plugin: 'java'
def jdkVersion = '1.8'
sourceCompatibility = jdkVersion
targetCompatibility = jdkVersion
repositories {
mavenCentral()
}
dependencies {
compile 'io.projectreactor:reactor-core:3.0.0.RC1'
compile 'ch.qos.logback:logback-classic:1.1.3'
testCompile 'junit:junit:4.12'
testCompile 'io.reactivex:rxjava:1.1.5'
}
確認
test を gradle から実行すると、すべて失敗します。
$ gradle test
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
io.pivotal.literx.Part01CreateFlux > fromValues FAILED
java.lang.NullPointerException at Part01CreateFlux.java:38
......中略......
io.pivotal.literx.Part09BlockingToReactive > fastPublisherSlowSubscriber FAILED
java.lang.NullPointerException at Part09BlockingToReactive.java:62
33 tests completed, 33 failed
:test FAILED
FAILURE: Build failed with an exception.
* What went wrong:
Execution failed for task ':test'.
> There were failing tests. See the report at: lite-rx-api-hands-on/build/reports/tests/index.html
* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug option to get more log output.
BUILD FAILED
Total time: 7.434 secs
テストが実装されていないから当然です。下記のようなレポートの HTML ファイルが build/reports/tests/index.html に生成されていれば、gradle の設定がちゃんとできていることの確認となります。
では、順に解いていきます。以下、解答を書いてありますので、未挑戦の方はご覧にならないことをお勧めします。なお、模範解答はこちらのリポジトリから参照可能です。どうしてもわからない問題があるなら覗いてみましょう。
Part1 CreateFlux
Learn how to create Flux instances.
まず Flux の生成方法を学びます。 Flux は RxJava の Observable に当たる、複数の要素を持つ Publisher です。
Return an empty Flux
そのままなので特に言うことはありません。
Flux<String> emptyFlux() {
return Flux.just();
}
Return a Flux that contains 2 values "foo" and "bar" without using an array or a collection
そのままなので特に言うことはありません。Flux#just は可変長引数をサポートしています。
Flux<String> fooBarFluxFromValues() {
return Flux.just("foo", "bar");
}
おまけ1
あまり意味はありませんが、下記2つでもテストはパスします。
return Flux.fromArray(new String[]{"foo", "bar"});
return Flux.fromIterable(Arrays.asList("foo", "bar"));
おまけ2
Reactor Core では要素1つの Flux (乱暴な表現)として Mono が用意されています。それを2つ concatWith でつなげるやり方でもテストは通ります。
return Mono.just("foo").concatWith(Mono.just("bar"));
Create a Flux from a List that contains 2 values "foo" and "bar"
先ほどの Flux#fromIterable でよさそうです。
Flux<String> fooBarFluxFromList() {
return Flux.fromIterable(Arrays.asList("foo", "bar"));
}
Create a Flux that emits an IllegalStateException
Flux#error というのがあるので、それを使います。
Flux<String> errorFlux() {
return Flux.error(new IllegalStateException());
}
Create a Flux that never terminates
Flux#never を使います。
Flux<String> neverTerminatedFlux() {
return Flux.never();
}
Create a Flux that emits an increasing value each 100ms
100ms 待機する Flux を Flux#intervalMillis で生成します。
Flux<Long> counter() {
return Flux.intervalMillis(100L);
}
Part2 CreateMono
Learn how to create Mono instances.
次は単一の要素を持つ Mono のオブジェクト生成について学びます。 Mono は RxJava の Single に当たるものです。 Flux との連携がスムーズに実行できる実装になっています。
Return an empty Mono
そのまますぎて何も書くことはありません。
Mono<String> emptyMono() {
return Mono.empty();
}
Return a Mono that contains a "foo" value
呼び出し元テストは fromValue メソッドですが、just を使います。
Mono<String> fooMono() {
return Mono.just("foo");
}
Create a Mono that emits an IllegalStateException
Flux の時と大差ありません。
Mono<String> errorMono() {
return Mono.error(new IllegalStateException());
}
Part3 Transform
Learn how to transform values.
要素の値を変換する方法を学びます。ReactiveUserRepository のオブジェクト repository が用意されているので、これを使います。さっきまでに比べると大分難しくなります。
Capitalize the user username, firstname and lastname
各要素の変更をするには map を使います。第0要素の User を持つ Mono が引数で渡されるので、それの各要素を UpperCase に変更し、新しいUser オブジェクトに詰めます。
Mono<User> capitalizeOne(final Mono<User> mono) {
return mono.map(person -> new User(
person.getUsername().toUpperCase(),
person.getFirstname().toUpperCase(),
person.getLastname().toUpperCase())
);
}
Capitalize the users username, firstName and lastName
先ほどの Mono と同じ処理を Flux でやります。
Flux<User> capitalizeMany(final Flux<User> flux) {
return flux.map(user -> new User(
user.getUsername().toUpperCase(),
user.getFirstname().toUpperCase(),
user.getLastname().toUpperCase()
)
);
}
Capitalize the users username, firstName and lastName using asyncCapitalizeUser()
すでに User を渡すと Capitalize された User を持つ Mono を返す asyncCapitalizeUser メソッドが用意されています。あとはそれを flatMap 中で呼ぶだけです。
Flux<User> asyncCapitalizeMany(final Flux<User> flux) {
return flux.flatMap(user -> asyncCapitalizeUser(user));
}
Part4 Merge
Learn how to merge flux.
複数の Flux を合成する処理について学習します。
Merge flux1 and flux2 values with interleave
Flux.mergeWith を使って、2つの Flux を merge するだけの簡単なお仕事です。
Flux<User> mergeFluxWithInterleave(final Flux<User> flux1, final Flux<User> flux2) {
return Flux.merge(flux1, flux2);
}
Flux<User> mergeFluxWithInterleave(final Flux<User> flux1, final Flux<User> flux2) {
return flux1.mergeWith(flux2);
}
Merge flux1 and flux2 values with no interleave (flux1 values, and then flux2 values)
Flux 同士の結合には merge のほかに concat もあります。
Flux<User> mergeFluxWithNoInterleave(final Flux<User> flux1, final Flux<User> flux2) {
return Flux.concat(flux1, flux2);
}
違いは下記の通りです。「非同期や並列処理にも役立つRxJavaの使い方」で説明されていますので、詳細はそちらの記事でご確認ください。
method name | desription |
---|---|
merge | 結合後全体で順序を並べ替える |
concat | 引数で渡された順に結合 |
Create a Flux containing the values of the 2 Mono
mono1のandでmono2をつなげる方法、Flux の concatメソッドでつなげる方法、2つやり方がありました。
Flux<User> createFluxFromMultipleMono(final Mono<User> mono1, final Mono<User> mono2) {
return mono1.concatWith(mono2);
}
Flux<User> createFluxFromMultipleMono(final Mono<User> mono1, final Mono<User> mono2) {
return Flux.concat(mono1, mono2);
}
Part5 Request
Learn how to control the demand.
Reactor Core のテストをする際に役立つ TestSubscriber の使い方を少し学びます。
Create a TestSubscriber that requests initially no value
TestSubscriber<User> createSubscriber(final Flux<User> flux) {
return TestSubscriber.subscribe(flux, 0);
}
Request one value
void requestOne(final TestSubscriber<User> testSubscriber) {
testSubscriber.request(1L);
}
Return a Flux with all users stored in the repository that prints automatically logs for all Reactive Streams signals
Flux.logを呼びます。
Flux<User> fluxWithLog() {
final Logger logger = Loggers.getLogger(getClass());
return repository.findAll().log();
}
Return a Flux with all users stored in the repository that prints "Starring:" on subscribe, "firstname lastname" for all values and "The end!" on complete
doOnSubscribe で subscribe 直後に1回だけ実行する処理、 doOnNext で全要素に対する処理、doOnComplete で終了時に1回だけ実行する処理を実装します。
return repository.findAll()
.doOnSubscribe(sub -> System.out.println("Starring:"))
.doOnNext(user -> System.out.println(
String.format("%s %s", user.getFirstname(), user.getLastname())))
.doOnComplete(() -> System.out.println("The end!"));
Starring:
Skyler White
Jesse Pinkman
Walter White
Saul Goodman
The end!
Part6 OtherOperations
Learn how to use various other operators.
さまざまな operator の使い方について学習します。
Create a Flux of user from Flux of username, firstname and lastname.
zip で Tuple3 にまとめ、それをさらに map で User オブジェクトに変換します。
Flux<User> userFluxFromStringFlux(
final Flux<String> usernameFlux,
final Flux<String> firstnameFlux,
final Flux<String> lastnameFlux
) {
return Flux.zip(usernameFlux, firstnameFlux, lastnameFlux)
.map(tuple3 -> new User(tuple3.t1, tuple3.t2, tuple3.t3));
}
return the mono which returns faster its value
複数の Mono を受け取り、最も早い Mono を返す Mono#first メソッドを使います。
Mono<User> useFastestMono(final Mono<User> mono1, final Mono<User> mono2) {
return Mono.first(mono1, mono2);
}
return the flux which returns faster the first value
Flux にも Flux#firstEmitting メソッドがあります。
Flux<User> useFastestFlux(final Flux<User> flux1, final Flux<User> flux2) {
return Flux.firstEmitting(flux1, flux2);
}
Convert the input Flux to a Mono that represents the complete signal of the flux
Mono<Void> を返す Flux.then を使います。
Mono<Void> endOfFlux(final Flux<User> flux) {
return flux.then();
}
Return a Mono<User> containing Saul when an error occurs in the input Mono, else do not change the input Mono.
エラー発生時は User.SAUL を、それ以外の時は渡した Mono を、それぞれ返します。 Mono.otherwise を使って実装します。
Mono<User> betterCallSaulForBogusMono(final Mono<User> mono) {
return mono.otherwise(e -> Mono.just(User.SAUL));
}
Return a Flux containing Saul when an error occurs in the input Flux, else do not change the input Flux.
そのものずばりな Flux.onErrorReturn というメソッドがあります。
Flux<User> betterCallSaulForBogusFlux(final Flux<User> flux) {
return flux.onErrorReturn(User.SAUL);
}
長くなってきたので、休憩を入れるのであればこの辺がちょうどよいです。
Part7 ReactiveToBlocking
Learn how to turn Reactive API to blocking one.
Mono や Flux をブロッキングな集合に変換する方法を学びます。
Return the user contained in that Mono
Mono.block メソッドを使うと、その Mono オブジェクトが保有する単一のオブジェクトを取得できます。
User monoToValue(final Mono<User> mono) {
return mono.block();
}
Return the users contained in that Flux
Flux.toIterable メソッドを使うと、その Flux オブジェクトが保有するオブジェクトの一覧を Iterable で取得できます。
Iterable<User> fluxToValues(final Flux<User> flux) {
return flux.toIterable();
}
Part8 Conversion
Learn how to convert from/to Java 8+ CompletableFuture, RxJava Observable/Single and Reactor Stream.
Mono and Flux already implements Reactive Streams interfaces so they are natively
Reactive Streams compliant + there are Mono.from(Publisher) and Flux.from(Publisher) factory methods.
Reactor Core と RxJava 、あるいは CompletableFuture との変換について学習します。
Convert Flux to RxJava Observable thanks to a Reactor converter
Reactor Core では Flux & Mono と RxJava のオブジェクトを相互変換する RxJava1Adapter というものが用意されています。
Observable<User> fromFluxToObservable(final Flux<User> flux) {
return RxJava1Adapter.publisherToObservable(flux);
}
一応、Observable#from と Flux.toIterable を組み合わせて使っても同じことはできます。
Observable<User> fromFluxToObservable(final Flux<User> flux) {
return Observable.from(flux.toIterable());
}
Convert RxJava Observable to Flux thanks to a Reactor converter
RxJava1Adapter#observableToFlux を使えば一発で変換できます。
Flux<User> fromObservableToFlux(final Observable<User> observable) {
return RxJava1Adapter.observableToFlux(observable);
}
Convert Mono to RxJava Single thanks to a Reactor converter
RxJava1Adapter#publisherToSingle を使います。
Single<User> fromMonoToSingle(final Mono<User> mono) {
return RxJava1Adapter.publisherToSingle(mono);
}
Convert RxJava Single to Mono thanks to a Reactor converter
RxJava1Adapter#singleToMono を使います。
Mono<User> fromSingleToMono(final Single<User> single) {
return RxJava1Adapter.singleToMono(single);
}
Convert Mono to Java 8+ CompletableFuture thanks to a Reactor converter
Mono.toFuture を使えば変換できます。
CompletableFuture<User> fromMonoToCompletableFuture(final Mono<User> mono) {
return mono.toFuture();
}
Convert Java 8+ CompletableFuture to Mono thanks to a Reactor converter
Mono#fromFuture で変換できます。
Mono<User> fromCompletableFutureToMono(final CompletableFuture<User> future) {
return Mono.fromFuture(future);
}
Part9 BlockingToReactive
Learn how to call blocking code from Reactive one with adapted concurrency strategy for blocking code that produces or receives data.
For those who know RxJava:
- RxJava subscribeOn = Reactor subscribeOn
- RxJava observeOn = Reactor publishOn
この章は一際難しいです。
Create a Flux for reading all users from the blocking repository, and run it with an elastic scheduler
BlockingRepository はメソッドの呼び出し回数を記録していて、1回でもメソッドを使うと呼び出し元の assertEquals(0, repository.getCallCount());
が通らなくなってしまいます。それを回避するために Flux#defer を使用するようです。この問題は API ドキュメント(もしくはライブラリのソースコード)を読む力が試されています。
Flux<User> blockingRepositoryToFlux(final BlockingRepository<User> repository) {
return Flux.defer(() -> Flux.fromIterable(repository.findAll()))
.subscribeOn(Schedulers.elastic());
}
Insert users contained in the Flux parameter in the blocking repository using a parallel scheduler
注意すべきは subscribeOn ではなく publishOn を使っている点です。前者だと処理が始まりません。
Mono<Void> fluxToBlockingRepository(final Flux<User> flux, final BlockingRepository<User> repository) {
return flux.publishOn(Schedulers.parallel())
.doOnNext(user -> repository.save(user))
.then();
}
Return a valid Mono of user for null input and non null input user (hint: Reactive Streams does not accept null values)
前2つと難しさに差があります。この問題は Mono#justOrEmpty を使うだけでした。
Mono<User> nullAwareUserToMono(final User user) {
return Mono.justOrEmpty(user);
}
終了
上記ですべての問題を終えました。最後に $ gradle test
を実行すると自己満足に浸ることができます。
$ gradle test
:compileJava UP-TO-DATE
:processResources UP-TO-DATE
:classes UP-TO-DATE
:compileTestJava
:processTestResources UP-TO-DATE
:testClasses
:test
BUILD SUCCESSFUL
Total time: 15.729 secs
おまけ:Logger
今回のハンズオンでは直接使用しませんでしたが、Reactor Core には独自の Logger が用意されています。2系と3系でインスタンスの取得方法が異なります。下記は3系での例です。
Logger logger = Loggers.getLogger(getClass());