Help us understand the problem. What is going on with this article?

KotlinでAndroidのRxJavaをちょっと便利に

More than 3 years have passed since last update.

これはKotlinアドベントカレンダー2015 17日目の記事です。

TL;DR

  • Kotlinの拡張関数でRxJavaがいろいろ便利になるよね
  • RxKotlinというのはRxJava用の便利な拡張関数を集めたライブラリだよ
  • RxJavaのCompositeSubscriptionがめんどくさい問題はDelegationでスッキリ解決できるよ
  • ここで紹介したDelegationを使った実装をまとめてライブラリとして公開してみたよ。詳細はこっちの記事を読んでね。

はじめに

RxJavaはコールバックを多用するライブラリです。Java7以前のバージョンではラムダ式は利用できずに、無名クラスでの実装となってしまうために、記述が冗長となってしまい、Kotlinの利用を考えた方も多いのではないでしょうか?
そこで、Kotlinだからこそできる、RxJavaの便利な利用方法を紹介してみたいと思います。

まずは拡張関数

まず、RxJavaを便利に使おうとして思いつくのは拡張関数ではないでしょうか?
Kotlinの拡張関数の機能を使えば、便利な機能はObservableのメソッドとしてガンガン追加して、メソッドチェーンでの記述ができるようになります。

例えば、

文字列などが空だった場合にfilterするfilterNotEmpty()を以下のように定義すると

filterNotEmpty
fun <T> Observable<T>.filterNotEmpty(): Observable<T> = filter {
    when (it) {
        is String -> !StringUtils.isEmpty(it)
        is List<*> -> it.count() > 0
        else -> it != null
    }
}

そのままObservableの拡張として以下のように書けてしまいます。

Kotlin
fun test(){
    Observable.from(arrayOf("a", "b", "", "d"))
            .filterNotNull()
            .subscribe { 
                // itの型はString
                // "a", "b", "d" のみがやってくる
            }

}

あとは例えば、observeOnをmainThreadで行うためのobserveOnMainThread()なども定義しておくと地味に便利です。

observeOnMainThread()
fun <T> Observable<T>.observeOnMainThread() = observeOn(AndroidSchedulers.mainThread())

以前の記事で扱ったRxJavaを使った通信中にProgressダイアログを出す処理などもJavaでは

再利用可能なProgressダイアログ表示Observable
    void onClick() {
        usingProgressDialog()
                .flatMap(new Func1<Void, Observable<String>>() {
                    @Override
                    public Observable<String> call(Void aVoid) {
                        // flatMapでloadData()を実行
                        return loadData();
                    }
                })
                .subscribe();
    }

のように、始めにProgressDialogを出しつつVoidをemitするObsevableを定義して、それから実際に行いたい処理をflatMapで続けるような処理を書く必要がありましたが、Kotlinでは

withProgressDialog
fun <T> Observable<T>.withProgressDialog(): Observable<T> =
        Observable.using({
            showProgressDialog()
        }, { this }, {
            dismissProgressDialog()
        })

のように実装することで

loadData中にprogressDialogを表示する
fun onClick() = loadData().withProgressDialog().subscribe()

のように、メソッドチェインの中で、より簡単にprogressDialog表示の機能を追加することができます。
もっともこの場合にはprogressDialogの表示を実行するスレッドに気を配る必要があります。

あとは以下のように、共通のエラーハンドリングの処理を定義したり、

RetryPolicy
fun <T> Observable<T>.myErrorRetry() = retry { i, throwable -> 
    when(throwable){
        is MyApiError -> i < 5
        else -> false
    }
}

他にもアイデア次第で便利な拡張関数を定義することができると思います。

RxKotlin

上記のような拡張関数を使って一般的に使われる処理を集めたライブラリがRxKotlinです。単純なObservableの拡張関数の他に

toObservable()
public fun <T> Array<out T>.toObservable() : Observable<T> = Observable.from(this)

のようにArrayに対してtoObservable()というメソッドを追加してObservableの生成を容易にしたり、

BehaviourSubject()
public fun <T> BehaviourSubject() : BehaviorSubject<T> = BehaviorSubject.create()

create()などのstatic methodを使わずにコンストラクタっぽく扱えるようにしたりする関数が定義されています。

CompositeSubscriptionをどうにかしたい!

CompositeSubscriptionがめんどくさい問題

RxJavaでは、あるObservableにsubscribe()した場合、それをunsubscribe()する必要があります。ストリームがonComplete()onError()で終了した場合は自動的にunsubscribeされますが、それ以外ではRxJavaを利用するclassでCompositeSubscriptionがオブジェクトを持ち、そこにsubscribe()後のsubscriptionオブジェクトを登録し、あるタイミングで一括にunsubscribe()するような処理が一般的です。

例えば、あるActivityで通信処理をRxJavaで実装した場合、画面が終了した場合などのことを考えて、onPause()でそれをunsubscribe()するコードをrxt4aライブラリを使って書こうとすると

class SomeActivity : Activity() {

    val compositeSubscription = AndroidCompositeSubscription()


    fun onClick() {
        loadData()
                .lift(OperatorAddToCompositeSubscription<String>(compositeSubscription))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // Do something
                }
    }

    override fun onPause() {
        super.onPause()
        compositeSubscription.unsubscribe()
    }
}

のようになります。このAndroidCompositeSubscriptionを毎回定義するのもめんどくさいですし、CompositeSubscriptionに登録するための.lift(new OperatorAddToCompositeSubscription<String>(compositeSubscription))もなかなかめんどくさいですね。

