Help us understand the problem. What is going on with this article?

RxJava+RetrofitでAPI通信周りを実装するうえで最低限の知識を30分で詰め込む

More than 1 year has passed since last update.

前提

いま参加しているプロジェクトのネットワーク通信部分を大幅に書き換える必要があり、次のような理由からRxJavaの採用を検討した。

  • 複数のAPI呼び出しを並行でやったり待ち合わせて逐次処理したり使い分ける必要がある
  • 複雑な一連の呼び出しをリトライやエラーハンドリングを考慮して書くことが簡単にできそう
  • 各社の採用実績から思いとどまる理由がない
  • 筆者はRxJava(1)を業務で使ったことがある

RxJavaで書き直した結果これらの要求を非常に簡単に満たすことができたが、不慣れなメンバーには「いつ非同期処理が開始されたのか分かりにくい」ことが判明した。

本ドキュメントは元々社内Qiita Teamに書き始めたものであるが、世の中の同じようなチームにも益があるかも知れないと考え公開することにした。

なお、現在RxJavaはバージョン2なのでとくに断りなくRxJava2(2017年6月時点の最新版)を前提として書いている。またJava8のラムダ式を使っている。Kotlinは弊社プロジェクトに取り込めていないので使っていない。

また、これは予防線であるが、「最低限の知識をパパッと詰め込む」という点に注力しているので、割り切って説明を省いている部分が多い。(たとえば後述するFuture(Promise)はObservableの一面にしかすぎないかもしれないが、あえて言い切ることで単純化を試みた)
理解の取っ掛かりになりさえすれば本望。

Observableとは何か

Observableとは Future(Promise) である。まだ完了していない計算結果へのプロキシオブジェクトである。
また、Future同士をつなぎ合わせてパイプライン化(要するに逐次処理)できる。未来に起こることを順番に繋げられるということ。

「まだ完了していない」というのがポイントで、値や値を変換するための手続きを (将来確定する) という文脈につつんだまま受け渡しできる。したがって、Observableオブジェクトを引数や返り値で受け渡ししてもその時点では何も起きていないんだということを理解する必要がある。

しつこいけどObservableそれ自体を引き渡したり連結したりしてもその時点ではまだ何も起こっていない。Observableは将来なにかが起こるという設計図にすぎない。そこからデータをくれと言ったときに(後述するが、RxJavaの場合は subscribe メソッド呼び出し)はじめてデータが流れてくる。

これは受け売りであるが、Observableは水道の配管の図面のようなものだ。水がどうながれるか定めているが、そこに水はまだ流れていない。蛇口を捻った瞬間に実際の中身が流れ、指示されたとおり形を変え、目的の場所にたどり着く。

Observable

Retrofitで初めてのObservable

Observableの作り方を覚えるのはもう少し後でよい。まずはとりあえずObservableを使ってみよう。
RetrofitはJavaやAndroidで利用できるRESTクライアントである。これを使って簡単にObservableを試すことができる。

Retrofit準備

インストール。

def retrofit_version = '2.3.0'
compile "com.squareup.retrofit2:retrofit:${retrofit_version}"

次のように使う。

1) BASE URLを定めてRetrofitインスタンスを作る。

Retrofit retrofit = new Retrofit.Builder()
    .baseUrl("https://api.github.com/")
    .build();

2) REST APIに合わせてアノテーションを使って宣言的にAPI呼び出しを設定する。

public interface GitHubService {
  @GET("users/{user}/repos")
  Call<List<Repo>> listRepos(@Path("user") String user);
}

3) 先のRetrofitインスタンスを使って呼び出す。

GitHubService service = retrofit.create(GitHubService.class);
Call<List<Repo>> repos = service.listRepos("srym");

// synchronous call
Response<List<Repo>> response = repos.execute();

// asynchronous call
repos.enqueue( /* callback here */ );

見ての通り execute は同期呼び出し、 enqueue は非同期呼び出しである。

上のコードに出てくる retrofit2.Call<T> はソースを見てもらえば分かるが okhttp3.Call とほとんど同じで型引数を与えたようなものだ。OkHttp に馴染みのある方はすぐに使い方が想像つくだろう。

で、Retrofitは CallAdapter という仕組みでAPI呼び出しの返り値をObservableとして受け取ることができる。次のようにする。

1) インストール

// CallAdapter for RxJava
compile "com.squareup.retrofit2:adapter-rxjava2:${retrofit_version}"
// GSON converter
compile "com.squareup.retrofit2:converter-gson:${retrofit_version}"

// Explicitly install RxJava
compile "io.reactivex.rxjava2:rxjava:2.1.1"
// for Android
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

2) RetrofitインスタンスにCallAdapterを設定

Retrofit retrofit = new Retrofit.Builder()
    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
    .addConverterFactory(GsonConverterFactory.create(gson))
    .baseUrl("https://api.github.com/")
    .build();

3) 返り値をObservableに変更

public interface GitHubService {
  @GET("users/{user}/repos")
  Observable<List<Repo>> listRepos(@Path("user") String user);
}

これだけだ。早速使おう。

Observableからデータを取り出す

Observableからデータを取り出すのは簡単だ。まず例示し、順次解説する。

Observable<List<Repo>> repos = service.listRepos("srym")
repos
    .subscribe(
        list -> doSomethingToList(list),
        throwable -> Timber.d(throwable.getMessage(), throwable),
        () -> Timber.d("complete")
    );

Observable#subscribe() メソッドを呼び出した瞬間(購読という)に非同期通信が走り、データが流れてくる。

最初の引数のラムダ式 list -> doSomethingToList(list) は成功パターンだ。ここに流れてくるのは List<Repo> の具象型である。好きに加工するがよろしい。

2つ目の引数のラムダ式 throwable -> Timber.d(throwable.getMessage(), throwable) は失敗パターンだ。任意のエラーハンドリングをするがよろしい。

3つ目の引数のラムダ式 () -> Timber.d("complete") はこのObservableがすべての処理を終えこれ以上なにも返してこないときに呼ばれる。ここでは特に何もしていない。

onNext, onError, onComplete

少しずつObservableの深い所に入っていく。

冒頭でObservableはFuture(Promise)だと言い切ったが、将来に渡ってデータを送り続けてくる可能性がある点が少し異なる。たとえばユーザのタッチイベントをObservableで表現した場合、ユーザがスマホをタッチしつづける限りタッチイベントはずっと飛んでくるだろうし終わりがない(言い換えるとこのObservableはアプリ実装者からはタイミングが読めないPUSH型といえる)。
また、API通信のように要求したタイミングで1回だけデータが飛んでくるというものもあるだろう(こちらはPULL型)。

Observableはそのどちらも表現できるように次の3つのイベントコールバックを用意している。

onNext

Observableを購読して、データが正常に送られてきたときに呼ばれる。 ラムダ式の引数にはこのObservableの持つ(プロキシする)データ T がひとつコールバックされる。先程のsubscribeの第1引数に相当。
onNextは、こちらが購読をやめるまでの間、このObservableがデータを放出しつづける限り呼ばれる。データのある限りと言っても良い。

onComplete

Observableを購読してonNextでデータが運ばれてくるが、もうこれ以上送り届けるデータがないときに呼ばれる。先程のsubscribeの第3引数に相当。
たとえばAPI通信の場合は、onNextが1度だけ呼ばれてHTTP通信の結果が届けられるとそれ以上届けるものがないのでつづけてonCompleteが1度呼ばれるだろう。

onError

何らかの理由でObservableの中で例外が上がった場合に呼ばれる。ラムダ式の引数にはその例外がひとつ渡される。先程のsubscribeの第2引数に相当。
例外の種類によってエラーハンドリングを変えたい場合は instanceOf 等で分岐する。
なお、onErrorとonCompleteは互い排他的なのでonErrorが呼ばれた場合はonCompleteが呼ばれることはないしその逆も同様である。

詳しくは The Observable Contract に書いてある。

Observableの種類

HTTP GETのような「成功パターンは必ずonNextにデータがひとつだけ運ばれて、その後即座にonCompleteが呼ばれる」ような性質のものは、onNextさえ受け取れれば成功と見做すことができる。
同様に、レスポンスとして返すものがないHTTP POSTのような通信はonNextで受け取るデータがなく、onCompleteかonErrorかどちらかの可能性しかない。

RxJavaにはこういった場合にうってつけのデータ型が用意されている。

Single

onSuccessかonErrorのどちらかが1回だけ呼ばれる。
HTTP GETのように成功してレスポンスを1度だけ受け取るか、または失敗するようなものに向いている。

Completable

onCompleteかonErrorのどちらかが1回だけ呼ばれる。
レスポンスが空のHTTP POSTなどに向いている。

Maybe

onSuccessかonCompleteかonErrorのどれか1つが1回だけ呼ばれる。
用途はObservableの作り次第だが、成功時はキャッシュの有無によってonSuccessとonCompleteを返しわけ、失敗時はonErrorというような場合に使えるかもしれない。

参考: RxJava2.0 Observable, Single, Maybe, Completableの使い分けメモ

これまで例に出してきたAPI通信は次のようにSingleに書き換えた方が意図がわかりやすいだろう。
(後述するオペレータも、よりSingleに最適化されたものが提供されている)

public interface GitHubService {
  @GET("users/{user}/repos")
  Single<List<Repo>> listRepos(@Path("user") String user);
}

スケジューラ

Retrofitの Call<T> を紹介した時に同期呼び出しと非同期呼び出しの2つのマナーを紹介したが、Observableの場合はどうだろうか。Observableはスケジューラという仕組みで処理を実行するスレッドを指定する。

指定する対象は2箇所、「Observableがデータを生産する処理が行われるスレッド」と「処理した結果を購読者が受け取る(または加工する)スレッド」である。
まずは例をお目にかけよう。

service.listRepos("srym")
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(list -> Stream.of(list).forEach(repo -> Timber.d("repo: " + repo.getFullName())));

subscribeOn

subscribeOn() で指定するのがこのObservableがデータを生産するスレッドである。この例ではつまりAPI通信をするスレッドを指定するということになる。
Schedulers.io() は非同期のブロッキングI/Oに向いたスケジューラを指定している。つまりここではAPI通信を非同期に実行するという意味になる。

observeOn

observeOn() で指定するのがこのObservableが吐き出したデータを受け取って加工する場所を指定するスレッドだ。
observeOnで明示的に指定しない限り、デフォルトでは購読者のスレッドはObservable(データ生産者)が実行されるスレッドと同じになる。

この例では AndroidSchedulers.mainThread() としてAndroidのUIスレッドで結果を受け取っている。典型的なAndroidアプリでは非同期通信結果をもってUIを更新したりする。その際にUIスレッド以外でUIの更新ができないためだ。

subscribeOn, observeOn 補足

subscribeOn() はひとつのObservableに対し複数指定しても最初に指定したものしか使われない。

observeOn() は複数指定できる(指定する意味がある)。
次のように指定した場合、

// notice this is a pseudo code
.observeOn // 1
.filter
.observeOn // 2
.map
.observeOn // 3

1〜2の間にある filter オペレータは1で指定されたスケジューラが、2〜3の間にある map には2で指定されたスケジューラが、3以降のオペレータは3で指定されたスケジューラが使われる。

まだオペレータについて解説していないので読み飛ばして良い。あとで戻ってきて欲しい。

よく使うスケジューラ一覧

スケジューラ 用途
Schedulers.io() 非同期なブロッキングI/Oに向いたスケジューラ。内部的にはスレッドプールが作られワーカを再利用する。ネットワーク通信等に使われることが多いが、必要に応じてスレッドプールを大きくしていくのでOutOfMemoryErrorに注意する。
Schedulers.computation() CPUをガンガン使う計算に向いたスケジューラ。ブロックするI/O処理に向かない。ワーカ数は固定(デフォルトで Runtime#availableProcessors() と同数)なので枯渇するとブロックする点に注意。
Schedulers.newThread() タスクごとに新しいスレッドを立ち上げるスケジューラ。従ってioスケジューラ同様無制限にタスクが増えすぎてパフォーマンスの低下やOOMに注意する。
Schedulers.trampoline() FIFOでキューに積まれた順にタスクを実行するスケジューラ
AndroidSchedulers.mainThread() タスクをAndroidのメインスレッドで実行するスケジューラ

参考: Understanding RxJava Scheduler

オペレータ

オペレータとはObservableを作ったり加工したり変換したりするための命令である。
膨大なオペレータが存在するが、ここでは割り切って極々々々少数のみ紹介する。

just

justは T からObservableを作る。 T は10個まで指定可能。
10個以上のTは fromArray オペレータを使う。 List等から変換する fromIterable というのもある。

Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(name -> Timber.d("name: " + name));

map

mapはObservableの中身を T から U へ1対1に変換するオペレータ。どういうことか?
たとえば次の例では人名のStringを文字数のIntegerに変換する。

Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
    .subscribeOn(Schedulers.io())
    .map(name -> name.length()) // String::length
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(length -> Timber.d("length: " + length));

この例ではコンソールに length: 5, length: 3, length: 7 と出力される。

reduce

mapと対になるオペレータで、Observableの中身を畳み込む。次の例では人名の文字数を足し合わせる。

Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
    .subscribeOn(Schedulers.io())
    .map(name -> name.length()) // String::length
    .reduce((accum, length) -> accum + length)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(length -> Timber.d("total length: " + length));

この例では total length: 15 とコンソールに出力される。

ちなみに似たようなオペレータに scan がある。
まったく同じように scan((accum, length) -> accum + length) のように使うが、こちらは畳み込み関数が評価されるごとにonNextにデータが流れる。

今回の例では足し上げるごとに total length: 5, total length: 8, total length: 15 と出力される。

flatMap

独断と偏見で選ぶ、もっとも大切なオペレータ。

Observable<T>Observable<U> に変換するが、中身は1対1の変換に限らない。どういうことか?
例えば次の例はObservableの中にListそのものが要素として2つ入っている(いわばListの中に入ったList)。
ここではそれをぺったんこにしてListの中身がひとつづつonNextに流れてくるようなObserbableに変換している。

Observable<List<String>> listInList = Observable.just(
    Arrays.asList("Alice", "Bob", "Charlie"),
    Arrays.asList("Dave", "Ellen", "Frank")
);
listInList
    .subscribeOn(Schedulers.io())
    .flatMap(list -> Observable.fromIterable(list))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(name -> Timber.d("name: " + name));

元はListが2つ入ったObservableだったのが2つのListの要素がすべて入ったObservableに変換されている。
これはコンソールにAlice〜Frankまでの名前を順に出力する。
まさに flatten して map するイメージ。伝われ!(祈り)

もうひとつflatMapの重要な役割として、Observable同士の逐次実行がある。
次のように、「まずAPI Aを呼び出し、その結果をもってAPI Bを呼び出す」というような処理をひとまとめにするのにflatMapを使う。

service.apiA
    .subscribeOn(Schedulers.io())
    .flatMap(result -> service.apiB(result.getFoo()))
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result.getBar()));

zip

これも弊社で使っているというだけで取り上げるオペレータ。
zipは複数のObservableの完了を待ち合わせ、それを処理する関数に渡して変換するためのオペレータ。

Observable.zip(service.apiA, service.apiB,
    (resultA, resultB) -> doSomething(resultA, resultB))
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result));

この例では service.apiAservice.apiB の結果はそれぞれ (resultA, resultB) -> {} のように同数の引数をとるラムダ式に渡されるので好きなように加工する。
複数のAPI呼び出しを並行に走らせて、尚且つすべての成功結果を待ち合わせてUIを変更するといった用途で非常に役に立つ。

なお、上の例は

service.apiA.zipWith(service.apiB, (resultA, resultB) -> resultA + resultB)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result));

と等価である。

このようにObservable同士を結合するオペレータは他にも

  • merge
  • concat
  • combineLatest

等があるので適宜調べてみて欲しい。

filter

onNextに流すデータを条件式でフィルタするためのオペレータ。
次の例では文字数が3以下の名前をフィルタリングしている。

Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
    .subscribeOn(Schedulers.io())
    .filter(name -> name.length() > 3)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(length -> Timber.d("long name: " + length));

コンソールには Alice, Charlie のみが出力される。

retry

リトライしてくれる。 .retry(3) と書いておけば3回までonErrorを捕まえてリトライし、4回目でもエラーの場合にonErrorにエラー理由を出力して停止する。

他にもリトライ条件を渡す retry(Predicate) や、終了条件を満たすまでリトライし続ける retryUntil(BooleanSupplier) 、より細かいエラーハンドリングが可能な retryWhen 等のバリエーションがある。 (retryWhen 参考リンク)

その他のオペレータ

他にも数え切れないほど重要なオペレータがあるが何もかも紹介するわけにはいかないので下記公式サイトを参照して欲しい。

参考) ReactiveX - Operators

購読解除

Android技術者ならば、非同期に実行した Observable#subscribe が未完了のままユーザが画面を去るときのことが気になって仕方がないはずだ。
RxJavaにはこのために外部から購読を解除するための機構が備わっている。

Disposable

subscribeメソッドは返り値としてDisposableを返す。
Disposable#dispose() を呼ぶと以後のイベントは通知されて来なくなる。1

Disposable disposable = service.apiA
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result));

disposable.dispose();

Activity等のフィールドに持っておき onDestroy() 等で破棄するのが典型的な使い方だろう。

CompositeDisposable

複数のDisposableをまとめて購読を解除するためのCompositeDisposableも提供されている。
次のように使う。

CompositeDisposable compositeDisposable = new CompositeDisposable();

Disposable disposable = service.apiA
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result));
compositeDisposable.add(disposable);

// non-reusable
compositeDisposable.dispose();
// reusable
compositeDisposable.clear();

こちらも典型的にはフィールドに持っておき、画面を去るタイミングでまとめて購読を解除する使い方が一般的だろう。

ちなみに、 CompositeDisposable#dispose してしまうと以後このCompositeDisposableは再利用できない(addしても即座にdisposeが呼ばれ、偽が返る)。したがって onResume/onPause で再利用するようなことができない。

この用途の場合、CompositeDisposable#clear が使える。
clearの場合はそのタイミングで積まれたDisposableはすべてdisposeするものの再びaddを受け付けることができる。

副作用

doOnXXX, doAfterXXX らのオペレータは、onNext, onComplete, onErrorなどの「前後」に何か処理をはさみたい場合(いわゆる副作用)に利用できるオペレータである。

これらのオペレータは値を返さないため、変換に使われるわけではない。
典型的にはロギング用途や、サーバから取得したデータをローカルストレージに保存しつつ取得したデータそのものはonNextに流すといった使われ方が考えられる。

doOnNext, doOnComplete, doOnError

それぞれ onNext, onComplete, onError などの対応するコールバックにデータが流れる前に呼ばれる。
例えば次の例では、文字数が3文字以下の名前をフィルタリングしてonNextに流す前に、確認の便宜のためにフィルタ前の名前をロギングしている。

