RxJSのソースコードを読むにはミニマムなオレオレRxを書くのが一番早い

  • 20
    いいね
  • 0
    コメント

ちきさんです。
今回は RxJS from スクラッチ です。

bokuweb さんのブログ記事 実装して学ぶRxJS がとても良かったのでネタをパクってみることにしました。

Rxは時空を操るとか言われると最初は難しく感じるかもしれませんが、自分で書いてみると意外とあっさりしていて、おっこれイケるじゃんってなるんですよ。

そしてRxとはライブラリというよりも概念です。概念を実装しているだけなので RxJS, RxJava, RxSwift, Rx.NET, ... のようにメジャー言語にはライブラリが存在している可能性が高いです。
つまり一度マスターすれば色々な言語でノウハウを活かせるということですね。コスパ高くないですか。

今回は僕が好きなTypeScriptでRxJS風の最高にミニマムなコードを書いて実行してみます。
大丈夫です。超簡単です。これを理解できるとRxJSのソースコードも結構読めます。


ファイルは1つだけ。

my-little-rx.ts
/////////////////////////////////////// Interfaces
interface IObserver<T> {
  next: (data: T) => void,
  error: (err) => void,
  complete: () => void,
}

interface IObservable<T> {
  subscribe: (observer: IObserver<T>) => void,
}


/////////////////////////////////////// Observable
class Observable<T> implements IObservable<T> {
  constructor(private parentSubscribe?: (observer: IObserver<T>) => void) { }

  subscribe(observer: IObserver<T>): void {
    if (this.parentSubscribe) {
      this.parentSubscribe(observer)
    }
  }

  of: typeof of
  map: typeof map
  filter: typeof filter
  do: typeof _do
}


/////////////////////////////////////// Operators
function of<T, R>(this: Observable<T>, ...values: R[]): Observable<R> {
  return new Observable<R>(observer => {
    console.log('of is called.')
    values.forEach(value => observer.next(value))
    observer.complete()
  })
}

function map<T, R>(this: Observable<T>, projection: (value: T) => R): Observable<R> {
  return new Observable<R>(observer => {
    console.log('map is called.')
    this.subscribe({
      next: (value) => observer.next(projection(value)),
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    })
  })
}

function filter<T>(this: Observable<T>, prediction: (value: T) => boolean): Observable<T> {
  return new Observable<T>(observer => {
    console.log('filter is called.')
    this.subscribe({
      next: (value) => prediction(value) ? observer.next(value) : () => { },
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    })
  })
}

function _do<T>(this: Observable<T>, func: (value: T) => any): Observable<T> {
  return new Observable<T>(observer => {
    console.log('do is called.')
    this.subscribe({
      next: (value) => {
        func(value)
        observer.next(value)
      },
      error: (err) => observer.error(err),
      complete: () => observer.complete(),
    })
  })
}


/////////////////////////////////////// Bind operators to Observable
Observable.prototype.of = of
Observable.prototype.map = map
Observable.prototype.filter = filter
Observable.prototype.do = _do


/////////////////////////////////////// Main
new Observable()
  .of(1, 2, 3)
  .map(value => value * 2)
  .filter(value => value > 3)
  .map(value => '!' + value)
  .do(value => console.log('do:', value))
  .subscribe({
    next: (value) => console.log('next:', value),
    error: (err) => console.error(err),
    complete: () => console.log('complete!'),
  })

以上です。100行もありません。


まず簡単にするために、最後のMainのところを下記のように変更すると、

/////////////////////////////////////// Main
new Observable()
  .of(1, 2, 3)
  // .map(value => value * 2)
  // .filter(value => value > 3)
  // .map(value => '!' + value)
  // .do(value => console.log('do:', value))
  .subscribe({
    next: (value) => console.log('next:', value),
    error: (err) => console.error(err),
    complete: () => console.log('complete!'),
  })

出力はこのようになります。

OUTPUT
of is called.
next: 1
next: 2
next: 3
complete!

シンプルな出力結果ですね。これは簡単に追えそうです。

では元のコードのまま実行すると出力はどうなるかというと、このようになります。

OUTPUT
do is called.
map is called.
filter is called.
map is called.
of is called.
do: !4
next: !4
do: !6
next: !6
complete!

下から上に遡るようにコールバックが実行されていく様子がわかるでしょうか。


コールバックの連鎖を作っておいて、Subscribeしたときに逆方向に呼び出していく、それがRxの流儀です。

これは本当にミニマムなコードで実際には色々と考慮・制御しないといけないことが山ほどあります。でもRxが得体の知れない魔法ではない、ということはお分かりいただけたんじゃないでしょうか。