はじめに
Angularを使っていると避けては通れないRxJSですが、「正直よくわからないまま使っている」という人も多いのではないでしょうか。僕もまさにその一人でした。
そこで本記事では、RxJSを理解するうえで最初に押さえておきたいキーワードを中心に、基本をシンプルに整理しています。
非同期データやイベントを「ストリーム」として扱う考え方をつかみ、Observable・Observer・Operator の基本的な使い方までざっくり理解できる内容です。
RxJSの概要
RxJSは非同期処理やイベントをストリームとして扱うためのライブラリです。
ここでいうストリームとは、「時間とともに次々と流れてくるデータの連続」のことを指します。
ストリームの具体例
例えば、次のようなものは「連続して発生するデータ」です。
- ボタンクリック:クリックのたびにイベントが流れてくる
- タイマー:1秒ごとに数字が流れてくる
- テキスト入力:入力するたびに文字が流れてくる
なぜRxJSが必要なのか
非同期処理(クリック・入力・タイマー・API など)は、「いつ値が来るかわからない」 という扱いにくさがあります。
これをそのまま管理しようとすると、
- 値が変わるタイミングを追いにくい
- コールバックやPromiseが増えて複雑になる
- 複数の非同期処理を組み合わせにくい
- イベントが連続すると制御が大変
といった問題が起きやすくなります。
RxJSを用いることで、
- 値を「流れ」として見通しよく扱える
- 加工・結合・フィルタリングなどの処理が簡潔に書ける
- 複雑な非同期の組み合わせもスッキリ整理できる
ということが可能となり、非同期処理全体をシンプルな構造で組み立てられるようになります。
RxJSの基本要素
RxJS を理解するために、まずはじめに押さえておきたいキーワードは以下の3つです。
- Observable
- Observer
- Operator
Observable/Observer
Observable
Observableは「値が流れてくる仕組みそのもの」で、RxJSの中心となる概念です。
- 値は複数回流せる
- 値が流れてくるタイミングはObservableが“作られたとき”ではなく、購読(subscribe)したとき
という特徴を持ちます。
Observer(購読者)
ObserverはObservableから流れてくる値を受け取ります。
- 「Observableから送られてくるデータに対して何をするか」を定義するオブジェクト
- Observableはsubscribeしない限り動き始めない =「購読」することでデータが届く
- 次の3つのメソッドを持つ
- next(value) : 新しい値を受け取る
- error(err) : エラー通知を受け取り、ストリームが終了する
- complete() : 終了した合図を受け取る
という特徴を持ちます。
Observableを作ってObserverで受け取る流れ
import { Observable } from 'rxjs';
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
});
numbers$.subscribe({
next: value => console.log(value),
error: err => console.error('error:' + err),
complete: () => console.log('complete')
});
このコードはObservable(データの流れ)を作り、Observer(受け取る側)で購読するという、RxJSの最も基本的な動きを示しています。
1. Observableをインポート
import { Observable } from 'rxjs';
まずはRxJSライブラリからObservableをインポートします。
これで、Observableを使うことができるようになります。
2. Observableを作成
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
});
この部分がObservable
-
new Observable<number>():「数値(number)を流すObservableを作る」 という意味 -
(subscriber => {}):Observable の“中身”(どう値が流れるか)を定義
subscriberはObservableの中で、値を購読者(Observer)へ流すための「送信役」です。
- 値を送る(next())
- エラーを送る(error())
- 完了を送る(complete())
ための3つのメソッドを持っています。
3. Obserber(購読者)を定義して購読
numbers$.subscribe({
next: value => console.log(value),
error: err => console.error('error:' + err),
complete: () => console.log('complete')
});
ここがObserver(値を受け取る側)です。
Observerには3つのメソッドを設定できます。
| ハンドラ | 役割 |
|---|---|
| next(value) | Observableが値を流すたびに呼ばれる |
| error(err) | エラーが発生したら呼ばれて終了 |
| complete() | 正常に完了したら呼ばれる |
1
2
complete
Operator
OperatorはObservable が流す値を変換・加工・制御するものです
例えば、
- 偶数だけ受け取る
- 値を3倍にして受け取る
- 〇〇を含む値のみを受け取る
という処理をOperatorを繋いで書くだけで実現できます。
以下はOperatorの例です。
| Operator | 役割 | 主な用途 | 簡単な例 |
|---|---|---|---|
| map | 値を変換する | 受け取った値に処理を加え、別の値として流す |
map(x => x * 2)(2倍にする) |
| filter | 条件に合う値だけを通す | 必要な値だけストリームを継続させる |
filter(x => x > 10)(10より大きい値だけ) |
| tap | 副作用を実行(値は変えない) | ログ出力、デバッグ、外部処理など | tap(x => console.log(x)) |
pipe
pipeは、Observableに対して複数の処理(Operator)を順番につなげて適用する仕組みのことです。
pipeの中にOperatorを並べて書くことで、Observableの流れるデータを変換したり加工したりできます。
また、pipe内部では必ずOperator関数を使う必要があります。
pipe(
map(x => x * 2),
filter(x => x > 10),
)
pipe(
console.log("aaa"), // これはoperatorではないため×
)
Observableに処理を挟む
import { Observable } from 'rxjs';
import { map, filter, tap } from 'rxjs/operators';
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.next(4);
subscriber.complete();
});
numbers$
.pipe(
tap(value => console.log('tap(元の値):', value)),
filter(value => value % 2 === 0),
map(value => value * 10)
)
.subscribe({
next: value => console.log('next(加工後):', value),
error: err => console.error('error:' + err),
complete: () => console.log('complete')
});
Observableの流れに map / filter / tap などのOperatorを挟むことで、値を加工しながら受け取っています。
1. Operatorをインポート
import { Observable } from 'rxjs';
import { map, filter, tap } from 'rxjs/operators';
ここでは map / filter / tap の3つのOperatorをインポートしています。
2. Observableを作成
const numbers$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.next(4);
subscriber.complete();
});
ここは前と同じく「数値が順番に流れてくる」Observableです。
3. Operatorsで流れを加工する
numbers$
.pipe(
tap(value => console.log('tap(元の値):', value)), // 副作用:ログ出力
filter(value => value % 2 === 0), // 偶数だけ通す
map(value => value * 10) // 10倍に変換
)
pipeの中では、
- tap:値をそのままログに出す
- filter:偶数以外を除外
- map:値を10倍にして変換
が行われています。
つまり、1→2→3→4 が流れてくるとpipe内では以下の処理が行われます。
tap: 1(そのまま表示)
tap: 2
tap: 3
tap: 4
filter:偶数だけ通す → 2, 4
map:10倍にする → 20, 40
4. Observerで加工後の値を受け取る
.subscribe({
next: value => console.log('next(加工後):', value),
error: err => console.error('error:' + err),
complete: () => console.log('complete')
});
Operatorにより加工された後の値がnextに渡ってきます。
tap(元の値): 1
tap(元の値): 2
tap(元の値): 3
tap(元の値): 4
next(加工後): 20
next(加工後): 40
complete
終わりに
RxJSは最初こそ少し複雑に感じるかもしれませんが、Observableを中心とした考え方を理解すれば、非同期処理やイベントを整理して扱える強力なツールです。
今回紹介した Observable、Observer、Operator、pipe の基本を押さえるだけでも、非同期処理の見通しはぐっと良くなります。
さらに、値を流す側にも受け取る側にもなれるSubjectや、購読の管理に使うSubscriptionといった仕組みもあります。
次回はSubjectとSubscriptionについて詳しく解説していきます。
参考
RxJS公式ドキュメント
https://rxjs.dev/guide/overview