ReactiveExtensions
RxJS
Rx
RxJSDay 6

RxJS の Operators (4) - Observable の結合 (1)

More than 3 years have passed since last update.

この記事は bouzuya's RxJS Advent Calendar 2015 の 6 日目かつ RxJS Advent Calendar 2015 の 6 日目です。


はじめに

今日は ReactiveX の Operators の Combining について RxJS の API ドキュメントやソースコードを見ていきます。

また RxJS 4.0.7 を対象にしています。


Observable の結合 (1)


Observable.merge

可変長引数で observable を取ります。各 Observable に値が流れてくるたびに、それをそのまま流します。

import { Observable } from 'rx';

Observable
.merge(
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted

ちなみに名前の似ている Observable.prototype.mergeAll は動きが違います。

ただ、Observable.merge のソースコードを見ると、実装は Observable.from([...]).mergeAll() だと分かります (実際には Observable.from ではなく Observable.ofWithScheduler(Scheduler.immediate, ...) なのですが、「Observable の生成」で of に触れておらず、Scheduler の説明もまだなので仮に Observable.from で説明します) 。

つまり、次のコードはほとんど同じです (厳密には異なります)。

Observable

.merge(
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
)

Observable

.from([
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
])
.mergeAll()


Observable.prototype.merge

引数が Observable である (実装は number でない) ときは Observable.merge(...) と同じで、引数が number のときは並行性の最大を制限します。

引数によって挙動が大幅に違うあたりが RxJS らしいと感じます。merge の引数の違いについては別で Qiita 記事を書きました。『RxJS 4 の merge の引数の差異 (concurrent 指定) と RxJS 5 での実装 - Qiita

以下は等価のコードを並べたものです。

import { Observable, Scheduler } from 'rx';

const source1 = Observable
.ofWithScheduler(
Scheduler.immediate,
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
)
.mergeAll();
const source2 = Observable
.merge(
Observable.from([1, 2, 3]),
Observable.from(['A', 'B'])
);
const source3 = Observable
.from([1, 2, 3])
.merge(Observable.from(['A', 'B']));

const subscribe = (source) =>
source.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);

subscribe(source1);
subscribe(source2);
subscribe(source3);
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted
// onNext: 1
// onNext: A
// onNext: 2
// onNext: B
// onNext: 3
// onCompleted


Observable.prototype.startWith

与えた引数を流してから、その後に現在の observable の値を流すような動きです。Observable.prototype.startWith は prepend で、 Observable.prototype.concat は append のイメージですね。実装は Observable.from([..., this]).concat(...) を期待したのですが、Enumerable.of(...).concat() のようです。

import { Observable } from 'rx';

Observable
.from([1, 2, 3])
.startWith(0)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0
// onNext: 1
// onNext: 2
// onNext: 3
// onCompleted

後述の combineLatest のために必ず値を流すようにしたい場合などにも使えます。


Observable.combineLatest

可変長引数で observable を取り、最後の引数で resultSelector を取ります。各 Observable に値が流れてくるたびに、各 Observable の直近の値が入った ArrayresultSelector に渡してくれて、その戻り値が次へと流れる Observable を返します。

この日本語の説明ではきっと「何を言っているのか わからねーと思うが」状態だと思うのですが、ぼくもこれで誤読ないように説明できている自信がまったくありません。これに限らず Rx のメソッドの中でも複雑なものは言葉で説明してもよく分からないものが多いのでソースコードを読むか試すかしたほうがいいです。

ちなみに combineLatest に限れば ReactiveX の combineLatest の marble diagram を見ると、とても分かりやすいとぼくは思います。

import { Observable } from 'rx';

// -0-1-2-3->
const observable1 = Observable
.interval(100)
.take(4)
.map((_, index) => index);
// --A---B-->
const observable2 = Observable
.interval(150)
.take(2)
.map((_, index) => 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'[index]);

// -0-1-2-3->
// --A---B-->
// --xx-xxx->
Observable
.combineLatest(
observable1,
observable2,
(value1, value2) => value1 + value2
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0A
// onNext: 1A
// onNext: 2A
// onNext: 2B
// onNext: 3B
// onCompleted

注意しておきたいのが各 Observable の最初の値が流れてこないと、次へ値を流さないという点。直近の値が出揃ってはじめて次に流れるということです。何も流さない Observable.never を使ってみるとこんな風になります。

import { Observable } from 'rx';

const observable1 = Observable.never();
const observable2 = Observable
.interval(150)
.take(2)
.map((_, index) => 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'[index]);

Observable
.combineLatest(
observable1,
observable2,
(value1, value2) => value1 + value2
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// (no output)


Observable.zip

Observable.zip は zip です (適当) 。複数の Observable で流れた要素をペアにして流します。combineLatest は各 Observable に値が流れるたびに値を流したのに対して、zip はすべての Observable が値を流すまで値を流しません。combineLatest は最初の値で各 Observable の値が出揃うのを待つように、zip は各値で各 Observable の値が出揃うのを待つようです。

また実装としては、最初の引数 (Observable) の Observable.prototype.zip に残りの引数を渡しているだけです。

import { Observable } from 'rx';

Observable
.zip(
Observable.from([0, 1, 2]),
Observable.from([3, 4, 5]),
Observable.from([6, 7, 8])
)
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0,3,6
// onNext: 1,4,7
// onNext: 2,5,8
// onCompleted


Observable.prototype.zip

前述のとおり、Observable.zip とほとんど同じ挙動なので割愛します。


Observable.prototype.switch

Observable.prototype.switchmergeAll (flatMap) のように ObservableObservalbe を flat にします。mergeAll のようにすべての Observable の値を merge するのとは違い、次の Observable へ次々と switch していきます。例を示します。

import { Observable } from 'rx';

Observable
.from([
Observable.from([0, 1, 2]),
Observable.from([3, 4, 5]),
Observable.from([6, 7, 8])
])
.switch()
.subscribe(
value => console.log(`onNext: ${value}`),
error => console.log(`onError: ${error}`),
() => console.log('onCompleted')
);
// onNext: 0
// onNext: 3
// onNext: 6
// onNext: 7
// onNext: 8
// onCompleted

もちろん 2 個目の Observable のタイミングを遅らせると [0, 1, 2] がすべて流れてきます。

ちなみに switch の公式の例は相当にわかりづらいので腹が立ちます。


おわりに

今日は Observable を結合する Operator を見てみました。まだリスト的には終わっていないのですが、時間の都合でここで区切ります。