もちろん、RxLifeCycleを使用して適切な状況でUnsubscribeするというのも手ですが、RxLifeCycleを使った場合は必ずRxActivityというクラス継承しなければならない、などの制約があります。

さて、これをKotlinで便利な感じに解決するにはどうしたらよいでしょうか?
なんとなく.lift(new OperatorAddToCompositeSubscription<String>(compositeSubscription))は拡張関数でなんとかなりそうな感じがしますが、その前にCompositeSubscriptionの定義をなんとかしたいですね。

そこでDelegation!

KotlinにはDelegationという機能があります。Delegationの説明は、Kotlinアドベントカレンダー2015の二日目のみんな大好きKotlinのDelegationについての記事が参考になります。

簡単に言うと、あるクラスから別のクラスにメソッドの実装を委譲する仕組みです。
例えば

MyListクラスのListインターフェイスの処理をArrayListに委譲する
class  MyList<T> : List<T> by ArrayList<T>() { }

と書くと

MyListからListのcontains()メソッドを呼び出す
MyList<String>().contains("hoge") // => false

MyListのインスタンスではListインターフェイスのメソッドを呼び出せてしまうのです。

この仕組みを先ほどのCompositeSubscriptionを持ったActivityに適用してみましょう。

まずcompositeSubscriptionを持つインターフェイスをとその実装を定義します。

AutoUnsubscribable
interface AutoUnsubscribable {
    val compositeSubscription: AndroidCompositeSubscription
    fun unsubscribe()
}

class AutoUnsubscribableImpl : AutoUnsubscribable{
    override val compositeSubscription = AndroidCompositeSubscription()
    override fun unsubscribe() = compositeSubscription.unsubscribe()
}

そしてこれを先程のActivityでclass delegateしてあげます

AutoUnsubscribableのDelegationを適用
class SomeActivity : Activity(), AutoUnsubscribable by AutoUnsubscribableImpl() {
    fun onClick() {
        loadData()
                .lift(OperatorAddToCompositeSubscription<String>(compositeSubscription))
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // Do something
                }

    }

    override fun onPause() {
        super.onPause()
        unsubscribe()
    }
}


これでActivityの実装からcompositeSubscriptionの定義を取り除くことができました。
また、onPause()で呼び出していた、compositeSubscription.unsubscribe()unsubscribe()のみを呼び出せばよいのでかなりスッキリです。

次は、.lift(OperatorAddToCompositeSubscription<String>(compositeSubscription))の部分ですね。これを拡張関数として実装していきます。先程のAutoUnsubscribable()のインターフェイスに拡張関数の定義を追加していきます。

AutoUnsubscribableにObservableの拡張関数を追加
interface AutoUnsubscribable {
    val compositeSubscription: AndroidCompositeSubscription
    fun <T> Observable<T>.autoUnsubscribe():Observable<T>
    fun unsubscribe()
}

class AutoUnsubscribableImpl : AutoUnsubscribable{
    override fun <T> Observable<T>.autoUnsubscribe():Observable<T> {
        return this.lift(OperatorAddToCompositeSubscription<T>(compositeSubscription))
    }
    override val compositeSubscription = AndroidCompositeSubscription()
    override fun unsubscribe() = compositeSubscription.unsubscribe()

}

ここではautoUnsubscribe()という関数をObservableの拡張関数としてインターフェイスに定義し、実装クラスで実装しています。
ちなみにインターフェイスで定義した拡張関数はそのインターフェイスを実装したクラス内でのみ有効になります。

さてその結果もとのActivityはどうなったかというと

AutoUnsubscribableをDelegateしたActivity
class MainActivity : Activity(), AutoUnsubscribable by AutoUnsubscribableImpl() {

    fun onClick() {
        loadData()
                .autoUnsubscribe()
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe {
                    // Do something
                }

    }

    override fun onPause() {
        super.onPause()
        unsubscribe()
    }
}

のように元のcompositeSubscriptionは跡形もなくスッキリ!
実装AutoUnsubscribeをDelegationしたクラスからはcompositeSubscriptionは参照されることはないので、インターフェイスから消してしまいましょう。

AutoUnsubscribable
interface AutoUnsubscribable {
    fun <T> Observable<T>.autoUnsubscribe():Observable<T>
    fun unsubscribe()
}

class AutoUnsubscribableImpl : AutoUnsubscribable{
    override fun <T> Observable<T>.autoUnsubscribe():Observable<T> {
        return this.lift(OperatorAddToCompositeSubscription<T>(compositeSubscription))
    }
    private val compositeSubscription = AndroidCompositeSubscription()
    override fun unsubscribe() = compositeSubscription.unsubscribe()

}

これでcompositeSubscriptionのボイラープレートからおさらばです!
Kotlin便利ですね!

ちなみにここで紹介した実装をライブラリとして公開しました。詳しくはこちらの記事を読んでください。

おわりに

ここではKotlinの拡張関数、Delegationを用いてRxJavaのボイラーブレート的な実装を減らしていく試みを紹介しました。これらのテクニックはRxJavaにかぎらず様々なところに適用できると思うので、みなさん

Kotlinアドベントカレンダー2015
明日は@magie_poohさんです

sansan
法人向け名刺管理サービスSansan及び個人向け名刺管理サービスEightを企画・開発・販売するベンチャー
http://jp.corp-sansan.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした