(2016/12/15追記)
この記事はDeprecatedです。
Reduxの概念をRxJSとTypeScriptで理解する ver.2 が最新版になります。
GitHubリポジトリはこちら → ovrmrw/understanding-redux-with-rxjs
git clone
したらnpm install
してnum run ts
ですぐに動かせます。
(注: Reactの話は一切出てきません)
Reduxとは
Redux公式
アプリケーション全体で状態(state)を一つのJSONツリーの構造で持ち、それをActionがキックされる度に全体更新して配信するという概念。
僕は最初FluxとかReduxとかよくわからなかったけど、色々参考にしながら自分で書いてみたらようやく理解できました。
Middlewareという概念はよくわからなかったし今でもよくわかりません。
ロガーとか便利なものもあるらしいのですがそもそも本家Reduxを動かしてみたことがない。致命的な問題としてReduxはReducerの中で非同期処理を扱えないので、それをどう扱うかでMiddleware戦争が起きているぐらいです。
Reduxを理解する
「しかしAngular派である俺達には最初からRxJSがある。だからRxJSをフルに使って非同期も扱えるReduxっぽいの書いちゃおーよ!」という元ネタがこちら。→"Tackling State" by Victor Savkin
(Victor SavkinはAngularチームの中心メンバーです。彼のブログは購読する価値があるでしょう。)
で、当時RxJSの理解が浅かった(JSの理解も浅かった)僕はこれを理解するのにすごく時間がかかりました。Savkin流Reduxを自分なりに何回も書き直して書き直して、そしてようやく辿り着いた…
実は一枚の簡単な短いコードで全てを説明できるということがわかったのです。
以下、GitHubリポジトリにあるコードそのまま。初心者でも簡単にReduxのコンセプトを理解できるはずです。
(RxJSがよくわからないという方は公式ドキュメントを読むとわかりやすいかと思います)
import 'babel-polyfill';
import 'zone.js/dist/zone-node';
import * as lodash from 'lodash';
import { Observable, Subject, BehaviorSubject } from 'rxjs/Rx';
declare const Zone: any;
class IncrementAction {
constructor(public num: number) { }
}
class OtherAction {
constructor() { }
}
type Action = IncrementAction | OtherAction;
interface IncrementState {
counter: number;
}
interface OtherState {
foo: string;
bar: number;
}
interface AppState {
increment: Promise<IncrementState>;
other?: OtherState;
}
Zone.current.fork({ name: 'myZone' }).runGuarded(() => {
console.log('zone name:', Zone.current.name); /* OUTPUT> zone name: myZone */
const initialState: AppState = {
increment: Promise.resolve<IncrementState>({
counter: 0
})
};
let counter: number;
const dispatcher$ = new Subject<Action>(); // Dispatcher
const provider$ = new BehaviorSubject<AppState>(initialState); // Provider
Observable // ReducerContainer
.zip<AppState>(...[
dispatcher$.scan<Promise<IncrementState>>((state, action) => { // Reducer
if (action instanceof IncrementAction) {
return new Promise<IncrementState>(resolve => {
setTimeout(() => {
state.then(s => resolve({ counter: s.counter + action.num }));
}, 10);
});
} else {
return state;
}
}, initialState.increment),
(increment): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { increment });
}
])
.subscribe(appState => {
provider$.next(appState);
});
provider$
.map<Promise<IncrementState>>(appState => appState.increment)
.mergeMap<IncrementState>(state => {
return Observable.fromPromise(state); // resolve async states.
})
.map<IncrementState>(state => lodash.cloneDeep(state)) // make states immutable.
.distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue))
.subscribe(state => {
counter = state.counter;
console.log('counter:', counter); /* (First time) OUTPUT> counter: 0 */
});
dispatcher$.next(new IncrementAction(1)); /* OUTPUT> counter: 1 */
dispatcher$.next(new IncrementAction(1)); /* OUTPUT> counter: 2 */
dispatcher$.next(new IncrementAction(0)); /* OUTPUT> (restricted) */
dispatcher$.next(new IncrementAction(1)); /* OUTPUT> counter: 3 */
dispatcher$.next(new IncrementAction(-1)); /* OUTPUT> counter: 2 */
});
どうでしょうか。超簡単じゃないですか?
ちなみに今回のコードでzone.jsを使う必要はありませんでしたが、後々Angular上で動かす予定のあるコードはなるべくNode.js環境でもZone
を使って書いた方がいいです。
そうしないとNode.js環境で動いたコードがAngularで動かないとか、そういう現象に遭遇することがあるからです。(僕は実際にあった)
要点1 - Subject
dispatcher$.next(new IncrementAction(1));
これがActionの起点です。ちなみにdispatcher$
はSubject
のインスタンスです。
この次にストリームがどこに流れているかわかるでしょうか。
dispatcher$.scan<Promise<IncrementState>>((state, action) => { // Reducer
/* 省略 */
})
ここです。そしてその先のsubscribe
の中でProvider(BehaviorSubject)をnext
して、という具合にストリームがどんどん次に流れていきます。
ところでActionが{type: 'ADD', payload: 1}
の形ではなくnew IncrementAction(1)
であることにお気付きでしょうか。この点に関してVictor SavkinはTypeScriptのUnion Typesを使ってtype-safeなコードが書けることを強調しています。
要点2 - BehaviorSubject
const provider$ = new BehaviorSubject<AppState>(initialState);
ここはSubject
ではなくBehaviorSubject
であることに意味があります。もしこれをSubject
に変えると最初の"counter: 0"
が出力されなくなります。
初期値がすぐに配信される様子はマーブルダイアグラムで見るとわかりやすいかも知れません。
Subjectのドキュメント
要点3 - scan
dispatcher$.scan<Promise<IncrementState>>((state, action) => {
if (action instanceof IncrementAction) {
return new Promise<IncrementState>(resolve => {
setTimeout(() => {
state.then(s => resolve({ counter: s.counter + action.num }));
}, 10);
});
} else {
return state;
}
}, initialState.increment)
いわゆるReducer。
scan
オペレーターはStore(Reducer)を構築する要です。これは時間をまたぐreduceであると理解できれば、それで大体OKかなと思います。このscan
と事項のzip
をしっかり理解できるかどうかが本記事理解の分かれ目となります。
scanのマーブルダイアグラム
Reducerの中で非同期(Promise)を扱っていることに違和感を感じますか? RxJSなら何の問題もありません。
要点4 - zip, projection
(increment): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { increment });
}
zip
オペレーターの最後の引数にprojectionと呼ばれる関数を入れて戻り値を整えます。
ちなみにzip
の中がいくつもあるときはこのように書きます。
.zip<AppState>(...[
dispatcher$.scan(/* 省略 */), // state1
dispatcher$.scan(/* 省略 */), // state2
dispatcher$.scan(/* 省略 */), // state3
(state1, state2, state3): AppState => { // projection
return Object.assign<{}, AppState, {}>({}, initialState, { state1, state2, state3 });
}
])
zipのマーブルダイアグラム
zip
と似たものにcombineLatest
というオペレーターがありますが、zip
は内包する全てのObservableのnextを待つのに対し、combineLatest
はどれかがnextする度に各々のObservableの最新の値を次に流します。
このためReduxの概念にはzip
が適しているということになります。
combineLatestのマーブルダイアグラム
要点5 - mergeMap, fromPromise
.mergeMap<IncrementState>(state => {
return Observable.fromPromise(state);
})
mergeMap
オペレーターは「流れてきたストリームを別のストリームに置き換えて次に流す」というミラクルな機能を持っています。(switchMap
という似たものもありますがこちらは直前のストリームをキャンセルできます)
Observable.fromPromise
で、流れてきたPromiseをthenして中身を次に流しているんですね。つまり、【Observable<Promise<IncrementState>>
→ Observable<IncrementState>
】のように置き換えています。
Reducerで非同期を扱えるといっても結局はどこかの段階でPromiseの解決を待たなければいけません。今回のコードはmergeMap
の中のObservable.fromPromise
がそれを担当しているというわけですね。
mergeMapのドキュメント
switchMapで書いたver.も用意しました。下の方にコメントで出力が書いてあります。 → redux-rxjs.switchMap.ts
実際に動かしてみるとストリームがキャンセルされる様子がわかるかと思います。このCancellationと呼ばれる機構はRxJSがウリにしているものの一つです。
要点6 - immutability
.map<IncrementState>(state => lodash.cloneDeep(state))
Promise解決後のオブジェクトをそのまま流すと、.subscribe()
内でオブジェクトの中身を変更できてしまいます。これはオブジェクトの値が渡されているのではなく、メモリ参照が渡されているからです。
Reduxの原則によると**"Actionの発行以外で状態を変更できてはならない"**ので、lodash.cloneDeep()
により副作用を生じさせない値を次に流すようにします。
lodash.cloneDeepのドキュメント
要点7 - distinctUntilChanged
.distinctUntilChanged((oldValue, newValue) => lodash.isEqual(oldValue, newValue))
distinctUntilChanged
オペレーターは通過するストリームが同じ値の場合に抑制する機能です。
ただし今回のコードは前述のprojectionのところで
return Object.assign<{}, AppState, {}>({}, initialState, { increment });
こうしているため、単純に.distinctUntilChanged()
と書いてしまうと流れてきたデータが{counter: 2}
→{counter: 2}
のように同じ値が続いたとしても両方通過します。これはオブジェクトの中身を見ていないからです。
オブジェクトの中身でチェックするためにはいわゆるdeepEqualの比較をしなければいけないため、comparerと呼ばれる関数を下記のように書く必要があります。
(oldValue, newValue) => lodash.isEqual(oldValue, newValue)
distinctUntilChangedのドキュメント
lodash.isEqualのドキュメント
Angularで使うには
DispatcherをDIコンテナに入れてComponentにインジェクトすれば、ComponentからActionをキックできるようになります。
またProviderのストリームの最後の方をなんとかしてComponentにインジェクトすれば、Componentで更新された状態(state)を取得できます。ただしObservable<Promise<any>>
のデータをAngular標準のAsyncPipeで受けようとするとブラウザが無限ループに入ってフリーズします。(ver 2.0.0現在)
AsyncPipeはObservable, Promise, EventEmitterのいずれも受けられるように設計されていて、Observableの中にPromiseのような遅延処理が入っていることは想定されていないのだと思います。
もしPipeで受けたいのなら自作した方が安心安全です。僕の場合はこう。 → async-state.pipe.ts
まとめ
ReduxのReducersに相当するのはscan
オペレーターのところですが、オリジナルReduxと違ってRxJSなら非同期の状態(Promise)を扱うことができます。
Reducerを増やしたければzip
オペレーターの中にどんどんdispatcher$.scan
を追加していきましょう。やることはそれだけです。簡単ですね。
(追記)
よく見たらReduxのドキュメントにこんなことが書いてありました。
The question is: do you really need Redux if you already use Rx? Maybe not. It's not hard to re-implement Redux in Rx. Some say it's a two-liner using Rx .scan() method. It may very well be!
(訳: RxJSを使えば簡単に実装できるよ。)