これはKotlinアドベントカレンダー2015 17日目の記事です。
TL;DR
- Kotlinの拡張関数でRxJavaがいろいろ便利になるよね
- RxKotlinというのはRxJava用の便利な拡張関数を集めたライブラリだよ
- RxJavaのCompositeSubscriptionがめんどくさい問題はDelegationでスッキリ解決できるよ
- ここで紹介したDelegationを使った実装をまとめてライブラリとして公開してみたよ。詳細はこっちの記事を読んでね。
はじめに
RxJavaはコールバックを多用するライブラリです。Java7以前のバージョンではラムダ式は利用できずに、無名クラスでの実装となってしまうために、記述が冗長となってしまい、Kotlinの利用を考えた方も多いのではないでしょうか?
そこで、Kotlinだからこそできる、RxJavaの便利な利用方法を紹介してみたいと思います。
まずは拡張関数
まず、RxJavaを便利に使おうとして思いつくのは拡張関数ではないでしょうか?
Kotlinの拡張関数の機能を使えば、便利な機能はObservable
のメソッドとしてガンガン追加して、メソッドチェーンでの記述ができるようになります。
例えば、
文字列などが空だった場合にfilterするfilterNotEmpty()
を以下のように定義すると
fun <T> Observable<T>.filterNotEmpty(): Observable<T> = filter {
when (it) {
is String -> !StringUtils.isEmpty(it)
is List<*> -> it.count() > 0
else -> it != null
}
}
そのままObservableの拡張として以下のように書けてしまいます。
fun test(){
Observable.from(arrayOf("a", "b", "", "d"))
.filterNotNull()
.subscribe {
// itの型はString
// "a", "b", "d" のみがやってくる
}
}
あとは例えば、observeOn
をmainThreadで行うためのobserveOnMainThread()
なども定義しておくと地味に便利です。
fun <T> Observable<T>.observeOnMainThread() = observeOn(AndroidSchedulers.mainThread())
以前の記事で扱ったRxJavaを使った通信中にProgressダイアログを出す処理などもJavaでは
void onClick() {
usingProgressDialog()
.flatMap(new Func1<Void, Observable<String>>() {
@Override
public Observable<String> call(Void aVoid) {
// flatMapでloadData()を実行
return loadData();
}
})
.subscribe();
}
のように、始めにProgressDialogを出しつつVoid
をemitするObsevableを定義して、それから実際に行いたい処理をflatMap
で続けるような処理を書く必要がありましたが、Kotlinでは
fun <T> Observable<T>.withProgressDialog(): Observable<T> =
Observable.using({
showProgressDialog()
}, { this }, {
dismissProgressDialog()
})
のように実装することで
fun onClick() = loadData().withProgressDialog().subscribe()
のように、メソッドチェインの中で、より簡単にprogressDialog表示の機能を追加することができます。
もっともこの場合にはprogressDialogの表示を実行するスレッドに気を配る必要があります。
あとは以下のように、共通のエラーハンドリングの処理を定義したり、
fun <T> Observable<T>.myErrorRetry() = retry { i, throwable ->
when(throwable){
is MyApiError -> i < 5
else -> false
}
}
他にもアイデア次第で便利な拡張関数を定義することができると思います。
RxKotlin
上記のような拡張関数を使って一般的に使われる処理を集めたライブラリがRxKotlinです。単純なObservableの拡張関数の他に
public fun <T> Array<out T>.toObservable() : Observable<T> = Observable.from(this)
のようにArrayに対してtoObservable()
というメソッドを追加してObservableの生成を容易にしたり、
public fun <T> BehaviourSubject() : BehaviorSubject<T> = BehaviorSubject.create()
create()
などのstatic methodを使わずにコンストラクタっぽく扱えるようにしたりする関数が定義されています。
CompositeSubscriptionをどうにかしたい!
CompositeSubscriptionがめんどくさい問題
RxJavaでは、あるObservableにsubscribe()
した場合、それをunsubscribe()
する必要があります。ストリームがonComplete()
やonError()
で終了した場合は自動的にunsubscribeされますが、それ以外ではRxJavaを利用するclassでCompositeSubscriptionがオブジェクトを持ち、そこにsubscribe()
後のsubscriptionオブジェクトを登録し、あるタイミングで一括にunsubscribe()するような処理が一般的です。
例えば、あるActivityで通信処理をRxJavaで実装した場合、画面が終了した場合などのことを考えて、onPause()
でそれをunsubscribe()するコードをrxt4aライブラリを使って書こうとすると
class SomeActivity : Activity() {
val compositeSubscription = AndroidCompositeSubscription()
fun onClick() {
loadData()
.lift(OperatorAddToCompositeSubscription<String>(compositeSubscription))
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// Do something
}
}
override fun onPause() {
super.onPause()
compositeSubscription.unsubscribe()
}
}
のようになります。このAndroidCompositeSubscription
を毎回定義するのもめんどくさいですし、CompositeSubscriptionに登録するための.lift(new OperatorAddToCompositeSubscription<String>(compositeSubscription))
もなかなかめんどくさいですね。
もちろん、RxLifeCycleを使用して適切な状況でUnsubscribeするというのも手ですが、RxLifeCycleを使った場合は必ずRxActivity
というクラス継承しなければならない、などの制約があります。
さて、これをKotlinで便利な感じに解決するにはどうしたらよいでしょうか?
なんとなく.lift(new OperatorAddToCompositeSubscription<String>(compositeSubscription))
は拡張関数でなんとかなりそうな感じがしますが、その前にCompositeSubscriptionの定義をなんとかしたいですね。
そこでDelegation!
KotlinにはDelegationという機能があります。Delegationの説明は、Kotlinアドベントカレンダー2015の二日目のみんな大好きKotlinのDelegationについての記事が参考になります。
簡単に言うと、あるクラスから別のクラスにメソッドの実装を委譲する仕組みです。
例えば
class MyList<T> : List<T> by ArrayList<T>() { }
と書くと
MyList<String>().contains("hoge") // => false
MyListのインスタンスではListインターフェイスのメソッドを呼び出せてしまうのです。
この仕組みを先ほどのCompositeSubscriptionを持ったActivityに適用してみましょう。
まずcompositeSubscription
を持つインターフェイスをとその実装を定義します。
interface AutoUnsubscribable {
val compositeSubscription: AndroidCompositeSubscription
fun unsubscribe()
}
class AutoUnsubscribableImpl : AutoUnsubscribable{
override val compositeSubscription = AndroidCompositeSubscription()
override fun unsubscribe() = compositeSubscription.unsubscribe()
}
そしてこれを先程のActivityでclass delegateしてあげます
class SomeActivity : Activity(), AutoUnsubscribable by AutoUnsubscribableImpl() {
fun onClick() {
loadData()
.lift(OperatorAddToCompositeSubscription<String>(compositeSubscription))
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// Do something
}
}
override fun onPause() {
super.onPause()
unsubscribe()
}
}
これでActivityの実装からcompositeSubscription
の定義を取り除くことができました。
また、onPause()
で呼び出していた、compositeSubscription.unsubscribe()
もunsubscribe()
のみを呼び出せばよいのでかなりスッキリです。
次は、.lift(OperatorAddToCompositeSubscription<String>(compositeSubscription))
の部分ですね。これを拡張関数として実装していきます。先程のAutoUnsubscribable()
のインターフェイスに拡張関数の定義を追加していきます。
interface AutoUnsubscribable {
val compositeSubscription: AndroidCompositeSubscription
fun <T> Observable<T>.autoUnsubscribe():Observable<T>
fun unsubscribe()
}
class AutoUnsubscribableImpl : AutoUnsubscribable{
override fun <T> Observable<T>.autoUnsubscribe():Observable<T> {
return this.lift(OperatorAddToCompositeSubscription<T>(compositeSubscription))
}
override val compositeSubscription = AndroidCompositeSubscription()
override fun unsubscribe() = compositeSubscription.unsubscribe()
}
ここではautoUnsubscribe()
という関数をObservableの拡張関数としてインターフェイスに定義し、実装クラスで実装しています。
ちなみにインターフェイスで定義した拡張関数はそのインターフェイスを実装したクラス内でのみ有効になります。
さてその結果もとのActivityはどうなったかというと
class MainActivity : Activity(), AutoUnsubscribable by AutoUnsubscribableImpl() {
fun onClick() {
loadData()
.autoUnsubscribe()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
// Do something
}
}
override fun onPause() {
super.onPause()
unsubscribe()
}
}
のように元のcompositeSubscription
は跡形もなくスッキリ!
実装AutoUnsubscribeをDelegationしたクラスからはcompositeSubscription
は参照されることはないので、インターフェイスから消してしまいましょう。
interface AutoUnsubscribable {
fun <T> Observable<T>.autoUnsubscribe():Observable<T>
fun unsubscribe()
}
class AutoUnsubscribableImpl : AutoUnsubscribable{
override fun <T> Observable<T>.autoUnsubscribe():Observable<T> {
return this.lift(OperatorAddToCompositeSubscription<T>(compositeSubscription))
}
private val compositeSubscription = AndroidCompositeSubscription()
override fun unsubscribe() = compositeSubscription.unsubscribe()
}
これでcompositeSubscription
のボイラープレートからおさらばです!
Kotlin便利ですね!
ちなみにここで紹介した実装をライブラリとして公開しました。詳しくはこちらの記事を読んでください。
おわりに
ここではKotlinの拡張関数、Delegationを用いてRxJavaのボイラーブレート的な実装を減らしていく試みを紹介しました。これらのテクニックはRxJavaにかぎらず様々なところに適用できると思うので、みなさん
Kotlinアドベントカレンダー2015
明日は@magie_poohさんです