この記事は bouzuya's RxJS Advent Calendar 2015 の 2 日目かつ RxJS Advent Calendar 2015 の 2 日目です。
はじめに
今日の内容は公式リポジトリにあるドキュメント Exploring The Major Concepts in RxJS とほとんど同じものです。
また RxJS 4.0.7 を対象にしています。
Obervable と Observer
RxJS における最も基本的なクラス Observable / Observer の概要を確認します。
Observable はデータソースを表すクラスで、名前どおり observe できる何かです。Observable からデータを受け取るには subscribe(observer) メソッドを呼び出します (メソッド名は observe ではありません) 。subscribe メソッドは引数に Observer のインスタンスを取ります。
Observer は Observable からのデータを受け取るためのクラスです。名前どおり流れてくるデータを observe するものです。onNext / onError / onCompleted が Observable により呼び出され (値が Push され) ます。それぞれ Stream の値・エラー・完了を通知します。
class Observable
subscribe: (observer) -> # ...
class Observer
onNext: (value) -> # ...
onError: (error) -> # ...
onCompleted: -> # ...
Disposable
もうひとつクラスを紹介します。Disposable クラスです。
Observable.prototype.subscribe は Disposable のインスタンスを返します。
Disposable クラスは dispose メソッドを持ちます。 connection の close などの後処理をできます。
class Disposable
dispose: ->
class Observable
subscribe: (observer) ->
# ... (returns Disposable)
実用上の注意
実際に RxJS を使う場合には Disposable や Observer を見ることはほとんどありません。
後処理が要らないケースも多いですし、subscribe Observer のコンストラクターと同様の引数を取り、内部で Observer を生成するので直接操作する必要はないからです。
{ Observable } = require 'rx'
Observable
.range(1, 3)
.subscribe (value) -> # onNext
console.log value
また Observable や Observer や Dispose を継承したクラスをつくることも、まずありません。独自の動きをする Observable については、そのためのメソッドが用意されています。これはまた別の機会に書きます。
ソースコードを眺める
最後にすこしだけソースコードを眺めてみましょう。
RxJS のモジュールは ES6 や Node.js のような仕組みではなく独自の concat を前提とした形になっています。ES6 class も使われていませんし、オレオレな util もあるので慣れるまでは読みづらいかもしれません。とりあえず今回は軽く眺めましょう。
Observable のソースコード
まずは Observable を眺めましょう。
この記事で紹介した subscribe や利便性のためなのか subscribeOnNext や subscribeOnError や subscribeOnCompleted などが並んでいます。
map や filter などの Operator については別ファイルで Observable.prototype (変数名 observableProto) に追加されます。このファイル分割は .NET Framework における拡張メソッドを定義しているイメージなのかもしれませんね。
次のコードは Observable.prototype.subscribe です。
observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) {
return this._subscribe(typeof oOrOnNext === 'object' ?
oOrOnNext :
observerCreate(oOrOnNext, onError, onCompleted));
};
Observable.prototype.subscribe は this._subscribe を呼び出すようになっています。そして this._subscribe は Observable を継承した各 operator やその共通実装である ObservableBase が実装しています。詳細は別の機会にしましょう。
Observer のソースコード
次に Observer を眺めましょう。
残念ながら onNext などは見当たりません。代わりに気になるコードがあります。次のコードは Observer.create です。これは Observer をつくるメソッドでしょう。
var observerCreate = Observer.create = function (onNext, onError, onCompleted) {
onNext || (onNext = noop);
onError || (onError = defaultError);
onCompleted || (onCompleted = noop);
return new AnonymousObserver(onNext, onError, onCompleted);
};
ここで代入されている変数 observerCreate は上記の Observable.prototype.subscribe で使われています。 Observable.prototype.subscribe を Observer 以外を渡して呼び出したときに Observer の実装として AnonymousObserver が使われるようです。
ちなみに、上記の理由から AnonymousObserver はエラー時のスタックトレースでよく見かけます (RxJS あるある) 。
では、次は AnonymousObserver を眺めましょう。
ここには next / error / completed はありますが onNext などはありません。AnonymousObserver.prototype.next が呼び出されたとき、コンストラクターで受け取った onNext を呼び出します。
var AnonymousObserver = Rx.AnonymousObserver = (function (__super__) {
// ...
}(AbstractObserver));
AnonymousObserver は AbstractObserver を継承 (?) しているようです。
次は、AbstractObserver を眺めましょう。
ここには onNext などの期待したメソッドが提供されています。次のコードは onNext です。onNext が呼び出されたときに this.next を呼び出しています。
AbstractObserver.prototype.onNext = function (value) {
!this.isStopped && this.next(value);
};
さきほどの AnonymousObserver.prototype.next はこの実装です。onNext はテンプレートメソッドパターンで、自身を継承したサブクラスに実装 (this.next) を任せます。これは Observable.prototype.subscribe と this._subscribe とも同じ構造ですね。
また AbstractObserver は Observer を継承 (?) しているみたいですね。型の階層としては Observer > AbstractObserver > AnonymousObserver のようです。おそらくインターフェース > 抽象クラス > 実装クラスといったところでしょう。抽象クラスの AbstractObserver が onNext を持っているのは効率など実装上の都合でしょう。
おわりに
今日は Observable / Observer のインタフェースや、その実装を軽く眺めました。