Edited at

RxJava的なものを最小限に実装してコンポーネントの関係を理解する

More than 3 years have passed since last update.

社内のRxJava勉強会用の資料です。

軽く、ほんの軽くRxJavaのサブセットを20行ほどで実装してみました。


概要


  • RxJavaはPromise的なもので、連続した「イベントの発生 / 値の生成」を扱えるようにしたもの

  • Observableは「イベントが発生するかもしれない状態 / 値の生成中」という状態を表すオブジェクト

  • OnSubscribeはObservableの中身で、「イベントの発生 / 値の生成」をObservableに通知するためのオブジェクト

  • SubscriberはObservableから「発生したイベント / 生成された値」を受け取るコールバック


RxJavaの捉え方

まずRxJavaですが、いくつかの捉え方があります。


  • リスト処理の抽象化・ストリーム化


  • Optional的な何か


  • Promise的な何か

ここでは、Promiseと捉えて話を進めます。これはReactive Porn - steps to phantasienで紹介されていた解釈ですが(ここではFutureとされていますが、まあFutureとPromiseは同じようなものです)、ぼくはこれが一番しっくりきました。

上記エントリはこんな風に説明しています。Futureがリアクティブモデルの一つとしたうえで、Futureが持つ1:1の要求と応答(たとえばHTTP requestとそのresponse)だけでなく、1:Nの関係を扱えるようにしたのがRxというわけです。


Actor のモデルでは、ブロッキングに頼らない限り要求と応答の対を扱うコードが少し不自然になる。Future をつかうと Actor では見えにくかった要求と応答の関係がくっきり姿を見せる。ところが今度はその 1:1 対応が強すぎてストリーム処理がぎこちなくなる。

ストリームと要求応答の両方をうまく扱えるモデルが、もう一つのリアクティブ実装 Reactive Extension 略して Rx … 正確にはその焼き直しバージョン ReactiveX … である。Rx は Future に似ている。そして要求応答の対を 1:1 から 1:N に一般化することでストリームが苦手な Future の弱点を乗り越えている。


これはつまり、RxはHTTP request/responseのような「1つの値の生成」だけでなく、onClickなどのイベントやDBのカーソルから1行ずつ取ってくるという「連続したイベントの発生」をうまく扱えるようになったということです。なお、ここでは具体的にイメージしやすくすくするために「イベントの発生」と「値の生成」という二つの言葉を使っていますが、これはどちらも同じ意味です。


Promiseの概要

さて、promiseというのはこういうやつです。コードはJavaScript Promiseの本より引用しました。なおこのpromiseは、ES6で標準になる予定です。

function getURL(URL) {

return new Promise(function (resolve, reject) {
var req = new XMLHttpRequest();
req.open('GET', URL, true);
req.onload = function () {
if (req.status === 200) {
resolve(req.responseText);
} else {
reject(new Error(req.statusText));
}
};
req.onerror = function () {
reject(new Error(req.statusText));
};
req.send();
});
}

// 実行例
var URL = "http://httpbin.org/get";
getURL(URL).then(function onFulfilled(value){
console.log(value);
}).catch(function onRejected(error){
console.error(error);
});

ポイントは以下のとおり。



  • getURL()は普通の非同期処理の考え方でいうと、「非同期処理が終わった時に呼び出されるコールバック」を渡すはずが、そのかわりに「処理が始まったが終わっていない状態」をPromiseというオブジェクトにして返している

  • しかし当然状態の変化(非同期処理の完了)をPromiseオブジェクトに通知しなければいけない。それがresolveとrejectという引数で渡される関数


  • resolve()を呼び出すと、それが Promise#then() に渡されたコールバックを呼ぶ


  • reject()を呼び出すと、それが Promise#catch() に渡されたコールバックを呼ぶ

Promiseは「連続したイベントの発生 / 値の生成」には向きませんが、1:1の要求と応答をうまくPromiseというオブジェクトに閉じ込めて、扱いやすくしています。

まあpromiseはこんな感じの理解でいいです。


RxJavaの考え方と使い方

RxJavaをpromiseとみると、Observable<T> がPromiseオブジェクトに相当します。そして、JavaScriptのPromiseで関数オブジェクトであった部分がすべてクラスないしインターフェイスとなり、名前がつけられています。

役者は以下のとおり。



  • Observable<T> - Promiseに相当。T型の値を生成するための状態オブジェクト


  • Observable.OnSubscribe<T> - Observableが保持する「状態」の実体。T型の値を生成し、Subscriber<T> に渡す


  • Subscriber<T> - Observable<T> の生成した値を受け取るコールバック

コードはこんな感じ。


