ReactiveExtensions
RxJS
Rx
RxJSDay 15

RxJS の Observable をシンプルにして実装する

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 15 日目かつ RxJS Advent Calendar 2015 の 15 日目です。


はじめに

昨日までで ReactiveX の Operators By Category をざっと見たので、今日からはもうすこし実装を読んでみます。

今日は subscribe の動きを改めて追う……つもりだったのですが、個人的に時間がないので、シンプルなコードとそれに必要な情報だけになるよう削ぎ落とした Observable 実装をしてみたいと思います。

Operator によって Observable のチェーンをつくり、そこを subscribe の呼び出しにより observer が渡されていく部分は元のものをなるべく残すようにしたつもりです。

正確な実装については RxJS 4.0.7 を参照してください。

サンプルコードは以下のコマンドで試しています。

$ npm i rx babel-cli babel-preset-es2015

$ $(npm bin)/babel --presets es2015 index.js -o out.js && node out.js


目指すコード

RxJS のシンプルなコードです。これに近いものを書いてみます。

import { Observable } from 'rx';

Observable
.from([1, 2, 3])
.map(value => value * 2)
.subscribe(value => console.log(`onNext: ${value}`));


書いたコード

今回のコードは RxJS から Disposable / Scheduler / ObserveronErroronCompleted / 今回の実装に不要なコードを取り除く方針で進めます。一部、実装の都合で変更している部分があります。


Observable

まずは Observable です。


  • Observable > ObservableBase > FromObservable

  • Observable > ObservableBase > MapObservable

今回は上記の他に Observer / MapObserver が出てきますが、これらは仮に置いているだけです。

今回は Observable とその子クラスのつながりを見ることが目標です。


class Observable {
// `Observable.from`
// https://github.com/Reactive-Extensions/RxJS/blob/v4.0.7/src/core/perf/operators/from.js#L130
static from(array) {
return new FromObservable(array);
}

// `Observable.prototype.map`
// https://github.com/Reactive-Extensions/RxJS/blob/v4.0.7/src/core/perf/operators/map.js#L55
map(selector) {
return new MapObservable(this, selector);
}

subscribe(observer) {
return this._subscribe(observer);
}

_subscribe(observer) {
throw new Error('Not Implemented');
}
}

Observable.from および Observable.prototype.map および Observable.prototype.subscribe が生えています。 _subscribe は子クラスで実装されます。

mapMapObservable の生成に注意してください。自身 (this) を渡しています。この形で自身を参照する後続の Observable を返すことで Observable のチェーンをつくります。

またコメントにも記載したとおり Observable.* のほとんどは別のファイルで代入されます。


ObservableBase

次は ObservableBase です。


  • Observable > ObservableBase > FromObservable

  • Observable > ObservableBase > MapObservable

中間の実装共有のためのクラスです。

class ObservableBase extends Observable {

// override
_subscribe(observer) {
this.subscribeCore(observer);
}

subscribeCore(observer) {
throw new Error('Not Implemented');
}
}

今回は例外処理もないので、特に何もしません。


FromObservable

次は FromObservable です。


  • Observable > ObservableBase > FromObservable

class FromObservable extends ObservableBase {

constructor(array) {
super();
this.array = array;
}

// override
subscribeCore(observer) {
this.array.forEach(value => observer.onNext(value));
}
}

実際には Array ではなく Iterable に変換して、それをスケジューラにしたがって走査します。

subscribeCore を実装しています。FromObservable は自身のもつ array の各要素を受け取った observer に流していきます。


MapObservable

次は MapObservable です。


  • Observable > ObservableBase > MapObservable

仮置きの Observer を含みますが、気にしないでください。

class MapObservable extends ObservableBase {

constructor(source, selector) {
super();
this.source = source;
this.selector = selector;
}

// override
subscribeCore(observer) {
this.source.subscribe(new MapObserver(observer, this.selector));
}
}

// Observer は仮に置いているだけで、実際とは大きく異なる。
class Observer {
static create(onNext) {
return { onNext };
}
}

class MapObserver {
constructor(observer, selector) {
this.observer = observer;
this.selector = selector;
}

onNext(value) {
this.observer.onNext(this.selector(value));
}
}

FromObservable 同様に subscribeCore を実装しています。MapObservable は自身がデータを持つわけではないので、ソースとなる Observable に「 Map 機能を持つ Observer 」を登録して、流れてくる値を受け取り変換して Observer に渡します。

今回は「 Map 機能を持つ Observer 」を MapObserver という名前にしてありますが、実際には「 MapObservable.InnerObserver 」として実装されています。

MapObserver は前述の通り Observer として onNext で値を受け取り、変換して Observer (後続の Observable が登録したもの) に結果の値を渡しています。

つまり subscribeObservableobserver を登録していく処理です。

登録する observerMap なら selector を使って変換したりする、というわけです。


実行


class Observable {
static from(array) {
return new FromObservable(array);
}

map(selector) {
return new MapObservable(this, selector);
}

subscribe(observer) {
return this._subscribe(observer);
}

_subscribe(observer) {
throw new Error('Not Implemented');
}
}

class ObservableBase extends Observable {
// override
_subscribe(observer) {
this.subscribeCore(observer);
}

subscribeCore(observer) {
throw new Error('Not Implemented');
}
}

class FromObservable extends ObservableBase {
constructor(array) {
super();
this.array = array;
}

// override
subscribeCore(observer) {
this.array.forEach(value => observer.onNext(value));
}
}

class MapObservable extends ObservableBase {
constructor(source, selector) {
super();
this.source = source;
this.selector = selector;
}

// override
subscribeCore(observer) {
this.source.subscribe(new MapObserver(observer, this.selector));
}
}

class Observer {
static create(onNext) {
return { onNext };
}
}

class MapObserver {
constructor(observer, selector) {
this.observer = observer;
this.selector = selector;
}

onNext(value) {
this.observer.onNext(this.selector(value));
}
}

Observable
.from([1, 2, 3])
.map(value => value * 2)
.subscribe(Observer.create(value => console.log(`onNext: ${value}`)));
// onNext: 2
// onNext: 4
// onNext: 6


おわりに

RxJS の Observable をシンプルにして実装してみました。さすがに機能を削りすぎているので、これを Observable と呼ぶには無理はありますが……。