この記事は、LeapMind Advent Calendar 2019 13 日目の記事です。
概要
今回は、RxJSのコード読めない問題と太刀打ちすべく、「Rxなんて怖くないよ、ほら、それっぽいものならこんな簡単に実装できるよ」っていうのが伝わって欲しくてこの記事を書いてみました。伝わるかな…(コード多いし)。
本記事ではRxJSのようなリアクティブプログラミングを可能にするライブラリの超簡易版を作ってみようと思います。
とはいえ、今回はRxJSの仕様を厳密には追わずに、一部の機能について似たような挙動をするものを気楽に作ることにします。実装方法やメソッド名などは違う部分も多々ありますが、(私が思う)分かりやすさ重視で、パフォーマンスは一旦気にせずに実装してみます。ソースコードは最後のリンク集に貼っています。
リアクティブプログラミングがどんなものか・Webフロントエンド開発などにおいてどのように役に立つのかは、以前こちらの記事に書いたので、ご興味があればご覧ください。
今回作るもの一覧
- RxJSの
Observable
のようなクラスの簡易版 - 新しくObservableを作る方法
-
interval
(指定したミリ秒単位の間隔で発火するObservableを作る関数。ストップウォッチっぽいもの)
-
- 既存のObservableを加工して新たにObservableを作る方法
- 一部のオペレータ(
map
・filter
・debounceTime
) - 複数のObservableの合成関数(
merge
・combineLatest
)
- 一部のオペレータ(
(全部読むのめんどくさい人は、Step1か2くらいまで読んでくれたら嬉しいです)
前置き
リアクティブプログラミングの説明にはよく表計算ソフトが用いられます。
例えば、
- セルAに
1
- セルBに
= A + 3
- セルCに
= B * 2
と書くと、C の値は (1 + 3) * 2 で 8 となります。
ここで、セル A の値を「2」に変えると、即座にセル B の値は 5、セル C の値は 10 になります。
このように、セル B、C は、それぞれが依存しているセルの値の変更に応じて自身の値を自動的に更新するしくみを持っているので、リアクティブであるといえます。
Webフロントエンド開発では、複雑なアプリケーションになればなるほど多数のイベント(サーバーからデータが降ってくる、ユーザーがボタンをクリックする、など)を受け取って様々な値を更新する必要があるため、イベントが起こるたびに依存している値が自動的に更新されるような仕組み=リアクティブプログラミングが役に立つと思っています。(もちろんケースバイケースですが)
必要なインターフェイス
Observableとは、自身の値が変化したときに、登録されているlistenerや自身の値に依存している別のObservableに変更を通知するような仕組みを持っているオブジェクトです。具体的には、
- 自身の値を更新するメソッド
update
- 値が変化したときに行う処理を登録する
subscribe
メソッド(例:Obs.subscribe(callbackFn)
) -
map
・filter
などの各種オペレータ(例Obs.map(x => x * 2)
)- 注:RxJS ver.6 では可変長引数メソッド
pipe
を用いてObs.pipe(map(mapFn))
のようにオペレータを入れるようになっているが、今回は手抜きするために直接各オペレータをメソッドとして実装する
- 注:RxJS ver.6 では可変長引数メソッド
というメソッドが生えている必要があります。
ここまでのメソッドで先ほどの表計算の例を書いてみると、
- セルAに
1
- セルBに
= A + 3
- セルCに
= B * 2
- セルAの値を
2
に更新
という処理は、
const A = new Observable();
A.update(1);
const B = A.map(x => x + 3);
const C = B.map(x => x * 2);
A.update(2);
B.subscribe(x => console.log("Bの値:", x)); // 4 -> 5
C.subscribe(x => console.log("Cの値:", x)); // 8 -> 10
のようになります。
Step 1. Observable classを実装する
まずは、update
メソッドとsubscribe
メソッドのみを持つObservableを実装します。
Observableは値がupdate
されたときにsubscribe
で登録された処理(関数)を実行する必要があるので、自身の値value
とsubscriberの配列subscribers
を持っていれば良さそうです。ということで以下のようになります。(手動でupdate
できてしまうので、ObservableというよりはRxJSのSubject
に対応するようなものになってしまっていますが)
// Observable.ts
export type Subscriber<T> = (value: T) => void;
export class Observable<T> {
protected value: T | undefined;
protected subscribers: Subscriber<T>[];
constructor() {
this.value = undefined;
this.subscribers = [];
}
update(value: T): void {
this.value = value; // 値の更新
this.subscribers.forEach(s => s(value)); // subscribersへの通知
}
subscribe(fn: (value: T) => void): void {
this.subscribers.push(fn);
}
}
private
ではなくprotected
にしているのは、後々Observable
クラスを継承するためです。
使用例
// main.ts
import { Observable } from './Observable';
const A = new Observable<number>();
A.subscribe(x => console.log('Aの値:', x));
A.update(1);
A.update(3);
A.update(6);
// Aの値: 1
// Aの値: 3
// Aの値: 6
Step 2. intervalを実装する
次に、指定した間隔(ミリ秒)ごとに値を発火するObservableを生成する関数 interval
を実装してみます。
内部にカウンターを持つ必要があるので、Observable
を継承したIntervalObservable
クラスを作って、interval
関数では単にこれを生成することにします。
内部では組み込みのsetInterval
関数を呼びます。本当はclearInterval
で停止できるようにすべきですが、見づらくなるので今回は省きます。
// Observable.ts
import { Observable } from './Observable';
export const interval = (milliSec: number) => new IntervalObservable(milliSec);
class IntervalObservable extends Observable<number> {
private readonly milliSec: number;
private counter: number;
constructor(milliSec: number) {
super();
this.milliSec = milliSec;
this.counter = 0;
this.start();
}
private start() {
setInterval(() => {
this.counter += 1;
this.update(this.counter);
}, this.milliSec);
}
}
使用例
// main.ts
import { interval } from './interval';
interval(1000).subscribe(console.log);
// 実行結果(1秒ごとに表示される)
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// ...
注意として、ここで作ったintervalは「hotな」Observable(生成と同時に発火を開始するObservable)を作るものですが、RxJSのintervalはcold(subscribeされてから開始する)という違いがあります。(cold/hotの違いについては「RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable」という記事の図が分かりやすいです。)
Step 3. オペレータを実装する
まずは、オペレータを施して生成した「子Observable」を参照できるように、Observable
にchildren
メンバを追加します。update
実行時に子Observableも更新するようにします。
// Observable.ts
export type Subscriber<T> = (value: T) => void;
// 追加
interface ChildObservable<T, S> extends Observable<S> {
updateWithParentValue(value: T): void;
}
export class Observable<T> {
protected value: T | undefined;
protected subscribers: Subscriber<T>[];
protected children: ChildObservable<T, any>[]; // 追加
...
update(value: T): void {
this.value = value; // 値の更新
this.subscribers.forEach(s => s(value)); // subscribersへの通知
this.children.forEach(c => c.updateWithParentValue(value)); // 追加(子Observableを発火させる)
}
...
}
Step 3-1. mapオペレータを実装する
map
はmapFn
により発火する値を変換したObservableを作るオペレータです。
map
の実装は以下のようにします。
// Observable.ts
class Observable<T> {
...
// 追加
map<S>(mapFn: (value: T) => S): Observable<S> {
const mapped = new MapObservable<T, S>(mapFn);
this.children.push(mapped);
return mapped;
}
}
class MapObservable<T, S> extends Observable<S> implements ChildObservable<T, S> {
private readonly mapFn: (value: T) => S;
constructor(mapFn: (value: T) => S) {
super();
this.mapFn = mapFn;
}
updateWithParentValue(value: T): void {
const mappedValue = this.mapFn(value);
this.update(mappedValue);
}
}
本当はファイルを分けたいのですが、循環importでエラーが出て面倒なので今回はObservableクラスを定義したファイルに追記するようにしました。複数のtsファイルを合成するスクリプトを書いておく手もあると思います。
// main.ts
import { interval } from './interval';
interval(1000)
.map(x => x * 2)
.map(x => x + 1)
.subscribe(console.log);
// 実行結果(1秒ごとに表示される)
// 3
// 5
// 7
// 9
// 11
// 13
// 15
// 17
// ...
Step 3-2. filterオペレータを実装する
filter
は値が条件を満たすかどうかfilterFn
関数で判定し、true
になるときだけ発火するObservableを作るオペレータです。
以下のようにして実装することができます。
// Observable.ts
class Observable<T> {
...
// 追加
filter(filterFn: (value: T) => boolean): Observable<T> {
const filtered = new FilterObservable<T>(filterFn);
this.children.push(filtered);
return filtered;
}
}
class FilterObservable<T> extends Observable<T> implements ChildObservable<T, T> {
private readonly filterFn: (value: T) => boolean;
constructor(filterFn: (value: T) => boolean) {
super();
this.filterFn = filterFn;
}
updateWithParentValue(value: T): void {
if (this.filterFn(value)) { // 条件を満たすときだけupdateを実行
this.update(value);
}
}
}
使用例
// main.ts
import { interval } from './interval';
interval(1000)
.filter(x => x % 2 === 0)
.subscribe(console.log);
// 2
// 4
// 6
// 8
// 10
// 12
Step 3-3. debounceTimeオペレータを実装する
debounceTime
は、あるObservableの発火頻度を抑制するオペレータです。
たとえば、inputフォーム要素の入力値に応じてリストをフィルタリングしたいとき、短時間に連続してタイプしてもすぐ重たいフィルタリングを実行せずに100ms間隔が空くタイミングを待つようにしたい、などの場面で役に立ちます。
実装は以下のようになります。updateWithParentValue
がmilliSec
未満の間隔で連続して実行されると、clearTimeout
で前回登録した処理がキャンセルされるという仕組みです。
// Observable.ts
class Observable<T> {
...
// 追加
debounceTime(milliSec: number): Observable<T> {
const result = new DebounceTimeObservable<T>(milliSec);
this.children.push(result);
return result;
}
}
class DebounceTimeObservable<T> extends Observable<T>
implements ChildObservable<T, T> {
private readonly milliSec: number;
private timerId: any;
constructor(milliSec: number) {
super();
this.milliSec = milliSec;
}
updateWithParentValue(value: T): void {
clearTimeout(this.timerId); // 前回登録したタイマー処理をキャンセルする
this.timerId = setTimeout(() => {
this.update(value);
}, this.milliSec);
}
}
使用例
// main.ts
interval(100)
.filter(x => x % 10 <= 5)
.debounceTime(200)
.subscribe(console.log);
// 実行結果(1秒ごとに発火)
// 5
// 15
// 25
// 35
// 45
// 55
// 65
// 75
// 85
// ...
/* 説明
filterまでで、
111111 222222
012345----012345----012345---- ...
というObservableになっており、200msの間隔が空いたタイミングで最後の値を発火するので、`5, 15, 25, ...` が発火される。
Step 4. 合成系の関数を実装する
複数のObservableを合成する関数を実装することでようやくリアクティブプログラミングっぽさが出てきます。
Step 4-1. merge関数を実装する
merge
は複数のObservableを合流させる(それらの発火する値をそのまま発火する)Observableです。
いつ使うのかイメージしにくいかもしれませんが、例えばinputフォームの入力とリセットボタンの入力を以降の処理でまとめて扱いたい、などの場面で役に立ちます。
以下のように実装することができます。
// merge.ts
import { Observable } from './Observable';
import { ArrayElement, Unwrap } from './util-types';
export const merge = <T extends Observable<any>[]>(
...srcs: T
): Observable<ArrayElement<Unwrap<T>>> => new MergeObservable(...srcs);
class MergeObservable<T extends Observable<any>[]> extends Observable<
ArrayElement<Unwrap<T>>
> {
constructor(...srcs: T) {
super();
srcs.forEach(src => {
src.subscribe(v => this.update(v));
});
}
}
使っている型定義が若干トリッキーなのですが、本題ではないので説明は省略します。
Observable<A>
・Observable<B>
・Observable<C>
をmerge
したときに作られるObservableの型をObservable<A | B | C>
とするために用いています。
// util-types.ts
import { Observable } from './Observable';
export type ObservableValueOf<S> = S extends Observable<infer T> ? T : never;
export type ArrayElement<S> = S extends Array<infer T> ? T : never;
export type Unwrap<S> = { [P in keyof S]: ObservableValueOf<S[P]> };
// main.ts
import { interval } from './interval';
import { merge } from './merge';
merge(
interval(1000).map(x => x * 10),
interval(1200)
).subscribe(console.log);
// 実行結果
// 10
// 1
// 20
// 2
// 30
// 3
// 40
// 4
// 50
// 5
// 60
// ...
Step 4-1. combineLatest関数を実装する
n個のObservableを受け取って、それらの最新値を合わせたタプルを発火するObservableを作る関数です。
import { Observable } from './Observable';
import { Unwrap } from './util-types';
export const combineLatest = <T extends Observable<any>[]>(
...srcs: T
): Observable<Unwrap<T>> => new MergeObservable(...srcs);
const INITIAL_VALUE = Symbol();
class MergeObservable<T extends Observable<any>[]> extends Observable<
Unwrap<T>
> {
private latestValues: any[];
constructor(...srcs: T) {
super();
this.latestValues = new Array<any>(srcs.length).fill(INITIAL_VALUE);
srcs.forEach((src, idx) => {
src.subscribe(v => {
this.latestValues[idx] = v;
if (!this.latestValues.includes(INITIAL_VALUE)) {
this.update((this.latestValues as unknown) as Unwrap<T>);
}
});
});
}
}
使用例
// main.ts
import { interval } from './interval';
import { combineLatest } from './combineLatest';
combineLatest(
interval(1000).filter(x => x % 2 === 0),
interval(1000).filter(x => x % 2 === 1)
).subscribe(console.log);
// 実行結果
// [ 2, 1 ]
// [ 2, 3 ]
// [ 4, 3 ]
// [ 4, 5 ]
// [ 6, 5 ]
// [ 6, 7 ]
// [ 8, 7 ]
// [ 8, 9 ]
// [ 10, 9 ]
// ...
まとめ
以上、かなり簡易ですが、リアクティブプログラミングライブラリを実装してみました。特によく使うものは実装したのでこれだけでもまあまあ使えるかもしれません(あと少なくともscan
やflatMap
くらいは必要そうですが)。RxJSには他にもたくさんのオペレーターが実装されているので、パズル感覚でぜひ実装してみてください。
リンク集
-
RxJSによるリアクティブプログラミング入門
- リアクティブプログラミングがなぜ嬉しいのかちゃんと考えるために、同じアプリケーションをRxJS使用/非使用でそれぞれ実装し比較したもの。1年前くらいに書いた記事。
- RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable
- ソースコード