// 何かを非同期に処理する
// postDelayed()は時間のかかる処理のエミュレーション
Observable<String> getSomethingDelayed(final String value) {
// new Promise(function (resolve, reject) { に相当
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(final Subscriber<String> subscriber) {
// subscriber.onNext(...) (とonComplete())がresolve()に相当する
// subscriber.onError(...)がreject()に相当する
Handler handler = new Handler();
handler.postDelayed(new Runnable() {
@Override
public void run() {
subscriber.onNext(value); // 値をsubscriberに渡す(複数回渡すことも可能)
subscriber.onComplete(); // 値の生成の完了
}
}, 1000);
}
});
}

// 使う側
void doit() {
// 非同期処理を包んだ「状態」オブジェクトを作る
Observable<String> observable = getSomethingDelayed("foo");

// observableが生成した値を受け取るコールバックを登録する
// subscribe()が内部のonSubscribe.call()呼び、実際に処理をはじめる
observable.subscribe(new Subscriber<String>() {
@Override
public void onNext(String value) {
// 生成された値を受け取った
Toast.makeText(MainActivity.this, "onNext: " + value, Toast.LENGTH_LONG).show();
}

@Override
public void onComplete() {
// 値の生成が完了した
Toast.makeText(MainActivity.this, "onComplete", Toast.LENGTH_LONG).show();
}

@Override
public void onError(Throwable e) {
// 何か問題が起きた
Toast.makeText(MainActivity.this, "onError", Toast.LENGTH_LONG).show();
}
});
}


RxJavaの最小限の実装

上記のサンプルコードが動く最低限の実装が以下になります(RxInTheBox)。

class Observable<T> {

static <U> Observable<U> create(OnSubscribe<U> onSubscribe) {
return new Observable<>(onSubscribe);
}

OnSubscribe<T> onSubscribe;

Observable(OnSubscribe<T> onSubscribe) {
this.onSubscribe = onSubscribe;
}

// 「値の生成中」から「値の生成」までを担うコールバック
interface OnSubscribe<T> {
void call(Subscriber<T> subscriber);
}

// observableによる「値の生成」を開始し、その結果をうけとる
void subscribe(Subscriber<T> subscriber) {
onSubscribe.call(subscriber);
}
}

// observableが生成する値を受け取るコールバック
interface Subscriber<T> {
void onNext(T value);
void onComplete();
void onError(Throwable e);
}


RxJavaの実際の実装

さて、RxJavaにおいて上記の役者たちが相当する実装がこちら。


class rx.Observable

9000行を超えるファイルでドン引きしますが、ほとんどがjavadocとファクトリメソッドです。そして今回の役者はRxInTheBoxで登場したものだけなので、そこに注目します。


Observable#create()

まずcreate()はRxInTheBoxとほとんど同じです。 hook.onCreate() はフック用のもので、デフォルトでは単に引数をそのまま返します。

public final static <T> Observable<T> create(OnSubscribe<T> f) {

return new Observable<T>(hook.onCreate(f));
}


interface rx.Observable.OnSubscribe

OnSubscribeはObservableのインナークラスで、実装はこれだけ。 Action1<T> というのは、「Tをうけとり値を返さない(void)コールバック」という意味です。インターフェイスの継承を展開するとRxInTheBoxのものとほとんど同じになりますが、生成する値が ? super T なので少しだけ柔軟になってます。

public static interface OnSubscribe<T> extends Action1<Subscriber<? super T>> {

}


Observable#subscribe()

subscribe() も前処理とエラーハンドリングを除くと実質 hook.onSubscribeStart(this, onSubscribe).call(subscriber) だけです。さらに hook.onSubscribeStart() はフックのためのオブジェクトで、何も設定していなければ単に onSubscribe を返します。つまり、subscribe() は実質的にはRxInTheBoxと同じ onSubscribe.call(subscriber) になります。subscribe() は他にもオーバーロードされたバリエーションがありますが、それらは使いやすくするためで本質的にはこのメソッドと変わりありません。

    // RxInTheBoxでは戻り値がvoidだったが、RxJavaではさらにSubscriptionというObservableを返して

// 次の処理に続けることができる。説明は省略。

public final Subscription subscribe(Subscriber<? super T> subscriber) {
/* 前半は引数のチェックとスタートのための前処理なので省略 */

try {
hook.onSubscribeStart(this, onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
} catch (Throwable e) {
/* エラー処理は省略 */
return subscription.unsubscribed();
}
}


class Subscriber

RxInTheBoxとかなり違うのはこちら。RxInTheBoxと違って実装のある抽象クラスになっています。これは、マルチスレッド下でのスケジューリングやunsubscribe、ストリーム的な処理などを扱うためです。

RxInTheBoxのSubscriberはむしろ、 rx.Observer にあたります。これの実装は以下のとおり。RxInTheBoxのSubscriberと同じですね。


public interface Observer<T> {
public abstract void onCompleted();
public abstract void onError(Throwable e);
public abstract void onNext(T t);
}


参考文献