Help us understand the problem. What is going on with this article?

Reduxの概念をRxJSとTypeScriptで理解する ver.2

More than 3 years have passed since last update.

Redux Advent Calendar 2016 の15日目です。ちきさんです。

前置きと反省

この記事は僕がQiitaに投稿した中で最もストックされてしまった Reduxの概念をRxJSとTypeScriptで理解する(初心者向け) をリライトしたものです。

今振り返ると前回の記事にはいくつか問題がありました。

  1. ステートとReducerにPromiseを持ち込むのはやはり筋が良くなかった。
  2. Actionのdispatch順を無視して非同期が解決した順に処理が流れる構造になっていた。

1に関しては当初は問題ないと思っていたのですが僕も年を取り、その考えは若気の至りだったのではないかと考えるようになった次第です。
2は要件によってはそれで良いと思うのですが、それでも基本的にはdispatch順に処理する方がより期待される挙動であろうと思った次第です。

「RxJSでReduxを書く」というテーマは今回のリライトでようやく本質に近づけたかなと思います。


ここから本編

GitHubリポジトリはこちら → ovrmrw/understanding-redux-with-rxjs-2
git cloneしたらnpm installしてnpm startですぐに動かせます。

(注: Reactの話は一切出てきません)

Reduxとは

Redux公式
アプリケーション全体で状態(state)を一つのJSONツリーの構造で持ち、それをActionがキックされる度に全体更新して配信するという概念。
僕は最初FluxとかReduxとかよくわからなかったけど、色々参考にしながら自分で書いてみたらようやく理解できました。

Middlewareという概念はよくわからなかったし今でもよくわかりません。
ロガーとか便利なものもあるらしいのですがそもそも本家Reduxを動かしてみたことがない。致命的な問題としてReduxはReducerの中で非同期処理を扱えないので、それをどう扱うかでMiddleware戦争が起きているぐらいです。

Reduxを理解する

「しかしAngular派である俺達には最初からRxJSがある。だからRxJSをフルに使ってReduxっぽいの書いちゃおーよ!」という元ネタがこちら。→"Tackling State" by Victor Savkin
(Victor SavkinはAngularチームの中心メンバーです。彼のブログは購読する価値があるでしょう。)

で、当時RxJSの理解が浅かった(JSの理解も浅かった)僕はこれを理解するのにすごく時間がかかりました。Savkin流Reduxを自分なりに何回も書き直して書き直して、そしてようやく辿り着いた…
実は一枚の簡単な短いコードで全てを説明できるということがわかったのです。

687474703a2f2f692e696d6775722e636f6d2f4149696d5138432e6a7067.jpeg

以下、GitHubリポジトリにあるコードそのまま。初心者でも簡単にReduxのコンセプトを理解できるはずです。
(RxJSがよくわからないという方は公式ドキュメントを読むとわかりやすいかと思います)

rxjs-redux.ts
import 'core-js';
import 'zone.js/dist/zone-node';
import * as lodash from 'lodash';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
declare const Zone: any;


///////////////////////////////// Action
class IncrementAction {
  constructor(public num: number) { }
}

class OtherAction {
  constructor() { }
}

type Action = IncrementAction | OtherAction;


///////////////////////////////// State
interface IncrementState {
  counter: number;
}

interface OtherState {
  foo: string;
  bar: number;
}

interface AppState {
  increment: IncrementState;
  other?: OtherState;
}


const initialState: AppState = {
  increment: {
    counter: 0
  }
};


///////////////////////////////// Redux
Zone.current.fork({ name: 'myZone' }).runGuarded(() => {

  console.log('zone name:', Zone.current.name); /* OUTPUT> zone name: myZone */

  const dispatcher$ = new Subject<Action | Promise<Action> | Observable<Action>>(); // Dispatcher
  const provider$ = new BehaviorSubject<AppState>(initialState); // Provider


  const dispatcherQueue$ = // Queue
    dispatcher$
      .concatMap(action => { // async actions are resolved here.
        if (action instanceof Promise || action instanceof Observable) {
          return Observable.from(action);
        } else {
          return Observable.of(action);
        }
      })
      .share();


  Observable // ReducerContainer
    .zip(...[
      dispatcherQueue$.scan((state, action) => { // Reducer
        if (action instanceof IncrementAction) {
          return { counter: state.counter + action.num };
        } else {
          return state;
        }
      }, initialState.increment),

      (increment): AppState => { // projection
        return Object.assign<{}, AppState, {}>({}, initialState, { increment }); // always create new state object!
      }
    ])
    .subscribe(newState => {
      provider$.next(newState);
    });


  provider$
    .map(appState => appState.increment)
    .distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue)) // restrict same values to pass through.
    .subscribe(state => {
      console.log('counter:', state.counter); /* (First time) OUTPUT> counter: 0 */
    });


  /* 
    OUTPUT: 0 -> 1 -> 2 -> 4 -> 3 
    outputs are not determined by async resolution order but by action dispatched order.
  */
  dispatcher$.next(promiseAction(new IncrementAction(1), 100));  /* OUTPUT> counter: 1 */
  dispatcher$.next(promiseAction(new IncrementAction(1), 50));  /* OUTPUT> counter: 2 */
  dispatcher$.next(observableAction(new IncrementAction(0), 100));  /* OUTPUT> (restricted) */
  dispatcher$.next(observableAction(new IncrementAction(2), 50));  /* OUTPUT> counter: 4 */
  dispatcher$.next(new IncrementAction(-1)); /* OUTPUT> counter: 3 */
});



