14
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

フィーリングで覚えるRxJava入門

Posted at

概要

「RxJavaをどう使えばいいか」について、サンプルケースを元に覚える記事です。
ReactiveExtensionsおよびRxJavaの真面目な(原理から説明するような)解説については、文末の参考資料をご覧ください。

例1:コールバックの使い勝手を良くしたい

 Javaで非同期処理を行う際、しばしば「コールバック」というテクニックが用いられます。
 具体的には、メソッドの引数に特定のinterfaceを継承したインスタンスXを指定することで、「何かあった際にXのメソッドを叩く」ことが可能になります。
 (※別にinterfaceじゃなくてclassを継承してもいいですが、多重継承できないJavaの都合から、コールバックを指定する用としてはinterfaceがよく用いられます。また、1つのコールバックインターフェースに対応した実装クラスを1つしか作らない場合、インターフェースの定義を継承したクラス内に定義することもあります)

// コールバックのインターフェース
public interface ICallback {
    void onResponse(String data);
}

// コールバックを利用するメソッド
public class SampleApi {
    void sendRequest(ICallback callback) {
        callback.onResponse("test");  // 入れる値の例
    }
}

 こうした、コールバックを伴う関数を利用する際は、「コールバックのインターフェース」を継承したクラスXを用意し、そちらを「コールバックを利用するメソッド」に食わせることで実現します。また、コールバックの戻り値はX内でどう処理するかを決めます。

// コールバックを利用するメソッドを利用するクラス
public class SampleClass {
    // コールバックのインターフェースを継承したクラス
    class TestCallback implements ICallback {
        @Override
        void onResponse(String data) {
            mResult = data;  // 利用方法の例
        }
    }

    private String mResult = "";
    private ICallback mCallback = new TestCallback();
    private SampleApi mSampleApi = new SampleApi();

    public sampleFunc() {
        mSampleApi.sendRequest(mCallback);  // ここで呼び出す
    }
}

実際にはSampleApi型にシングルトンパターンを適用したり、コールバックをSampleApi型にDIしたりするかと思いますが、どちらにしてもコールバックと呼び出し先のクラスが密結合しているといった問題は避けられません。

そこでRxJavaを使うと、ObservableEmitterを通じてコールバックのデータ受け渡しが整理できます。

// コールバックのインターフェースを継承したクラス
class TestCallback implements ICallback {
    private ObservableEmitter<String> emitter;

    // EmitterをDIするためのメソッド
    public setEmitter(ObservableEmitter<String> emitter) {
        this.emitter = emitter;  // ここでDIしている
    }

    @Override
    void onResponse(String data) {
        // レスポンスの情報をonNextを通じてemitterに流し込む
        emitter.onNext(data);

        // 流し込みが完了したらonCompleteする
        emitter.onComplete();
    }
}

// コールバックを利用するメソッドを利用するクラス
public class SampleClass {
    private String mResult = "";
    private SampleApi mSampleApi = new SampleApi();

    public sampleFunc() {
        // 一本のObservableを作成する
        // (Javaはラムダ式については型推論するため、これはObservable<String>型になる)
        Observable.create(emitter -> {
            // コールバックを作成し、emitterを登録した後にAPIを実行する
            // これにより、onResponseが発火し、onNextが実行された際に次のステップに進める
            ICallback mCallback = new TestCallback();
            mCallback.setEmitter(emitter);
            mSampleApi.sendRequest(mCallback);
        // subcribe以降にObservableのメソッドチェーンで受け渡されたデータについての最終処理を記述する
        }).subcribe(data -> {
            // ここでdataはString型であることに注意(ObservableEmitter<String>からonNextで流されたデータ)
            mResult = data;
        });
    }
}

一見するとDIするためのコードがTestCallbackに増えたようにしか見えませんが、これにより

  • SampleClassに依存せずSampleApi#sendRequestできる
  • コールバックを要請するメソッドとコールバック結果が戻る場所が近いので分かりやすい
  • 一度Observableにしてしまえば様々な加工が行いやすい

などの利点が生まれます。言うなればObservableEmitterは伝令役であり、「ObservableEmitterからデータがonNextで流れてきた際に以降の処理を実行する」といった動きになります。

例2:重い処理を並行処理する

例2-1:指定したスレッドへ割り振り

 「ある処理が重いので別スレッドで動かしたい」という要請は、非同期処理じゃない流れでもよくあります。
 特にGUIアプリケーション(スマホ含む)の場合、UIスレッドがUI描画以外の処理に時間を取られすぎると画面がフリーズしてしまいます。
 これがWindowsだと「画面がフリーズした」だけで済みますが(UX的には好ましくない)、例えばAndroidだと「アプリケーションが応答していません」といったダイアログが表示され、ユーザーがアプリをその場で終了させるかもしれないといった可能性も考えられます。
 更に、Android9.0になると**アプリケーションが応答しないとシステム側で強制終了する**ようにできるので、この対策はもはや義務と言ってもいいでしょう。

 そこで登場するのが「別のスレッドで動かす」といった概念です。ワーカースレッドとも言われるそれは、例えばAndroidだとThreadHandlerAsyncTaskIntentServiceを利用して使用することになります。
 ところがRxJavaを使うと、「処理を特定のスレッドに割り振る」といった操作を簡単に行うことができます。

