この記事は bouzuya's RxJS Advent Calendar 2015 の 4 日目かつ RxJS Advent Calendar 2015 の 4 日目です。
はじめに
今日は ReactiveX の Operators の Transform について RxJS の API ドキュメントやソースコードを見ていきます。
また RxJS 4.0.7 を対象にしています。
Observable の変換
Observable.prototype.map
Array.prototype.map でもおなじみのものです。動きについては説明は不要でしょう。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.map(value => value * 2)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 2
// onNext: 4
// onNext: 6
// onCompleted
せっかくなのでソースコードを眺めてみましょう。
observableProto.select = observableProto.map = function (selector, thisArg) {
var selectorFn = isFunction(selector) ? bindCallback(selector, thisArg, 3) : function () { return selector; },
source = this;
return new AnonymousObservable(function (o) {
var i = 0;
return source.subscribe(function (x) {
try {
var result = selector(x, i++, source);
} catch (e) {
return observer.onError(e);
}
o.onNext(result);
}, function (e) { o.onError(e); }, function () { o.onCompleted(); });
}, source);
};
AnonymousObservable という Observable を継承したクラスの、インスタンスを返しています。
ところで selectorFn が使われていないですね。バグっぽいです。→ このソースコードはドキュメントからリンクが貼られている /src/core/linq/observable/select.js ですが、ビルドスクリプトを確認すれば分かりますが、このソースコードはもう使われていません (おまけにバグがあります) 。現在は /src/core/perf/operators/map.js が使われているようです。
Pull Request 送ろうとしたところで気づいたのですが、このまま記録しておきます。
いきなり脱線しましたが、続けます。
Observable.prototype.scan
- ReactiveX - Scan operator
Observable.prototype.scanAPI DocumentObservable.prototype.scanSource Code
Observable.prototype.scan は Array.prototype.reduce のように fold っぽい動きをします。ただ Observable.prototype.reduce とは違って途中の値も onNext で流します。
ちょうどいい記事があったので、こちらをどうぞ Rxにおけるscanとreduceの違い - Qiita 。
個人的な使用頻度としては reduce より scan のほうが高いです。
Observable.prototype.flatMap
- ReactiveX - FlatMap operator
Observable.prototype.flatMapAPI DocumentObservable.prototype.flatMapSource Code
Observable.prototype.flatMap は map したものを flat にします。flat という名前からは入れ子になった Array を flat にするイメージを浮かべるでしょう。例えば [[1, 2, 3], [4, 5, 6]] が [1, 2, 3, 4, 5, 6] になるイメージです。これを Rx という前提で考えると Observable.from([Observable.from([1, 2, 3]), Observable.from([4, 5, 6])]) は Observable.from([1, 2, 3, 4, 5, 6]) になるでしょう。イメージはつかめたはずです。……あとは例を見てみましょう。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.flatMap(value => Observable.just(value * 2))
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 2
// onNext: 4
// onNext: 6
// onCompleted
flatMap で返した Observable から流れてくる値が流れてきているのが分かります。この例なら flatMap である必要はまったくないのですが、例えば、これが非同期処理ならどうでしょう。複数の HTTP リクエストの戻り値をひとつの Observable にまとめることができるわけです。
また、ソースコードを見れば分かるのですが、flatMap は簡単に書くと次のようなものです。
class Observable {
// ...
flatMap(selector) {
return this.source.map(selector).mergeAll();
}
}
map と mergeAll とを組み合わせたものです。 Observable.prototype.mergeAll はまた別の機会に紹介しますが、複数の Observable を merge ( flat に ) する、そんな動きです。
個人的には、この flatMap は map や scan と並んでよく使う Operator です。
Observable.prototype.groupBy
- ReactiveX - GroupBy operator
Observable.prototype.groupByAPI DocumentObservable.prototype.groupBySource Code
Observable.prototype.groupBy は流れてくる値を keySelector でグループ分けした Observable に変換します。
import { Observable } from 'rx';
Observable
.from([1, 2, 3, 4, 5, 6])
.groupBy(value => value % 2 == 0)
.flatMap(observable => observable.toArray())
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 1,3,5
// onNext: 2,4,6
// onCompleted
keySelector で偶数 / 奇数 (true / false) に分けています。flatMap (さっそく使っています) で Observable の Observable (Observable<Observable<number>> のイメージ) を、 Array の Observable (Observable<Array<number>> のイメージ) に変換してまとめています。
Observable.prototype.window
- ReactiveX - Window operator
Observable.prototype.windowAPI DocumentObservable.prototype.windowSource Code
Observable.prototype.window は Observable.prototype.groupBy と同様にグループ化して Observable の Observable を返すのですが、groupBy とは異なり流れてくる値以外でグループ化します。
import { Observable } from 'rx';
Observable
// 100 ms ごとに値を流す (延々と続く)
.interval(100)
// return した Observable が値を流すタイミング (300ms) で window を区切る
.window(() => Observable.timer(300))
// 延々と続くと困るので最初の 3 回を取る
.take(3)
.flatMap(observable => observable.toArray())
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 0,1,2
// onNext: 3,4,5,6
// onNext: 7,8,9,10
// onCompleted
引数によって動きがかなり違うのでドキュメントを参照してください。
Observable.prototype.buffer
- ReactiveX - Buffer operator
Observable.prototype.bufferAPI DocumentObservable.prototype.bufferSource Code
Observable.prototype.buffer は Observable.prototype.window と似たような引数を取ってグループ化しますが、Observable の Observable ではなく、 Array の Observable を返します。
サンプルを見るよりもソースコードを読んだ方が分かりやすいと思います。
observableProto.buffer = function () {
return this.window.apply(this, arguments)
.flatMap(toArray);
};
window して flatMap(toArray) しています。つまり、さきほどの window の例のように、区切ってまとめた値をほしい場合は buffer を使えばいいわけです。さきほどの例だと延々続いてしまうのでアレですが……。
おわりに
今日は Observable を変換する Operator を見てみました。map / scan / flatMap は特によく使うものなので最初に覚えると良さそうです。