Reduxの概念をRxJSとTypeScriptで理解する(初心者向け)

  • 112
    いいね
  • 0
    コメント

(2016/12/15追記)
この記事はDeprecatedです。
Reduxの概念をRxJSとTypeScriptで理解する ver.2 が最新版になります。


GitHubリポジトリはこちら → ovrmrw/understanding-redux-with-rxjs
git cloneしたらnpm installしてnum run tsですぐに動かせます。

(注: Reactの話は一切出てきません)

Reduxとは

Redux公式
アプリケーション全体で状態(state)を一つのJSONツリーの構造で持ち、それをActionがキックされる度に全体更新して配信するという概念。
僕は最初FluxとかReduxとかよくわからなかったけど、色々参考にしながら自分で書いてみたらようやく理解できました。

Middlewareという概念はよくわからなかったし今でもよくわかりません。
ロガーとか便利なものもあるらしいのですがそもそも本家Reduxを動かしてみたことがない。致命的な問題としてReduxはReducerの中で非同期処理を扱えないので、それをどう扱うかでMiddleware戦争が起きているぐらいです。

Reduxを理解する

「しかしAngular派である俺達には最初からRxJSがある。だからRxJSをフルに使って非同期も扱えるReduxっぽいの書いちゃおーよ!」という元ネタがこちら。→"Tackling State" by Victor Savkin
(Victor SavkinはAngularチームの中心メンバーです。彼のブログは購読する価値があるでしょう。)

で、当時RxJSの理解が浅かった(JSの理解も浅かった)僕はこれを理解するのにすごく時間がかかりました。Savkin流Reduxを自分なりに何回も書き直して書き直して、そしてようやく辿り着いた…
実は一枚の簡単な短いコードで全てを説明できるということがわかったのです。

687474703a2f2f692e696d6775722e636f6d2f4149696d5138432e6a7067.jpeg

以下、GitHubリポジトリにあるコードそのまま。初心者でも簡単にReduxのコンセプトを理解できるはずです。
(RxJSがよくわからないという方は公式ドキュメントを読むとわかりやすいかと思います)

redux-rxjs.ts
import 'babel-polyfill';
import 'zone.js/dist/zone-node';
import * as lodash from 'lodash';
import { Observable, Subject, BehaviorSubject } from 'rxjs/Rx';
declare const Zone: any;


class IncrementAction {
  constructor(public num: number) { }
}

class OtherAction {
  constructor() { }
}

type Action = IncrementAction | OtherAction;


interface IncrementState {
  counter: number;
}

interface OtherState {
  foo: string;
  bar: number;
}

interface AppState {
  increment: Promise<IncrementState>;
  other?: OtherState;
}


Zone.current.fork({ name: 'myZone' }).runGuarded(() => {

  console.log('zone name:', Zone.current.name); /* OUTPUT> zone name: myZone */

  const initialState: AppState = {
    increment: Promise.resolve<IncrementState>({
      counter: 0
    })
  };

  let counter: number;


  const dispatcher$ = new Subject<Action>(); // Dispatcher
  const provider$ = new BehaviorSubject<AppState>(initialState); // Provider


  Observable // ReducerContainer
    .zip<AppState>(...[
      dispatcher$.scan<Promise<IncrementState>>((state, action) => { // Reducer
        if (action instanceof IncrementAction) {
          return new Promise<IncrementState>(resolve => {
            setTimeout(() => {
              state.then(s => resolve({ counter: s.counter + action.num }));
            }, 10);
          });
        } else {
          return state;
        }
      }, initialState.increment),
      (increment): AppState => { // projection
        return Object.assign<{}, AppState, {}>({}, initialState, { increment });
      }
    ])
    .subscribe(appState => {
      provider$.next(appState);
    });


  provider$
    .map<Promise<IncrementState>>(appState => appState.increment)
    .mergeMap<IncrementState>(state => {
      return Observable.fromPromise(state); // resolve async states.
    })
    .map<IncrementState>(state => lodash.cloneDeep(state)) // make states immutable.
    .distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue))
    .subscribe(state => {
      counter = state.counter;
      console.log('counter:', counter);  /* (First time) OUTPUT> counter: 0 */
    });


  dispatcher$.next(new IncrementAction(1));  /* OUTPUT> counter: 1 */
  dispatcher$.next(new IncrementAction(1));  /* OUTPUT> counter: 2 */
  dispatcher$.next(new IncrementAction(0));  /* OUTPUT> (restricted) */
  dispatcher$.next(new IncrementAction(1));  /* OUTPUT> counter: 3 */
  dispatcher$.next(new IncrementAction(-1)); /* OUTPUT> counter: 2 */

});

