Redux Advent Calendar 2016 の15日目です。ちきさんです。
前置きと反省
この記事は僕がQiitaに投稿した中で最もストックされてしまった Reduxの概念をRxJSとTypeScriptで理解する(初心者向け) をリライトしたものです。
今振り返ると前回の記事にはいくつか問題がありました。
- ステートとReducerにPromiseを持ち込むのはやはり筋が良くなかった。
- Actionのdispatch順を無視して非同期が解決した順に処理が流れる構造になっていた。
1に関しては当初は問題ないと思っていたのですが僕も年を取り、その考えは若気の至りだったのではないかと考えるようになった次第です。
2は要件によってはそれで良いと思うのですが、それでも基本的にはdispatch順に処理する方がより期待される挙動であろうと思った次第です。
「RxJSでReduxを書く」というテーマは今回のリライトでようやく本質に近づけたかなと思います。
ここから本編
GitHubリポジトリはこちら → ovrmrw/understanding-redux-with-rxjs-2
git clone
したらnpm install
してnpm start
ですぐに動かせます。
(注: Reactの話は一切出てきません)
Reduxとは
Redux公式
アプリケーション全体で状態(state)を一つのJSONツリーの構造で持ち、それをActionがキックされる度に全体更新して配信するという概念。
僕は最初FluxとかReduxとかよくわからなかったけど、色々参考にしながら自分で書いてみたらようやく理解できました。
Middlewareという概念はよくわからなかったし今でもよくわかりません。
ロガーとか便利なものもあるらしいのですがそもそも本家Reduxを動かしてみたことがない。致命的な問題としてReduxはReducerの中で非同期処理を扱えないので、それをどう扱うかでMiddleware戦争が起きているぐらいです。
Reduxを理解する
「しかしAngular派である俺達には最初からRxJSがある。だからRxJSをフルに使ってReduxっぽいの書いちゃおーよ!」という元ネタがこちら。→"Tackling State" by Victor Savkin
(Victor SavkinはAngularチームの中心メンバーです。彼のブログは購読する価値があるでしょう。)
で、当時RxJSの理解が浅かった(JSの理解も浅かった)僕はこれを理解するのにすごく時間がかかりました。Savkin流Reduxを自分なりに何回も書き直して書き直して、そしてようやく辿り着いた…
実は一枚の簡単な短いコードで全てを説明できるということがわかったのです。
以下、GitHubリポジトリにあるコードそのまま。初心者でも簡単にReduxのコンセプトを理解できるはずです。
(RxJSがよくわからないという方は公式ドキュメントを読むとわかりやすいかと思います)
import 'core-js';
import 'zone.js/dist/zone-node';
import * as lodash from 'lodash';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
declare const Zone: any;
///////////////////////////////// Action
class IncrementAction {
constructor(public num: number) { }
}
class OtherAction {
constructor() { }
}
type Action = IncrementAction | OtherAction;
///////////////////////////////// State
interface IncrementState {
counter: number;
}
interface OtherState {
foo: string;
bar: number;
}
interface AppState {
increment: IncrementState;
other?: OtherState;
}
const initialState: AppState = {
increment: {
counter: 0
}
};
///////////////////////////////// Redux
Zone.current.fork({ name: 'myZone' }).runGuarded(() => {
console.log('zone name:', Zone.current.name); /* OUTPUT> zone name: myZone */
const dispatcher$ = new Subject<Action | Promise<Action> | Observable<Action>>(); // Dispatcher
const provider$ = new BehaviorSubject<AppState>(initialState); // Provider
const dispatcherQueue$ = // Queue
dispatcher$
.concatMap(action => { // async actions are resolved here.
if (action instanceof Promise || action instanceof Observable) {
return Observable.from(action);
} else {
return Observable.of(action);
}
})
.share();
Observable // ReducerContainer
.zip(...[
dispatcherQueue$.scan((state, action) => { // Reducer
if (action instanceof IncrementAction) {
return { counter: state.counter + action.num };
} else {
return state;
}
}, initialState.increment),
(increment): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { increment }); // always create new state object!
}
])
.subscribe(newState => {
provider$.next(newState);
});
provider$
.map(appState => appState.increment)
.distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue)) // restrict same values to pass through.
.subscribe(state => {
console.log('counter:', state.counter); /* (First time) OUTPUT> counter: 0 */
});
/*
OUTPUT: 0 -> 1 -> 2 -> 4 -> 3
outputs are not determined by async resolution order but by action dispatched order.
*/
dispatcher$.next(promiseAction(new IncrementAction(1), 100)); /* OUTPUT> counter: 1 */
dispatcher$.next(promiseAction(new IncrementAction(1), 50)); /* OUTPUT> counter: 2 */
dispatcher$.next(observableAction(new IncrementAction(0), 100)); /* OUTPUT> (restricted) */
dispatcher$.next(observableAction(new IncrementAction(2), 50)); /* OUTPUT> counter: 4 */
dispatcher$.next(new IncrementAction(-1)); /* OUTPUT> counter: 3 */
});
///////////////////////////////// Helper
function promiseAction(action: Action, timeout: number): Promise<Action> {
return new Promise<Action>(resolve => {
setTimeout(() => resolve(action), timeout);
});
}
function observableAction(action: Action, timeout: number): Observable<Action> {
return Observable.of(action).delay(timeout);
}
どうでしょうか。超簡単じゃないですか?
ちなみに今回のコードでzone.jsを使う必要はありませんでしたが、後々Angular上で動かす予定のあるコードはなるべくNode.js環境でもZone
を使って書いた方がいいです。
そうしないとNode.js環境で動いたコードがAngularで動かないとか、そういう現象に遭遇することがあるからです。(僕はただのAngular好きです)
要点1 - Subject
dispatcher$.next(promiseAction(new IncrementAction(1), 100));
これがActionの起点です。ちなみにdispatcher$
はSubject
のインスタンスです。
この次にストリームがどこに流れているかわかるでしょうか。
正解はdispatcherQueue$
です。
要点2 - concatMap
const dispatcherQueue$ = // Queue
dispatcher$
.concatMap(action => { // async actions are resolved here.
if (action instanceof Promise || action instanceof Observable) {
return Observable.from(action);
} else {
return Observable.of(action);
}
})
.share();
dispatcher$
から流れてきたActionをconcatMap
オペレーターで受けています。何をしているかというと、
- PromiseかObservable、つまり非同期だったら
Obervable.from()
で非同期を解決させて返す。 - それ以外、つまり同期だったら
Observable.of()
でただ単にObservableに変換して返す。
これによりObservable<Action | Promise<Action> | Observable<Action>>
だったActionの型はObservable<Action>
に統一されます。
しかもconcatMap
の効能によりActionのdispatch順が遵守されます。素敵です。
このReducerの手前で非同期を解決させようというアプローチはredux-observableにも通じるものがあります。
要点3 - BehaviorSubject
const provider$ = new BehaviorSubject<AppState>(initialState);
ここはSubject
ではなくBehaviorSubject
であることに意味があります。もしこれをSubject
に変えると最初の"counter: 0"
が出力されなくなります。
初期値がすぐに配信される様子はマーブルダイアグラムで見るとわかりやすいかも知れません。
Subjectのドキュメント
要点4 - scan
dispatcherQueue$.scan((state, action) => { // Reducer
if (action instanceof IncrementAction) {
return { counter: state.counter + action.num };
} else {
return state;
}
}, initialState.increment)
いわゆるReducer。
scan
オペレーターはStore(Reducer)を構築する要です。これは時間をまたぐreduceであると理解できれば、それで大体OKかなと思います。このscan
と事項のzip
をしっかり理解できるかどうかが本記事理解の分かれ目となります。
scanのマーブルダイアグラム
dispatcher$.scan()
ではなくdispatcherQueue$.scan()
であることが重要です。
要点5 - zip, projection
(increment): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { increment });
}
zip
オペレーターの最後の引数にprojectionと呼ばれる関数を入れて戻り値を整えます。
ちなみにzip
の中がいくつもあるときはこのように書きます。
.zip<AppState>(...[
dispatcher$.scan(/* 省略 */), // state1
dispatcher$.scan(/* 省略 */), // state2
dispatcher$.scan(/* 省略 */), // state3
(state1, state2, state3): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { state1, state2, state3 });
}
])
zipのマーブルダイアグラム
zip
と似たものにcombineLatest
というオペレーターがありますが、zip
は内包する全てのObservableのnextを待つのに対し、combineLatest
はどれかがnextする度に各々のObservableの最新の値を次に流します。
このためReduxの概念にはzip
が適しているということになります。
combineLatestのマーブルダイアグラム
要点6 - distinctUntilChanged
.distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue))
distinctUntilChanged
オペレーターは通過するストリームが同じ値の場合に抑制する機能です。
ただし今回のコードは前述のprojectionのところで
return Object.assign<{}, AppState, {}>({}, initialState, { increment });
こうしているため、単純に.distinctUntilChanged()
と書いてしまうと流れてきたデータが{counter: 2}
→{counter: 2}
のように同じ値が続いたとしても両方通過します。これはオブジェクトの中身を見ていないからです。
オブジェクトの中身でチェックするためにはいわゆるdeepEqualの比較をしなければいけないため、comparerと呼ばれる関数を下記のように書く必要があります。
(oldValue, newValue) => lodash.isEqual(oldValue, newValue)
distinctUntilChangedのドキュメント
lodash.isEqualのドキュメント
Angularで使うには
DispatcherをDIコンテナに入れてComponentにインジェクトすれば、ComponentからActionをキックできるようになります。
またProviderのストリームの最後の方をなんとかしてComponentにインジェクトすれば、Componentで更新されたステートを取得できます。
ちなみに Firebase-as-a-Store ~RxJSで作るFirebaseバックエンドのRedux~ で紹介しているAngularアプリはこの概念をそっくりそのまま使って構築しています。
まとめ
要点はいくつかありますが、最も重要なのはReducerを構成するscan
,zip
オペレーターの部分です。
Reducerを増やしたければzip
オペレーターの中にどんどんdispatcherQueue$.scan
を追加していきましょう。やることはそれだけです。