16
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

posted at

updated at

RxJS の Operators (2) - Observable の変換

この記事は bouzuya's RxJS Advent Calendar 2015 の 4 日目かつ RxJS Advent Calendar 2015 の 4 日目です。

はじめに

今日は ReactiveX の Operators の Transform について RxJS の API ドキュメントやソースコードを見ていきます。

また RxJS 4.0.7 を対象にしています。

Observable の変換

Observable.prototype.map

Array.prototype.map でもおなじみのものです。動きについては説明は不要でしょう。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3])
  .map(value => value * 2)
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log(`onCompleted`)
  );
// onNext: 2
// onNext: 4
// onNext: 6
// onCompleted

せっかくなのでソースコードを眺めてみましょう。

  observableProto.select = observableProto.map = function (selector, thisArg) {
    var selectorFn = isFunction(selector) ? bindCallback(selector, thisArg, 3) : function () { return selector; },
        source = this;
    return new AnonymousObservable(function (o) {
      var i = 0;
      return source.subscribe(function (x) {
        try {
          var result = selector(x, i++, source);
        } catch (e) {
          return observer.onError(e);
        }

        o.onNext(result);
      }, function (e) { o.onError(e); }, function () { o.onCompleted(); });
    }, source);
  };

AnonymousObservable という Observable を継承したクラスの、インスタンスを返しています。

ところで selectorFn が使われていないですね。バグっぽいです。→ このソースコードはドキュメントからリンクが貼られている /src/core/linq/observable/select.js ですが、ビルドスクリプトを確認すれば分かりますが、このソースコードはもう使われていません (おまけにバグがあります) 。現在は /src/core/perf/operators/map.js が使われているようです。

Pull Request 送ろうとしたところで気づいたのですが、このまま記録しておきます。

いきなり脱線しましたが、続けます。

Observable.prototype.scan

Observable.prototype.scanArray.prototype.reduce のように fold っぽい動きをします。ただ Observable.prototype.reduce とは違って途中の値も onNext で流します。

ちょうどいい記事があったので、こちらをどうぞ Rxにおけるscanとreduceの違い - Qiita

個人的な使用頻度としては reduce より scan のほうが高いです。

Observable.prototype.flatMap

Observable.prototype.flatMapmap したものを flat にします。flat という名前からは入れ子になった Array を flat にするイメージを浮かべるでしょう。例えば [[1, 2, 3], [4, 5, 6]][1, 2, 3, 4, 5, 6] になるイメージです。これを Rx という前提で考えると Observable.from([Observable.from([1, 2, 3]), Observable.from([4, 5, 6])])Observable.from([1, 2, 3, 4, 5, 6]) になるでしょう。イメージはつかめたはずです。……あとは例を見てみましょう。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3])
  .flatMap(value => Observable.just(value * 2))
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log(`onCompleted`)
  );
// onNext: 2
// onNext: 4
// onNext: 6
// onCompleted

flatMap で返した Observable から流れてくる値が流れてきているのが分かります。この例なら flatMap である必要はまったくないのですが、例えば、これが非同期処理ならどうでしょう。複数の HTTP リクエストの戻り値をひとつの Observable にまとめることができるわけです。

また、ソースコードを見れば分かるのですが、flatMap は簡単に書くと次のようなものです。

class Observable {
  // ...
  flatMap(selector) {
    return this.source.map(selector).mergeAll();
  }
}

mapmergeAll とを組み合わせたものです。 Observable.prototype.mergeAll はまた別の機会に紹介しますが、複数の Observable を merge ( flat に ) する、そんな動きです。

個人的には、この flatMapmapscan と並んでよく使う Operator です。

Observable.prototype.groupBy

Observable.prototype.groupBy は流れてくる値を keySelector でグループ分けした Observable に変換します。

import { Observable } from 'rx';

Observable
  .from([1, 2, 3, 4, 5, 6])
  .groupBy(value => value % 2 == 0)
  .flatMap(observable => observable.toArray()) 
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log(`onCompleted`)
  );
// onNext: 1,3,5
// onNext: 2,4,6
// onCompleted

keySelector で偶数 / 奇数 (true / false) に分けています。flatMap (さっそく使っています) で ObservableObservable (Observable<Observable<number>> のイメージ) を、 ArrayObservable (Observable<Array<number>> のイメージ) に変換してまとめています。

Observable.prototype.window

Observable.prototype.windowObservable.prototype.groupBy と同様にグループ化して ObservableObservable を返すのですが、groupBy とは異なり流れてくる値以外でグループ化します。

import { Observable } from 'rx';

Observable
  // 100 ms ごとに値を流す (延々と続く)
  .interval(100)
  // return した Observable が値を流すタイミング (300ms) で window を区切る
  .window(() => Observable.timer(300))
  // 延々と続くと困るので最初の 3 回を取る
  .take(3)
  .flatMap(observable => observable.toArray()) 
  .subscribe(
    value => console.log(`onNext: ${value}`),
    error => console.log(`onError: ${error}`),
    () => console.log(`onCompleted`)
  );
// onNext: 0,1,2
// onNext: 3,4,5,6
// onNext: 7,8,9,10
// onCompleted

引数によって動きがかなり違うのでドキュメントを参照してください。

Observable.prototype.buffer

Observable.prototype.bufferObservable.prototype.window と似たような引数を取ってグループ化しますが、ObservableObservable ではなく、 ArrayObservable を返します。

サンプルを見るよりもソースコードを読んだ方が分かりやすいと思います。

  observableProto.buffer = function () {
    return this.window.apply(this, arguments)
      .flatMap(toArray);
  };

window して flatMap(toArray) しています。つまり、さきほどの window の例のように、区切ってまとめた値をほしい場合は buffer を使えばいいわけです。さきほどの例だと延々続いてしまうのでアレですが……。

おわりに

今日は Observable を変換する Operator を見てみました。map / scan / flatMap は特によく使うものなので最初に覚えると良さそうです。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
16
Help us understand the problem. What are the problem?