ReactiveExtensions
RxJS
Rx
RxJSDay 22

RxJS の Observable.fromEvent を読む

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 22 日目かつ RxJS Advent Calendar 2015 の 22 日目です。


はじめに

昨日に続いて Observable.fromEvent の実装を読んでみます。

内容としては Observable.fromEventPatternsubscribe 時の挙動の確認と publish().refCount() の効果を確認したものです。


実装を読む

Observable.fromEvent の実装を読んでみます。

Node.js の EventEmitter の場合は Observable.fromEventPattern に丸投げしています。fromEventfromEventPatternNodeList などの複数要素に対応するかの違いのようなので、fromEventPattern を読んでいきます。

Observable.fromEventPatternEventPatternObservablepublish().refCount() (share()) したものを返します。


EventPatternObservable & EventPatternDisposable

EventPatternObservablesubscribeCore で event listener を登録します。subscribeCoresubscribe との関連については RxJS の Observable をシンプルにして実装する - Qiita のあたりで ObservableBase と絡めて説明しているので、そちらを参照すると良いです。

createHandler は event listener を生成します。これは observer.onNext() を呼び出す関数です。生成された関数 (fn) を this._add(fn) で event listener として登録します。

event listener 登録処理である this._addfromEventPattern の引数に渡された addHandler です。fromEvent にある function (h) { element.addListener(eventName, h); } です。今回で言えば elementEventEmitter の instance で、 eventName'data' で、 hobserver.onNext() を呼び出す関数です。

EventPatternObservablesubscribeCoreDisposable として EventPatternDisposable を返します。

EventPatternDisposable は引数として this._del を取ります。これは fromEventPattern の引数に渡された removeHandler です。fromEvent にある function (h) { element.removeListener(eventName, h); } です。

EventPatternDisposabledisposethis._del を呼び出します。(this.isDisposed が設定されていないのはバグでしょうね…… (ぼくの Pull Request が merge されて修正されました。)。ただ subscribeCore を呼び出している _subscribeAutoDetachObserver (および SingleAssignmentDisposable) で wrap されているので 2 回呼び出されることはありませんが……)

まとめると EventPatternObservable は、 subscribeaddHandler を呼び出して event listener を登録して、 disposeremoveHandler を呼び出す Disposable を返します。


publish().refCount() (share())

Observable.fromEventPatternEventPatternObservablepublish().refCount() (share()) したものを返します。

これは RxJS における ConnectableObservable を返す (Cold / Hot 変換) Operators - Qiita に書いているので、そちらを参照すると良いです。

publish()multicast(new Subject()) であり new ConnectableObservable(this, new Subject()) であるので、 EventPatternObservableConnectableObservableSubject とを組み合わせたものに処理されます。

publish()ConnectableObservable.prototype.connect を呼び出すまで EventPatternObservable.prototype.subscribe (event listener 登録) の呼び出しを遅延させるとともに、自身を subscribe させる代わりに Subjectsubscribe (Observer の登録) させるということです。connect 呼び出し後には Subject に値を流すことで Subjectsubscribe している observers に値を流します。

refCount() を呼び出しているので、最初の呼び出しで connect (ventPatternObservable.prototype.subscribe) して最後の呼び出しで dispose します。

次の例で言うと……

const event$ = Observable.fromEvent(/* ... */);

event$.subscribe(value => console.log(value));
event$.subscribe(value => console.log(value));

subscribe で 2 つの observer (onNext のみ) を登録していますが、 event listener の登録は実は最初の 1 回だけです。

publish により 2 回の subscribeEventPatternObservablesubscribe ではなく Subjectsubscribe になります。

refCount により 1 回目の subscribe は暗黙のうちに ConnectableObservableconnect を呼び出します。これで EventPatternObservablesubscribe (event listener の登録) が呼び出されます。

event listener に伝えられた値は Subject を通じて 2 つの observer に伝えられます。 1 つの event listener と 2 つの observer です。

どこで Observable が分岐しているのかを押さえておくことは RxJS でハマらないために重要です。どの Operator を使うとどこで分岐するのかを押さえるべきです。


おわりに

私的な時間の都合で、半端ですが終わります。この後に「もし share しなかったら」という内容を書きたかったのですが……。