どうでしょうか。超簡単じゃないですか?

ちなみに今回のコードでzone.jsを使う必要はありませんでしたが、後々Angular上で動かす予定のあるコードはなるべくNode.js環境でもZoneを使って書いた方がいいです。
そうしないとNode.js環境で動いたコードがAngularで動かないとか、そういう現象に遭遇することがあるからです。(僕は実際にあった)

要点1 - Subject

  dispatcher$.next(new IncrementAction(1));

これがActionの起点です。ちなみにdispatcher$Subjectのインスタンスです。
この次にストリームがどこに流れているかわかるでしょうか。

dispatcher$.scan<Promise<IncrementState>>((state, action) => { // Reducer
  /* 省略 */
})

ここです。そしてその先のsubscribeの中でProvider(BehaviorSubject)をnextして、という具合にストリームがどんどん次に流れていきます。
ところでActionが{type: 'ADD', payload: 1}の形ではなくnew IncrementAction(1)であることにお気付きでしょうか。この点に関してVictor SavkinはTypeScriptのUnion Typesを使ってtype-safeなコードが書けることを強調しています。

要点2 - BehaviorSubject

const provider$ = new BehaviorSubject<AppState>(initialState);

ここはSubjectではなくBehaviorSubjectであることに意味があります。もしこれをSubjectに変えると最初の"counter: 0"が出力されなくなります。
初期値がすぐに配信される様子はマーブルダイアグラムで見るとわかりやすいかも知れません。
Subjectのドキュメント
S.BehaviorSubject.png

要点3 - scan

      dispatcher$.scan<Promise<IncrementState>>((state, action) => {
        if (action instanceof IncrementAction) {
          return new Promise<IncrementState>(resolve => {
            setTimeout(() => {
              state.then(s => resolve({ counter: s.counter + action.num }));
            }, 10);
          });
        } else {
          return state;
        }
      }, initialState.increment)

いわゆるReducer。
scanオペレーターはStore(Reducer)を構築する要です。これは時間をまたぐreduceであると理解できれば、それで大体OKかなと思います。このscanと事項のzipをしっかり理解できるかどうかが本記事理解の分かれ目となります。
scanのマーブルダイアグラム

Reducerの中で非同期(Promise)を扱っていることに違和感を感じますか? RxJSなら何の問題もありません。

要点4 - zip, projection

      (increment): AppState => { // projection
        return Object.assign<{}, AppState, {}>({}, initialState, { increment });
      }

zipオペレーターの最後の引数にprojectionと呼ばれる関数を入れて戻り値を整えます。
ちなみにzipの中がいくつもあるときはこのように書きます。

    .zip<AppState>(...[
      dispatcher$.scan(/* 省略 */), // state1
      dispatcher$.scan(/* 省略 */), // state2
      dispatcher$.scan(/* 省略 */), // state3

      (state1, state2, state3): AppState => { // projection
        return Object.assign<{}, AppState, {}>({}, initialState, { state1, state2, state3 });
      }
    ])

zipのマーブルダイアグラム
zipと似たものにcombineLatestというオペレーターがありますが、zipは内包する全てのObservableのnextを待つのに対し、combineLatestはどれかがnextする度に各々のObservableの最新の値を次に流します。
このためReduxの概念にはzipが適しているということになります。
combineLatestのマーブルダイアグラム

