概要

RxJavaで書かれたソースを初めて見ると、処理の流れが追いづらく、
本来理解したいロジックの部分の理解に入れないことが多々あります。
(少なくとも私はそうでした...)

本記事は、RxJavaの主要クラスと主要メソッドである、

  • Completable
  • Single
  • Observable
  • subscribeOn
  • observeOn
  • map
  • flagMap

を、実際にコードを動かしながら処理を追うことで、
読者の皆様を『読める!RxJava』にします。

対象

  • RxJavaって何、美味しいの?
  • RxJavaで書かれている処理を見たけど理解不能だった
  • 使いたいライブラリがリアクティブで困ってます
  • 『RxJavaでググったけど...』

observable.png

『こんな図ばっか出てきて意味分からない!!』
なあなたに送ります。

※この図自体が悪いわけでは無いですが、初学者向けでは無いです。
(英語だし...)

実行環境

  • Java8 (以上)
  • RxJava 2.2.0-SNAPSHOT

gradleの場合は以下を記述するだけでOKです。

build.gradle
repositories {
    mavenCentral()
    maven { url 'https://oss.jfrog.org/libs-snapshot' }
}

dependencies {
    compile 'io.reactivex.rxjava2:rxjava:2.2.0-SNAPSHOT'
}

それ以外の方は以下を参照してください。
https://github.com/ReactiveX/RxJava

尚、前提知識として、ラムダを利用します。
ラムダが不安な方は以下等で習得してください。
https://qiita.com/sanotyan1202/items/64593e8e981e8d6439d3

サンプルコード

サンプルコードは以下にあります。
そのまま動くようになっているので、是非実際に動かしてみてください。

https://github.com/youyanntoto/RxJavaSample

尚、本編はサンプルコードを中心に説明していきます。

本編

サンプルコードの処理内容について

Main.java
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
        RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
        RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);

        System.out.println(tokyoWeather);
        System.out.println(yokohamaWeather);
        System.out.println(nagoyaWeather);

        System.out.println("*** main end ***");
    }
RestUtil.java
    public static Weather getWeather(Place place) {
        Weather weather;

        // 場所ごとに適当に天気を取得
        switch (place) {
            case TOKYO:
                weather = Weather.SUNNY;
                break;
            case YOKOHAMA:
                weather = Weather.RAINY;
                break;
            case NAGOYA:
                weather = Weather.WINDY;
                break;
            default:
                weather = Weather.SUNNY;
                break;
        }

        try {
            // 通信処理時間として500 ~ 999msかかるようにする
            Thread.sleep(new Random().nextInt(500) + 500);
        } catch (InterruptedException e) {
            // nop
        }

        return weather;
    }

Mainクラスを見てください。
Rest通信(RestUtil#getWeather(RestUtil.Place))を行って、
指定地域の天気(RestUtil.Weather)を取得する処理を行います。

RestUtilクラスの中を見て頂ければ分かりますが、通信処理自体はダミーになっています。

RestUtil.getWeatherはそれぞれ500 - 999ms時間がかかるようになっています。
なので、Main.javaを実行すると、500 - 999ms × 3の1500 - 3000ms程度かかります。

それでは早速上記コードをリアクティブにしてい行きましょう。

その1. 完了を伝える "Completable"

サンプルコード

CompletableSample.java
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Completable.create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);

            System.out.println(tokyoWeather);
            System.out.println(yokohamaWeather);
            System.out.println(nagoyaWeather);
            emitter.onComplete();

        }).subscribe(() -> {
            System.out.println("Complete!!");

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        });

        System.out.println("*** main end ***");
    }

解説

まず、Completable.createの引数(のラムダ内)の処理が実行されます。
中の処理自体は一番初めのMain.javaの処理と同様です。

次に、emitter.onComplete()
を呼びだすことで、subscribeの第一引数が実行されます。

ちなみに、subscribeの第二引数はemitter.onError()を呼び出した際、
もしくは、Completable.createの引数のラムダ内でエラーをthrowした際に実行されます。
そこで渡されるthrowableがエラーの際に発生したエラーオブジェクトになります。

