RxJava Advent Calendar 2017
昨日は @toastkidjp さんの「RxPermissions で Runtime Permission の処理をする」でした。3日目は @guignol さんの 「RxAndroidとRxSwingのScheduler」です。Scheduler の実装に関する興味深いお話です。
どなたも2日目に立候補されていなかったので、もうとっくに過ぎてしまいましたが、私が記事を書いてみます。
概要
勉強をする上で「これは何の役に立つのか?」を知っておくことはモチベーション向上につながります。
「RxJava はどういうことをできるライブラリなのか?」ということを知った上で勉強を始めるやり方もありかなと思い、RxJava を「非同期処理を書きやすくする Java ライブラリ」と考えて説明をしてみます。
対象
RxJava に興味はあるが、まだ使ったことがない人(特にAndroid アプリ開発者)
この記事で取り上げないこと
- リアクティブプログラミング
- Flowable
- Back Pressure
- Disposable
- 複数のストリームの結合
- Processor
- Reactor
この記事で書くこと
非同期処理を書きやすくするライブラリとしての RxJava の説明
おことわり
- Lambda 式を用います
- サンプルコードは Java で記載します。関数型インターフェイスを多用するライブラリなので Lambda 式は使えた方が良いです
RxJava は従来のものと何が違うか?
- Stream API 風にメソッドチェインで処理をつなげて書ける
- 複数の非同期処理の待ち合わせを容易に書ける
- 一部処理の別スレッド実行を簡単に書ける……小さい単位で処理スレッドを変えることができる
- 遅延実行が可能
導入
RxJava はライブラリとして提供されており、Maven や Gradle を利用して導入することが可能です。
要件
下記のバージョン以降である必要があります。
Name | Version |
---|---|
Java | 1.6- |
Android | 2.3- |
Android アプリ開発でよく利用されている RxJava ですが、 Android 限定ということはなく、通常の Java や Kotlin のアプリケーションでも利用することが可能です。
なお、RxJava2 は必要な関数型インターフェイスを独自で定義して持っているため、 Java 8 以降の java.util.function パッケージの関数型インターフェイス(Function, Consumer 等)には依存せず、それらとの互換性もありません。
依存の追加
build.gradle に1行追加するだけです。この記事を書いている時点では2.1.7が最新でした。
dependencies {
implementation 'io.reactivex.rxjava2:rxjava:2.1.7'
Android アプリ開発の場合はもう1行、RxAndroid も追加しておくと便利です。というより、これがないと RxJava を入れる意味があまりありません。
主に Android のメインスレッドで処理を実行させるための Scheduler(処理の実行スレッドを指定するためのもの) を使うのが目的です。
最新版は 2.0.1 です。
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
Lambda 式を使う場合は Retrolambda や Android Gradle Plugin 3.0 の導入が必要ですが、ここでは省略します。
ライセンス
RxJava, RxAndroid ともに Apache License 2.0 でライセンスされています。用いる場合はアプリ内でライセンス表記が必要です。Android アプリの場合は com.google.gms:oss-licenses を使うと楽でしょう。
com.google.gms:oss-licenses を使ってオープンソースライセンスを表示する
レシピ
RxJava 入門の記事で Single(1つの値か完了かエラーを通知) と Completable(完了かエラーを通知) を使っている記事はそんなにないのかなと思ったので、その2つを使った例を紹介します。
RxJava の得意とする、細かいレベルでの処理の非同期化は、主に Single/Maybe/Completable を用いることで実現できると思いますし、実際の開発でも私はそれらを用いる方が多いです。
ボトルネックになっているメソッドを非同期処理にする
ExecutorService
従来の ExecutorService を使った処理だとこのように書けます。
final ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(() -> bottleneckMethod());
executor.shutdown();
RxJava
これを RxJava で書くと下記の通りです。
Completable.fromAction(() -> bottleneckMethod())
.subscribeOn(Schedulers.newThread())
.subscribe();
処理の流れ
- 非同期で処理したい bottleneckMethod() を実行する Action(引数なしで戻り値なしの操作をする関数型インターフェイス)を引数に、Completable のインスタンスを取得
- subscribeOn で実行スレッドを指定
- subscribe で処理を実行……このメソッドを呼ぶまでは、上記の処理は一切実行されません(遅延実行)
subscribeOn は一連の処理全体の実行スレッドを指定するメソッドです。上記のコードでは、新しいスレッドを生成してそこで処理を実行させる、という指定になっています。
遅延実行
subscribe() を呼び出すまでは一切の処理が実行されない、というのが RxJava の特徴です。例えば下記のコードの場合
Completable completable = Completable.fromAction(() -> bottleneckMethod())
.subscribeOn(Schedulers.newThread());
System.out.println("start");
completable.subscribe();
実行結果は下記の通りです。
start
RxNewThreadScheduler-1 bottle neck
end.
subscribe()されるよりも前に実行された println メソッドの結果が先に標準出力に出ています。
非同期処理の結果を用いる
ExecutorService
同じように従来の API で書いてみましょう。
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Future awaitable = executor.submit(() -> anyReturnBottleneckMethod());
executor.shutdown();
try {
System.out.println(awaitable.get(2, TimeUnit.SECONDS));
} catch (InterruptedException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
RxJava
同じ処理を RxJava を使って書くと下記の通りです。
Single.create(emitter -> emitter.onSuccess(anyReturnBottleneckMethod()))
.subscribeOn(Schedulers.newThread())
.subscribe(System.out::println, Throwable::printStackTrace);
処理結果を使う必要があるので、値を通知する Single を使います。subscribe メソッドの第2引数で、エラーが発生した時の処理を定義することが可能です。
ちなみに、create ではなく fromCallable を使って書くこともできます。Callable とは java.util.concurrent.Callable です。Java 8 で追加された Supplier と同じで値を遅延発生させる関数型インターフェイスで、Java の 1.5 で追加されていたので流用したのだと思われます。その一方で Runnable は使わず Action という関数型インターフェイスを別途定義しているのでよくわからないですが……
Single.fromCallable(() -> anyReturnBottleneckMethod())
RxJava はこうした、非同期処理の待ち合わせのコードを書きたい時に力を貸してくれます。
重要なのは解決したい課題に合った適切な道具を使うことです。
ExecutorService は大きめのタスクを並列実行させるコードを書くのに適した APIで、今回のような処理の待ち合わせは書けないこともないが得意ではない、というだけです。
ネットワーク通信の結果を使って UI を更新する
例えば、networkRequest() を I/O スレッドで実行し、その結果を TextView で表示したい場合は下記の通りに書けます。
Single.create(emitter -> networkRequest())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(textView::setText)
subscribeOn と observeOn
どちらも処理の実行スレッドを指定するためのメソッドです。observeOn は次のメソッドからの実行スレッドを変更します。observeOn での指定は subscribeOn での指定よりも優先されます。
先ほどのコードだと下記の通り実行スレッドが変化します。
Single.create(emitter -> networkRequest()) // I/O スレッド
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(textView::setText) // メインスレッド
これの何がうれしいのか?
そう思われるかもしれません。
Android では重要で、「ネットワーク通信はバックグラウンドスレッドでしなければいけない」「View の更新はメインスレッドでなければならない」という制約が存在し、それに違反すると実行時例外が発生してアプリがクラッシュします。
その制約を回避するために Handler & Looper 等様々な仕組みが存在しています。それらを用いても同じことはできますが、RxJava (と RxAndroid)を用いればよりわかりやすいコードを書くことができます。
導入前に考慮すること
- 学習コスト
- メソッド数
1. 学習コスト
ライトに使う分には Stream API + α程度の知識で十分ですので、そう勉強もいりませんが、単純にかなり巨大なライブラリですので、完全に知り尽くすには相当の時間がかかります。
Androidではありませんが、RxSwift の学習コスト+MVVMの導入コストと Clean Architecture の導入コストを天秤にかけて、Rx の学習コストがネックになって後者を選択した、という話を会社で聞いたことがあります。
各言語での Rx 系の実装は似通っているらしいので、別の言語で使っていた開発者がスムーズに入ってこられるメリットが生まれるかもしれませんが、逆に Rx に不慣れな人が入りにくくなることも考えておく必要があるでしょう。
2. メソッド数
2017年末現在の RxJava2 を Android アプリに入れると、メソッド数が1万弱増えます。決して少なくはない数字で、プロジェクトによっては制限を超えてしまって MultiDex 化を強いられることもあります。現に私が仕事で開発しているアプリではそうなりました。
一度導入すると、(あまりの便利さに多用しすぎて)外すのがなかなか難しくなりますので、代替の手段もよく検討してみた方が良いです。Kotlin を使っているのであれば Coroutine は確実に調べておきましょう(そういう私はまったく調べていませんが)。
おわりに
RxJava を「非同期処理を書きやすくするライブラリ」として紹介してみました。細かい単位での実行スレッド指定が可能な RxJava は、フレームワークに実行スレッドの制約が存在する Android アプリ開発者から広く受け入れられています。
重要なのは解決したい課題に合った道具を用いることです。ExecutorService は大きめの処理を並列実行させるための道具であり、今回取り上げたような小さい単位での非同期化には RxJava が比較的向いています。
参考
書籍
『RxJava リアクティブプログラミング』(翔泳社)……発売から多少時間が経ってしまいましたが、今の時点だと日本語で書かれた書籍では最も詳細でわかりやすいです。
リンク
- RxJava(GitHub)
- RxJava Wiki
- RxJava cheat sheet……開発PCの壁紙にしたり、壁に貼っておいたりすると良さそうです