リアクティブプログラミングライブラリ「RxJS」を仕事で使う機会があった。その調査もかねて本記事を書こうと思う。
まず、RxJSはリアクティブプログラミングを可能にするライブラリである。これにより、非同期処理やデータストリームの操作を簡潔に記述できる。本記事では、RxJSの主要な概念と使い方を書こうと思う。
Observable
Observableは、データストリーム(つまり流れ)を表現するRxJSの基本的な要素。クリックイベントやAPIレスポンスなど、非同期データを扱う場合に利用。
特徴
- データが流れる「発信者」
- 「push型」でデータを提供
- 遅延実行(
subscribe
するまで実行されない)
sample Code
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.complete();
});
observable.subscribe({
next: value => console.log(value),
complete: () => console.log('完了')
});
// 出力: 1, 2, 完了
Observer
Observerは、Observableから送信されるデータを受けとる購読者です。subscribe
メソッドを利用して接続し、データを処理する。
特徴
- データ(
next
)、エラー(error
)、完了(complete
)の3つのメソッドを実装。 - Observableの動作を受け取り、処理する役割を持つ。
sample code
const observer = {
next: (value: any) => console.log(`値: ${value}`),
error: (err: any) => console.error(`エラー: ${err}`),
complete: () => console.log('完了')
};
observable.subscribe(observer);
Operators
Operatorsは、Observableに対して変換やフィルタリングなどの処理を行う関数。
主な種類
- Transformation:データを変換(
map
やmergeMap
など) - Filtering:条件に合うデータだけを流す(
filter
やtake
など) - Combination:複数のObservableを結合する(
merge
やcombineLatest
など)
sample code(filter , mapの例)
import { of } from 'rxjs';
import { map, filter } from 'rxjs/operators';
const observable = of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0), // 偶数のみ
map(x => x * 10) // 10倍
);
observable.subscribe(value => console.log(value));
// 出力: 20, 40
sample code(combineLatestの例)
combineLatestは、複数のObservableを監視し、それぞれの最新値を組み合わせて新しいObservableを生成する演算子。リアルタイムで複数のデータの流れを統合する際に便利。
import { combineLatest, of } from 'rxjs';
import { delay } from 'rxjs/operators';
const observable1 = of(1, 2, 3).pipe(delay(1000));
const observable2 = of('A', 'B', 'C').pipe(delay(500));
combineLatest([observable1, observable2]).subscribe(
([value1, value2]) => console.log(`値: ${value1}, ${value2}`)
);
// 出力: 値: 3, C
Subjects
Subjectsは、ObservableとObserverの両方の性質を持ったオブジェクト。複数のObserverが同じデータを提供するときに使う。
特徴
- データのブロードキャストが可能
- イベントエミッタのように動作(データの発火者?)
sample code
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(value => console.log(`Observer1: ${value}`));
subject.subscribe(value => console.log(`Observer2: ${value}`));
subject.next(10); // Observer1とObserver2に値を送信
BehaviorSubject
BehaviorSubjectは、最新の値を保持し、新しく購読したObserverにもその最新値を即座に通知する。通常のSubjectと異なり、初期値を指定する必要がある。
特徴
- 常に最新の値を保持
- 新しい購読者が追加されると、直近の値が即時に提供される。
-
next
メソッドで値を更新可能
sample code
import { BehaviorSubject } from 'rxjs';
const behaviorSubject = new BehaviorSubject<number>(0);
behaviorSubject.subscribe(value => console.log(`Observer1: ${value}`));
behaviorSubject.next(1);
behaviorSubject.next(2);
behaviorSubject.subscribe(value => console.log(`Observer2: ${value}`));
behaviorSubject.next(3);
出力
Observer1: 0
Observer1: 1
Observer1: 2
Observer2: 2
Observer1: 3
Observer2: 3
Schedulers
Schedulersは、Observableの実行タイミングやスレッドの管理を制御する仕組み。RxJSの非同期処理の背後にあるエンジンのような役割。
主なスケジューラー
- Immediate Scheduler: 即座に実行
- Async Scheduler: 次のイベントループで実行
- AnimationFrame Scheduler: アニメーションフレームで実行
sample code
import { asyncScheduler } from 'rxjs';
import { observeOn } from 'rxjs/operators';
const observable = of(1, 2, 3).pipe(
observeOn(asyncScheduler)
);
console.log('開始');
observable.subscribe(value => console.log(value));
console.log('終了');
// 出力順: 開始, 終了, 1, 2, 3
Hot vs Cold Observables
Observableには、データの提供方法に基づいてHotとColdの2種類がある。
Cold Observable
- Cold Observableは、
subscribe
するたびに、新しいデータストリームを生成。 - 例:
of
,fromEvent
など
Hot Observable
- Hot Observableはすでに発生しているデータストリームを共有。
- 例:
Subject
Marble Diagram
RxJSの挙動を視覚的に表現するツール。時間軸上でデータストリームの操作を表現し、オペレーターの動作を直感的に理解できるようにする。
例:map
(入力ストリームを変換)、merge
(複数のストリームを統合)