この記事は bouzuya's RxJS Advent Calendar 2015 の 22 日目かつ RxJS Advent Calendar 2015 の 22 日目です。
はじめに
昨日に続いて Observable.fromEvent
の実装を読んでみます。
内容としては Observable.fromEventPattern
の subscribe
時の挙動の確認と publish().refCount()
の効果を確認したものです。
実装を読む
Observable.fromEvent
の実装を読んでみます。
Node.js の EventEmitter
の場合は Observable.fromEventPattern
に丸投げしています。fromEvent
と fromEventPattern
は NodeList
などの複数要素に対応するかの違いのようなので、fromEventPattern
を読んでいきます。
Observable.fromEventPattern
は EventPatternObservable
を publish().refCount()
(share()
) したものを返します。
EventPatternObservable & EventPatternDisposable
EventPatternObservable
は subscribeCore
で event listener を登録します。subscribeCore
と subscribe
との関連については RxJS の Observable をシンプルにして実装する - Qiita のあたりで ObservableBase
と絡めて説明しているので、そちらを参照すると良いです。
createHandler
は event listener を生成します。これは observer.onNext()
を呼び出す関数です。生成された関数 (fn
) を this._add(fn)
で event listener として登録します。
event listener 登録処理である this._add
は fromEventPattern
の引数に渡された addHandler
です。fromEvent
にある function (h) { element.addListener(eventName, h); }
です。今回で言えば element
が EventEmitter
の instance で、 eventName
は 'data'
で、 h
は observer.onNext()
を呼び出す関数です。
EventPatternObservable
の subscribeCore
は Disposable
として EventPatternDisposable
を返します。
EventPatternDisposable
は引数として this._del
を取ります。これは fromEventPattern
の引数に渡された removeHandler
です。fromEvent
にある function (h) { element.removeListener(eventName, h); }
です。
EventPatternDisposable
は dispose
で this._del
を呼び出します。(this.isDisposed
が設定されていないのはバグでしょうね…… (ぼくの Pull Request が merge されて修正されました。)。ただ subscribeCore
を呼び出している _subscribe
は AutoDetachObserver
(および SingleAssignmentDisposable
) で wrap されているので 2 回呼び出されることはありませんが……)
まとめると EventPatternObservable
は、 subscribe
で addHandler
を呼び出して event listener を登録して、 dispose
で removeHandler
を呼び出す Disposable
を返します。
publish().refCount() (share())
Observable.fromEventPattern
は EventPatternObservable
を publish().refCount()
(share()
) したものを返します。
これは RxJS における ConnectableObservable を返す (Cold / Hot 変換) Operators - Qiita に書いているので、そちらを参照すると良いです。
publish()
は multicast(new Subject())
であり new ConnectableObservable(this, new Subject())
であるので、 EventPatternObservable
は ConnectableObservable
と Subject
とを組み合わせたものに処理されます。
publish()
は ConnectableObservable.prototype.connect
を呼び出すまで EventPatternObservable.prototype.subscribe
(event listener 登録) の呼び出しを遅延させるとともに、自身を subscribe
させる代わりに Subject
を subscribe
(Observer
の登録) させるということです。connect
呼び出し後には Subject
に値を流すことで Subject
を subscribe
している observers
に値を流します。
refCount()
を呼び出しているので、最初の呼び出しで connect
(ventPatternObservable.prototype.subscribe
) して最後の呼び出しで dispose
します。
次の例で言うと……
const event$ = Observable.fromEvent(/* ... */);
event$.subscribe(value => console.log(value));
event$.subscribe(value => console.log(value));
subscribe
で 2 つの observer
(onNext
のみ) を登録していますが、 event listener の登録は実は最初の 1 回だけです。
publish
により 2 回の subscribe
は EventPatternObservable
の subscribe
ではなく Subject
の subscribe
になります。
refCount
により 1 回目の subscribe
は暗黙のうちに ConnectableObservable
の connect
を呼び出します。これで EventPatternObservable
の subscribe
(event listener の登録) が呼び出されます。
event listener に伝えられた値は Subject
を通じて 2 つの observer
に伝えられます。 1 つの event listener と 2 つの observer です。
どこで Observable
が分岐しているのかを押さえておくことは RxJS でハマらないために重要です。どの Operator を使うとどこで分岐するのかを押さえるべきです。
おわりに
私的な時間の都合で、半端ですが終わります。この後に「もし share しなかったら」という内容を書きたかったのですが……。