ReactiveExtensions
RxJS
Rx
RxJSDay 2

RxJS の Observable / Observer の概要

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 2 日目かつ RxJS Advent Calendar 2015 の 2 日目です。


はじめに

今日の内容は公式リポジトリにあるドキュメント Exploring The Major Concepts in RxJS とほとんど同じものです。

また RxJS 4.0.7 を対象にしています。


Obervable と Observer

RxJS における最も基本的なクラス Observable / Observer の概要を確認します。

Observable はデータソースを表すクラスで、名前どおり observe できる何かです。Observable からデータを受け取るには subscribe(observer) メソッドを呼び出します (メソッド名は observe ではありません) 。subscribe メソッドは引数に Observer のインスタンスを取ります。

ObserverObservable からのデータを受け取るためのクラスです。名前どおり流れてくるデータを observe するものです。onNext / onError / onCompletedObservable により呼び出され (値が Push され) ます。それぞれ Stream の値・エラー・完了を通知します。

class Observable

subscribe: (observer) -> # ...

class Observer
onNext: (value) -> # ...
onError: (error) -> # ...
onCompleted: -> # ...


Disposable

もうひとつクラスを紹介します。Disposable クラスです。

Observable.prototype.subscribeDisposable のインスタンスを返します。

Disposable クラスは dispose メソッドを持ちます。 connection の close などの後処理をできます。

class Disposable

dispose: ->

class Observable
subscribe: (observer) ->
# ... (returns Disposable)


実用上の注意

実際に RxJS を使う場合には DisposableObserver を見ることはほとんどありません。

後処理が要らないケースも多いですし、subscribe Observer のコンストラクターと同様の引数を取り、内部で Observer を生成するので直接操作する必要はないからです。

{ Observable } = require 'rx'

Observable
.range(1, 3)
.subscribe (value) -> # onNext
console.log value

また ObservableObserverDispose を継承したクラスをつくることも、まずありません。独自の動きをする Observable については、そのためのメソッドが用意されています。これはまた別の機会に書きます。


ソースコードを眺める

最後にすこしだけソースコードを眺めてみましょう。

RxJS のモジュールは ES6 や Node.js のような仕組みではなく独自の concat を前提とした形になっています。ES6 class も使われていませんし、オレオレな util もあるので慣れるまでは読みづらいかもしれません。とりあえず今回は軽く眺めましょう。


Observable のソースコード

まずは Observable を眺めましょう。

この記事で紹介した subscribe や利便性のためなのか subscribeOnNextsubscribeOnErrorsubscribeOnCompleted などが並んでいます。

mapfilter などの Operator については別ファイルで Observable.prototype (変数名 observableProto) に追加されます。このファイル分割は .NET Framework における拡張メソッドを定義しているイメージなのかもしれませんね。

次のコードは Observable.prototype.subscribe です。


observable.js

    observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) {

return this._subscribe(typeof oOrOnNext === 'object' ?
oOrOnNext :
observerCreate(oOrOnNext, onError, onCompleted));
};

Observable.prototype.subscribethis._subscribe を呼び出すようになっています。そして this._subscribeObservable を継承した各 operator やその共通実装である ObservableBase が実装しています。詳細は別の機会にしましょう。


Observer のソースコード

次に Observer を眺めましょう。

残念ながら onNext などは見当たりません。代わりに気になるコードがあります。次のコードは Observer.create です。これは Observer をつくるメソッドでしょう。


observer.js

  var observerCreate = Observer.create = function (onNext, onError, onCompleted) {

onNext || (onNext = noop);
onError || (onError = defaultError);
onCompleted || (onCompleted = noop);
return new AnonymousObserver(onNext, onError, onCompleted);
};

ここで代入されている変数 observerCreate は上記の Observable.prototype.subscribe で使われています。 Observable.prototype.subscribeObserver 以外を渡して呼び出したときに Observer の実装として AnonymousObserver が使われるようです。

ちなみに、上記の理由から AnonymousObserver はエラー時のスタックトレースでよく見かけます (RxJS あるある) 。

では、次は AnonymousObserver を眺めましょう。

ここには next / error / completed はありますが onNext などはありません。AnonymousObserver.prototype.next が呼び出されたとき、コンストラクターで受け取った onNext を呼び出します。


anonymousobserver.js

var AnonymousObserver = Rx.AnonymousObserver = (function (__super__) {

// ...
}(AbstractObserver));

AnonymousObserverAbstractObserver を継承 (?) しているようです。

次は、AbstractObserver を眺めましょう。

ここには onNext などの期待したメソッドが提供されています。次のコードは onNext です。onNext が呼び出されたときに this.next を呼び出しています。


abstractobserver.js

    AbstractObserver.prototype.onNext = function (value) {

!this.isStopped && this.next(value);
};

さきほどの AnonymousObserver.prototype.next はこの実装です。onNext はテンプレートメソッドパターンで、自身を継承したサブクラスに実装 (this.next) を任せます。これは Observable.prototype.subscribethis._subscribe とも同じ構造ですね。

また AbstractObserverObserver を継承 (?) しているみたいですね。型の階層としては Observer > AbstractObserver > AnonymousObserver のようです。おそらくインターフェース > 抽象クラス > 実装クラスといったところでしょう。抽象クラスの AbstractObserveronNext を持っているのは効率など実装上の都合でしょう。


おわりに

今日は Observable / Observer のインタフェースや、その実装を軽く眺めました。