業務でAngularを使い始め、RxJSが複雑だったため、主に使用しているオブジェクトを中心に整理したいと思います。
RxJSとは
Reactive Extensions for JavaScript
の略称で、JavaScriptでリアクティブプログラミングをするためのライブラリ。
RxJSを用いることで、JavaScriptの非同期処理やイベントに関する処理を簡単に書くことができる。
リアクティブプログラミングとは
時間の経過によって変化する値に対する操作を宣言的に記述するプログラミングパラダイム。
時間の経過によって変化する値を流す場所をストリームと呼ぶ。
このストリームにクリックイベントやHTTPデータなどの値が流れ、流れた値に対して加工や計算などの処理をしていく。
よくある例えとして、以下のようなもので説明される。
- ストリーム = 川
- ストリームを流れる値 = 魚
- 処理 = 大きな魚だけ捕まえる
余談ですが、RxJSの動物は魚のような見た目をしていますね。
RxJSで主に使用するオブジェクト
Observable
Observable
の前に前提となるデータの受け渡し方について説明する必要がある。
プルとプッシュ
データを作る側をプロデューサー、データを受け取る側をコンシューマーとした上で、データの受け渡し方には以下の2つがある。
-
プル
コンシューマーをプロデューサーからデータを取り出す方法。
データを受信するタイミングは受け取り側のコンシューマーが決定する。
呼び出すことで1つのデータを受け取る関数、複数のデータを順番に取り出すことができるイテレーターがプルに該当する。 -
プッシュ
プロデューサーがコンシューマーへデータを送信する方法。
データを送信するタイミングは作る側のプロデューサーが決定する。
1つのデータをコールバック関数であるコンシューマーに渡すPromise
、そして複数のデータを渡すことができるのがObservable
。
(なお、Observable
は単数のデータも渡すことが出来る)
表にすると以下のように分類できる。
プロデューサー | コンシューマー | 実装(単数・複数) | |
---|---|---|---|
プル | 要求されたときにデータを作る | データがいつ要求されるか決める | 関数・イテレーター |
プッシュ | データを作りいつ送るか決める | データを受け取ったら処理する | Promise・Observable |
上記から、Observable
は単数・複数の値を好きなタイミングでコンシューマーにプッシュすることができるオブジェクトといえる。
4つの処理段階
Observable
には以下4つの処理段階がある。
- 作成
- 購読
- 実行
- 破棄
作成
Observable
は、コンストラクタに1つのsubscribe
関数を渡して作成する。
以下の場合、1秒ごとにHello Worldをコンシューマーであるsubscriber
に送信するデータを作成する。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
setInterval(() => {
subscriber.next('Hello World')
}, 1000);
});
購読
購読することでObservable
のデータをどのように処理するかを定義する。
以下では送信されてきたデータをconsole.log
で画面に表示する。
observable.subscribe(x => console.log(x));
subscribe
すると、作成したObservable
のsubscriber
に設定した値(ここではHello World)が渡されて実行される。
実行
Observable
には以下3種類の実行タイプがある。
-
next
数値・文字列・オブジェクトなどの値を送信する -
error
JavaScriptエラー・例外を送信する -
complete
値を送信しない
next
はsubscriber
に送信するデータを表し、error
とcomplete
は実行中にどちらか一方だけが実行される。
error
とcomplete
実行後にはデータは送信されない。
以下の場合、1,2,3までは表示されるが途中でcompelte
を実行しているため、4は表示されない。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4);
});
observable.subscribe(x => console.log(x));
破棄
Observable
は無限に値を送り続けることができるが、不要になったら無駄なメモリリソースの浪費を避けるために止める必要がある。
Observable.subscribe()
を呼び出すとSubscripion
オブジェクトを返されるため、実行をキャンセルするにはunsubscribe()
を呼び出す。
以下の場合、1,2を表示した後に実行を取り消したため、setIntervabl
の処理は実行されない。
import { Observable } from 'rxjs';
const observable = new Observable(function subscribe(subscriber) {
subscriber.next(1)
subscriber.next(2)
setInterval(() => {
subscriber.next('Hello World');
}, 1000);
});
const a = observable.subscribe(x => console.log(x));
a.unsubscribe()
Observer
Observer
はnext
・error
・complete
の3つのコールバックを持ち、Observable
からの値を受け取るオブジェクト。
const observer: Observer<number> = {
next: (value) => console.log(`next:${value}`),
error: (err) => console.log(err),
complete: () => console.log("complete"),
}
コールバックは3つ揃っていなくても、Observable
の実行は可能。(以下の場合、error
が無くても実行されないだけ)
const observer = {
next: (value: number) => console.log(`next:${value}`),
complete: () => console.log("complete"),
};
またsubscribe
に関数を直接渡すと、Observer
オブジェクトを内部的に作成し、next
のコールバックになる。
observable.subscribe(x => console.log(x));
Operators
Operators
は複雑な非同期の処理を簡潔に処理することが出来るObservable
のメソッド。
Operators
には以下2種類の関数がある
Pipeable Operators
Pipeable Operators
は入力としてObservable
を受け取り、処理された新しいObservable
を返す関数。
オペレーターをチェーンすることでパイプライン処理をすることができる。
- ex) 渡されたObservableの値を
filter
で偶数に絞り、map
で2乗する
import { of, filter, map } from 'rxjs';
of(1, 2, 3, 4, 5)
.pipe(
filter(x => x % 2 == 0),
map(x => x * x))
.subscribe((x) => console.log(x));
// 4, 16
Creation Operators
スタンドアロンで新しいObservable
を作成するための関数。
- ex)
interval
で1秒ごとに0から連番する
import { interval } from 'rxjs';
const observable = interval(1000);
observable.subscribe(x => console.log(x));
// 0 1 2 3 4 5...
Subscription
Subscription
はObservable
の実行中にリソースを破棄するオブジェクト。
上記で記載した通り、unsubscribe()
でObservable
の実行をキャンセルする。
また、Subscription.add()
で別のサブスクリプションを追加することで、サブスクリプションを複数持つことができる。
その後、unsubscribe()
すると追加したサブスクリプションの実行もキャンセルできる。
import { interval } from 'rxjs';
const observable1 = interval(1000);
const observable2 = interval(2000);
let subscription1 = observable1.subscribe(x => console.log(`sub1:${x}`));
let subscription2 = observable2.subscribe(x => console.log(`sub2:${x}`));
subscription1.add(subscription2);
setInterval(() => {
subscription1.unsubscribe()
}, 3000);
//sub1:0
//sub1:1
//sub2:0
//sub1:2