この記事は bouzuya's RxJS Advent Calendar 2015 の 2 日目かつ RxJS Advent Calendar 2015 の 2 日目です。
はじめに
今日の内容は公式リポジトリにあるドキュメント Exploring The Major Concepts in RxJS とほとんど同じものです。
また RxJS 4.0.7 を対象にしています。
Obervable と Observer
RxJS における最も基本的なクラス Observable
/ Observer
の概要を確認します。
Observable
はデータソースを表すクラスで、名前どおり observe できる何かです。Observable
からデータを受け取るには subscribe(observer)
メソッドを呼び出します (メソッド名は observe
ではありません) 。subscribe
メソッドは引数に Observer
のインスタンスを取ります。
Observer
は Observable
からのデータを受け取るためのクラスです。名前どおり流れてくるデータを observe するものです。onNext
/ onError
/ onCompleted
が Observable
により呼び出され (値が Push され) ます。それぞれ Stream の値・エラー・完了を通知します。
class Observable
subscribe: (observer) -> # ...
class Observer
onNext: (value) -> # ...
onError: (error) -> # ...
onCompleted: -> # ...
Disposable
もうひとつクラスを紹介します。Disposable
クラスです。
Observable.prototype.subscribe
は Disposable
のインスタンスを返します。
Disposable
クラスは dispose
メソッドを持ちます。 connection の close などの後処理をできます。
class Disposable
dispose: ->
class Observable
subscribe: (observer) ->
# ... (returns Disposable)
実用上の注意
実際に RxJS を使う場合には Disposable
や Observer
を見ることはほとんどありません。
後処理が要らないケースも多いですし、subscribe
Observer
のコンストラクターと同様の引数を取り、内部で Observer
を生成するので直接操作する必要はないからです。
{ Observable } = require 'rx'
Observable
.range(1, 3)
.subscribe (value) -> # onNext
console.log value
また Observable
や Observer
や Dispose
を継承したクラスをつくることも、まずありません。独自の動きをする Observable については、そのためのメソッドが用意されています。これはまた別の機会に書きます。
ソースコードを眺める
最後にすこしだけソースコードを眺めてみましょう。
RxJS のモジュールは ES6 や Node.js のような仕組みではなく独自の concat を前提とした形になっています。ES6 class も使われていませんし、オレオレな util もあるので慣れるまでは読みづらいかもしれません。とりあえず今回は軽く眺めましょう。
Observable のソースコード
まずは Observable
を眺めましょう。
この記事で紹介した subscribe
や利便性のためなのか subscribeOnNext
や subscribeOnError
や subscribeOnCompleted
などが並んでいます。
map
や filter
などの Operator については別ファイルで Observable.prototype
(変数名 observableProto
) に追加されます。このファイル分割は .NET Framework における拡張メソッドを定義しているイメージなのかもしれませんね。
次のコードは Observable.prototype.subscribe
です。
observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) {
return this._subscribe(typeof oOrOnNext === 'object' ?
oOrOnNext :
observerCreate(oOrOnNext, onError, onCompleted));
};
Observable.prototype.subscribe
は this._subscribe
を呼び出すようになっています。そして this._subscribe
は Observable
を継承した各 operator やその共通実装である ObservableBase
が実装しています。詳細は別の機会にしましょう。
Observer のソースコード
次に Observer
を眺めましょう。
残念ながら onNext
などは見当たりません。代わりに気になるコードがあります。次のコードは Observer.create
です。これは Observer
をつくるメソッドでしょう。
var observerCreate = Observer.create = function (onNext, onError, onCompleted) {
onNext || (onNext = noop);
onError || (onError = defaultError);
onCompleted || (onCompleted = noop);
return new AnonymousObserver(onNext, onError, onCompleted);
};
ここで代入されている変数 observerCreate
は上記の Observable.prototype.subscribe
で使われています。 Observable.prototype.subscribe
を Observer
以外を渡して呼び出したときに Observer
の実装として AnonymousObserver
が使われるようです。
ちなみに、上記の理由から AnonymousObserver
はエラー時のスタックトレースでよく見かけます (RxJS あるある) 。
では、次は AnonymousObserver
を眺めましょう。
ここには next
/ error
/ completed
はありますが onNext
などはありません。AnonymousObserver.prototype.next
が呼び出されたとき、コンストラクターで受け取った onNext
を呼び出します。
var AnonymousObserver = Rx.AnonymousObserver = (function (__super__) {
// ...
}(AbstractObserver));
AnonymousObserver
は AbstractObserver
を継承 (?) しているようです。
次は、AbstractObserver
を眺めましょう。
ここには onNext
などの期待したメソッドが提供されています。次のコードは onNext
です。onNext
が呼び出されたときに this.next
を呼び出しています。
AbstractObserver.prototype.onNext = function (value) {
!this.isStopped && this.next(value);
};
さきほどの AnonymousObserver.prototype.next
はこの実装です。onNext
はテンプレートメソッドパターンで、自身を継承したサブクラスに実装 (this.next
) を任せます。これは Observable.prototype.subscribe
と this._subscribe
とも同じ構造ですね。
また AbstractObserver
は Observer
を継承 (?) しているみたいですね。型の階層としては Observer
> AbstractObserver
> AnonymousObserver
のようです。おそらくインターフェース > 抽象クラス > 実装クラスといったところでしょう。抽象クラスの AbstractObserver
が onNext
を持っているのは効率など実装上の都合でしょう。
おわりに
今日は Observable
/ Observer
のインタフェースや、その実装を軽く眺めました。