Observable<String> names = Observable.just("Alice", "Bob", "Charlie");
names
    .subscribeOn(Schedulers.io())
    .doOnNext(name -> Timber.d("original name: " + name))
    .filter(name -> name.length() > 3)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(length -> Timber.d("filtered name: " + length));

前述したが、doOnNextはAPI越しに取得したデータをローカルストレージに保存するのに良いタイミングだ。

service.apiA
    .subscribeOn(Schedulers.io())
    .doOnNext(this::saveResult)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result));

なお、Singleの場合はonNextは1つと決まっていることからより適切な doOnSuccess という専用のオペレータが用意されている。

また、doOnErrorはonErrorの前に呼ばれるがエラーをキャッチしてonErrorを回避するような効果はない。
そういったことがしたい場合は後述するエラーハンドリングを参照して欲しい。

doAfterNext, doAfterComplete, doAfterError

データが onNext, onComplete, onError などの対応するコールバックに流れた後に呼ばれる以外はdoOnXXXと同様なので省略。

エラーハンドリング

Observableとそのチェインの途中で発生したエラーはonErrorに届くが、場合によってはエラーで停止せず代替データを返す等のエラーハンドリングをしたいケースもあるだろう。

onErrorReturnItem

エラー時にデフォルトのデータ T を返す。

service.apiA
    .subscribeOn(Schedulers.io())
    .onErrorReturnItem(defaultResult)
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result));

onErrorReturn

エラー時に任意のハンドリングをして必要に応じてデフォルトのデータ T を返す。

service.apiA
    .subscribeOn(Schedulers.io())
    .onErrorReturn(throwable -> {
        if (throwable instanceof IllegalStateException) {
            return defaultResult;
        }
        throw new RuntimeException(throwable);
    })
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(result -> Timber.d("result: " + result));

onErrorReturnItemがそのまま T を返すのに対し、こちらではラムダ式で任意の処理や条件分岐を挟むことができる。

onErrorResumeNext

エラー時に任意のハンドリングをして Observable<T> を返す。どういうことか?
たとえばデフォルトではローカルストレージからデータを読み出すが、ローカルにデータがない場合のみサーバに問い合わせて返し、以後はローカルキャッシュを返すというような非常に実践的なビジネスロジックが簡単に構築できる。

Single<MyData> getMyData(String id) {
  return getFromLocal(id)
      .onErrorResumeNext(
          getFromRemote(id)
              .doOnSuccess(this::saveToLocal)
      );
}

この例では getFromLocal にデータがない場合に getFromRemote で取得して利用者に返しつつ、前述の doOnSuccess でローカルにキャッシュを保存している。
getMyData メソッドの利用者はデータがローカルにあるかリモートにあるか気にすることなく取得することができる。

同期呼び出し

冒頭でObservableはまだ完了していない計算結果へのプロキシだと述べた。
データはsubscribeした瞬間流れ始めるが、あくまで結果はonNext等のコールバックで受け取るものであり、同期呼び出しではない。2

ただ、 Observable<T> から強制的に同期的に T を取り出すためのAPIは用意されている。

blockingFirst, blockingLast, blockingIterable

それぞれ、Observableの最初の要素、最後の要素、Iterableとしてすべての要素を同期的に取り出せる。
返り値はもちろん直接TまたはIterable<T>となる。

MyData myData = service.apiA
                       .blockingFirst();

なお、Singleの場合は要素がひとつなので blockingGet() というAPIが用意されている。

テスト

Observableのテストは一般的にやりづらい。
この理由は、同期呼び出しの項で述べたように、subscribe メソッドで購読を開始してもonNextやonErrorは非同期にコールバックされるためだ。2

たとえば次のテストケースはmap内で必ず IllegalStateException が上がるが、実行するとテストケースは成功してしまう。