要点5 - mergeMap, fromPromise

    .mergeMap<IncrementState>(state => {
      return Observable.fromPromise(state);
    })

mergeMapオペレーターは「流れてきたストリームを別のストリームに置き換えて次に流す」というミラクルな機能を持っています。(switchMapという似たものもありますがこちらは直前のストリームをキャンセルできます)
Observable.fromPromiseで、流れてきたPromiseをthenして中身を次に流しているんですね。つまり、【Observable<Promise<IncrementState>>Observable<IncrementState>】のように置き換えています。
Reducerで非同期を扱えるといっても結局はどこかの段階でPromiseの解決を待たなければいけません。今回のコードはmergeMapの中のObservable.fromPromiseがそれを担当しているというわけですね。
mergeMapのドキュメント

switchMapで書いたver.も用意しました。下の方にコメントで出力が書いてあります。 → redux-rxjs.switchMap.ts
実際に動かしてみるとストリームがキャンセルされる様子がわかるかと思います。このCancellationと呼ばれる機構はRxJSがウリにしているものの一つです。

要点6 - immutability

    .map<IncrementState>(state => lodash.cloneDeep(state))

Promise解決後のオブジェクトをそのまま流すと、.subscribe()内でオブジェクトの中身を変更できてしまいます。これはオブジェクトの値が渡されているのではなく、メモリ参照が渡されているからです。
Reduxの原則によると"Actionの発行以外で状態を変更できてはならない"ので、lodash.cloneDeep()により副作用を生じさせない値を次に流すようにします。
lodash.cloneDeepのドキュメント

要点7 - distinctUntilChanged

    .distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue))

distinctUntilChangedオペレーターは通過するストリームが同じ値の場合に抑制する機能です。
ただし今回のコードは前述のprojectionのところで

return Object.assign<{}, AppState, {}>({}, initialState, { increment });

こうしているため、単純に.distinctUntilChanged()と書いてしまうと流れてきたデータが{counter: 2}{counter: 2}のように同じ値が続いたとしても両方通過します。これはオブジェクトの中身を見ていないからです。
オブジェクトの中身でチェックするためにはいわゆるdeepEqualの比較をしなければいけないため、comparerと呼ばれる関数を下記のように書く必要があります。

(oldValue, newValue) => lodash.isEqual(oldValue, newValue)

distinctUntilChangedのドキュメント
lodash.isEqualのドキュメント

Angularで使うには

DispatcherをDIコンテナに入れてComponentにインジェクトすれば、ComponentからActionをキックできるようになります。
またProviderのストリームの最後の方をなんとかしてComponentにインジェクトすれば、Componentで更新された状態(state)を取得できます。ただしObservable<Promise<any>>のデータをAngular標準のAsyncPipeで受けようとするとブラウザが無限ループに入ってフリーズします。(ver 2.0.0現在)
AsyncPipeはObservable, Promise, EventEmitterのいずれも受けられるように設計されていて、Observableの中にPromiseのような遅延処理が入っていることは想定されていないのだと思います。
もしPipeで受けたいのなら自作した方が安心安全です。僕の場合はこう。 → async-state.pipe.ts

まとめ

ReduxのReducersに相当するのはscanオペレーターのところですが、オリジナルReduxと違ってRxJSなら非同期の状態(Promise)を扱うことができます。
Reducerを増やしたければzipオペレーターの中にどんどんdispatcher$.scanを追加していきましょう。やることはそれだけです。簡単ですね。


(追記)
よく見たらReduxのドキュメントにこんなことが書いてありました。

The question is: do you really need Redux if you already use Rx? Maybe not. It's not hard to re-implement Redux in Rx. Some say it's a two-liner using Rx .scan() method. It may very well be!
(訳: RxJSを使えば簡単に実装できるよ。)