この記事は bouzuya's RxJS Advent Calendar 2015 の 4 日目かつ RxJS Advent Calendar 2015 の 4 日目です。
はじめに
今日は ReactiveX の Operators の Transform について RxJS の API ドキュメントやソースコードを見ていきます。
また RxJS 4.0.7 を対象にしています。
Observable の変換
Observable.prototype.map
Array.prototype.map
でもおなじみのものです。動きについては説明は不要でしょう。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.map(value => value * 2)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 2
// onNext: 4
// onNext: 6
// onCompleted
せっかくなのでソースコードを眺めてみましょう。
observableProto.select = observableProto.map = function (selector, thisArg) {
var selectorFn = isFunction(selector) ? bindCallback(selector, thisArg, 3) : function () { return selector; },
source = this;
return new AnonymousObservable(function (o) {
var i = 0;
return source.subscribe(function (x) {
try {
var result = selector(x, i++, source);
} catch (e) {
return observer.onError(e);
}
o.onNext(result);
}, function (e) { o.onError(e); }, function () { o.onCompleted(); });
}, source);
};
AnonymousObservable
という Observable
を継承したクラスの、インスタンスを返しています。
ところで selectorFn
が使われていないですね。バグっぽいです。→ このソースコードはドキュメントからリンクが貼られている /src/core/linq/observable/select.js
ですが、ビルドスクリプトを確認すれば分かりますが、このソースコードはもう使われていません (おまけにバグがあります) 。現在は /src/core/perf/operators/map.js
が使われているようです。
Pull Request 送ろうとしたところで気づいたのですが、このまま記録しておきます。
いきなり脱線しましたが、続けます。
Observable.prototype.scan
- ReactiveX - Scan operator
Observable.prototype.scan
API DocumentObservable.prototype.scan
Source Code
Observable.prototype.scan
は Array.prototype.reduce
のように fold っぽい動きをします。ただ Observable.prototype.reduce
とは違って途中の値も onNext
で流します。
ちょうどいい記事があったので、こちらをどうぞ Rxにおけるscanとreduceの違い - Qiita 。
個人的な使用頻度としては reduce
より scan
のほうが高いです。
Observable.prototype.flatMap
- ReactiveX - FlatMap operator
Observable.prototype.flatMap
API DocumentObservable.prototype.flatMap
Source Code
Observable.prototype.flatMap
は map
したものを flat にします。flat という名前からは入れ子になった Array を flat にするイメージを浮かべるでしょう。例えば [[1, 2, 3], [4, 5, 6]]
が [1, 2, 3, 4, 5, 6]
になるイメージです。これを Rx という前提で考えると Observable.from([Observable.from([1, 2, 3]), Observable.from([4, 5, 6])])
は Observable.from([1, 2, 3, 4, 5, 6])
になるでしょう。イメージはつかめたはずです。……あとは例を見てみましょう。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.flatMap(value => Observable.just(value * 2))
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 2
// onNext: 4
// onNext: 6
// onCompleted
flatMap
で返した Observable
から流れてくる値が流れてきているのが分かります。この例なら flatMap
である必要はまったくないのですが、例えば、これが非同期処理ならどうでしょう。複数の HTTP リクエストの戻り値をひとつの Observable
にまとめることができるわけです。
また、ソースコードを見れば分かるのですが、flatMap
は簡単に書くと次のようなものです。
class Observable {
// ...
flatMap(selector) {
return this.source.map(selector).mergeAll();
}
}
map
と mergeAll
とを組み合わせたものです。 Observable.prototype.mergeAll
はまた別の機会に紹介しますが、複数の Observable
を merge ( flat に ) する、そんな動きです。
個人的には、この flatMap
は map
や scan
と並んでよく使う Operator です。
Observable.prototype.groupBy
- ReactiveX - GroupBy operator
Observable.prototype.groupBy
API DocumentObservable.prototype.groupBy
Source Code
Observable.prototype.groupBy
は流れてくる値を keySelector
でグループ分けした Observable に変換します。
import { Observable } from 'rx';
Observable
.from([1, 2, 3, 4, 5, 6])
.groupBy(value => value % 2 == 0)
.flatMap(observable => observable.toArray())
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 1,3,5
// onNext: 2,4,6
// onCompleted
keySelector
で偶数 / 奇数 (true
/ false
) に分けています。flatMap
(さっそく使っています) で Observable
の Observable
(Observable<Observable<number>>
のイメージ) を、 Array
の Observable
(Observable<Array<number>>
のイメージ) に変換してまとめています。
Observable.prototype.window
- ReactiveX - Window operator
Observable.prototype.window
API DocumentObservable.prototype.window
Source Code
Observable.prototype.window
は Observable.prototype.groupBy
と同様にグループ化して Observable
の Observable
を返すのですが、groupBy
とは異なり流れてくる値以外でグループ化します。
import { Observable } from 'rx';
Observable
// 100 ms ごとに値を流す (延々と続く)
.interval(100)
// return した Observable が値を流すタイミング (300ms) で window を区切る
.window(() => Observable.timer(300))
// 延々と続くと困るので最初の 3 回を取る
.take(3)
.flatMap(observable => observable.toArray())
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log(`onCompleted`)
);
// onNext: 0,1,2
// onNext: 3,4,5,6
// onNext: 7,8,9,10
// onCompleted
引数によって動きがかなり違うのでドキュメントを参照してください。
Observable.prototype.buffer
- ReactiveX - Buffer operator
Observable.prototype.buffer
API DocumentObservable.prototype.buffer
Source Code
Observable.prototype.buffer
は Observable.prototype.window
と似たような引数を取ってグループ化しますが、Observable
の Observable
ではなく、 Array
の Observable
を返します。
サンプルを見るよりもソースコードを読んだ方が分かりやすいと思います。
observableProto.buffer = function () {
return this.window.apply(this, arguments)
.flatMap(toArray);
};
window
して flatMap(toArray)
しています。つまり、さきほどの window
の例のように、区切ってまとめた値をほしい場合は buffer
を使えばいいわけです。さきほどの例だと延々続いてしまうのでアレですが……。
おわりに
今日は Observable
を変換する Operator を見てみました。map
/ scan
/ flatMap
は特によく使うものなので最初に覚えると良さそうです。