概要
「RxJavaをどう使えばいいか」について、サンプルケースを元に覚える記事です。
ReactiveExtensionsおよびRxJavaの真面目な(原理から説明するような)解説については、文末の参考資料をご覧ください。
例1:コールバックの使い勝手を良くしたい
Javaで非同期処理を行う際、しばしば「コールバック」というテクニックが用いられます。
具体的には、メソッドの引数に特定のinterface
を継承したインスタンスXを指定することで、「何かあった際にXのメソッドを叩く」ことが可能になります。
(※別にinterface
じゃなくてclass
を継承してもいいですが、多重継承できないJavaの都合から、コールバックを指定する用としてはinterface
がよく用いられます。また、1つのコールバックインターフェースに対応した実装クラスを1つしか作らない場合、インターフェースの定義を継承したクラス内に定義することもあります)
// コールバックのインターフェース
public interface ICallback {
void onResponse(String data);
}
// コールバックを利用するメソッド
public class SampleApi {
void sendRequest(ICallback callback) {
callback.onResponse("test"); // 入れる値の例
}
}
こうした、コールバックを伴う関数を利用する際は、「コールバックのインターフェース」を継承したクラスXを用意し、そちらを「コールバックを利用するメソッド」に食わせることで実現します。また、コールバックの戻り値はX内でどう処理するかを決めます。
// コールバックを利用するメソッドを利用するクラス
public class SampleClass {
// コールバックのインターフェースを継承したクラス
class TestCallback implements ICallback {
@Override
void onResponse(String data) {
mResult = data; // 利用方法の例
}
}
private String mResult = "";
private ICallback mCallback = new TestCallback();
private SampleApi mSampleApi = new SampleApi();
public sampleFunc() {
mSampleApi.sendRequest(mCallback); // ここで呼び出す
}
}
実際にはSampleApi型にシングルトンパターンを適用したり、コールバックをSampleApi型にDIしたりするかと思いますが、どちらにしてもコールバックと呼び出し先のクラスが密結合しているといった問題は避けられません。
そこでRxJavaを使うと、ObservableEmitter
を通じてコールバックのデータ受け渡しが整理できます。
// コールバックのインターフェースを継承したクラス
class TestCallback implements ICallback {
private ObservableEmitter<String> emitter;
// EmitterをDIするためのメソッド
public setEmitter(ObservableEmitter<String> emitter) {
this.emitter = emitter; // ここでDIしている
}
@Override
void onResponse(String data) {
// レスポンスの情報をonNextを通じてemitterに流し込む
emitter.onNext(data);
// 流し込みが完了したらonCompleteする
emitter.onComplete();
}
}
// コールバックを利用するメソッドを利用するクラス
public class SampleClass {
private String mResult = "";
private SampleApi mSampleApi = new SampleApi();
public sampleFunc() {
// 一本のObservableを作成する
// (Javaはラムダ式については型推論するため、これはObservable<String>型になる)
Observable.create(emitter -> {
// コールバックを作成し、emitterを登録した後にAPIを実行する
// これにより、onResponseが発火し、onNextが実行された際に次のステップに進める
ICallback mCallback = new TestCallback();
mCallback.setEmitter(emitter);
mSampleApi.sendRequest(mCallback);
// subcribe以降にObservableのメソッドチェーンで受け渡されたデータについての最終処理を記述する
}).subcribe(data -> {
// ここでdataはString型であることに注意(ObservableEmitter<String>からonNextで流されたデータ)
mResult = data;
});
}
}
一見するとDIするためのコードがTestCallbackに増えたようにしか見えませんが、これにより
-
SampleClassに依存せず
SampleApi#sendRequest
できる - コールバックを要請するメソッドとコールバック結果が戻る場所が近いので分かりやすい
- 一度
Observable
にしてしまえば様々な加工が行いやすい
などの利点が生まれます。言うなればObservableEmitterは伝令役であり、「ObservableEmitterからデータがonNextで流れてきた際に以降の処理を実行する」といった動きになります。
例2:重い処理を並行処理する
例2-1:指定したスレッドへ割り振り
「ある処理が重いので別スレッドで動かしたい」という要請は、非同期処理じゃない流れでもよくあります。
特にGUIアプリケーション(スマホ含む)の場合、UIスレッドがUI描画以外の処理に時間を取られすぎると画面がフリーズしてしまいます。
これがWindowsだと「画面がフリーズした」だけで済みますが(UX的には好ましくない)、例えばAndroidだと「アプリケーションが応答していません」といったダイアログが表示され、ユーザーがアプリをその場で終了させるかもしれないといった可能性も考えられます。
更に、Android9.0になると**アプリケーションが応答しないとシステム側で強制終了する**ようにできるので、この対策はもはや義務と言ってもいいでしょう。
そこで登場するのが「別のスレッドで動かす」といった概念です。ワーカースレッドとも言われるそれは、例えばAndroidだとThread
やHandler
やAsyncTask
やIntentService
を利用して使用することになります。
ところがRxJavaを使うと、「処理を特定のスレッドに割り振る」といった操作を簡単に行うことができます。
Observable.create(emitter -> {処理1})
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.map(data -> {処理2})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(result -> 処理3});
上記コードの場合、
- 処理1は計算用のバックグラウンドスレッド
- 処理2はI/O操作用のバックグラウンドスレッド
- 処理3はメインスレッド(UIスレッド)
で実行されます。嫌がらせかと思うレベルで紛らわしいですが、
-
subscribeOn
の引数には最初のObservable内の処理に使用するスレッド -
observeOn
の引数にはそれ以降の処理に使用するスレッド
を指定できます(※observeOn
はスレッド切り替え用に何度も書けますが、subscribeOn
は最初の1つだけ有効になります)。subscribeOn
およびobserveOn
の中身に書けるスレッドとしては
名称 | 種別 | 意味 |
---|---|---|
Schedulers.computation() | バックグラウンド | 計算用 |
Schedulers.io() | バックグラウンド | I/O操作用 |
Schedulers.immediate() | メイン | Observable#subcribeを使用する現在のスレッド |
Schedulers.newThread() | バックグラウンド | 新規にスレッドを作成する |
AndroidSchedulers.mainThread() | ※ | UIスレッド |
などがあります。名前から察せられるように、AndroidSchedulers
はRxJavaというよりRxAndroid内に用意されているクラスですので、使用する場合はRxJavaだけでなくRxAndroidも導入しましょう(Android開発での話)。
例2-2:直列化
また、上記のように単一の処理1→処理1の結果を受け取って処理2→……と進む場合は実行順序に迷うことはありませんが、起点となるObservableやメソッドチェーンの途中のObservableで複数のデータを流す場合は注意が必要です。
Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}).flatMap(data -> {func(data)})
.subscribe(result -> 処理3});
例えば上記のようなコードがあった際、funcメソッドはdataが1・2・3の場合で計3回呼ばれます。
ここでfuncがObservable
を返す……つまり非同期なコードだった場合、func(1)が完了する前にfunc(2)やfunc(3)が呼ばれる可能性があります。
それが嫌な場合、それぞれの非同期処理を別個のObservable
として用意し、concatWith
メソッドで直列に並べることで対処します。
Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
})
.toList()
.flatMap(list -> {
Observable<Integer> task = null;
for (int x :list) {
if (task == null) {
task = func(x);
} else {
task = task.concatWith(func(x));
}
}
return task;
})
.subscribe(result -> 処理3});
補足1:
mapとflatMapの違いですが、雑に言えば前者は同期処理のメソッドしか無い時、後者は他のObservableなメソッドも使う時。ちゃんとした使い分けについては参考資料を参照してください
補足2:
toList()できるのは、Observableの上流からonNext
で返ってくる個数が有限個の場合だけです。無限個返される可能性がある際の対策は参考資料を参照すること
例2-3:並列処理の同期
直列化とは逆に、「複数のデータが揃った際に次の処理を行う」ことをしたい場合があります。
例えば、「データAとデータBのObservableからの戻り値の合計を求める」処理は次のように書けます。
Observable<Integer> obsA = Observable.create(emitter -> {略}));
Observable<Integer> obsB = Observable.create(emitter -> {略}));
Observable<Integer> obsC = Observable.zip(obsA, obsB, (d1, d2) -> d1 + d2);
ここではzip
メソッドを使用しましたが、combineLatest
というメソッドもあります。違いとしては、
-
zip
は引数にした各Observableから1個づつ使用する -
combineLatest
は、どれかのonNext
が実行されるたび、他のデータは「最後に出力されたもの」が使用される
といった感じです。例えば0・2・4秒目の時点でobsAから1・3・5といった値が出力され、1・3・5秒目の時点でobsBから7・8・9といった値が出力される場合、obsCからは
-
zip
……各Observableから1個づつ使用するので、1+7=8・3+8=11・5+9=14が返ってくる -
combineLatest
……他のデータは「最後に出力されたもの」が使用されるので、1+7=8・3+7=10……5+9=14が返ってくる
タイミング | 0秒 | 1秒 | 2秒 | 3秒 | 4秒 | 5秒 |
---|---|---|---|---|---|---|
obsA | 1 | 3 | 5 | |||
obsB | 7 | 8 | 9 | |||
obsC(zip) | 8 | 11 | 14 | |||
obsC(combineLatest) | 8 | 10 | 11 | 13 | 14 |