redux-observable
redux-observable は Redux の middleware の一つで、 redux-thunk や redux-saga と同じく、主に非同期処理のために用いられるライブラリです。
redux-saga のコンセプトを generator でなく Rx により実現するライブラリであると認識しています。
引用: http://www.timqian.com/star-history/
参考:
逆引き redux-observable
以下では実際のアプリ開発でよくあるケースの redux-observable における実装の一例を、 RxJS の基礎的な点も含めてご紹介します。
大好きなライブラリなので、随時更新して利用者を増やしたいと思っています。
前提
バージョン
バージョンは以下の通りです。
rxjs@6.2.0
redux-observable@1.0.0
typescript@2.9.1
typescript-fsa@2.5.0
TypeScript
内容に大きな違いはありませんが、本記事では TypeScript でのコードを記載します。
Action については typescript-fsa により以下のように ActionCreatorを作成することを想定しています。
import actionCreatorFactory from 'typescript-fsa';
const actionCreator = actionCreatorFactory('Foo/Bar');
export const actions = {
submit: actionCreator<{ something: string }>('SUBMIT'),
success: actionCreator<{}>('SUCCESS')
}
// actions.submit({ something: 'foo' }) === { type: 'Foo/Bar/SUBMIT', payload: { something: 'foo' } }
// actions.success({}) === { type: 'Foo/Bar/SUCCESS', payload: {} }
Action に対して Action を発行する
極めてシンプルな例です。
mapTo を使います。
以下は actions.foo
に対し、 actions.bar
を発行する epic です。
export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.foo.match),
mapTo(actions.bar({})),
);
epic のストリームは最終的に actionCreator でなく Action が流れなければならないことに注意してください。
そのため、 actions.bar({})
として actionCreator から Action を生成しています。
特定の状況でのみ、Action に対して Action を発行する
filter を使います。
以下は actions.foo
に対し、その payload.bar
が true
の時のみ actions.baz
を発行する epic です。
export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.foo.match),
filter(action => action.payload.bar),
mapTo(actions.baz({})),
);
以下は actions.foo
に対し、State の bar
が true
の時のみ actions.baz
を発行する epic です。
export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.foo.match),
filter(_ => state$.value.bar),
mapTo(actions.baz({})),
);
Action に応じて、状況に応じた Action を発行する
map の中で条件分岐を行えば十分です。
以下は actions.foo
に対し、その payload.bar
が true
であれば actions.baz
を、そうでなければ actions.qux
を発行する epic です。
export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.foo.match),
map(action => {
if (action.payload.bar) {
return actions.baz({});
} else {
return actions.qux({});
}
}),
);
Action に対して、複数の Action を発行する
以下は actions.foo
に対し、 actions.bar
と actions.baz
を発行する epic です。
export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.foo.match),
mergeMapTo(of(
actions.bar({}),
actions.baz({}),
)),
);
ストリームを流れる Action をただ別の Action に変換するだけであれば map
で良いのですが、個数を変えたい場合や非同期的に流したい場合は Action をストリームに変換する必要があります。
Action に対して、Action は発行しないが何か処理をする
tap, ignoreElements を使います。 map
等の中で行うこともできてしまいますが、避けるべきでしょう。
export const successEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.success.match),
tap(action => {
Alert.alert('成功しました');
}),
ignoreElements(),
);
なお、 ignoreElements
を呼ばないと、 actions.success
が流れ続け、 successEpic
が反応し続けることとなります。
HTTP リクエストの成否をさばく
まず、以下のような Observable<Response>
を返す request
関数があると都合が良いと思います。ここでは fetch
を使うこととします。
import QueryString from 'query-string';
type HttpMethod = 'GET' | 'POST' | 'PATCH' | 'PUT' | 'DELETE';
interface RequestOption {
httpMethod: HttpMethod;
url: string;
params: { [key: string]: string | number };
headers: { [key: string]: string };
}
interface ResponseError {
message: string;
status: number;
}
export const request = <Response>({ httpMethod, url, params, headers }: RequestOption): Observable<Response> => {
return defer(async () => {
const _url = httpMethod === 'GET' ? `${url}?${QueryString.stringify(params)}` : url;
const options = {
headers,
method: httpMethod,
...(httpMethod === 'GET' ? {} : { body: JSON.stringify(_params) }),
};
const res = await fetch(_url, options);
const body = await res.json();
return {
status: res.status,
body,
};
}).pipe(
catchError(_err => {
return _throw(networkError);
}),
map(({ status, body }: { status: number, body: Response | ResponseError }) => {
if (status >= 400) {
throw { message: body.message, status } as ResponseError;
}
return body as Response;
}),
);
};
また、アプリ上考えられるリクエスト毎に関数を作ることとします。ここではシンプルなユーザの新規登録リクエストを考えます。
interface LoginResponse {
user_id: string;
}
export const register = ({ email, password }: { email: string, password: string }): Observable<{ userId: string }> => {
const url = `${urls.API}/users`;
const params = {
user: {
email,
password,
},
};
return request<LoginResponse>({ httpMethod: 'POST', url, params }).pipe(
map(res => ({
userId: res.user_id
})),
);
};
Action は以下のようにしておきます。
export const register = actionCreator<{ email: string, password: string }>('User/REGISTER');
export const registerSuccess = actionCreator<{ userId: string }>('User/REGISTER_SUCCESS');
export const registerFailure = actionCreator<{ errorMessage: string }>('User/REGISTER_FAILURE');
そうすると、 epic は以下のように書けます。
export const registerEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.register.match),
exhaustMap(action => {
const { email, password } = action.payload;
return api.register({ email, password }).pipe(
map(({ userId }) => actions.registerSuccess({ userId })),
catchError(error => {
const errorMessage = error.message;
return of(actions.registerFailure({ errorMessage }));
}),
);
}),
);
exhaustMap は、 mergeMap
と似た振る舞いをするのですが、前の Action が完了していなければ新しい Action を無視するという性質があります。
これにより、仮に actions.register
が連続で発行されても、 registerSuccess
か registerFailure
が流れるまではそれを破棄するようになります。そもそも Action が発行されないようにすべきですが、お手軽に保険がかけられるのは嬉しいですね。
また、 catch
を exhaustMap
内で行っているのがポイントです。 ストリームは一度 catch
されると、以降何も流さなくなってしまいます。 そのため、エラーハンドリングは必ず mergeMap
や exhaustMap
の中で完結させ、 epic のストリームを止めてはいけません。
AuthToken を用いた HTTP リクエストをする
参照透明の観点から言えば Reducer に AuthToken を持たせ、それを都度リクエスト関数に渡すようにするのが良いと思います。
export const submitEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.submit.match),
exhaustMap(action => {
const { title, message } = action.payload;
const { authToken } = state$.value.auth;
return api.submit({ title, message, authToken }).pipe(
map(({ something }) => actions.submitSuccess({ something })),
catchError(error => {
const errorMessage = error.message;
return of(actions.submitFailure({ errorMessage }));
}),
);
}),
);
AuthToken が expired だった場合、予め用意していた Refresh Token により AuthToken を再取得し、それにより再リクエストする
まず以下のようなオペレータを用意しました。
export const catchAuthFailureAndReAuthenticate = (action$: ActionsObservable<Action>) => (
catchError((error: any, source$) => {
if (error.SOME_PROPERTY !== RE_AUTHENTICATE_NEEDED) {
return _throw(error);
}
return merge(
action$.pipe(
filter(authActions.reAuthenticateSuccess.match),
takeUntil(action$.pipe(filter(authActions.reAuthenticateFailure.match))),
take(1),
mergeMapTo(source$),
),
of(authActions.reAuthenticate({})),
);
})
);
そして再認証の epic を作っておきます。
export const reAuthenticateEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.reAuthenticate.match),
exhaustMap(action => {
const { refreshToken } = state$.value.auth;
return api.reAuthenticate({ refreshToken }).pipe(
map(({ authToken }) => actions.reAuthenticateSuccess({ authToken })),
catchError((error: ResponseError | any) => {
let errorMessage = error.message;
if (!errorMessage) {
errorMessage = '再ログインが必要です。';
}
return of(actions.reAuthenticateFailure({ errorMessage }));
}),
);
}),
);
このように使います。
const decodePost = (json: PostJSON): Post => {
// mapping
};
// api
export const fetch = ({ postId, authToken }: { postId: string, authToken: string }): Observable<Post> => {
const url = `${urls.API}/posts/${postId}`;
return requestWithAuth<PostJSON>({
httpMethod: 'GET',
url,
params: null,
authToken,
}).pipe(map(res => res.map(decodePost)));
};
// epic
export const fetchEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.fetchPost.match),
mergeMap(action => {
const { postId } = action.payload;
const { authToken } = state$.value.auth;
return defer(() => api.fetchPost({ postId, authToken })).pipe(
map(post => actions.fetchSuccess({ post })),
catchAuthFailureAndReAuthenticate(action$),
catchError((error: ResponseError | any) => {
const errorMessage = error.message;
return of(actions.fetchFailure({ errorMessage }));
}),
);
}),
);
reAuthenticateIfNeeded
がしていることは以下の通りです。
- error が渡され、再認証すべきでなければそのまま throw する
- 再認証すべき時は
- 「
reAuthenticateSuccess
が来たら元のストリーム(fetch
)を再度流し、reAuthenticateFailure
が来たら何も流さずに終える」ようなストリームのもとで -
reAuthenticate
を呼び出す
- 「
なお、 reAuthenticateFailureEpic を用意し、そこで再認証失敗時のハンドリングを行います(ログイン画面に戻らせる等)。
defer
を使うことで、 AuthToken がリフレッシュされた後のリクエスト時、リフレッシュされた AuthToken を使うようにしています。
このようにしないと、リフレッシュ後も初回の呼び出し時の AuthToken によりリクエストされることとなってしまいます。
画像アップロード時、進行度合いに応じて Action を発行する
以下のようなインターフェースの requestMultipart 関数を用意してみました。 formData の作成箇所等は省略しています。
interface MultipartRequestWithAuthOption {
httpMethod: HttpMethod;
url: string;
something: any;
progressPercentageObserver?: Observer<number>;
}
export const requestMultipartWithAuth = <Response>({ httpMethod, url, something, progressPercentageObserver }: MultipartRequestWithAuthOption): Observable<Response> => {
// FormData は引数等からよしなに用意する
// const formData = somethingToFormData(something)
return Observable.create((observer: Observer<Response>) => {
const xhr = new XMLHttpRequest();
xhr.open(httpMethod, url, true);
if (xhr.upload && progressPercentageObserver) {
xhr.upload.onprogress = (event: ProgressEvent) => {
const percentage = event.loaded / event.total;
progressPercentageObserver.next(percentage);
};
}
xhr.onload = () => {
const response = JSON.parse(xhr.response);
if (xhr.status >= 400) {
const err: ResponseError = {
status: xhr.status,
message: response.message,
};
observer.error(err);
return;
}
observer.next(response as Response);
observer.complete();
if (progressPercentageObserver) {
progressPercentageObserver.complete();
}
};
xhr.onerror = (err) => {
observer.error(err);
};
xhr.send(formData);
});
};
進行状況を流すための Observer を受け取り、 XMLHttpRequest のコールバックで流しています。
また、エンドポイントに対応するリクエスト関数は以下のように書きました。
export const submit = ({ something, progressPercentageObserver }: { something: any, progressPercentageObserver: Observer<number> }): Observable<SomeType> => {
const url = `${urls.API}/foo`;
return requestMultipartWithAuth<SomeType>({ httpMethod: 'PATCH', url, something, progressPercentageObserver });
};
epic では以下のように使います。
export const submitEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.submit.match),
exhaustMap(action => {
const { something } = action.payload;
const progressPercentageSubject = new Subject<number>();
return merge(
api.submit({ something, progressPercentageObserver: progressPercentageSubject }).pipe(
map(({ someResult }) => actions.submitSuccess({ someResult })),
catchError(error => {
const errorMessage = error.message;
return of(actions.submitFailure({ errorMessage }));
}),
),
progressPercentageSubject.map(percentage => actions.submitProgress({ percentage }))
);
}),
);
Observer としても Observable としても使える Subject をここで用意し、リクエスト関数に渡して進行状況を流してもらいます。
また、それを進行状況用の Action に map したストリームを戻り値のストリームに merge することで、進行状況と結果の両方が流れるストリームを作り上げています。
間隔が短い Action をまとめる
あまり無さそうですが、ある Action が連続で起きたときに、それらの結果をまとめて別の Action なり処理に繋げたいケースです。
例えば Bluetooth デバイス等からデータ転送を受けるとき、転送量の問題で複数回に分けてデータを送る必要があることがあります。一つ一つの転送が Action で表現されているとすると、全ての転送を終えたタイミングでデータを連結した payload を持ち完了を表す Action を発行したいこともあるでしょう。
export const dataTransferredEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.dataTransferred.match),
buffer(
action$.pipe(
filter(actions.dataTransferred.match),
debounceTime(500),
),
),
map(bufferedActions => {
const data = _.flatten(bufferedActions.map(a => a.payload.value));
return actions.dataTransferCompleted({ data })
}),
);
buffer
は、流れてきた値をすぐには流さず保持します。引数に Observable を取り、その Observable に値が流れたタイミングで保持していた値を全て配列形式で流します。
上記の例では、 debouceTime
を使い、最後に dataTransferred
Action が流れてから 500ms が経過したタイミングで全ての転送が終わったとみなし、受け取ったデータを全て流しています。
後続の map では、バッファされていた Action の payload を連結し、それらをもとに完了を表す Action に変換しています。
転送が終わったことがわかるのであれば、時間で区切るのでなく、そのタイミングで値が流れる Observable を buffer
の引数に取るのがより良さそうです。
期待していた Action が起きなかった場合に Action を発行する
例えば一定時間ユーザの入力を受け付けることを考えます。ユーザの入力相当の Action に対してなにか Action を発行するのは問題ありませんが、時間内に入力相当の Action がなかったことに対して Action を発行するのは少し工夫が必要だと思います。
export const userNonActionEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.begin.match),
mergeMap(action => {
return action$.pipe(
filter(actions.finish.match),
take(1),
takeUntil(action$.pipe(filter(actions.someUserAction.match))),
);
}),
map(_finishAction => {
return actions.showSomeAlert({ message: 'foo' });
})
)
begin
が流れてから、 someUserAction
が流れるよりも早く finish
が流れた場合に showSomeAlert
が流れます。 「finish
が流れた」ではなく、「一定時間経過した」であれば、 takeUntil
の引数を of(null).pipe(delay(1000))
等にすれば良いですね。
mergeMap
の中身は Action のストリームに merge
されますが、 action$
をソースにしているので、そのままでは Action が二重に流れてしまうストリームを作ってしまいます。そのため、 take(1)
を付け加え、 finish
が一つ流れたら以降何も流さないストリームとして merge
することが必要となります。
元のストリームに元のストリームを元にしたストリーム(ややこしい)を merge
する際はこの点に注意しないといけません。