Observable.create(emitter -> {処理1})
    .subscribeOn(Schedulers.computation())
    .observeOn(Schedulers.io())
    .map(data -> {処理2})
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> 処理3});

上記コードの場合、

  • 処理1は計算用のバックグラウンドスレッド
  • 処理2はI/O操作用のバックグラウンドスレッド
  • 処理3はメインスレッド(UIスレッド)

で実行されます。嫌がらせかと思うレベルで紛らわしいですが、

  • subscribeOnの引数には最初のObservable内の処理に使用するスレッド
  • observeOnの引数にはそれ以降の処理に使用するスレッド

を指定できます(※observeOnはスレッド切り替え用に何度も書けますが、subscribeOnは最初の1つだけ有効になります)。subscribeOnおよびobserveOnの中身に書けるスレッドとしては

名称 種別 意味
Schedulers.computation() バックグラウンド 計算用
Schedulers.io() バックグラウンド I/O操作用
Schedulers.immediate() メイン Observable#subcribeを使用する現在のスレッド
Schedulers.newThread() バックグラウンド 新規にスレッドを作成する
AndroidSchedulers.mainThread() UIスレッド

などがあります。名前から察せられるように、AndroidSchedulersはRxJavaというよりRxAndroid内に用意されているクラスですので、使用する場合はRxJavaだけでなくRxAndroidも導入しましょう(Android開発での話)。

例2-2:直列化

 また、上記のように単一の処理1→処理1の結果を受け取って処理2→……と進む場合は実行順序に迷うことはありませんが、起点となるObservableやメソッドチェーンの途中のObservableで複数のデータを流す場合は注意が必要です。

Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
}).flatMap(data -> {func(data)})
.subscribe(result -> 処理3});

 例えば上記のようなコードがあった際、funcメソッドはdataが1・2・3の場合で計3回呼ばれます。
 ここでfuncがObservableを返す……つまり非同期なコードだった場合、func(1)が完了する前にfunc(2)やfunc(3)が呼ばれる可能性があります。
 それが嫌な場合、それぞれの非同期処理を別個のObservableとして用意し、concatWithメソッドで直列に並べることで対処します。

Observable.create(emitter -> {
    emitter.onNext(1);
    emitter.onNext(2);
    emitter.onNext(3);
    emitter.onComplete();
})
.toList()
.flatMap(list -> {
    Observable<Integer> task = null;
    for (int x :list) {
        if (task == null) {
            task = func(x);
        } else {
            task = task.concatWith(func(x));
        }
    }
    return task;
})
.subscribe(result -> 処理3});

補足1:
 mapとflatMapの違いですが、雑に言えば前者は同期処理のメソッドしか無い時、後者は他のObservableなメソッドも使う時。ちゃんとした使い分けについては参考資料を参照してください
補足2:
 toList()できるのは、Observableの上流からonNextで返ってくる個数が有限個の場合だけです。無限個返される可能性がある際の対策は参考資料を参照すること

例2-3:並列処理の同期

 直列化とは逆に、「複数のデータが揃った際に次の処理を行う」ことをしたい場合があります。
 例えば、「データAとデータBのObservableからの戻り値の合計を求める」処理は次のように書けます。

Observable<Integer> obsA = Observable.create(emitter -> {}));
Observable<Integer> obsB = Observable.create(emitter -> {}));
Observable<Integer> obsC = Observable.zip(obsA, obsB, (d1, d2) -> d1 + d2);

ここではzipメソッドを使用しましたが、combineLatestというメソッドもあります。違いとしては、

  • zipは引数にした各Observableから1個づつ使用する
  • combineLatestは、どれかのonNextが実行されるたび、他のデータは「最後に出力されたもの」が使用される

といった感じです。例えば0・2・4秒目の時点でobsAから1・3・5といった値が出力され、1・3・5秒目の時点でobsBから7・8・9といった値が出力される場合、obsCからは

  • zip……各Observableから1個づつ使用するので、1+7=8・3+8=11・5+9=14が返ってくる
  • combineLatest……他のデータは「最後に出力されたもの」が使用されるので、1+7=8・3+7=10……5+9=14が返ってくる
タイミング 0秒 1秒 2秒 3秒 4秒 5秒
obsA 1 3 5
obsB 7 8 9
obsC(zip) 8 11 14
obsC(combineLatest) 8 10 11 13 14

参考資料

14
24
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
24

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?