まとめると、
Completable.createの引数の処理
→ subscribeの第一引数(onSuccess)の処理 or 第二引数(onError)の処理
という流れで処理が実行されます。

その2. 値を一つ渡す "Single"

サンプルコード

SingleSample.java
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Single.<List<RestUtil.Weather>>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onSuccess(List.of(tokyoWeather, yokohamaWeather, nagoyaWeather));

        }).subscribe(weathers -> {
            System.out.println("Complete!!");
            for (RestUtil.Weather weather : weathers) {
                System.out.println(weather);
            }

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        });

        System.out.println("*** main end ***");
    }

解説

その1のCompletable同様に
Single.createの引数の処理
→ subscribeの第一引数(onSuccess)の処理 or 第二引数(onError)の処理
という流れで処理が実行されます。

違う箇所は、天気を出力している場所です。

Singleでは後続の処理に一つだけ値を渡すことが出来るので、
それぞれの地域の天気をリストにして
emitter.onSuccess(T)
を呼びだすことで、subscribeの第一引数の処理に渡しています。
尚、そこで渡される引数の型をSingle#createの際のジェネリクスで指定しています。

JavaScriptでPromiseを扱ったことがある方は、イメージが近いと思います。

その3. (番外編) "Stream"

サンプルコード

StreamSample.java
/**
 * sample for Stream
 */
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
        RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
        RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);

        Stream.of(List.of(tokyoWeather, yokohamaWeather, nagoyaWeather))
                .forEach(weather -> {
                    System.out.println(weather + "");
                });

        System.out.println("*** main end ***");
    }

解説

Observableを説明する前に、Streamを少し学びます。
Streamを既に理解している人は読み飛ばして下さい。

では、処理の流れですが、
まずはそれぞれの地区の天気を取得し、リストを生成、
それをStreamで流してそれぞれの地区の天気を出力しています。

値が順次流れてくる形が次に説明する、Observableに近いですので、
処理の流れをしっかり理解して下さい。

その4. 値を複数渡す "Observable"

サンプルコード

ObservableSample.java
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Weather>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            emitter.onNext(tokyoWeather);

            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            emitter.onNext(yokohamaWeather);

            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onNext(nagoyaWeather);

            emitter.onComplete();

        }).subscribe(weather -> {
            System.out.println("Next!!");
            System.out.println(weather);

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        }, () -> {
            System.out.println("Complete!!");

        });

        System.out.println("*** main end ***");
    }

解説

今までのCompletableやSingleと違い、
subscribeの引数が3つになっています。

処理の流れとしては
Observable.createの処理が最初に実行されます。
その後、emitter.onNext(T)が呼ばれる度にsubscribeの第一引数が実行されます。
最後に、emitter.onComplete()が呼ばれるとsubscribeの第三引数が実行されます。
第二引数は、従来と同様にエラーが発生した際に呼ばれる処理です。

まとめると、
Observable.createの処理
 → subscribeの第一引数(onNext)の処理 or 第二引数(onError)の処理
 → subscribeの第三引数(onComplete)の処理
という流れで処理が実行されます。

Observableは値が複数渡せるので、通信の都度値を出力出来ています。

その5. スレッドを指定する "subscribeOn", "observeOn"

サンプルコード

ThreadSample.java
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Weather>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            emitter.onNext(tokyoWeather);

            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            emitter.onNext(yokohamaWeather);

            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onNext(nagoyaWeather);

            emitter.onComplete();

        }).subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(weather -> {
                    System.out.println("Next!!");
                    System.out.println(weather);

                }, throwable -> {
                    System.out.println("Error!!");
                    throwable.printStackTrace();

                }, () -> {
                    System.out.println("Complete!!");

                });

        System.out.println("*** main end ***");
    }

解説

その4で解説したソースコードに
"subscribeOn", "observeOn"
が追加されています。

RxJavaのメリットはいくつかあるのですが、
そのうち一番大きなものがこのスレッド指定を行う処理が簡単に書けることです。