///////////////////////////////// Helper
function promiseAction(action: Action, timeout: number): Promise<Action> {
  return new Promise<Action>(resolve => {
    setTimeout(() => resolve(action), timeout);
  });
}

function observableAction(action: Action, timeout: number): Observable<Action> {
  return Observable.of(action).delay(timeout);
}

どうでしょうか。超簡単じゃないですか?

ちなみに今回のコードでzone.jsを使う必要はありませんでしたが、後々Angular上で動かす予定のあるコードはなるべくNode.js環境でもZoneを使って書いた方がいいです。
そうしないとNode.js環境で動いたコードがAngularで動かないとか、そういう現象に遭遇することがあるからです。(僕はただのAngular好きです)

要点1 - Subject

  dispatcher$.next(promiseAction(new IncrementAction(1), 100));

これがActionの起点です。ちなみにdispatcher$Subjectのインスタンスです。
この次にストリームがどこに流れているかわかるでしょうか。

正解はdispatcherQueue$です。

要点2 - concatMap

  const dispatcherQueue$ = // Queue
    dispatcher$
      .concatMap(action => { // async actions are resolved here.
        if (action instanceof Promise || action instanceof Observable) {
          return Observable.from(action);
        } else {
          return Observable.of(action);
        }
      })
      .share();

dispatcher$から流れてきたActionをconcatMapオペレーターで受けています。何をしているかというと、

  • PromiseかObservable、つまり非同期だったらObervable.from()で非同期を解決させて返す。
  • それ以外、つまり同期だったらObservable.of()でただ単にObservableに変換して返す。

これによりObservable<Action | Promise<Action> | Observable<Action>>だったActionの型はObservable<Action>に統一されます。

しかもconcatMapの効能によりActionのdispatch順が遵守されます。素敵です。
このReducerの手前で非同期を解決させようというアプローチはredux-observableにも通じるものがあります。

要点3 - BehaviorSubject

const provider$ = new BehaviorSubject<AppState>(initialState);

ここはSubjectではなくBehaviorSubjectであることに意味があります。もしこれをSubjectに変えると最初の"counter: 0"が出力されなくなります。
初期値がすぐに配信される様子はマーブルダイアグラムで見るとわかりやすいかも知れません。
Subjectのドキュメント
S.BehaviorSubject.png

要点4 - scan

      dispatcherQueue$.scan((state, action) => { // Reducer
        if (action instanceof IncrementAction) {
          return { counter: state.counter + action.num };
        } else {
          return state;
        }
      }, initialState.increment)

いわゆるReducer。
scanオペレーターはStore(Reducer)を構築する要です。これは時間をまたぐreduceであると理解できれば、それで大体OKかなと思います。このscanと事項のzipをしっかり理解できるかどうかが本記事理解の分かれ目となります。
scanのマーブルダイアグラム

dispatcher$.scan()ではなくdispatcherQueue$.scan()であることが重要です。

要点5 - zip, projection

      (increment): AppState => { // projection
        return Object.assign<{}, AppState, {}>({}, initialState, { increment });
      }

zipオペレーターの最後の引数にprojectionと呼ばれる関数を入れて戻り値を整えます。
ちなみにzipの中がいくつもあるときはこのように書きます。

    .zip<AppState>(...[
      dispatcher$.scan(/* 省略 */), // state1
      dispatcher$.scan(/* 省略 */), // state2
      dispatcher$.scan(/* 省略 */), // state3

      (state1, state2, state3): AppState => { // projection
        return Object.assign<{}, AppState, {}>({}, initialState, { state1, state2, state3 });
      }
    ])

zipのマーブルダイアグラム
zipと似たものにcombineLatestというオペレーターがありますが、zipは内包する全てのObservableのnextを待つのに対し、combineLatestはどれかがnextする度に各々のObservableの最新の値を次に流します。
このためReduxの概念にはzipが適しているということになります。
combineLatestのマーブルダイアグラム

要点6 - distinctUntilChanged

    .distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue))

distinctUntilChangedオペレーターは通過するストリームが同じ値の場合に抑制する機能です。
ただし今回のコードは前述のprojectionのところで

return Object.assign<{}, AppState, {}>({}, initialState, { increment });

こうしているため、単純に.distinctUntilChanged()と書いてしまうと流れてきたデータが{counter: 2}{counter: 2}のように同じ値が続いたとしても両方通過します。これはオブジェクトの中身を見ていないからです。
オブジェクトの中身でチェックするためにはいわゆるdeepEqualの比較をしなければいけないため、comparerと呼ばれる関数を下記のように書く必要があります。

(oldValue, newValue) => lodash.isEqual(oldValue, newValue)

distinctUntilChangedのドキュメント
lodash.isEqualのドキュメント

Angularで使うには

DispatcherをDIコンテナに入れてComponentにインジェクトすれば、ComponentからActionをキックできるようになります。
またProviderのストリームの最後の方をなんとかしてComponentにインジェクトすれば、Componentで更新されたステートを取得できます。

ちなみに Firebase-as-a-Store ~RxJSで作るFirebaseバックエンドのRedux~ で紹介しているAngularアプリはこの概念をそっくりそのまま使って構築しています。

まとめ

要点はいくつかありますが、最も重要なのはReducerを構成するscan,zipオペレーターの部分です。
Reducerを増やしたければzipオペレーターの中にどんどんdispatcherQueue$.scanを追加していきましょう。やることはそれだけです。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした