この記事は bouzuya's RxJS Advent Calendar 2015 の 6 日目かつ RxJS Advent Calendar 2015 の 6 日目です。
はじめに
今日は ReactiveX の Operators の Combining について RxJS の API ドキュメントやソースコードを見ていきます。
また RxJS 4.0.7 を対象にしています。
Observable の結合 (1)
Observable.merge
可変長引数で observable
を取ります。各 Observable
に値が流れてくるたびに、それをそのまま流します。
import { Observable } from 'rx';
Observable
.merge(
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted
ちなみに名前の似ている Observable.prototype.mergeAll
は動きが違います。
ただ、Observable.merge
のソースコードを見ると、実装は Observable.from([...]).mergeAll()
だと分かります (実際には Observable.from
ではなく Observable.ofWithScheduler(Scheduler.immediate, ...)
なのですが、「Observable の生成」で of
に触れておらず、Scheduler の説明もまだなので仮に Observable.from
で説明します) 。
つまり、次のコードはほとんど同じです (厳密には異なります)。
Observable
.merge(
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
)
Observable
.from([
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
])
.mergeAll()
Observable.prototype.merge
- ReactiveX - Merge operator
Observable.prototype.merge
API DocumentObservable.prototype.merge
Source Code
引数が Observable
である (実装は number
でない) ときは Observable.merge(...)
と同じで、引数が number
のときは並行性の最大を制限します。
引数によって挙動が大幅に違うあたりが RxJS らしいと感じます。merge
の引数の違いについては別で Qiita 記事を書きました。『RxJS 4 の merge の引数の差異 (concurrent 指定) と RxJS 5 での実装 - Qiita』
以下は等価のコードを並べたものです。
import { Observable, Scheduler } from 'rx';
const source1 = Observable
.ofWithScheduler(
Scheduler.immediate,
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
)
.mergeAll();
const source2 = Observable
.merge(
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
);
const source3 = Observable
.from([1, 2, 3])
.merge(Observable.from(['A', 'B']));
const subscribe = (source) =>
source.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
subscribe(source1);
subscribe(source2);
subscribe(source3);
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted
Observable.prototype.startWith
- ReactiveX - StartWith operator
Observable.prototype.startWith
API DocumentObservable.prototype.startWith
Source Code
与えた引数を流してから、その後に現在の observable の値を流すような動きです。Observable.prototype.startWith
は prepend で、 Observable.prototype.concat
は append のイメージですね。実装は Observable.from([..., this]).concat(...)
を期待したのですが、Enumerable.of(...).concat()
のようです。
import { Observable } from 'rx';
Observable
.from([1, 2, 3])
.startWith(0)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0
// onNext: 1
// onNext: 2
// onNext: 3
// onCompleted
後述の combineLatest
のために必ず値を流すようにしたい場合などにも使えます。
Observable.combineLatest
- ReactiveX - CombineLatest operator
Observable.combineLatest
API DocumentObservable.combineLatest
Source Code
可変長引数で observable
を取り、最後の引数で resultSelector
を取ります。各 Observable
に値が流れてくるたびに、各 Observable
の直近の値が入った Array
を resultSelector
に渡してくれて、その戻り値が次へと流れる Observable
を返します。
この日本語の説明ではきっと「何を言っているのか わからねーと思うが」状態だと思うのですが、ぼくもこれで誤読ないように説明できている自信がまったくありません。これに限らず Rx のメソッドの中でも複雑なものは言葉で説明してもよく分からないものが多いのでソースコードを読むか試すかしたほうがいいです。
ちなみに combineLatest
に限れば ReactiveX の combineLatest の marble diagram を見ると、とても分かりやすいとぼくは思います。
import { Observable } from 'rx';
// -0-1-2-3->
const observable1 = Observable
.interval(100)
.take(4)
.map((_, index) => index);
// --A---B-->
const observable2 = Observable
.interval(150)
.take(2)
.map((_, index) => 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'[index]);
// -0-1-2-3->
// --A---B-->
// --xx-xxx->
Observable
.combineLatest(
observable1,
observable2,
(value1, value2) => value1 + value2
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0A
// onNext: 1A
// onNext: 2A
// onNext: 2B
// onNext: 3B
// onCompleted
注意しておきたいのが各 Observable
の最初の値が流れてこないと、次へ値を流さないという点。直近の値が出揃ってはじめて次に流れるということです。何も流さない Observable.never
を使ってみるとこんな風になります。
import { Observable } from 'rx';
const observable1 = Observable.never();
const observable2 = Observable
.interval(150)
.take(2)
.map((_, index) => 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'[index]);
Observable
.combineLatest(
observable1,
observable2,
(value1, value2) => value1 + value2
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// (no output)
Observable.zip
Observable.zip
は zip です (適当) 。複数の Observable
で流れた要素をペアにして流します。combineLatest
は各 Observable
に値が流れるたびに値を流したのに対して、zip
はすべての Observable
が値を流すまで値を流しません。combineLatest
は最初の値で各 Observable
の値が出揃うのを待つように、zip
は各値で各 Observable
の値が出揃うのを待つようです。
また実装としては、最初の引数 (Observable
) の Observable.prototype.zip
に残りの引数を渡しているだけです。
import { Observable } from 'rx';
Observable
.zip(
Observable.from([0, 1, 2]),
Observable.from([3, 4, 5]),
Observable.from([6, 7, 8])
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0,3,6
// onNext: 1,4,7
// onNext: 2,5,8
// onCompleted
Observable.prototype.zip
前述のとおり、Observable.zip
とほとんど同じ挙動なので割愛します。
Observable.prototype.switch
- ReactiveX - Switch operator
Observable.prototype.switch
API DocumentObservable.prototype.switch
Source Code
Observable.prototype.switch
は mergeAll
(flatMap
) のように Observable
の Observalbe
を flat にします。mergeAll
のようにすべての Observable
の値を merge するのとは違い、次の Observable
へ次々と switch していきます。例を示します。
import { Observable } from 'rx';
Observable
.from([
Observable.from([0, 1, 2]),
Observable.from([3, 4, 5]),
Observable.from([6, 7, 8])
])
.switch()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0
// onNext: 3
// onNext: 6
// onNext: 7
// onNext: 8
// onCompleted
もちろん 2 個目の Observable のタイミングを遅らせると [0, 1, 2]
がすべて流れてきます。
ちなみに switch
の公式の例は相当にわかりづらいので腹が立ちます。
おわりに
今日は Observable
を結合する Operator を見てみました。まだリスト的には終わっていないのですが、時間の都合でここで区切ります。