"subscribeOn"はObservable.create内の処理のスレッドを指定、
"observeOn"はsubscribe内の処理のスレッドを指定可能です。

上記のサンプルでは通信処理をIOスレッドで行い、
後続の処理(通信結果を利用する処理)は他のスレッドで実行という流れにしています。

よく使われるシーンとして、Android開発では、通信処理を別スレッドで行い、
後続の処理でViewを扱う処理をmainスレッドで行うというのが定石になっています。

その6. 値を変換する "map"

サンプルコード

ObservableMapSample.java
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Weather>create(emitter -> {
            RestUtil.Weather tokyoWeather = RestUtil.getWeather(RestUtil.Place.TOKYO);
            emitter.onNext(tokyoWeather);

            RestUtil.Weather yokohamaWeather = RestUtil.getWeather(RestUtil.Place.YOKOHAMA);
            emitter.onNext(yokohamaWeather);

            RestUtil.Weather nagoyaWeather = RestUtil.getWeather(RestUtil.Place.NAGOYA);
            emitter.onNext(nagoyaWeather);

            emitter.onComplete();

        }).map(weather -> {
            switch (weather) {
                case SUNNY:
                    return "happy day!!";
                case RAINY:
                    return "sad day...";
                case WINDY:
                default:
                    return "normal day";
            }

        }).subscribe(weather -> {
            System.out.println("Next!!");
            System.out.println(weather);

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        }, () -> {
            System.out.println("Complete!!");

        });

        System.out.println("*** main end ***");
    }

解説

その4のソースコードに新たなメソッドmapが追加されました。
実際に動かして貰えると分かりますが、
subscribeの第一引数に渡される値が変換されています。

変換しているのは見ての通り、mapに渡している処理です。

mapは流れてくる値を受け取り、変換後の値をreturn句で返すことで、
後続の処理に流す値を変換することが出来ます。

その7. 繋げる "flatMap"

サンプルコード

ObservableFlatMapSample.java
    public static void main(String[] args) {
        System.out.println("*** main start ***");

        Observable.<RestUtil.Place>create(emitter -> {
            emitter.onNext(RestUtil.Place.TOKYO);
            emitter.onNext(RestUtil.Place.YOKOHAMA);
            emitter.onNext(RestUtil.Place.NAGOYA);
            emitter.onComplete();

        }).flatMap(place -> {
            return Observable.create(emitter -> {
                RestUtil.Weather tokyoWeather = RestUtil.getWeather(place);
                emitter.onNext(tokyoWeather);
                emitter.onComplete();
            });

        }).subscribe(weather -> {
            System.out.println("Next!!");
            System.out.println(weather);

        }, throwable -> {
            System.out.println("Error!!");
            throwable.printStackTrace();

        }, () -> {
            System.out.println("Complete!!");

        });

        System.out.println("*** main end ***");
    }

解説

その4のソースコードに新たなメソッドflatMapが追加され、
一つ目のObservable.create内で渡す値が、天気から地域に変わっています。
そして、flatMap内にもう一つのObservableが生成されています。

処理の流れは、
一つ目のObservableの処理が実行され、
emitter.onNext(T)が呼ばれる度に
flatMap内でreturnされている二つ目のObservable内の処理が行われます。
つまり、flatMapが一つ目のObservableと二つ目のObservableを繋げています。

今回の例では通信箇所が二つ目のObservableのみなので、特にうまみはありませんが、
実際には、一つ目のObservableでDB処理を行い、
二つ目のObservableで通信処理を行うといった
別々に時間のかかる処理を行うシーンで使われます。

終わりに

以上で、『読める!RxJava』は終了です。

RxJavaの世界は広く、未だ成長を続けている分野なので、
今回の記事の内容だけで全てを網羅は出来ませんが、
基本は今回紹介しているクラスおよびメソッドの組み合わせや応用で理解できるはずです。
(少なくとも読めるという意味においては)

これからRxJavaの世界に飛び込んでいく人の少しでも手助けになれれば幸いです。