Angularで登場してきたのでざっくり学習。厳密な理論ではなくて、使い所勘所まとめです。
v6ベースです。
TL;DR
ゆるふわにまとめると、
- RxJSは、流れてくるデータのイテレイティブな処理の記述をシンプルにすることが出来るもの
- 使い所は、EventEmitterやPromiseに似てる
- 勘所的には、非同期的な配列をイメージすると良さそう
基本
-
Observer
- データの処理の仕方を記述したもの、データの処理方法
- next, error, completeの3つが定義できる。それぞれPromiseのresolve, reject, (finaly callback)に相当する。
- ようは値を受け取って、処理をして、返すもの
- nextだけ定義するパターンが多い
- Observableのsubscribe()の引数に匿名関数わたして定義しちゃうのが多いっぽい
-
Observable
- データの流れを抽象化したもの
- ファクトリーメソッドで、
of(1,2,3)
,from([1,2,3])
といったように作れる- (この場合、1,2,3のプリミティブが順に流れていく)
-
pipe
メソッドでOperatorという便利関数をチェーンできる、 -
subscribe()
メソッドにobserverを登録すると、流れたデータを次々にobserverで処理する - だいたい外部ライブラリ呼び出した結果に、Observableが返ってくる
-
Operator
- Observableに繋ぐ便利関数
- データの流れ方を整理、変更できちゃう、すごい
-
Subscription
-
subscribe()
の返り値 -
.unsubscribe()
メソッドをもつ。データが流れてくることをやめる。それだけ。
-
-
Subject
- Observableの一種で、値を複数のObserverにマルチキャストできるやつ
- subscribeで複数のobserverを登録でき、一気にマルチキャストする
- しかも、Observerにもなれる(値を受け取れる)
- 他のObservableのsubscribeに渡してもOK
- Observableの一種で、値を複数のObserverにマルチキャストできるやつ
-
Schedular
- スケジュール、つまりタイミングを制御してくれるやつ
概念がややこしいが、大事なのは Observableで、
- Observableにoperatorをpipeしていける
- Observableは、最後に、
.subscribe()
で値の処理=値の受け取り側を指定する - その指定されるやつがObserverである
をまず最初に理解してしまうのが早い。
具体的にコードで見てみる。
Ovservable
const observable = from([1,2,3]) // 1
.pipe( // 2
map((n) => n*2) // 3
).subscribe((n)=> console.log(`value: ${n}`));// 4
-
from
はObservableを生成するファクトリーメソッド(Creation Operator)。1,2,3のvalueが順に発生して流れている。 -
pipe
メソッドにより、Operatorを追加している。 - ここでは、
map
により、渡ってきた値をイテレイティブに処理して返している。返り値は再びObservable。 -
subscribe()
により、Observerを登録。関数を指定すれば、そのままObserverのnextとして取り扱われる。
以下、リファレンス的に細部の説明をしていく。
pipe()
Observableに生えている基本的なメソッド。
observableInstance.pipe(operator())
みたいな感じでpipeを使ってOperatorをつなげていく。
Pipable Operatorは純粋な関数であり、新しいObservableインスタンスが生成される。
Observableに対して、複数のPipable Operatorをつなげることができる。
observableInstance.pipe(
op1(),
op2(),
op3(),
op3(),
)
もちろん、OperatorがObservableを返すので
observableInstance.pipe(op1()).pipe(op2())
とも書ける(もちろん、ちょっと意味は違う)
Operators
Observableを作ったり、加工したりできるやつら。
Creation Operator
Observableを作る基本的なファクトリーメソッド。
-
of(1,2,3)
- 引数がそのまま流れてくる
-
from([1,2,3])
- 配列を与えると、それがそのまま流れてくる
-
ajax
- Using ajax() to fetch the response object that is being returned from API.
- HTTP RequestのResponseをObservableにしてくれる
-
interval(1000)
- intervalごとにシーケンシャルな数字を流してくれる
-
range(1,10)
- 指定した範囲の数字を流してくれる
Join Creation Operators
Observableまとめ隊。複数のObservableから1つのObservableへ。
merge(observable1, observable2,...concurrent)
- 複数のObservableをまとめて1つのObservableにしてくれるすごいやつ
- [1,2,3] + [a,b,c] ==> [1,2,a,b,3,c]
- なお、concurrentで並列度を決めることが出来る
- 並列度2にしておいて、3このObservableを与えると、最後の1つは、先に与えた2つが終了したら流れてくる
- [1,2,3] + [a,b,c] + [d,e,f] ==> [1,2,a,b,3,c,d,e,f]
- [d,e,f]は後回しになった
concat(observable1, observable2)
- "Concatenates multiple Observables together by sequentially emitting their values, one Observable after the other."
- 複数のObservableをまとめて1つのObservableにしてくれるが、完了するまで待つやつ
- 順番は登録した順
- [1,2,3] + [a,b,c] ==> [1,2,3,a,b,c]
zip(observable1, observable2)
- 複数のObservableをまとめて、各入力Observableの値をひとまとめにしたObservableにしてくれる
- [1,2,3] + [a,b,c] ==> [[1,a],[2,b],[3,c]]みたいな感じ
conbineLatest(observable1, observable2)
- 複数のObservableをまとめて、それぞれの最新の値を組合せた値を流すObservableにしてくれる
- [1,2,3] + [a,b,c] ==> [[1,a], [2,a], [2,b], [3,b], [3,c]] みたいな。(うーん分かりづらいので公式見てね
race(observable1, observable2)
- 複数のObservableのうち、一番最初に値が流れてきたObservableのみ返す。文字通りレース。
forkJoin([observable1, observable2])
- 渡すのはObservableをValueにもつObjectでも可
- 複数のObservableが流れ終わった後に、最後の値を組合せた配列またはオブジェクトを流すObservableを返す
Transformation Operators
合体変形し隊。
map
- 配列のmapと同じ、流れてくる値を順番に処理する
- たぶん一番良く出てくる
pluck
- オブジェクトから値を取り出す
-
{ message: 'ok' )
からpluck('message')
でok
だけ取り出せる
scan((acc, one) => acc + one, seed)
- 配列のreduceっぽい処理ができる(ただし、reduceは別にある)
- accumulatorに指定した関数で、流れてくる値を次々と処理した結果のObservableを作れる
mergeMap
- "Projects each source value to an Observable which is merged in the output Observable."
- 引数側の関数内に指定したObservable(=内部Observable)に対し、元のObservable(=ソースObservable)を投影してくれる
- 2つのObservableを合成し、時系列順に流れるObservableを作り出してくれる
- 当たり前だが内部Observableがきちんと終了しないと、無限に増え続ける
concatMap
- "Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next."
- mergeMapに似て、こちらもソースObservableを捨てることなく、2つのObservableを合成してくれる
- ただし、時系列は、内部Observableが完了したら、次のソースObservableの値を処理する…といった仕組みになる
- ソースObservableの値は保持して、待っている形になる
- ということは、内部Observableが無限に終了しなかったり、内部Observableが終了する前にソースObservableから値が届き続けると、無限にバッファを食ってしぬことに…
switchMap
- "Projects each source value to an Observable which is merged in the output Observable, emitting values only from the most recently projected Observable."
- mergeMapと違い、ソースObservableからの値が到着したら内部Observableを発火する
- ソースから新たな値が来た場合には、内部Observableは途中でも終了する。このとき、内部Observableの値は捨てられる(ソースObservable優先)
exhaustMap
- "Projects each source value to an Observable which is merged in the output Observable only if the previous projected Observable has completed."
- switchMapと同様、ソースObservableからの値が到着したら内部Observableを発火する
- ただし、switchMapと逆で、内部Observableが終了するまではソースObservableの値を捨てていく(内部Observable優先)
- ソースObservableの値が捨てられる可能性があることに注意
- やっぱり内部Observableが終了しないと、無限に終わらない
buffer
- bufferに指定したObservableが発火するまでは、オリジナルのObservableを溜め込んでおき、発火したら配列として値を流すObservableを作ってくれる
- 他にも一定数カウントごとに発火、秒数毎に発火、Observableをトリガーにしてトグルするものなど亜種が結構ある
window
- "Branch out the source Observable values as a nested Observable whenever
windowBoundaries
emits." - window側に指定したObservableから値が流れるたびに、branchを作成する
Filter Operator
Observableから流れてくる値に対してフィルタリングしていくやつ。
- take
- 一番多く見るフィルター
- 指定した数だけ値が届くと、Observableを完了する
- filter
- 文字通り、配列のfilterと同じ動きをするフィルター
- takeLast
- 最後からn番目までの値をまとめて取り出したObservableを作ってくれる
- first, last
- これも文字通り、最初 or 最後の値だけを返す
- といっても返すのはObservableなのに注意
- distinct
- ダブってたら吐き出さないやつ
- skip
- 指定した数だけ値をスキップする
- sample
- 内部Observableに値が届いたタイミングで、ソースObservableの直近の値を取り出して流してくれる
Join Operator
Observable-of-Observable、つまり入れ子になったObservableをよしなにしてくれる。flattenに似ている。
-
mergeAll
- Higher observableをflat化する
-
concatAll
- Higher observableをflat化する
- その他のconcat系と同様、内部Observableが完了次第、次のObservableが流れる
-
exhaust
- Higher observableをflat化する
- ただし、内部Observableが完了後、すでに別の内部Observableがスタートしていた場合、それは無視し、次の新しい内部Observableが発火するまで待機する
-
combileAll
- "Flattens an Observable-of-Observables by applying
combineLatest
when the Observable-of-Observables completes." - Higher observable完了時に、
combineLatest
を用いてflat化する - どういうときに使うのかな
- "Flattens an Observable-of-Observables by applying
Subjectと仲間たち
Observableでもあり、ObserverでもあるSubject。IObservable<T>
とIObserver<T>
を実装したもの。
Subjectのsubscribeにovserverを割り当てれば、マルチキャストしてくれる。
逆に、他のObservableのsubscribeに登録すれば、値を受け取ることが出来る。
import { Subject, from } from 'rxjs';
const subject = new Subject<number>(); // 1
subject.subscribe({ // 2-1
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({ // 2-2
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject); // 3
- Subjectを作成
- subscribeを2つ登録。それぞれ異なる処理が書いてある
- Observableでsubscribeする。こうすると、1,2,3が順番に流れる。
さらに、Subjectは、asObservable()
メソッドでObservable化することもできる(Observableインスタンスが返ってくる)
このとき、Subjectのnext(value)
で値を流すと、Observableインスタンスで受け取ることができる。
(Subjectを使って任意のタイミングで値を流せる、とも言えそう)
次のように使うっぽい
- あるロジックで変化があったら、値を流す(nextを使う)
- 他方、別のサービスでasObservable()で作成したObservableを利用し、値を受け取り、observerで処理する
いくつかSubjectには亜種がある。
-
BehaviorSubject
- 現在の値(直近の値)を保持する
- subscribeに新たなobserverが登録されると、この値を流してくれる
-
ReplaySubject
- 指定された個数の値を保持する
- subscribeに新たなobserverが登録されると、この値を次々に流してくれる
-
AsnycSubject
- 最後の値を保持する
- subjectが完了(complete)したら、subscribeに登録されているobserverに最後の値だけ流す
Schedular
Subscriptionの開始(データの流れ始め)と通知を制御するもの。文字通りスケジュールを制御する。
概念だけ理解しておけばいいやつ。
observeOn
operatorの一種。Docsには、 "Re-emits all notifications from source Observable with specified scheduler." と書いてある。
指定されたスケジューラを使用して、ソースObservableからのすべての通知を再発行できる。
引数は、(scheduler: SchedulerLike, delay: number = 0)
となっていて、第一引数 SchedularLike
がスケジューラ。
使い所の例:
import { interval } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const intervals = interval(10); // 1
intervals.pipe(
observeOn(animationFrameScheduler), // 2
)
.subscribe(val => {
someDiv.style.height = val + 'px';
});
ブラウザーの再描画の直前に、subscribeの値が呼び出される例。
- Creation Operator である
interval()
でObservableを作る
なお、interval() オペレータも、内部でAsyncSchedularを使っている -
animationFramesScheduler
スケジューラをobserveOn
で指定し、ブラウザ再描画の直前にスケジュールする
subscribeOn
同様のものにObservableのメソッド subscribeOn() がある。これは、subscribe()を指定されたスケジュールでsubscribeするもの。
import { of, merge, asyncScheduler } from 'rxjs';
import { subscribeOn } from 'rxjs/operators';
const a = of(1, 2, 3, 4).pipe(subscribeOn(asyncScheduler));
const b = of(5, 6, 7, 8, 9);
merge(a, b).subscribe(console.log);
もし、pipeでsubscribeOnオペレータを繋いでいない場合、of
オペレータは同期的に、つまり順番に発動するので、 1 2 3 4 5 6 7 8 9
と表示される。
subscribeOnオペレータをpipeしていると、of
オペレータで作られたa
Observableは非同期に、バックグラウンドで動作するため、 5 6 7 8 9 1 2 3 4
と表示される。
このとき、a
Observableを非同期化しますよ〜と指定した asyncScheduler
がスケジューラ。
Schedularの種類
-
AsyncScheduler
- "Schedules work with setInterval. Use this for time-based operations."
- は?どゆ意味?と思ったけど、ようはsetInterval()のように動いて、非同期処理になる
- 時間にまつわるOperatorの多くは、当然非同期で動作しているので、内部的にはこれを使ってる
- 逆に同期的=コードに書いてある順番に上から下へ、といった使い方は、RxJSの使い所としては少ないので、ほとんど非同期的なのでは
-
asapScheduler
- "Schedules on the micro task queue, which is the same queue used for promises. Basically after the current job, but before the next job. Use this for asynchronous conversions."
- マイクロタスクキューにスケジュールを加える
- マイクロタスクはここを参照、あまりよく知らなかった
- 非同期化するにあたり、
process.nextTick()
や Web workerなどを利用する
-
queueScheduler
- "Schedules on a queue in the current event frame (trampoline scheduler). Use this for iteration operations."
- 現在のイベントフレーム(trampoline scheduler)のキューにスケジュールを加える
- 繰り返し動作向け
-
animationFrameScheduler
- ブラウザの再描画直前にスケジュールされる
使い所
Creation Operatorなどで作るObservableを明示的に非同期にしたい場合には、 asyncScheduler
を指定することになるのだろう。
また、animationFrameScheduler
は、アニメーションをスムーズにするのに使うのが一般的なようだった。