この記事は bouzuya's RxJS Advent Calendar 2015 の 15 日目かつ RxJS Advent Calendar 2015 の 15 日目です。
はじめに
昨日までで ReactiveX の Operators By Category をざっと見たので、今日からはもうすこし実装を読んでみます。
今日は subscribe
の動きを改めて追う……つもりだったのですが、個人的に時間がないので、シンプルなコードとそれに必要な情報だけになるよう削ぎ落とした Observable
実装をしてみたいと思います。
Operator によって Observable
のチェーンをつくり、そこを subscribe
の呼び出しにより observer
が渡されていく部分は元のものをなるべく残すようにしたつもりです。
正確な実装については RxJS 4.0.7 を参照してください。
サンプルコードは以下のコマンドで試しています。
$ npm i rx babel-cli babel-preset-es2015
$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js
目指すコード
RxJS のシンプルなコードです。これに近いものを書いてみます。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.map(value => value * 2)
.subscribe(value => console.log(`onNext: ${value}`));
書いたコード
今回のコードは RxJS から Disposable
/ Scheduler
/ Observer
の onError
と onCompleted
/ 今回の実装に不要なコードを取り除く方針で進めます。一部、実装の都合で変更している部分があります。
Observable
まずは Observable
です。
- Observable > ObservableBase > FromObservable
- Observable > ObservableBase > MapObservable
今回は上記の他に Observer / MapObserver が出てきますが、これらは仮に置いているだけです。
今回は Observable
とその子クラスのつながりを見ることが目標です。
class Observable {
// `Observable.from`
// https://github.com/Reactive-Extensions/RxJS/blob/v4.0.7/src/core/perf/operators/from.js#L130
static from(array) {
return new FromObservable(array);
}
// `Observable.prototype.map`
// https://github.com/Reactive-Extensions/RxJS/blob/v4.0.7/src/core/perf/operators/map.js#L55
map(selector) {
return new MapObservable(this, selector);
}
subscribe(observer) {
return this._subscribe(observer);
}
_subscribe(observer) {
throw new Error('Not Implemented');
}
}
Observable.from
および Observable.prototype.map
および Observable.prototype.subscribe
が生えています。 _subscribe
は子クラスで実装されます。
map
の MapObservable
の生成に注意してください。自身 (this
) を渡しています。この形で自身を参照する後続の Observable
を返すことで Observable
のチェーンをつくります。
またコメントにも記載したとおり Observable.*
のほとんどは別のファイルで代入されます。
ObservableBase
次は ObservableBase
です。
- Observable > ObservableBase > FromObservable
- Observable > ObservableBase > MapObservable
中間の実装共有のためのクラスです。
class ObservableBase extends Observable {
// override
_subscribe(observer) {
this.subscribeCore(observer);
}
subscribeCore(observer) {
throw new Error('Not Implemented');
}
}
今回は例外処理もないので、特に何もしません。
FromObservable
次は FromObservable
です。
- Observable > ObservableBase > FromObservable
class FromObservable extends ObservableBase {
constructor(array) {
super();
this.array = array;
}
// override
subscribeCore(observer) {
this.array.forEach(value => observer.onNext(value));
}
}
実際には Array
ではなく Iterable
に変換して、それをスケジューラにしたがって走査します。
subscribeCore
を実装しています。FromObservable
は自身のもつ array
の各要素を受け取った observer
に流していきます。
MapObservable
次は MapObservable
です。
- Observable > ObservableBase > MapObservable
仮置きの Observer
を含みますが、気にしないでください。
class MapObservable extends ObservableBase {
constructor(source, selector) {
super();
this.source = source;
this.selector = selector;
}
// override
subscribeCore(observer) {
this.source.subscribe(new MapObserver(observer, this.selector));
}
}
// Observer は仮に置いているだけで、実際とは大きく異なる。
class Observer {
static create(onNext) {
return { onNext };
}
}
class MapObserver {
constructor(observer, selector) {
this.observer = observer;
this.selector = selector;
}
onNext(value) {
this.observer.onNext(this.selector(value));
}
}
FromObservable
同様に subscribeCore
を実装しています。MapObservable
は自身がデータを持つわけではないので、ソースとなる Observable
に「 Map
機能を持つ Observer
」を登録して、流れてくる値を受け取り変換して Observer
に渡します。
今回は「 Map
機能を持つ Observer
」を MapObserver
という名前にしてありますが、実際には「 MapObservable.InnerObserver
」として実装されています。
MapObserver
は前述の通り Observer
として onNext
で値を受け取り、変換して Observer
(後続の Observable
が登録したもの) に結果の値を渡しています。
つまり subscribe
は Observable
に observer
を登録していく処理です。
登録する observer
は Map
なら selector
を使って変換したりする、というわけです。
実行
class Observable {
static from(array) {
return new FromObservable(array);
}
map(selector) {
return new MapObservable(this, selector);
}
subscribe(observer) {
return this._subscribe(observer);
}
_subscribe(observer) {
throw new Error('Not Implemented');
}
}
class ObservableBase extends Observable {
// override
_subscribe(observer) {
this.subscribeCore(observer);
}
subscribeCore(observer) {
throw new Error('Not Implemented');
}
}
class FromObservable extends ObservableBase {
constructor(array) {
super();
this.array = array;
}
// override
subscribeCore(observer) {
this.array.forEach(value => observer.onNext(value));
}
}
class MapObservable extends ObservableBase {
constructor(source, selector) {
super();
this.source = source;
this.selector = selector;
}
// override
subscribeCore(observer) {
this.source.subscribe(new MapObserver(observer, this.selector));
}
}
class Observer {
static create(onNext) {
return { onNext };
}
}
class MapObserver {
constructor(observer, selector) {
this.observer = observer;
this.selector = selector;
}
onNext(value) {
this.observer.onNext(this.selector(value));
}
}
Observable
.from([1, 2, 3])
.map(value => value * 2)
.subscribe(Observer.create(value => console.log(`onNext: ${value}`)));
// onNext: 2
// onNext: 4
// onNext: 6
おわりに
RxJS の Observable をシンプルにして実装してみました。さすがに機能を削りすぎているので、これを Observable と呼ぶには無理はありますが……。