概要
ngrxでeffectsを使ってコードを書いています。
エラー処理がうまくいかずに、一度例外が発生すると、次からEffectが動かなくなり苦戦したのでその原因と対策を整理しました。
ありがちなコード(問題あり)
effectsでありがちなコードは以下となると思います。
(参考ページより引用。)
このコードでは、fetchTodoList関数で何か問題が出た時には、最後のcatchError関数が呼び出されてエラーが適切に処理されるという想定です。
しかし、このコードでは 初回エラーが発生するイベントまでは動くのですが、それ以後のEffectが呼び出されなくなります。
つまり、初回イベント処理までは正しく動くのですが、それ以降、ストリームが止まってしまうのです。
@Injectable()
export class TodoEffects {
@Effect()
public fetchListOfTodos$: Observable<void> = this.action$.pipe(
ofType(ActionTypes.FETCH_TODO_LIST),
switchMap(() => this.todoListService.fetchTodoList()),
map((todos: Todo[]) => {
return {
type: ActionTypes.TODO_LIST_FETCHED,
payload: { todos }
};
}),
catchError((error) => {
return Observable.of({
type: ActionTypes.FETCH_TODO_LIST_FAILED,
payload: { error }
});
})
);
}
何が起きているのか
catchError関数を使ってちゃんとエラー処理しているように思えるのに何が起きているのか。
実はswitchMap関数の中で発生したエラーを適切に処理していないため、switchMap関数以降のstreamがエラー状態になっているのです。
良くわからなかったので、switchMap関数のソースを引用すると以下となります。
switchMap関数の中で実際の処理はproject関数を呼び出すことで実現されます。(★1のところ)
project関数から例外が出てきた場合、★2で補足されます。
コードから見てわかるようにerrorとしてディスパッチされています。
つまり、ここで例外が発生すると以降のstreamはエラー状態となってしまいます。
結果として、以下のことが発生します。
[発生すること]
- 以降の処理でcatchError関数を使って例外を補足する事は出来る。
- streamがエラー状態となったため以後アクション発生してEffectが呼び出されてもstreamが閉じている
- Effectが動かなくなってしまう。
/**
* We need this JSDoc comment for affecting ESDoc.
* @ignore
* @extends {Ignored}
*/
class SwitchMapSubscriber<T, R> extends OuterSubscriber<T, R> {
private index: number = 0;
private innerSubscription: Subscription;
constructor(destination: Subscriber<R>,
private project: (value: T, index: number) => ObservableInput<R>) {
super(destination);
}
protected _next(value: T) {
let result: ObservableInput<R>;
const index = this.index++;
try {
//★1 : 処理実行
result = this.project(value, index);
} catch (error) {
// ★2 : ここでエラーとなる
this.destination.error(error);
return;
}
this._innerSub(result, value, index);
}
対策
対策ですが、switchMap内部で例外を漏らさない様にします。
例外が発生しそうなfetchTodoList()関数にcatchError関数を接続して、
何か例外が発生したらcatchError関数内部で例外ではないオブジェクトに変換して次に流すようにします。
これにより、switchMap関数内部で未処理例外が発生してストリームが流れなくなってしまう問題を防ぐことができます。
@Injectable()
export class TodoEffects {
@Effect()
public fetchListOfTodos$: Observable<void> = this.action$.pipe(
ofType(ActionTypes.FETCH_TODO_LIST),
switchMap(() =>
this.todoListService.fetchTodoList().pipe(
map((todos: Todo[]) => {
return {
type: ActionTypes.TODO_LIST_FETCHED,
payload: { todos }
};
}),
// handle failure in todoListService.fetchTodoList()
catchError((error) => {
return Observable.of({
type: ActionTypes.FETCH_TODO_LIST_FAILED,
payload: { error }
});
})
)
)
);
}
まとめ
ngrxでeffectsでの例外処理を整理しました。
catchErrorと書いておけばうまく動くだろうと浅く考えていましたが、
catchErrorする場所によっていはstreamが意図しない状態となってしまいはまりました。
ngrxでeffectsの場合、常にアクションが流れ続ける状態でないと、うまく動かないため、
内部でエラーが発生してもストリームを壊さずに適切にエラー処理してアクションに変換してアクションを流せる状態にし続ける事が必要なようです。
実験コード
動作原理を確認するために、動きを確認するシンプル化した実験コードを書きました。
例外なし
例外が出ないパータンです。
通常はこのパターンで、testと出続けます。
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { switchMap } from 'rxjs/operators';
import { timer, interval } from 'rxjs';
{
//emit immediately, then every 1s
const source = timer(0, 1000);
//switch to new inner observable when source emits, emit items that are emitted
const example = source.pipe(switchMap(() => {
return of('test');
}),
catchError(val => of(`I caught on pipe a: ${val}`))
);
const subscribe = example.subscribe(
(val) => {console.log('on sucess error ' + val);console.log(subscribe);},
(c) => console.log('on error ' + c),
);
}
ストリーム停止
今回問題となったストリームが停止するパターンです。
動きととしては「例外発生→catchError関数で補足→subscribe関数では成功扱い」となります。
ただし、stream自体はこれでエラー状態となり閉じてしまうので、1回しかイベントが流れません。
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { switchMap } from 'rxjs/operators';
import { timer, interval } from 'rxjs';
{
//emit immediately, then every 1s
const source = timer(0, 1000);
//switch to new inner observable when source emits, emit items that are emitted
const example = source.pipe(
switchMap(() => {
return throwError('This is an error!');
}),
catchError(val => of(`I caught on pipe a: ${val}`))
);
const subscribe = example.subscribe(
(val) => { console.log('on sucess ' + val); console.log(subscribe); },
(c) => console.log('on error ' + c),
);
}
ストリーム停止を防ぐ(今回対策)
例外が発生した直後にcatchError関数で例外を補足します。
これにより、switchMap関数でstreamがエラー状態となりません。
streamは停止状態とならずtimerで指定した周期で例外が流れてきます。
//成功(ストリームが流れる)
import { throwError, of } from 'rxjs';
import { catchError } from 'rxjs/operators';
import { switchMap } from 'rxjs/operators';
import { timer, interval } from 'rxjs';
{
//emit immediately, then every 1s
const source = timer(0, 1000);
//switch to new inner observable when source emits, emit items that are emitted
const example = source.pipe(
switchMap(() => {
return throwError('This is an error!')
.pipe(
catchError(val => of(`I caught on after throwError: ${val}`))
)
}),
catchError(val => of(`I caught on pipe a: ${val}`))
);
const subscribe = example.subscribe(
(val) => { console.log('on sucess ' + val); console.log(subscribe); },
(c) => console.log('on error ' + c),
);
}