1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

簡易リアクティブプログラミングライブラリの作り方(TypeScript)

Last updated at Posted at 2019-12-12

この記事は、LeapMind Advent Calendar 2019 13 日目の記事です。

概要

今回は、RxJSのコード読めない問題と太刀打ちすべく、「Rxなんて怖くないよ、ほら、それっぽいものならこんな簡単に実装できるよ」っていうのが伝わって欲しくてこの記事を書いてみました。伝わるかな…(コード多いし)。

本記事ではRxJSのようなリアクティブプログラミングを可能にするライブラリの超簡易版を作ってみようと思います。
とはいえ、今回はRxJSの仕様を厳密には追わずに、一部の機能について似たような挙動をするものを気楽に作ることにします。実装方法やメソッド名などは違う部分も多々ありますが、(私が思う)分かりやすさ重視で、パフォーマンスは一旦気にせずに実装してみます。ソースコードは最後のリンク集に貼っています。

リアクティブプログラミングがどんなものか・Webフロントエンド開発などにおいてどのように役に立つのかは、以前こちらの記事に書いたので、ご興味があればご覧ください。


今回作るもの一覧

  • RxJSのObservableのようなクラスの簡易版
  • 新しくObservableを作る方法
    • interval(指定したミリ秒単位の間隔で発火するObservableを作る関数。ストップウォッチっぽいもの)
  • 既存のObservableを加工して新たにObservableを作る方法
    • 一部のオペレータ(mapfilterdebounceTime
    • 複数のObservableの合成関数(mergecombineLatest

(全部読むのめんどくさい人は、Step1か2くらいまで読んでくれたら嬉しいです)

前置き

リアクティブプログラミングの説明にはよく表計算ソフトが用いられます。

例えば、

  • セルAに1
  • セルBに= A + 3
  • セルCに= B * 2

と書くと、C の値は (1 + 3) * 2 で 8 となります。
ここで、セル A の値を「2」に変えると、即座にセル B の値は 5、セル C の値は 10 になります。
このように、セル B、C は、それぞれが依存しているセルの値の変更に応じて自身の値を自動的に更新するしくみを持っているので、リアクティブであるといえます。

Webフロントエンド開発では、複雑なアプリケーションになればなるほど多数のイベント(サーバーからデータが降ってくる、ユーザーがボタンをクリックする、など)を受け取って様々な値を更新する必要があるため、イベントが起こるたびに依存している値が自動的に更新されるような仕組み=リアクティブプログラミングが役に立つと思っています。(もちろんケースバイケースですが)

必要なインターフェイス

Observableとは、自身の値が変化したときに、登録されているlistenerや自身の値に依存している別のObservableに変更を通知するような仕組みを持っているオブジェクトです。具体的には、

  • 自身の値を更新するメソッド update
  • 値が変化したときに行う処理を登録する subscribe メソッド(例:Obs.subscribe(callbackFn)
  • mapfilterなどの各種オペレータ(例Obs.map(x => x * 2)
    • 注:RxJS ver.6 では可変長引数メソッド pipe を用いてObs.pipe(map(mapFn))のようにオペレータを入れるようになっているが、今回は手抜きするために直接各オペレータをメソッドとして実装する

というメソッドが生えている必要があります。

ここまでのメソッドで先ほどの表計算の例を書いてみると、

  • セルAに1
  • セルBに= A + 3
  • セルCに= B * 2
  • セルAの値を2に更新

という処理は、

const A = new Observable();
A.update(1);
const B = A.map(x => x + 3);
const C = B.map(x => x * 2);
A.update(2);

B.subscribe(x => console.log("Bの値:", x));  // 4 -> 5
C.subscribe(x => console.log("Cの値:", x));  // 8 -> 10

のようになります。

Step 1. Observable classを実装する

まずは、updateメソッドとsubscribeメソッドのみを持つObservableを実装します。

Observableは値がupdateされたときにsubscribeで登録された処理(関数)を実行する必要があるので、自身の値valueとsubscriberの配列subscribersを持っていれば良さそうです。ということで以下のようになります。(手動でupdateできてしまうので、ObservableというよりはRxJSのSubjectに対応するようなものになってしまっていますが)

// Observable.ts

export type Subscriber<T> = (value: T) => void;

export class Observable<T> {
  protected value: T | undefined;
  protected subscribers: Subscriber<T>[];

  constructor() {
    this.value = undefined;
    this.subscribers = [];
  }

  update(value: T): void {
    this.value = value; // 値の更新
    this.subscribers.forEach(s => s(value)); // subscribersへの通知
  }

  subscribe(fn: (value: T) => void): void {
    this.subscribers.push(fn);
  }
}

privateではなくprotectedにしているのは、後々Observableクラスを継承するためです。

使用例

// main.ts

import { Observable } from './Observable';

const A = new Observable<number>();
A.subscribe(x => console.log('Aの値:', x));

A.update(1);
A.update(3);
A.update(6);

// Aの値: 1
// Aの値: 3
// Aの値: 6

Step 2. intervalを実装する

次に、指定した間隔(ミリ秒)ごとに値を発火するObservableを生成する関数 interval を実装してみます。
内部にカウンターを持つ必要があるので、Observableを継承したIntervalObservableクラスを作って、interval関数では単にこれを生成することにします。
内部では組み込みのsetInterval関数を呼びます。本当はclearIntervalで停止できるようにすべきですが、見づらくなるので今回は省きます。

// Observable.ts

import { Observable } from './Observable';

export const interval = (milliSec: number) => new IntervalObservable(milliSec);

class IntervalObservable extends Observable<number> {
  private readonly milliSec: number;
  private counter: number;

  constructor(milliSec: number) {
    super();
    this.milliSec = milliSec;
    this.counter = 0;
    this.start();
  }

  private start() {
    setInterval(() => {
      this.counter += 1;
      this.update(this.counter);
    }, this.milliSec);
  }
}

使用例

// main.ts

import { interval } from './interval';

interval(1000).subscribe(console.log);

// 実行結果(1秒ごとに表示される)
// 1
// 2
// 3
// 4
// 5
// 6
// 7
// ...

注意として、ここで作ったintervalは「hotな」Observable(生成と同時に発火を開始するObservable)を作るものですが、RxJSのintervalはcold(subscribeされてから開始する)という違いがあります。(cold/hotの違いについては「RxJS を学ぼう #4 - COLD と HOT について学ぶ / ConnectableObservable」という記事の図が分かりやすいです。)

Step 3. オペレータを実装する

まずは、オペレータを施して生成した「子Observable」を参照できるように、Observablechildrenメンバを追加します。update実行時に子Observableも更新するようにします。

// Observable.ts

export type Subscriber<T> = (value: T) => void;

// 追加
interface ChildObservable<T, S> extends Observable<S> {
  updateWithParentValue(value: T): void;
}

export class Observable<T> {
  protected value: T | undefined;
  protected subscribers: Subscriber<T>[];
  protected children: ChildObservable<T, any>[];  // 追加

  ...

  update(value: T): void {
    this.value = value; // 値の更新
    this.subscribers.forEach(s => s(value)); // subscribersへの通知
    this.children.forEach(c => c.updateWithParentValue(value)); // 追加(子Observableを発火させる)
  }

  ...
}

Step 3-1. mapオペレータを実装する

mapmapFnにより発火する値を変換したObservableを作るオペレータです。

mapの実装は以下のようにします。

// Observable.ts

class Observable<T> {
  ...

  // 追加
  map<S>(mapFn: (value: T) => S): Observable<S> {
    const mapped = new MapObservable<T, S>(mapFn);
    this.children.push(mapped);
    return mapped;
  }
}

class MapObservable<T, S> extends Observable<S> implements ChildObservable<T, S> {
  private readonly mapFn: (value: T) => S;

  constructor(mapFn: (value: T) => S) {
    super();
    this.mapFn = mapFn;
  }

  updateWithParentValue(value: T): void {
    const mappedValue = this.mapFn(value);
    this.update(mappedValue);
  }
}

本当はファイルを分けたいのですが、循環importでエラーが出て面倒なので今回はObservableクラスを定義したファイルに追記するようにしました。複数のtsファイルを合成するスクリプトを書いておく手もあると思います。

// main.ts

import { interval } from './interval';

interval(1000)
  .map(x => x * 2)
  .map(x => x + 1)
  .subscribe(console.log);

// 実行結果(1秒ごとに表示される)
// 3
// 5
// 7
// 9
// 11
// 13
// 15
// 17
// ...

Step 3-2. filterオペレータを実装する

filterは値が条件を満たすかどうかfilterFn関数で判定し、trueになるときだけ発火するObservableを作るオペレータです。
以下のようにして実装することができます。

// Observable.ts

class Observable<T> {
  ...

  // 追加
  filter(filterFn: (value: T) => boolean): Observable<T> {
    const filtered = new FilterObservable<T>(filterFn);
    this.children.push(filtered);
    return filtered;
  }
}

class FilterObservable<T> extends Observable<T> implements ChildObservable<T, T> {
  private readonly filterFn: (value: T) => boolean;

  constructor(filterFn: (value: T) => boolean) {
    super();
    this.filterFn = filterFn;
  }

  updateWithParentValue(value: T): void {
    if (this.filterFn(value)) {  // 条件を満たすときだけupdateを実行
      this.update(value);
    }
  }
}

使用例

// main.ts

import { interval } from './interval';

interval(1000)
  .filter(x => x % 2 === 0)
  .subscribe(console.log);

// 2
// 4
// 6
// 8
// 10
// 12

Step 3-3. debounceTimeオペレータを実装する

debounceTimeは、あるObservableの発火頻度を抑制するオペレータです。
たとえば、inputフォーム要素の入力値に応じてリストをフィルタリングしたいとき、短時間に連続してタイプしてもすぐ重たいフィルタリングを実行せずに100ms間隔が空くタイミングを待つようにしたい、などの場面で役に立ちます。

実装は以下のようになります。updateWithParentValuemilliSec未満の間隔で連続して実行されると、clearTimeoutで前回登録した処理がキャンセルされるという仕組みです。

// Observable.ts

class Observable<T> {
  ...

  // 追加
  debounceTime(milliSec: number): Observable<T> {
    const result = new DebounceTimeObservable<T>(milliSec);
    this.children.push(result);
    return result;
  }
}

class DebounceTimeObservable<T> extends Observable<T>
  implements ChildObservable<T, T> {
  private readonly milliSec: number;
  private timerId: any;

  constructor(milliSec: number) {
    super();
    this.milliSec = milliSec;
  }

  updateWithParentValue(value: T): void {
    clearTimeout(this.timerId); // 前回登録したタイマー処理をキャンセルする
    this.timerId = setTimeout(() => {
      this.update(value);
    }, this.milliSec);
  }
}

使用例

// main.ts

interval(100)
  .filter(x => x % 10 <= 5)
  .debounceTime(200)
  .subscribe(console.log);

// 実行結果(1秒ごとに発火)
// 5
// 15
// 25
// 35
// 45
// 55
// 65
// 75
// 85
// ...

/* 説明
 filterまでで、
           111111    222222
 012345----012345----012345---- ...

 というObservableになっており、200msの間隔が空いたタイミングで最後の値を発火するので、`5, 15, 25, ...` が発火される。

Step 4. 合成系の関数を実装する

複数のObservableを合成する関数を実装することでようやくリアクティブプログラミングっぽさが出てきます。

Step 4-1. merge関数を実装する

mergeは複数のObservableを合流させる(それらの発火する値をそのまま発火する)Observableです。
いつ使うのかイメージしにくいかもしれませんが、例えばinputフォームの入力とリセットボタンの入力を以降の処理でまとめて扱いたい、などの場面で役に立ちます。

以下のように実装することができます。

// merge.ts

import { Observable } from './Observable';
import { ArrayElement, Unwrap } from './util-types';

export const merge = <T extends Observable<any>[]>(
  ...srcs: T
): Observable<ArrayElement<Unwrap<T>>> => new MergeObservable(...srcs);

class MergeObservable<T extends Observable<any>[]> extends Observable<
  ArrayElement<Unwrap<T>>
> {
  constructor(...srcs: T) {
    super();

    srcs.forEach(src => {
      src.subscribe(v => this.update(v));
    });
  }
}

使っている型定義が若干トリッキーなのですが、本題ではないので説明は省略します。
Observable<A>Observable<B>Observable<C>mergeしたときに作られるObservableの型をObservable<A | B | C>とするために用いています。

// util-types.ts

import { Observable } from './Observable';

export type ObservableValueOf<S> = S extends Observable<infer T> ? T : never;

export type ArrayElement<S> = S extends Array<infer T> ? T : never;

export type Unwrap<S> = { [P in keyof S]: ObservableValueOf<S[P]> };
// main.ts

import { interval } from './interval';
import { merge } from './merge';

merge(
  interval(1000).map(x => x * 10),
  interval(1200)
).subscribe(console.log);

// 実行結果
// 10
// 1
// 20
// 2
// 30
// 3
// 40
// 4
// 50
// 5
// 60
// ...

Step 4-1. combineLatest関数を実装する

n個のObservableを受け取って、それらの最新値を合わせたタプルを発火するObservableを作る関数です。

import { Observable } from './Observable';
import { Unwrap } from './util-types';

export const combineLatest = <T extends Observable<any>[]>(
  ...srcs: T
): Observable<Unwrap<T>> => new MergeObservable(...srcs);

const INITIAL_VALUE = Symbol();

class MergeObservable<T extends Observable<any>[]> extends Observable<
  Unwrap<T>
> {
  private latestValues: any[];

  constructor(...srcs: T) {
    super();

    this.latestValues = new Array<any>(srcs.length).fill(INITIAL_VALUE);

    srcs.forEach((src, idx) => {
      src.subscribe(v => {
        this.latestValues[idx] = v;

        if (!this.latestValues.includes(INITIAL_VALUE)) {
          this.update((this.latestValues as unknown) as Unwrap<T>);
        }
      });
    });
  }
}

使用例

// main.ts

import { interval } from './interval';
import { combineLatest } from './combineLatest';

combineLatest(
  interval(1000).filter(x => x % 2 === 0),
  interval(1000).filter(x => x % 2 === 1)
).subscribe(console.log);

// 実行結果
// [ 2, 1 ]
// [ 2, 3 ]
// [ 4, 3 ]
// [ 4, 5 ]
// [ 6, 5 ]
// [ 6, 7 ]
// [ 8, 7 ]
// [ 8, 9 ]
// [ 10, 9 ]
// ...

まとめ

以上、かなり簡易ですが、リアクティブプログラミングライブラリを実装してみました。特によく使うものは実装したのでこれだけでもまあまあ使えるかもしれません(あと少なくともscanflatMapくらいは必要そうですが)。RxJSには他にもたくさんのオペレーターが実装されているので、パズル感覚でぜひ実装してみてください。

リンク集

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?