RxJS とは
RxJS とは、リアクティブプログラミングを JavaScript で実現するためのライブラリのこと。
リアクティブプログラミングとは、データやイベントの変化を「流れ(ストリーム)」として扱い、自動的に処理を実行するプログラミング手法。変化に応じて画面や処理がリアルタイムに更新されるのが特徴。
なぜ必要なのか
Angular では、フォーム入力や HTTP 通信など多くの処理が非同期で行われる。RxJS を使えば、こうしたイベントやデータの流れとして一貫して扱え、コードをシンプルでわかりやすく保てる。Angular の多くの機能が RxJS に基づいて設計されているため、リアクティブな開発に不可欠なライブラリとなっている。
主要な概念
ここでは、「桃太郎」を例に RxJS の主要な概念について説明する。物語を少し改変し、おばあさんが川で桃を探しており、流れてくる桃は複数ある設定にする。
同じように桃太郎の例で RxJS を解説している記事があったので、パクらせてもらう。現在、その記事はなくなった?見つけられなかった?ので、リンクはないです...
Observable
データの生産者。時間の経過とともに発生する値やイベント(例:数値、クリックイベント、APIレスポンスなど)をストリームとして発行する。「桃太郎」で言うと、「川」が Observable のイメージ。時間の経過とともに、桃(データ)が1つずつ流れてくる。
const kawa$ = new Observable(subscriber => {
subscriber.next('小さな桃');
subscriber.next('普通の桃');
subscriber.next('大きな桃');
subscriber.complete(); // 川の流れ終了
});
Observer
データの消費者。Observable が発行する値を受け取り、処理を行う。「桃太郎」で言うと、川を見ていたおばあさんが流れてきた桃を拾って反応するイメージ。通常、以下の3つのコールバック関数を持つが、必須なのは next()
のみで、error()
と complete()
は任意。
- next(value):値が流れてきたとき
- error(err):エラーが発生したとき
- complete():すべての値の配信が終了したとき
const obaasan = {
next: momo => console.log('おばあさんが拾った桃:', momo),
error: err => console.error('腐った桃だった!:', err),
complete: () => console.log('今日はもう桃は流れてこないようだ')
};
Subscription
Observable と Observer をつなぐもの。「桃太郎」で言うと、おばあさんを川へ見張りに行かせるイメージ。 subscribe()
を呼ぶと、川に流れてくる桃(データ)を見張り始める。 返ってくる Subscription は見張りの「監視権」を持っていて、 unsubscribe()
を呼ぶと、「もう十分だ」と見張りをやめて帰ってくる(監視停止・リソース解放)ことができる。 また、subscribe()
には、Observer オブジェクトの代わりに、next()
のみの関数を直接渡すことも可能。
const kanshi = kawa$.subscribe(obaasan);
// 後で見張りをやめたいとき
kanshi.unsubscribe();
// nextだけ渡す簡単な書き方
const kanshi2 = kawa$.subscribe(momo => {
console.log('おばあさんが拾った桃:', momo);
});
Angular では ngOnDestroy()
で unsubscribe()
しないとメモリリークの原因になってしまう。
Operators
Observable に対して処理(変換、絞り込み、結合など)を行う関数群。「桃太郎」で言うと、流れてきた桃を選別する工程にあたる。例えば、「大きい桃だけ拾う」「熟した桃だけ残す」「桃に名前をつける」といった処理を途中に挟める。これを pipe()
の中に並べて書くことで、ストリームの中身を自在に加工できる。
kawa$
.pipe(
filter(momo => momo === '大きな桃'), // 大きな桃だけ拾う
map(momo => `美味しい ${momo}`), // 桃に「美味しい」をつける
tap(momo => console.log('桃を確認:', momo)) // 副作用として桃の情報をログ出力
)
.subscribe(obaasan);
Subject
Observable でもあり Observer でもある特別な存在。他の Observable から値を受け取るだけでなく、自分でも next()
を使って任意の値を発行できる。「桃太郎」で言うと、おばあさんが拾った桃を村人みんなに配っているイメージ。川から桃を受け取る Observer の顔も持ちつつ、村人に配る Observable の役割も果たす。
const obaasan = new Subject();
// 村人A
obaasan.subscribe({
next: momo => console.log('村人Aが受け取った:', momo)
});
// 村人B
obaasan.subscribe({
next: momo => console.log('村人Bが受け取った:', momo)
});
// おばあさんが川から桃を拾い、村人たちに配る
kawa$.subscribe(obaasan);
// おばあさんが新しい桃を見つけて、自分から村人に直接知らせる(Subjectのnextを使う)
obaasan.next('とても大きな桃');
Subjectの種類
Subject にはいくつかの派生型があり、それぞれ動作が異なる。
種類 | 特徴 | 桃太郎 |
---|---|---|
Subject |
通常の Subject(すぐに購読しないと値を逃す) | 桃をその場で配る、来た人にしか渡さない |
BehaviorSubject |
初期値があり、直近の値を新しい購読者に即座に渡す | 常に「最新の桃」を1つ置いておき、来た人に渡す |
ReplaySubject |
指定数または時間分の過去の値を新しい購読者に渡す | これまで拾った桃の記録をすべて見せる |
AsyncSubject |
完了時に最後の値だけを流す | 桃拾いが終わったら、最後に拾った桃を見せる |
Cold Observable と Hot Observable
Observable には大きく分けて Cold と Hot の2種類がある。
種類 | 特徴 | 桃太郎 |
---|---|---|
Cold Observable | 購読されるたびにストリームが最初から始まる | 川に行くたびに新しい桃が流れてくる(それぞれ別の川) |
Hot Observable | 共有された1つのストリームを皆で購読 | 1つの川をみんなで見ていて、遅れてきた人は既に流れた桃は見られない |
// Cold(購読ごとに新しくスタート)
const cold$ = new Observable(sub => {
console.log('桃を流し始める');
sub.next('桃1');
sub.complete();
});
cold$.subscribe(v => console.log('村人A:', v));
cold$.subscribe(v => console.log('村人B:', v));
// 桃を流し始める
// 村人A: 桃1
// 桃を流し始める
// 村人B: 桃1
// Hot(Subjectで共有)
const hot$ = new Subject();
hot$.subscribe(v => console.log('村人A:', v));
hot$.next('桃1');
hot$.subscribe(v => console.log('村人B:', v)); // 村人Bは桃1を見られない
hot$.next('桃2');
// 村人A: 桃1
// 村人A: 桃2
// 村人B: 桃2
Observable と Subject の使い分け
Observable はデータの生産者として使われ、主に非同期処理やAPI呼び出しなど、一人ひとりに独立した処理が必要な場面で適している。一方、Subject は複数の購読者に同時に値を配信したい場合や、外部から値を発行したい場合に利用される。Subject はイベント共有や状態管理、つまり状態をリアルタイムに通知するようなケースに向いている。たとえば、フォームの入力イベントを複数のコンポーネントで共有したい場合には Subject が便利。
代表的な Operator
RxJS には数多くの Operator があるが、ここでは特によく使われるものをいくつか紹介する。
Operator | 説明 | 桃太郎 |
---|---|---|
map() |
値を変換する | 「桃に名前をつける」 |
filter() |
条件に合う値だけ通す | 「大きな桃だけ拾う」 |
tap() |
副作用を実行(値は変更しない) | 「拾う前に桃の重さを測る」 |
take(n) |
最初の n 件だけ受け取る | 「最初の3個の桃だけ拾う」 |
debounceTime() |
一定時間値が変化しないと流す | 「一定時間桃が流れてこなかったら、それを拾う」 |
mergeMap() |
内部で非同期処理を展開し統合 | 「複数の川から同時に桃を拾う」 |
エラーハンドリング
ストリームで何か問題が起きたときは error()
コールバックや catchError()
オペレーターで対処できる。
kawa$
.pipe(
map(momo => {
if (momo === '腐った桃') throw new Error('腐ってる!');
return momo;
}),
catchError(err => {
console.error('おばあさんが叫んだ:', err.message);
return of('代わりの桃'); // 別の桃で代用
})
)
.subscribe(momo => console.log('拾った桃:', momo));
参考