@Test
public void invalidTestCase() throws Exception {
    githubRepository.listRepos("srym")
            .subscribeOn(Schedulers.io())
            .map(list -> {
                // always throws Exception
                if (true) throw new IllegalStateException();
                return list;
            })
            .subscribe(list -> assertThat(list).isNotNull().isNotEmpty());
}

これは assertThat(list).isNotNull().isNotEmpty() が評価される前にこのメソッドは最後まで走ってしまうのでJUnit的には成功と見做すためだ。

TestObserver

RxJavaにはTestObserverというテストに便利な購読者が用意されている。
TestObserverはObservableやSingleの test() メソッドで作成することができる。

List<Repo> repos = githubRepository.listRepos("srym")
        .subscribeOn(Schedulers.io())
        .test() // get TestObserver
        .await() // wait until onComplete or onError
        .assertNoErrors()  // confirm no error
        .assertComplete()  // confirm complete
        .values().get(0);  // fetch data
assertThat(repos).isNotEmpty();

TestObserver#await で、データソースであるObservableがonComplete/onErrorになるまで待たせることができる。

また、TestObserverには assertComplete() / assertNotCompleteassertError(Throwable) / assertNoErrors のような各種アサーションメソッドが生えており、正しく完了したのか失敗したのかを簡単に検証できる。

Observableから排出されるデータでonNextを通ったものはすべて記録されており、 values() で同期的に取り出すことができる。
同期的に取り出してしまいさえすればあとは通常のAssertionが可能である。

参考1) RxJava のテスト(1): TestSubscriber, test(), TestScheduler
参考2) RxJava のテスト(2): RxJavaHooks, RxAndroidPlugins

ObservableとFlowable

ここまで敢えて触れてこなかったが、Observableにはほとんど同じような機能とオペレータを提供し、対になるFlowableというものがある。
両者の違いはすばり「バックプレッシャー」の有無である。

バックプレッシャー

バックプレッシャーとは、データを送る側(生産者)とデータを受け取る側(消費者)との間で、著しく消費スピードに差がある場合のバッファリングや次のデータの要求のための仕組みである。

なんと、ここでは詳しく触れない!Retrofitが返すのはObservableだけだからだ。
しかしながら、FlowableはReactive Streamsという仕様に則っており、これに準拠している限りにおいてその他の準拠API(Java 9 Flow API, Akka Streams, etc...)と相互に連携可能らしい。

このまとめを卒業した人は是非調べてみていただきたい。

参考) What's New in RxJava 2
参考) What's different in 2.0

これらの参考リンクには自分でFlowableまたはObservableを作るときに、どのケースでどちらを使うべきかの指針が書いてある。

まとめ

RxJavaのObservableは概念から包括的に理解しようとすると馴染みのないメンバーはつまづきがちという経験から、敢えて現場で利用するRetrofitから逆算して使う部分のみの説明にとどめた。

このまとめは従ってかなり雑なまとめである感は否めないが、ここを出発点として 公式サイト を読みながら理解を深めて欲しい。

最後に、詳しい内容は原点にあたるにしくはないのであるが、日本語で手っ取り早く体系的な理解を得るには次の書籍が参考になった。

参考書籍) RxJavaリアクティブプログラミング

適宜加筆修正する。また、編集リクエストを歓迎する。

以上。


  1. ちなみに「通知されて来なくなる」ことと「非同期通信のキャンセルまで行ってくれる」ことは別問題である。Retrofitで RxJava2CallAdapter を使ってObservableに変換している場合、非同期通信の Call<T> は内部的には CallEnqueueObservable<T> に変換され、dispose() 時に call.cancel() される。結果的に retrofit2.Call#canel() から okhttp3.Call#cancel() にdelegateされるので、Retrofitで作ったObservableはdisposeすると非同期処理もキャンセルされると言える。 

  2. 正確にはスケジューラ次第 

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした