TypeScript
angular
RxJS
Angular2
redux
ReduxDay 15

Reduxの概念をRxJSとTypeScriptで理解する ver.2

More than 1 year has passed since last update.

Redux Advent Calendar 2016 の15日目です。ちきさんです。


前置きと反省

この記事は僕がQiitaに投稿した中で最もストックされてしまった Reduxの概念をRxJSとTypeScriptで理解する(初心者向け) をリライトしたものです。

今振り返ると前回の記事にはいくつか問題がありました。


  1. ステートとReducerにPromiseを持ち込むのはやはり筋が良くなかった。

  2. Actionのdispatch順を無視して非同期が解決した順に処理が流れる構造になっていた。

1に関しては当初は問題ないと思っていたのですが僕も年を取り、その考えは若気の至りだったのではないかと考えるようになった次第です。

2は要件によってはそれで良いと思うのですが、それでも基本的にはdispatch順に処理する方がより期待される挙動であろうと思った次第です。

「RxJSでReduxを書く」というテーマは今回のリライトでようやく本質に近づけたかなと思います。



ここから本編

GitHubリポジトリはこちら → ovrmrw/understanding-redux-with-rxjs-2

git cloneしたらnpm installしてnpm startですぐに動かせます。

(注: Reactの話は一切出てきません)


Reduxとは

Redux公式

アプリケーション全体で状態(state)を一つのJSONツリーの構造で持ち、それをActionがキックされる度に全体更新して配信するという概念。

僕は最初FluxとかReduxとかよくわからなかったけど、色々参考にしながら自分で書いてみたらようやく理解できました。

Middlewareという概念はよくわからなかったし今でもよくわかりません。

ロガーとか便利なものもあるらしいのですがそもそも本家Reduxを動かしてみたことがない。致命的な問題としてReduxはReducerの中で非同期処理を扱えないので、それをどう扱うかでMiddleware戦争が起きているぐらいです。


Reduxを理解する

「しかしAngular派である俺達には最初からRxJSがある。だからRxJSをフルに使ってReduxっぽいの書いちゃおーよ!」という元ネタがこちら。→"Tackling State" by Victor Savkin

(Victor SavkinはAngularチームの中心メンバーです。彼のブログは購読する価値があるでしょう。)

で、当時RxJSの理解が浅かった(JSの理解も浅かった)僕はこれを理解するのにすごく時間がかかりました。Savkin流Reduxを自分なりに何回も書き直して書き直して、そしてようやく辿り着いた…

実は一枚の簡単な短いコードで全てを説明できるということがわかったのです。

687474703a2f2f692e696d6775722e636f6d2f4149696d5138432e6a7067.jpeg

以下、GitHubリポジトリにあるコードそのまま。初心者でも簡単にReduxのコンセプトを理解できるはずです。

(RxJSがよくわからないという方は公式ドキュメントを読むとわかりやすいかと思います)


rxjs-redux.ts

import 'core-js';

import 'zone.js/dist/zone-node';
import * as lodash from 'lodash';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
declare const Zone: any;

///////////////////////////////// Action
class IncrementAction {
constructor(public num: number) { }
}

class OtherAction {
constructor() { }
}

type Action = IncrementAction | OtherAction;

///////////////////////////////// State
interface IncrementState {
counter: number;
}

interface OtherState {
foo: string;
bar: number;
}

interface AppState {
increment: IncrementState;
other?: OtherState;
}

const initialState: AppState = {
increment: {
counter: 0
}
};

///////////////////////////////// Redux
Zone.current.fork({ name: 'myZone' }).runGuarded(() => {

console.log('zone name:', Zone.current.name); /* OUTPUT> zone name: myZone */

const dispatcher$ = new Subject<Action | Promise<Action> | Observable<Action>>(); // Dispatcher
const provider$ = new BehaviorSubject<AppState>(initialState); // Provider

const dispatcherQueue$ = // Queue
dispatcher$
.concatMap(action => { // async actions are resolved here.
if (action instanceof Promise || action instanceof Observable) {
return Observable.from(action);
} else {
return Observable.of(action);
}
})
.share();

Observable // ReducerContainer
.zip(...[
dispatcherQueue$.scan((state, action) => { // Reducer
if (action instanceof IncrementAction) {
return { counter: state.counter + action.num };
} else {
return state;
}
}, initialState.increment),

(increment): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { increment }); // always create new state object!
}
])
.subscribe(newState => {
provider$.next(newState);
});

provider$
.map(appState => appState.increment)
.distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue)) // restrict same values to pass through.
.subscribe(state => {
console.log('counter:', state.counter); /* (First time) OUTPUT> counter: 0 */
});

/*
OUTPUT: 0 -> 1 -> 2 -> 4 -> 3
outputs are not determined by async resolution order but by action dispatched order.
*/

dispatcher$.next(promiseAction(new IncrementAction(1), 100)); /* OUTPUT> counter: 1 */
dispatcher$.next(promiseAction(new IncrementAction(1), 50)); /* OUTPUT> counter: 2 */
dispatcher$.next(observableAction(new IncrementAction(0), 100)); /* OUTPUT> (restricted) */
dispatcher$.next(observableAction(new IncrementAction(2), 50)); /* OUTPUT> counter: 4 */
dispatcher$.next(new IncrementAction(-1)); /* OUTPUT> counter: 3 */
});

///////////////////////////////// Helper
function promiseAction(action: Action, timeout: number): Promise<Action> {
return new Promise<Action>(resolve => {
setTimeout(() => resolve(action), timeout);
});
}

function observableAction(action: Action, timeout: number): Observable<Action> {
return Observable.of(action).delay(timeout);
}


どうでしょうか。超簡単じゃないですか?

ちなみに今回のコードでzone.jsを使う必要はありませんでしたが、後々Angular上で動かす予定のあるコードはなるべくNode.js環境でもZoneを使って書いた方がいいです。

そうしないとNode.js環境で動いたコードがAngularで動かないとか、そういう現象に遭遇することがあるからです。(僕はただのAngular好きです)


要点1 - Subject

  dispatcher$.next(promiseAction(new IncrementAction(1), 100));

これがActionの起点です。ちなみにdispatcher$Subjectのインスタンスです。

この次にストリームがどこに流れているかわかるでしょうか。

正解はdispatcherQueue$です。


要点2 - concatMap

  const dispatcherQueue$ = // Queue

dispatcher$
.concatMap(action => { // async actions are resolved here.
if (action instanceof Promise || action instanceof Observable) {
return Observable.from(action);
} else {
return Observable.of(action);
}
})
.share();

dispatcher$から流れてきたActionをconcatMapオペレーターで受けています。何をしているかというと、


  • PromiseかObservable、つまり非同期だったらObervable.from()で非同期を解決させて返す。

  • それ以外、つまり同期だったらObservable.of()でただ単にObservableに変換して返す。

これによりObservable<Action | Promise<Action> | Observable<Action>>だったActionの型はObservable<Action>に統一されます。

しかもconcatMapの効能によりActionのdispatch順が遵守されます。素敵です。

このReducerの手前で非同期を解決させようというアプローチはredux-observableにも通じるものがあります。


要点3 - BehaviorSubject

const provider$ = new BehaviorSubject<AppState>(initialState);

ここはSubjectではなくBehaviorSubjectであることに意味があります。もしこれをSubjectに変えると最初の"counter: 0"が出力されなくなります。

初期値がすぐに配信される様子はマーブルダイアグラムで見るとわかりやすいかも知れません。

Subjectのドキュメント

S.BehaviorSubject.png


要点4 - scan

      dispatcherQueue$.scan((state, action) => { // Reducer

if (action instanceof IncrementAction) {
return { counter: state.counter + action.num };
} else {
return state;
}
}, initialState.increment)

いわゆるReducer。

scanオペレーターはStore(Reducer)を構築する要です。これは時間をまたぐreduceであると理解できれば、それで大体OKかなと思います。このscanと事項のzipをしっかり理解できるかどうかが本記事理解の分かれ目となります。

scanのマーブルダイアグラム

dispatcher$.scan()ではなくdispatcherQueue$.scan()であることが重要です。


要点5 - zip, projection

      (increment): AppState => { // projection

return Object.assign<{}, AppState, {}>({}, initialState, { increment });
}

zipオペレーターの最後の引数にprojectionと呼ばれる関数を入れて戻り値を整えます。

ちなみにzipの中がいくつもあるときはこのように書きます。

    .zip<AppState>(...[

dispatcher$.scan(/* 省略 */), // state1
dispatcher$.scan(/* 省略 */), // state2
dispatcher$.scan(/* 省略 */), // state3

(state1, state2, state3): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { state1, state2, state3 });
}
])

zipのマーブルダイアグラム

zipと似たものにcombineLatestというオペレーターがありますが、zipは内包する全てのObservableのnextを待つのに対し、combineLatestはどれかがnextする度に各々のObservableの最新の値を次に流します。

このためReduxの概念にはzipが適しているということになります。

combineLatestのマーブルダイアグラム


要点6 - distinctUntilChanged

    .distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue))

distinctUntilChangedオペレーターは通過するストリームが同じ値の場合に抑制する機能です。

ただし今回のコードは前述のprojectionのところで

return Object.assign<{}, AppState, {}>({}, initialState, { increment });

こうしているため、単純に.distinctUntilChanged()と書いてしまうと流れてきたデータが{counter: 2}{counter: 2}のように同じ値が続いたとしても両方通過します。これはオブジェクトの中身を見ていないからです。

オブジェクトの中身でチェックするためにはいわゆるdeepEqualの比較をしなければいけないため、comparerと呼ばれる関数を下記のように書く必要があります。

(oldValue, newValue) => lodash.isEqual(oldValue, newValue)

distinctUntilChangedのドキュメント

lodash.isEqualのドキュメント


Angularで使うには

DispatcherをDIコンテナに入れてComponentにインジェクトすれば、ComponentからActionをキックできるようになります。

またProviderのストリームの最後の方をなんとかしてComponentにインジェクトすれば、Componentで更新されたステートを取得できます。

ちなみに Firebase-as-a-Store ~RxJSで作るFirebaseバックエンドのRedux~ で紹介しているAngularアプリはこの概念をそっくりそのまま使って構築しています。


まとめ

要点はいくつかありますが、最も重要なのはReducerを構成するscan,zipオペレーターの部分です。

Reducerを増やしたければzipオペレーターの中にどんどんdispatcherQueue$.scanを追加していきましょう。やることはそれだけです。