この記事は、Replace AsyncTask and AsyncTaskLoader with rx.Observable – RxJava Android Patternsの翻訳です。多分に意訳が含まれます。あと、そんなに RxJava を使ったことがないせいもあって、若干怪しい記述があるかもしれません…そして、個人的には AsyncTask はやればできる子だと思っています :p
はしがき
RxJava に関して、はじめの導入に関する記事は至る所で投稿されている。中には Android での事例を取り扱っているものもある。一方で、導入に至るまでの記事について聞いたことはあっても、実際の Android プロジェクトの中にある課題を解決するにあたり、どのように使っていくか、なぜそれを使うのかについて理解していない人が居るようにも思う。このシリーズのなかでは、RxJava を設計の軸に置いた幾つかのプロジェクトで見てきたパターンについて深めていこうと思う。
はじめに、Android 開発者の多くが経験するよくある辛みポイントについて議論し、RxJava でどのようにそのポイントを乗り越えるかを紹介する。そこから、より発展的で最適な解決方法へと議論を移していく。このシリーズ中、いろいろな開発者から、RxJava を用いて同じような課題に取り組んだ経験から、私が得た経験とどこが違ってどこが同じかという意見を頂きたい。
Problem 1: Background Task
Android 開発者が最初に出会うチャレンジのひとつに、いかにして非同期処理とそのコールバックを効果的に実行するか、というものがある。Web サービスからデータを取り出してきて、という要求は頻繁に目にするだろう。読者の中には、そのようなことに取り組んできた人たちも居るだろうが、おそらくこのように言うだろう「それのどこがチャレンジングなんだ。AsyncTask
を立ち上げさえすれば、あとはそいつに任せれば、面倒は全部引き受けてくれて良いではないか」。このように言うということは、複雑すぎる方法に慣れてしまっており、より整理された方法があり、またきちんと対応するべき特殊な問題に対応していないことに気づいていないということで、これを見直すのにはちょうどよい機会なのではなかろうか。
Default Solution: AsyncTask
AsyncTask
は Android に標準で組み込まれているフレームワークで、時間を要する簡単な処理を、UI をブロックせずに実行するためのものである(注:最近では、AsyncTaskLoader
が、データを読み込むというより特化した用途のものが出てきた。これについては後ほど議論する)。
ぱっと見、簡単に使えそうな気がする。あなたは、バックグラウンドスレッドで動作するコードを書き、その後 UI スレッドで動作するコードを別に書き、そうすれば、勝手にスレッドを超えてデータのやり取りをしてくれるようになる。
private class CallWebServiceTask extends AsyncTask<String, Result, Void> {
protected Result doInBackground(String... someData) {
Result result = webService.doSomething(someData);
return result;
}
protected void onPostExecute(Result result) {
if (result.isSuccess() {
resultText.setText("It worked!");
}
}
}
AsyncTask
には、その細部に諸悪の根源がある。詳しく見てみよう。
Error handling
最初の問題は、次のような簡単な使い方に見られる。
「何か間違ったら何が起こるのだろうか?」
残念なことに、これを解決できる銀の弾丸のようなものはなく、結局開発者たちは、AsyncTask
を継承して、doInBackground()
の処理を try-catch
で囲み、結果と例外のペアを持つオブジェクトを返して、その結果に応じて、onSuccess()
とかonError()
とか独自に定義したメソッドを呼び出すようにしている(私は、例外の参照を残しておいて、それをonPostExecute()
でチェックするような実装も見たことがある)。
これで少しはマシになるが、独自に定義したものの分だけ余分にコードを書く羽目になるので、このようなコードは余分に時間を取らせるし、おそらくすべての開発者やプロジェクトにとって有用とは限らない。
Activity/Fragment lifecycle
他にも問題はある。
「AsyncTask が走っている最中に、バックボタンで画面を戻ったり、デバイスを回転させたりしたらどうなるんだ?」
ああ、投げっぱなしジャーマンを決めているのなら何の問題もないだろう。しかし、その結果を UI に反映しなければいけないとしたら?何も考えずに居ると、NullPointerException が飛んできてクラッシュする。なぜなら、結果を反映する頃には Activity は存在しなくなっており、null
なものにアクセスしようとしてしまうからだ。
繰り返すが、AsyncTask
はそれほど役に立たない。開発者としては、タスクへの参照が保持されることを保証しておきたいし、Activity が死んだらタスクはキャンセルされるようにしておきたいし、結果を受け取った時の処理を書くonPostExecute()
では Activity が正当な状態にあることを保証しておきたい。
そうすると、綺麗でメンテナンスしやすい方法でコードを書きたいのに、いろいろ雑多なものを持ち込んでしまう。
Caching on rotation (or otherwise)
もしユーザが、画面回転をしただけで同じ画面に滞在した場合はどうだろうか。非同期処理をキャンセルしてしまったら、この場合はもう一度非同期処理をする羽目になり、効率が悪い。あるいは、非同期処理が何らの状態をどこかで冪等な方法で変えている一方で、非同期処理の結果を UI への反映のために知りたいから非同期処理を再度実行したくないということもあるだろう。
非同期にデータの読み取りだけをするのなら、AsyncTaskLoader
が使える。しかしこれもまた多くのボイラプレートなコードをもたらしてしまう。しかもエラーハンドリングのフレームワークがないし、Activity を超えたキャッシュの仕組みもない。より変なクセもある。
Composing multiple web service calls
さて、事のあらましを明らかにしてうまく動くようにできたとして、私達はさらに、ネットワークアクセスをいくつか連続して行い、それぞれ直前の呼び出しに依存するようにしたい。あるいは、いくつかのネットワークアクセスを、パフォーマンスを上げるために並列化し、最後にそれを一つに統合してから UI に返したいとしたらどうするだろう。申し訳ないが、もう一度言わせていただくと、このようなケースでもAsyncTask
はやはり役に立たない。
このようなことを実現しようと思うと、以前に挙げた問題がどれも、大した複雑度や辛さではないいように思えてくる。より複雑なスレッドモデルをいい感じに取り扱う必要があるのだから。呼び出しをチェーン上につなぐには、それぞれを分割することでコールバック地獄へと陥ったり、呼び出しを同期的に一つのバックグラウンドにまとめて実行した結果、他の似たような状況でも、処理をまとめるコードを別に重複して書いていかなければならなくなったりする(この辺複雑でうまく訳せたか自信がない…)。
そして処理を並列に動かす場合、独自のExecutor
を用意してどこでも使えるようにしなければならない。AsyncTask
はデフォルトでは並列に動かないからこのようにするのだ(注:Android 2.x系は並列で動く)。そして、複数のスレッドの動きをうまくまとめるには、より複雑な同期化のパターンを、CountDownLatch や Thread、Executor や Future を用いて自分で何とかしなければならなくなる。
Testability
最後に、コードのユニットテストを考えた時、AsyncTask
はまたしても役に立ってはくれない。AsyncTask
のテストは、普通ではする必要のない工夫なしには難易度が高く、多くの場合脆くてメンテナンスがしづらい。これに関して、うまく乗り越える方法を示している記事がある。
Better Solution: RxJava’s Observable
幸運にも、これまで議論したすべての問題には、素晴らしい解決策が用意されていて、それは RxAndroid に詰め込まれている。詳しく見てみよう。
Error handling
さて、これまで見てきたAsyncTask
と同等のことをするコードを、Observable を用いて書いてみるとする(もし Retrofit を使っていたら、Observable 型の戻り値を取り扱うことができ、非同期にすべき仕事はスレッドプールで管理してくれる。そして、あなたはそれ以上のことはする必要がない)。
webService.doSomething(someData)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
result -> resultText.setText("It worked!")),
e -> handleError(e)
);
おそらく、何もしなくても、我々はすでに成功と失敗の両方の場合についてハンドリングしていることに気がつくだろう。これはAsyncTask
ではなしえなかったことだ。そして、私達はずっと少ない量のコードを書いている。私達がすべき追加の仕事は、Observer に対してメインスレッドで結果をハンドリングするよう宣言することだけだ。この違いによって、私達はより良い方へ進んでいくのだ。そして、もしwebService
がバックグラウンドのスレッドで処理するようになっていない場合、ここで非同期に処理するよう宣言することが出来る。.subscribeOn(...)
がそれをしてくれるのだ(注:これらの例は Java8 のラムダ式を前提に書かれている。これは現在の私の意見だが、得られるものはリスクよりも大きく、私達はプロジェクトでこれを使うようにしている)。
Activity/Fragment lifecycle
さて、RxAndroid を使って以前に述べたようなライフサイクルの問題に対峙してみよう。ここでは、mainThread() スケジューラを指定しないようにしたい(そう、RxAndroid ならはじめからそうなっている)。このようなコードになるはずだ。
AppObservable.bindFragment(this, webService.doSomething(someData))
.subscribe(
result -> resultText.setText("It worked!")),
e -> handleError(e)
);
特に私がしていることとしては、ヘルパメソッドを作ってベースとなるFragment
に置き、すこしシンプルにしていることだ。RxFragment
を見れば理解していただけるだろう。
bind(webService.doSomething(someData))
.subscribe(
result -> resultText.setText("It worked!")),
e -> handleError(e)
);
AppObservable.bindFragment()
を用いることで、一連の呼び出しのつながりの間に、Fragment や Activity が正当な状態でなくなった場合に、onNext()
を実行してしまうことを防いでくれるようになる。
もし不正な状態のまま実行しようとした場合は、subscribe
していたものは解除され、処理は止まる。これによって、NPE のリスクはなくなって、クラッシュしないようになる。一つ注意しておくと、Fragment や Activity をこのままにしておくと、一時的か永続的か、それをリークさせてしまう。これは議題にある Observable の振る舞いに依存する。だから、bind()
のなかでは、LifeCycleObservable
のメカニズムにフックして、Fragment が死んだら自動でsubscribe
を解除するようにしている。これの良い点は、一度そのように作ってしまえば、使う側は二度と気にすることがなくなるという点にある。
そう、つまり、これは最初にあげたふたつの大きな問題についてケアしてくれるのだ。しかし、次の問題でこそ、RxJava の真価が発揮される。
Composing multiple web service calls
ここでは、詳細を細かく見ていくつもりはない。それはとても深遠なトピックであり、Observable を使えば、複雑なことを簡単に、分かりやすい形式でできるようになるからだ。このサンプルでは、それぞれ依存しあう Web Service の呼び出しを数珠つなぎにしており、ふたつ目の呼び出しのまとまりをスレッドプールから取り出した複数のスレッドで並列実行し、その結果を統合、ソートして、Observer に返している。filter も良い例なのでここで使っている。これらすべてのロジックと最後のまとめは、文字通り5行のコードで宣言されている。さて、どうなっているかというと…
public Observable<List<CityWeather>> getWeatherForLargeUsCapitals() {
return cityDirectory.getUsCapitals()
.flatMap(cityList -> Observable.from(cityList))
.filter(city -> city.getPopulation() > 500,000)
.flatMap(city -> weatherService.getCurrentWeather(city)) //each runs in parallel
.toSortedList((cw1,cw2) -> cw1.getCityName().compare(cw2.getCityName()));
}
Caching on rotation (or otherwise)
さて、これはデータの"読み込み"の話なのだが、もしかすると、我々はそのデータをキャッシュして、デバイスの回転ですべての web service の呼び出しをトリガしないようにしたくなるかもしれない。我々はこれにうまく対応することが出来る。そう、AsyncSubject
やBehaviorSubject
を使えばね。これらは、subscribe
されるならばいつも、最後の結果をもう一度返してくれる(BehaviorSubject
をsubscribe
することは、最終的な結果やキャッシュされたデータを読み取るだけでなく、データが更新されたことの通知を受け取ることにも役に立ってくれる)。これについては後ほど、Observable
を event bus のスタイルで使うことを議論するところで話をしよう。
我々は新しいデータを Activity の生成時に得て、キャッシュされたものはデバイスの回転時にだけ得られるようにしたくなる。これをするには、invalidate()
メソッドを定義し、savedInstanceState == null
の時に呼べばよいだけだ。そうすれば、BehaviorSubject
はリセットされ、新しいObservable
の連続の処理が始まる。
public void onCreate() {
super.onCreate()
if (savedInstanceState == null) {
weatherManager.invalidate(); //invalidate cache on fresh start
}
}
public void onViewCreated() {
super.onViewCreated()
bind(weatherManager.getWeatherForLargeUsCapitals()).subscribe(this);
}
public void invalidate() {
weatherSubject = null;
}
public Observable<List<CityWeather>> getWeatherForLargeUsCapitals() {
if (weatherSubject == null) {
weatherSubject = BehaviorSubject.create();
cityDirectory.getUsCapitals()
.flatMap(cityList -> Observable.from(cityList))
.filter(city -> city.getPopulation() > 500,000)
.flatMap(city -> weatherService.getCurrentWeather(city))
.toSortedList((city1,city2) -> city1.name.compare(city2.name))
.subscribe(weatherSubject);
}
return weatherSubject;
}
このすごいところは、Loader とちがって、読み込みの結果をキャッシュし、それを Activity や Service を超えて扱うことが出来る点だ。invalidate()
の呼び出しをonCreate()
から消し、Manager
オブジェクトにいつ新しい天気のデータを返すようにすべきかを決めさせよう。おそらくタイマーによって、あるいはユーザが移動した時、あるいは……。どのようなときに、ということは問題ではない。あなたはついに、いつ、そしてどのようにキャッシュを invalidate してリロードすべきかについて制御できるようになったのだ。さらに、Fragment や Manager オブジェクトのインタフェースは、キャッシュ戦略が変わろうとも変化しない。どんなときも、List<WeatherData>
のObserver
であるから。
Testability
テストについてが、最後の明らかにすべき議題だ(テスト中、web service をモックのものにすり替えることに関しては無視しよう。これはシンプルで、それらの依存性の注入をする標準のパターンはおそらくすでに実践されているだろう)。
幸運にも、Observable は、非同期なメソッドを同期的なメソッドに変えるための単純なメソッドを持っている。すべきことのすべては、.toBlocking()
を呼ぶことだ。上記のコードをテストする例を見てみよう。
List results = getWeatherForLargeUsCapitals().toBlocking().first();
assertEquals(12, results.size());
それだけだ。我々はもはや、スレッドをスリープさせたり、Future や CountDownLatch を使った珍妙なテストを書いたりするような、諸刃の剣を使う必要はない。テストはシンプルに、わかりやすく、メンテナンスしやすくなったのだ。
Conclusion
以上。AsyncTask
やAsyncTaskLoader
の機能をrx.Observableのインタフェースへ置き換えることで、私達は、より強力で分かりやすい実装をコードに表現できるようになった。ハッピーRx!そして、今後も Android でよくある問題に RxJava Observable を使った解決の方法を提示していきたい。