JavaScript
TypeScript
RxJS

RxJS入門#2図でわかるOperators


はじめに

rxjs6.3、typescript3.2で動作確認。

RxJS公式サイトからOperatorsのMarble diagramsを引っ張ってきて説明しています。

個人的に重要だと思うOperatorsのみ説明しています。

MultiCast Operators系はまた別でまとめたいと思います。


図の見かた

map.png

まず、図の見かたですが、map(オペレータ名)を挟んで上の矢印と値がオペレータ適用前のInput Observable、下がオペレータ適用後のOutput Observableと呼ばれるものです。

また、矢印は時間軸の流れを表しています。

オペレータはInput Observableを受け取ってOutput Observableを返す、その作用はどのタイミングで値を受け取るかによっても違ってくることが多いので、このような図となっています。

ちなみに、公式ではこの図はMarble diagramsと呼ばれています。


作成用オペレータ(Creation Operators)


from:イテレータからObservableを作成する

from.png

配列などイテラブルな値からObservableを作成します。

from([10, 20, 30]).subscribe(x => console.log(x));


of:可変長引数からObservableを作成する

of.png

可変長引数からObservableを作成します。

of(1, 2, 3).subscribe(x => console.log(x));


fromEvent:イベントからObservableを作成する

fromEvent.png

第一引数にDOM、第二引数にイベント名を渡し、イベント発火時にObservableが作成される。

fromEvent(document, 'click').subscribe(e => console.log(e));


interval:指定ミリ秒ごとに初期値0で1ずつ加算する値のObservableを作成する

interval.png

interval(1000).subscribe(x => console.log(x));


変換用オペレータ(Transformation Operators)


map:値を変換する

map.png

jsでもArrayに対して使われることの多いメソッドなので特に説明不要でわかる方も多いと思います。

引数に関数を取り、その関数で引数の値を変換して次に渡しています。

of(1, 2, 3)

.pipe(map(x => 10 * x))
.subscribe(x => console.log(x));

// 10
// 20
// 30


switchMap:ソースのObservableを引数のObservableに流す。被ったらキャンセルする

switchMap.png

第1のObservable(ここではof(1, 3, 5))をソースとして、第2のObservableに値を流して変換することができます。

第1のObservableの値は第2のObservableの値全てに流れます。

図の方のように変換の途中で次の値が流れてきた場合は、その途中の変換はキャンセルされ、次の値が処理されます(図だと3 * 10の3回目の処理は中断されています)。

of(1, 3, 5)

.pipe(switchMap(x => of(10 * x, 10 * x, 10 * x)))
.subscribe(x => console.log(x));

// 10
// 10
// 10
// 30
// 30
// (30) ← 本来はログに流れるが、次の値がやってきたら中断される
// 50
// 50
// 50

コードでは中断を表現できていませんが、それ以外はこんな感じです。


mergeMap:ソースのObservableを非同期的に引数のObservableに流す

mergeMap.png

第1のObservableをソースとして、第2のObservableに値を流して変換することができます。

第1のObservableの値は第2のObservableの値全てに流れます。

ここまでは上のswitchMapと一緒ですね。

switchMap、mergeMap、次に説明するconcatMapは第1のObservableを第2のObservableに流すというところまでは同じで、変換中に次の値が流れてきたときの処理だけが異なります。

mergeMapは、図の方のように変換の途中で次の値が流れてきた場合は、その途中の変換はキャンセルされず続けて行われ、非同期的に次の値も処理されます(図だと3 * 10の3回目の処理は中断されずに5 * 10 の1回目の処理の後に行われています)。

of(1, 3, 5)

.pipe(mergeMap(x => of(10 * x, 10 * x, 10 * x)))
.subscribe(x => console.log(x));

// 10
// 10
// 10
// 30
// 30
// (50) ← 本来は30、50の順に流れるが、
// (30) ← 仮に変換途中に次の値(5)が流れてきた場合は非同期に処理されるのでこうなる場合がある
// 50
// 50

上記のswitchMapと同じように、このObservableで表しても本来のmergeMapの非同期性を表せないのですが、わかりやすさのため図とあわせてログだけmergeMapの非同期性を表したものにしています。


concatMap:ソースのObservableを同期的に引数のObservableに流す

concatMap.png

