39
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

posted at

updated at

RxJS の Observable / Observer の概要

この記事は bouzuya's RxJS Advent Calendar 2015 の 2 日目かつ RxJS Advent Calendar 2015 の 2 日目です。

はじめに

今日の内容は公式リポジトリにあるドキュメント Exploring The Major Concepts in RxJS とほとんど同じものです。

また RxJS 4.0.7 を対象にしています。

Obervable と Observer

RxJS における最も基本的なクラス Observable / Observer の概要を確認します。

Observable はデータソースを表すクラスで、名前どおり observe できる何かです。Observable からデータを受け取るには subscribe(observer) メソッドを呼び出します (メソッド名は observe ではありません) 。subscribe メソッドは引数に Observer のインスタンスを取ります。

ObserverObservable からのデータを受け取るためのクラスです。名前どおり流れてくるデータを observe するものです。onNext / onError / onCompletedObservable により呼び出され (値が Push され) ます。それぞれ Stream の値・エラー・完了を通知します。

class Observable
  subscribe: (observer) -> # ... 

class Observer
  onNext: (value) -> # ...
  onError: (error) -> # ...
  onCompleted: -> # ...

Disposable

もうひとつクラスを紹介します。Disposable クラスです。

Observable.prototype.subscribeDisposable のインスタンスを返します。

Disposable クラスは dispose メソッドを持ちます。 connection の close などの後処理をできます。

class Disposable
  dispose: ->

class Observable
  subscribe: (observer) ->
    # ... (returns Disposable)

実用上の注意

実際に RxJS を使う場合には DisposableObserver を見ることはほとんどありません。

後処理が要らないケースも多いですし、subscribe Observer のコンストラクターと同様の引数を取り、内部で Observer を生成するので直接操作する必要はないからです。

{ Observable } = require 'rx'

Observable
  .range(1, 3)
  .subscribe (value) -> # onNext
    console.log value

また ObservableObserverDispose を継承したクラスをつくることも、まずありません。独自の動きをする Observable については、そのためのメソッドが用意されています。これはまた別の機会に書きます。

ソースコードを眺める

最後にすこしだけソースコードを眺めてみましょう。

RxJS のモジュールは ES6 や Node.js のような仕組みではなく独自の concat を前提とした形になっています。ES6 class も使われていませんし、オレオレな util もあるので慣れるまでは読みづらいかもしれません。とりあえず今回は軽く眺めましょう。

Observable のソースコード

まずは Observable を眺めましょう。

この記事で紹介した subscribe や利便性のためなのか subscribeOnNextsubscribeOnErrorsubscribeOnCompleted などが並んでいます。

mapfilter などの Operator については別ファイルで Observable.prototype (変数名 observableProto) に追加されます。このファイル分割は .NET Framework における拡張メソッドを定義しているイメージなのかもしれませんね。

次のコードは Observable.prototype.subscribe です。

observable.js
    observableProto.subscribe = observableProto.forEach = function (oOrOnNext, onError, onCompleted) {
      return this._subscribe(typeof oOrOnNext === 'object' ?
        oOrOnNext :
        observerCreate(oOrOnNext, onError, onCompleted));
    };

Observable.prototype.subscribethis._subscribe を呼び出すようになっています。そして this._subscribeObservable を継承した各 operator やその共通実装である ObservableBase が実装しています。詳細は別の機会にしましょう。

Observer のソースコード

次に Observer を眺めましょう。

残念ながら onNext などは見当たりません。代わりに気になるコードがあります。次のコードは Observer.create です。これは Observer をつくるメソッドでしょう。

observer.js
  var observerCreate = Observer.create = function (onNext, onError, onCompleted) {
    onNext || (onNext = noop);
    onError || (onError = defaultError);
    onCompleted || (onCompleted = noop);
    return new AnonymousObserver(onNext, onError, onCompleted);
  };

ここで代入されている変数 observerCreate は上記の Observable.prototype.subscribe で使われています。 Observable.prototype.subscribeObserver 以外を渡して呼び出したときに Observer の実装として AnonymousObserver が使われるようです。

ちなみに、上記の理由から AnonymousObserver はエラー時のスタックトレースでよく見かけます (RxJS あるある) 。

では、次は AnonymousObserver を眺めましょう。

ここには next / error / completed はありますが onNext などはありません。AnonymousObserver.prototype.next が呼び出されたとき、コンストラクターで受け取った onNext を呼び出します。

anonymousobserver.js
var AnonymousObserver = Rx.AnonymousObserver = (function (__super__) {
  // ...
}(AbstractObserver));

AnonymousObserverAbstractObserver を継承 (?) しているようです。

次は、AbstractObserver を眺めましょう。

ここには onNext などの期待したメソッドが提供されています。次のコードは onNext です。onNext が呼び出されたときに this.next を呼び出しています。

abstractobserver.js
    AbstractObserver.prototype.onNext = function (value) {
      !this.isStopped && this.next(value);
    };

さきほどの AnonymousObserver.prototype.next はこの実装です。onNext はテンプレートメソッドパターンで、自身を継承したサブクラスに実装 (this.next) を任せます。これは Observable.prototype.subscribethis._subscribe とも同じ構造ですね。

また AbstractObserverObserver を継承 (?) しているみたいですね。型の階層としては Observer > AbstractObserver > AnonymousObserver のようです。おそらくインターフェース > 抽象クラス > 実装クラスといったところでしょう。抽象クラスの AbstractObserveronNext を持っているのは効率など実装上の都合でしょう。

おわりに

今日は Observable / Observer のインタフェースや、その実装を軽く眺めました。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
39
Help us understand the problem. What are the problem?