この記事は bouzuya's RxJS Advent Calendar 2015 の 22 日目かつ RxJS Advent Calendar 2015 の 22 日目です。
はじめに
昨日に続いて Observable.fromEvent の実装を読んでみます。
内容としては Observable.fromEventPattern の subscribe 時の挙動の確認と publish().refCount() の効果を確認したものです。
実装を読む
Observable.fromEvent の実装を読んでみます。
Node.js の EventEmitter の場合は Observable.fromEventPattern に丸投げしています。fromEvent と fromEventPattern は NodeList などの複数要素に対応するかの違いのようなので、fromEventPattern を読んでいきます。
Observable.fromEventPattern は EventPatternObservable を publish().refCount() (share()) したものを返します。
EventPatternObservable & EventPatternDisposable
EventPatternObservable は subscribeCore で event listener を登録します。subscribeCore と subscribe との関連については RxJS の Observable をシンプルにして実装する - Qiita のあたりで ObservableBase と絡めて説明しているので、そちらを参照すると良いです。
createHandler は event listener を生成します。これは observer.onNext() を呼び出す関数です。生成された関数 (fn) を this._add(fn) で event listener として登録します。
event listener 登録処理である this._add は fromEventPattern の引数に渡された addHandler です。fromEvent にある function (h) { element.addListener(eventName, h); } です。今回で言えば element が EventEmitter の instance で、 eventName は 'data' で、 h は observer.onNext() を呼び出す関数です。
EventPatternObservable の subscribeCore は Disposable として EventPatternDisposable を返します。
EventPatternDisposable は引数として this._del を取ります。これは fromEventPattern の引数に渡された removeHandler です。fromEvent にある function (h) { element.removeListener(eventName, h); } です。
EventPatternDisposable は dispose で this._del を呼び出します。(this.isDisposed が設定されていないのはバグでしょうね…… (ぼくの Pull Request が merge されて修正されました。)。ただ subscribeCore を呼び出している _subscribe は AutoDetachObserver (および SingleAssignmentDisposable) で wrap されているので 2 回呼び出されることはありませんが……)
まとめると EventPatternObservable は、 subscribe で addHandler を呼び出して event listener を登録して、 dispose で removeHandler を呼び出す Disposable を返します。
publish().refCount() (share())
Observable.fromEventPattern は EventPatternObservable を publish().refCount() (share()) したものを返します。
これは RxJS における ConnectableObservable を返す (Cold / Hot 変換) Operators - Qiita に書いているので、そちらを参照すると良いです。
publish() は multicast(new Subject()) であり new ConnectableObservable(this, new Subject()) であるので、 EventPatternObservable は ConnectableObservable と Subject とを組み合わせたものに処理されます。
publish() は ConnectableObservable.prototype.connect を呼び出すまで EventPatternObservable.prototype.subscribe (event listener 登録) の呼び出しを遅延させるとともに、自身を subscribe させる代わりに Subject を subscribe (Observer の登録) させるということです。connect 呼び出し後には Subject に値を流すことで Subject を subscribe している observers に値を流します。
refCount() を呼び出しているので、最初の呼び出しで connect (ventPatternObservable.prototype.subscribe) して最後の呼び出しで dispose します。
次の例で言うと……
const event$ = Observable.fromEvent(/* ... */);
event$.subscribe(value => console.log(value));
event$.subscribe(value => console.log(value));
subscribe で 2 つの observer (onNext のみ) を登録していますが、 event listener の登録は実は最初の 1 回だけです。
publish により 2 回の subscribe は EventPatternObservable の subscribe ではなく Subject の subscribe になります。
refCount により 1 回目の subscribe は暗黙のうちに ConnectableObservable の connect を呼び出します。これで EventPatternObservable の subscribe (event listener の登録) が呼び出されます。
event listener に伝えられた値は Subject を通じて 2 つの observer に伝えられます。 1 つの event listener と 2 つの observer です。
どこで Observable が分岐しているのかを押さえておくことは RxJS でハマらないために重要です。どの Operator を使うとどこで分岐するのかを押さえるべきです。
おわりに
私的な時間の都合で、半端ですが終わります。この後に「もし share しなかったら」という内容を書きたかったのですが……。