なんでこの記事書いたのか
今開発中のプロダクトにおいて、RxJavaの導入をやってみたので、実際に使った箇所とその例、調べないとわからなかったことを載せておきました。
そう(retrolambdaのためにjdk8を投入)までして導入したかったメリットを話してくれ、サンプルコードがないとわからん、といった声を頂いているので、実際に何が解決されたのか、どんなコードで解決したのかということと、そのために勉強しなくてはならなかった点について書いています。
(追記)警告:差分作ってコード上では解決したんですが、この差分まだ「リリース」したわけじゃないので、その点だけご注意くださいmm 続報あり次第追記します。
追記:リリースして安定運用しています!最近まで監視に難がありましたがそれも修正しました。この記事の監視スニペットも更新済みです!
なぜRxJavaを導入したのか
次の課題をまとめて解決できるのがRxJavaだと思い、リファクタの勢いも相まって導入(勢いですみません・・・)。
- チーム内でAndroidの非同期処理やそのエラーハンドリングをするのが面倒だという話がずっと前から出ていた
- リスト操作やlambdaで書いたほうが可読性の高い箇所が見受けられていた
- リファクタで、データソースが変更された時の表示の更新をイベントによって実現しようという方針に決まった
Cookpadでは最初はリスト操作から導入したとのことです:Android開発でRxJavaをチームに導入した話。
が、当初リスト操作やLoader完了後の状態変更の配信だけに使用することにしていましたが、 @ainame にこのコードだとメリットが良くわからないと煽られたwのもあり、LoaderとEventBusで実装されていた箇所をRxJavaに置き換えることになりました。
RxJavaのキホン
Reactive Programmingの概念については参考記事を書くにとどめ、この記事では実際の使用例について扱います。
- 概念や取っ付き方についてはこのスライドがわかりやすいです:RxJava学習のベストプラクティスっぽいもの(スライド)
- 【翻訳】あなたが求めていたリアクティブプログラミング入門
- ReactiveXのintro
※自分はHaskellや論文に触れたことがないためFunctional Reactive Programmingについては理解がないことをご容赦ください。
共通する使い方だけ下記に記載しておきます。コードサンプルは続きをご覧ください。
- Observableからはitemが同期的or非同期に流れてくる(ストリーム)
- Observable.from(List)を使うと、Listの中身が1つずつ流れてくるObservableを作ることができる
- map()やfilter()を始めとするOperatorを使うとストリームに流れてくるitemを加工できる
- 原則subscribe()で処理を開始(またはobserve開始)し、コールバックに順次itemが流れてくる
- itemの到着を知らせるonNext()、ストリームの終了を伝えるonComplete()、エラー発生による終了を伝えるonError()がある
- 同期的に結果を得たい場合は、toBlocking()を使用する
- observeOn()にSchedulerを指定すると、subscribe()のコールバックを指定したスレッド(メインスレッドなど)で呼び出すことができる
- キャンセルしたい場合はunsubscribe()する(と、適切に実装されたObservableではActivity/Fragmentがリークしない)
メインのドキュメントはRxJavaのrepoのWikiを見るのが良さげでした。
挙動の確認はrxmarbles.comが多分一番便利です。ただしRxJSで書かれているので、名前が違うメソッドとかあるかもしれません。
現状の課題のRxJavaによる解決策
(ところどころarg -> process()
とかHoge::method
とか出てきますが、Java 8で使えるlambda表現なので、retrolambdaを持ち込まない人は適切なコールバックをnewで作ってください)
リスト操作がいまだ?にfor文問題
追記: RxJavaはただ単にループしているわけじゃない為、zip()など一部の処理が低速なようです。
リスト操作を便利にしたいという目的にはJava 8のStream APIなどの(小さな?)バックポートであるLightweight-Stream-APIをおすすめします。メソッド数は350程度です。
Java 8からlambdaとstream APIを使ってmapやfilterのような、Ruby、Python、underscore.js/EcmaScript5でおなじみの、forループを使わないリスト操作ができるようになりました。
// Java 7
List<String> otherUserNames = new ArrayList<>(users.size);
for (User user : users) {
if (user.id != selfUser.id) {
otherUserNames.add(user.getName());
}
}
これがJava 8では
// Java 8
List<String> otherUserNames = users.stream()
.filter(user -> user.id != selfUser.id)
.map(User::getName) // user -> user.getName() の短縮形、method referencesといいます
.collect(Collectors.toList());
newとかaddとかの手続き的な記述が消えて、リストに対する加工だけがコードに現れるので読みやすい!
しかしAndroidではJava 8が使えるのはずっと先になりそうな感じです・・。
それ、RxJavaで解決できるよ
// RxJava
List<String> otherUserNames = Observable.from(users)
.filter(user -> user.id != selfUser.id)
.map(User::getName)
.toList().toBlocking().single(); // 全部Listにまとめ、同期処理するよう指定し、1つしかない結果(List)を取り出す
非同期処理を前提としているので、同期処理に変えるためのtoBlocking()の呼び出しが必要でちょっと癖がありますが、そこさえクリアすれば十分実用的かなと思いました。
サンプルはこの記事にいくつか載っています:Java 8: No more loopsをRxJavaでやる(Androidの環境で)
非同期(バックグラウンド)処理が面倒くさい問題
Androidのフレームワークで(Viewに結果を反映するような)バックグラウンド処理を実行する際に使うパターンは主に下記の3つです。しかしながら、それぞれ問題点があります。
-
AsyncTask
- 実行中にActivity/Fragmentがdestroyされると、特別な考慮をしない限り参照がリークしたりクラッシュする
- 特にcancel時の扱いが難しい
- エラーハンドリングが面倒
-
AsyncTaskLoader
- 正しく実装するためのboilerplate(コピペ)が非常に難しい(以前コード読んだときはリンク先の公式サンプルも間違っているように見えた)
- destroyや画面回転のとき、Activity/Fragmentが非アクティブなときなど、条件ごとの挙動があり複雑
- Activity/Fragmentでしか使用できない
- パラメータがbundleでしか渡せず(意図はわかるが)素直じゃない
- エラーハンドリングが面倒
-
IntentService
- 結果を返却する方法がイベント(EventBusやLocalBroadcast)に限られる
- 作るごとにAndroidManifestへの追加が必要
それ、RxJavaで解決できるよ
解決策として、Promiseを導入する手段があります。
RxJavaのObservableはPromiseのように使用することができます。
observable
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> render(result));
例えば、REST clientライブラリのretrofitはRxJavaに対応していて、Observableを返却することができます。
public interface Client {
@GET("/users")
List<User> getUsers();
@GET("/users")
Observable<List<User>> users(); // retrofitの場合、最初からバックグラウンドスレッドで処理されます
}
これを使った実装例は下記のようなものになります。
public class UserListFragment extends Fragment {
private Subscription mSubscription;
private Client mClient;
...
public void onStart() {
super.onStart();
mSubscription = mClient.users()
.observeOn(AndroidSchedulers.mainThread()) // 結果の通知はUIスレッドで行います
.subscribe( // subscribeのタイミングで処理が開始され、コールバックに結果が通知されます
this::render, // 結果をrenderメソッドに渡します
error -> showErrorAlert(error), // エラー発生による終了時の処理を指定します(optional)
() -> showCompletedAlert()); // 正常終了時の処理を指定します (optional)
}
// ※mainThread()はRxAndroidの実装を参照のこと。
public void onStop() {
// viewがなくなる前には処理が終わってなくてもunsubscribe()してキャンセルし、コールバックへの参照を破棄します
// (Observableが正しく実装されていれば)参照が破棄されればActivity/Fragmentはリークしません
// どのライフサイクルでsubscribe/unsubscribeするかはまだベストプラクティスが定まってないです
mSubscription.unsubscribe();
super.onStop();
}
private void render(List<User> userList) {
...
}
...
}
このようにAsyncTaskやAsyncTaskLoader、IntentServiceを新たに設置しなくても、リークしないしシンプルなコードで書けます・・!
今回は画面回転時のキャッシュなどはしない割り切った実装ですが、できればその辺りにもtryしてみようと思います。
※PromiseとObservableの大きな違いは、複数の値を返却できる点です。
なお、今回自分で作った実装ではAndroidSchedulers.mainThread()はRxAndroidのものを利用せず、参考に実装しなおしたものを使っています。RxAndroidを使わなかった理由は後述します。
変更通知の流れが追いにくくなった問題
表示のたびにFragmentからModelを呼び出しているような場所に、変更通知イベントによる表示の更新を加えると、呼び出しの流れがやや複雑になってしまいます。また、EventBusなどを介すると、そのイベントがどこから飛んでくるのかが一目ではわからないという課題もあります。
┌────────┐
| Model2 | ──( save )───┐
└────────┘ ↓
┌──────┐ ┌───────────────────┐ ──( call )─→ ┌───────┐ ──( call )─→ ┌────────┐
| View |←─(render)──| Activity/Fragment | | Model | | DB/API |
└──────┘ └───────────────────┘ ←─(return)── └───────┘ ←─(return)── └────────┘
↑ ┌───────────┐ │
└─(event)── | Event Bus | ←─(post)──┘
└───────────┘
これをObserverパターンにすれば、変更内容が通知されるごとにrenderするだけで良くなり、またsubscribeするときにModelを参照しているので可読性が高まります。
┌────────┐
| Model2 | ──( save )───┐
└────────┘ ↓
┌──────┐ ┌───────────────────┐ ┌───────┐ ──( call )─→ ┌────────┐
| View |←─(render)──| Activity/Fragment | ←─(update)── | Model | | DB/API |
└──────┘ └───────────────────┘ └───────┘ ←─(return)── └────────┘
│ ↑
└ ─ ─ ─ (subscribe once) ─ ─ ─ ┘
それ、RxJavaで(ry
追記:元の実装だとタイミングによっては最新の状態を受け取れなくなってしまうのと、onBackpressureLatest()が効果がない状態だったので、replay(1)を使って書き直しました。
ObservableはonComplete()やonError()を呼びださなければ、いつ何度でもデータを流すことができます。
public class UserModel {
...
// 通知が来るたびにリクエストを走らせるObservableを作ります
private PublishSubject<Void> mUpdateNotificationPublisher = PublishSubject.create()
private Observable<List<User>> mUserUpdateObservable = mUpdateNotificationPublisher
.onBackpressureLatest() // 後続が「処理できますよ」と言っている数よりもたくさんきたら、
// 最新の1つだけをキューイングし残りを捨てる。
.flatMap(aVoid -> mClient.users(), 1) // 値が飛んできたらリクエストを投げるObservableを返す。
// flatMapは返されたObservableをsubscribe()して後続につなぐ。
// maxConcurrentに1を指定すると、同時に1リクエストだけ走るようになります。
.replay(1).refCount() // 複数回subscribeされても1つの結果を全員に配布する。
// replay(1)はsubscribe()された時に最新の1つのonNext()を渡す。
// refCount()は誰か1人でもsubscribe()してる時に上流の処理を実行する。
// ※publish()またはreplay()しないとsubscribe()した
// 回数分だけ(無駄に)並列にリクエストが走ってしまいます。
// 複数回走る理由はHot/Cold Observableの概念で後述します。
private void notifyUpdate() {
mUpdateNotificationPublisher.onNext(null);
}
public Observable<List<User>> observeUsers() {
return mUserUpdateObservable; // 何回叩かれても同じHot Observableをsubscribe()する
}
}
public class UserListFragment extends Fragment {
private Subscription mSubscription;
private UserModel mUserModel;
...
public void onStart() {
super.onStart();
mSubscription = mUserModel.observeUsers()
.observeOn(AndroidSchedulers.mainThread()) // 結果の通知はUIスレッドで行います
.subscribe(this::render) // subscribeで監視と初回のリクエストが開始され、変更があるたびにrenderが呼び出されます
}
public void onStop() {
mSubscription.unsubscribe();
super.onStop();
}
private void render(List<User> userList) {
...
}
...
}
Fragment側に一切複雑な処理を書くことなく、リアクティブ感にあふれる、変更が勝手に反映される実装が書けました・・!
※誰からでもpostできるようなイベントはEventBusを、特定の相手のイベントを見たい場合はObservableを、といった使い分けが(どこかで)提案されていました。
導入したくなったら
-
RxJava: 本体
- build.gradleに
compile 'io.reactivex:rxjava:x.xx.xx'
最新版は上のリンクで確認のこと
- build.gradleに
-
retrolambda: lambda
- Androidでもlambdaが使えるようになります
-
RxJavaDebug: デバッグ用
- 結局使ってないです
それなりに調べないとわからなかったこと
まずは実装例を載せてみたのですが、リスト操作以降は理解しておくべき事柄が増加します。
- リスト操作:ストリーム処理、Operator、toBlocking()
- 非同期処理:↑+subscribe()、unsubscribe()、Scheduler、(必要に応じて)Observableの自作方法
- 変更通知: ↑+Subject+できればBackpressure
これ以降は正直最初っから理解しようとするとしんどい領域かと思われるので、これらの扱いが簡単なリスト操作から始めたほうがよいと思います。非同期処理やってみるぜ・・!と思ったら、下記を順番にご覧ください。
unsubscribe()をするのが面倒~~、かつそれを何とかするRxAndroidは今使えない~~
追記:subscribe/unsbuscribeライフサイクルに関しては別のQiita記事(詳解RxJava)に今の見解を書きました
追記:RxLifecycleが元のRxAndroidのunsubscribe周りに近いですが、上記の記事のようにonStart()だけで十分だなぁと思ったので使っていません。
subscribe()で返却されるSubscriptionをunsubscribe()しないと、AsyncTaskのようにタスク完了までActivity/Fragmentがリークしてしまいます(一応onComplete()もしくはonError()の際に自動でunsubscribe()されます)。本来ではRxAndroidで対応できそうなんですが、大きくなりすぎてるから分割するぞ by Jake氏となっている通り、今あまり使えない感じです。
学習コストを抑えるためもかねて、onCreate()などのメソッドでsubscribeとunsubscribeを自分で叩くことにしました。自前でのSubscription管理を簡単にするには、CompositeSubscriptionにadd()して、あとで複数まとめてunsubscribe()します。
public class UserListFragment extends Fragment {
private CompositeSubscription mCompositeSubscription;
private Client mClient;
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
mCompositeSubscription = new CompositeSubscription();
Subscription subscription = mClient.users()
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> render(result));
mCompositeSubscription.add(subscription);
}
public void onDestroy() {
mCompositeSubscription.unsubscribe();
super.onDestroy();
}
...
}
更新処理を実行する場合など、subscribe()を何度も呼び出す場合があると思いますが、この場合タスクが完了するたびにいちいちCompositeSubscriptionからremove()しないとどんどん増えていってしまいます。そのため、subscribe()とunsubscribe()(or終了)の際に勝手にCompositeObservableのadd()とremove()を呼び出す仕組みを作りました。
https://gist.github.com/ypresto/accec4409654a1830f54
追記:これは必ずsubscribe()
の一番直前で呼び出さないと、うまくunsubscribeされない場合があるので注意が必要です。
mClient.users()
.observeOn(AndroidSchedulers.mainThread())
.lift(CompositeObservables.attachTo(mCompositeObservable)) // liftはカスタムのOperatorを使うためのものです
.subscribe(result -> render(result));
※CompositeSubscriptionは一度unsubscribeすると再利用できないので注意。attachTo()はmCompositeObservableを作りなおすたびに呼び出す必要があります。
http://gfx.hatenablog.com/entry/2015/06/08/091656
subscribeOn()、observeOn()とScheduler、スレッド切り替えとその有効範囲
追記:当時より理解が深まったので疑似コード付きの説明を別のQiita記事(詳解RxJava)に書きました
最初に理解するのに時間がかかったものの1つが、実行スレッドを切り替えるためのSchedulerの使い方です。
Schedulerを指定すると、ストリームの実行(onNext()、onComplete()、onError()などの呼び出し)のスレッドを変更することができます。
- subscribeOn(): subscribe()で実行される処理全体のスレッドを(ストリームの根っこ:ソースから)変更
- observeOn(): 呼び出した以降のストリームを別のスレッドで実行
subscribeOn()は、遅いメソッドをmap()やカスタムのObservableで呼び出す場合は指定する必要があります。ただし、一番ソースに近いsubscribeOn()だけが有効です。何度も叩くとスレッド生成分無駄とのこと。なお、retrofitは最初からスレッドが決まっているようで効果が無いみたいです。
observeOn()は、Viewの更新はMain Threadでしか呼べないので、非同期のリクエストなどを叩いた際はsubscribe()の直前で必ず呼び出す必要があります。今のところRxAndroidのAndroidSchedulers.mainThread()を参考に再実装するのが良さそうだと思います。 RxAndroidの1.0はAndroidSchedulersの実装だけを含むライブラリになったので、これを使うのが最適です。
なお、ドキュメントに記載のない限り、subscribeOn()の指定がなければsubscribe()したスレッドで実行されます。
※Observableを返却できるflatMap()を使った際がややこしいですが、返したObservableでsubscribeOn()を叩いていると、onNext()を呼んだObservableに対応するスレッドで続きの処理も呼ばれるそうです。 https://groups.google.com/d/msg/rxjava/hVFl4YCORDQ/F-KorYBmpV0J
同期的なメソッドをObservableにする方法
追記:下記の実装を(特に、遅い処理で)すると、subscriberがリークして、結果Activityとかがleakしてしまいます。詳しくは別のQiita記事に書く予定です
最もシンプルなやり方をすると下記のようになります。
Observable.create(new Observable.OnSubscribe<List<User>>() {
@Override
public void call(Subscriber<? super List<User>> subscriber) {
subscriber.onNext(mClient.getUsers());
subscriber.onCompleted();
}
});
が、実は処理が終わるまでsubscriber(=ActivityやFragmentの中のInner Classやlambda、もしくはそれをラップしたオブジェクト)への参照が維持されるため、AsyncTaskと同様のリークが発生します。AbstractOnSubscribeを使うと最初からunsubscribe()によるキャンセルなどがサポートされた状態になります。
// ※AbstractOnSubscribeはExperimentalです
Observable.create(AbstractOnSubscribe.create(new Action1<AbstractOnSubscribe.SubscriptionState<List<User>, Void>>() {
@Override
public void call(AbstractOnSubscribe.SubscriptionState<List<MediaFile>, Void> subscriptionState) {
subscriptionState.onNext(client.getUsers());
subscriptionState.onCompleted();
}
})).subscribeOn(Schedulers.io());
※rxjava-async-utilを使うとAsyncObservable.start()を使用することもできますが、値がキャッシュされたりするので今回は見送りました。
Hot/Cold Observableという概念とConnectable Observableがとっつきにくい問題
もう1つとっつきにくかったものが、HotとColdという概念があるということです。
private Observable mObservable = mClient.users().map(users -> heavyMethod(users));
...
mObservable.subscribe(users -> render(users));
mObservable.subscribe(users -> render(users));
mObservable.subscribe(users -> render(users));
最初はheavyMethod()の呼び出しは1回で済むと思ってしまいました。しかし、Observableは非同期なので、mObservableにはいかなる結果もキャッシュされておらず、heavyMethod()は3回呼ばれてしまいます。変更通知の実装例でshare()(publish().refCount()と同義)の呼び出しをしていたのは、分岐の根っこにして処理結果を共有するためです。このように、分岐の根っこになる特別なものをHot Observable、ならない通常のものをCold Observableと呼びます。
RxのHotとColdについてにて詳しく解説されています。
Connectable Observableはsubscribeされたらすぐに動作が始まるHot Observableを、全員のsubscribe()が終わるまで遅らせるための仕組みで、refCount()を使うと誰かがsubscribe()している時だけ開始された状態になります。下記のようなものがあります。
- publish(): subscribe()した先に以後届いたものを垂れ流すだけ(Multicast)
- replay(): subscribe()するたびに今まで届いていたitemを全部配信しなおす(最新のn件も可能)
なお、Hot Observableは開始されるとその手前までのストリームを代表してsubscribe()している状態になります。
Observableに自分で値を流すにはSubjectを使用する
Observable.from(List)だと最初に決めた値しか流すことができません。後から値を流したい場合はSubjectを使用しonNext()を呼びます。SubjectはObservableであると同時にコールバック(Subscriber)でもあります。
- PublishSubject
- PublishSubjectのonNext()を呼び出した時にsubscribe()したコールバックに同じ値を渡します
- 通知(イベント)の実装に便利
- BehaviorSubject
- subscribe()の際に最後にonNext()された値を流し、以降onNext()されるたびに値を流す
- 変更が起きる値を表現するのに便利
Backpressureという概念
(追記)Backpressureの仕組みについては別の記事を書きました。こちらも合わせてどうぞ。
Backpressureについてはまだ勉強中です。ReactiveXのintroにも記載があるように、Iterableはpull(next()で取ってくる)、Observableはpush(onNext()で渡ってくる)です。pushの場合、自分でpullしている場合に比べ、連打されて処理が追いつかない時の制御が難しくなります。これを制御するために、あとどのくらいならonNext()が叩かれても大丈夫かをソース側の方に伝える仕組みです。
UIで1文字入力ごとに処理呼び出す場合などに役立ちそうですが、そのような場合一定時間内のイベント数を制御するOperatorを使ってみてくださいと書かれています。