mergeMapで見たように、第1のObservableをソースとして、第2のObservableに値を流して変換することができるところまではswitchMap、mergeMapと一緒です。

concatMapは、図の方のように変換の途中で次の値が流れてきた場合は、その途中の変換はキャンセルされず続けて行われ、同期的に次の値が処理されます(図だと3 * 10の処理の途中に、次の値である5を受け取れる状態になっていますが、3 * 10の処理がすべて終わった後に5 * 10が行われています)。

of(1, 3, 5)

.pipe(concatMap(x => of(10 * x, 10 * x, 10 * x)))
.subscribe(x => console.log(x));

// 10
// 10
// 10
// 30
// 30
// 30
// 50
// 50
// 50

同期的に処理されるので、タイミングがどうあれログに流れる順番は必ずこうなる。

switchMap、mergeMap、concatMapをまとめると次の表のようになります。

オペレータ名
次の値が来たとき

switchMap
現在の変換処理はキャンセルされ、次の値の変換処理が開始される

mergeMap
現在の変換処理は続けて行われ、次の値の変換処理が非同期的に開始される

concatMap
現在の変換処理は続けて行われ、次の値の変換処理が同期的に開始される


フィルター用オペレータ(Filtering Operators)


filter:trueを返すObservableのみ取得する

filter.png

Arrayライクなfilter同様、trueを返す値のみ取得するようフィルターします。

interval(1000)

.pipe(filter(x => x % 2 === 1))
.subscribe(x => console.log(x));

// 1
// 3
// 5
// ......


take:指定数のみObservableを取得する

take.png

interval(1000)

.pipe(take(5))
.subscribe(x => console.log(x));

// 0
// 1
// 2
// 3
// 4


takeUntil:引数のObservableが流れるまで取得する

takeUntil.png

第1のObservableをソースに、引数の第2のObservableが流れたらsubscribe関数はcompleteを発行します。

第2のObservableの値はなんでもいいです。もし、第2のObservableが流れなければ第1のObservableはすべて流れるので、takeUntilはないのと一緒です。

interval(1000)

.pipe(takeUntil(fromEvent(document, 'click')))
.subscribe(x => console.log(x));

// 0
// 1
// 2
// -- clickするまで流れる --


distinctUntilChanged:連続した重複を排除する

distinctUntilChanged.png

連続した重複だけ排除したい場合に使用するのがdistinctUntilChangedです。

値がオブジェクトで一部のプロパティの重複だけ感知したい場合は引数の無名関数で指定できます。

なお、連続した重複だけでなく、すべての重複を排除して一意なものだけ取り出す場合はdistinctオペレータを使用します。

of(1, 1, 2, 2, 2, 1, 1, 2, 3, 3, 4).pipe(

distinctUntilChanged(),
)
.subscribe(x => console.log(x));

// 1
// 2
// 1
// 2
// 3
// 4


throttleTime:指定ミリ秒間のObservableを間引く。

throttleTime.png

引数に指定したミリ秒以内のObservableはひとつしか流れないようにフィルターされます。

連続したクリックを除外したいときなどに使用します。

fromEvent(document, 'click')

.pipe(throttleTime(1000))
.subscribe(x => console.log(x));


debounceTime:指定ミリ秒間のObservableを間引き、指定ミリ秒後にObservableを返す

debounceTime.png

引数に指定したミリ秒以内のObservableはひとつしか流れないようにフィルターされます。

throttleTimeと違い、指定ミリ秒後に遅れてObservableが流れます。

fromEvent(document, 'click')

.pipe(debounceTime(1000))
.subscribe(x => console.log(x));


接続用オペレータ(Join Operators)


startWith:Observableの最初の値をあとづけする

startWith.png

Observableの初期値をつけたいときに使用する。

of('a', 'b', 'c')

.pipe(startWith('s'))
.subscribe(x => console.log(x));

// s
// a
// b
// c


withLatestFrom:引数のObservableの最新の値を伴わせる

withLatestFrom.png

下の例ではclick時にだけログが流れ、イベントオブジェクトとともにその時点のintervalが作成したObservableの値も流れる。

fromEvent(document, 'click')

.pipe(withLatestFrom(interval(1000)))
.subscribe(x => console.log(x));

// [MouseEvent, 1] // 1秒後にクリック
// [MouseEvent, 3] // 3秒後にクリック
// [MouseEvent, 7] // 7秒後にクリック