Edited at
ReduxDay 8

逆引き redux-observable | よくあるケースの実装例を RxJS の基礎から解説


redux-observable

redux-observable は Redux の middleware の一つで、 redux-thunkredux-saga と同じく、主に非同期処理のために用いられるライブラリです。

redux-saga のコンセプトを generator でなく Rx により実現するライブラリであると認識しています。

スクリーンショット 2017-12-06 20.54.56.png

引用: http://www.timqian.com/star-history/

参考:


逆引き redux-observable

以下では実際のアプリ開発でよくあるケースの redux-observable における実装の一例を、 RxJS の基礎的な点も含めてご紹介します。

大好きなライブラリなので、随時更新して利用者を増やしたいと思っています。


前提


バージョン

バージョンは以下の通りです。

rxjs@6.2.0

redux-observable@1.0.0
typescript@2.9.1
typescript-fsa@2.5.0


TypeScript

内容に大きな違いはありませんが、本記事では TypeScript でのコードを記載します。

Action については typescript-fsa により以下のように ActionCreatorを作成することを想定しています。


actions.ts

import actionCreatorFactory from 'typescript-fsa';

const actionCreator = actionCreatorFactory('Foo/Bar');

export const actions = {
submit: actionCreator<{ something: string }>('SUBMIT'),
success: actionCreator<{}>('SUCCESS')
}

// actions.submit({ something: 'foo' }) === { type: 'Foo/Bar/SUBMIT', payload: { something: 'foo' } }
// actions.success({}) === { type: 'Foo/Bar/SUCCESS', payload: {} }



Action に対して Action を発行する

極めてシンプルな例です。

mapTo を使います。

以下は actions.foo に対し、 actions.bar を発行する epic です。

export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.foo.match),
mapTo(actions.bar({})),
);

epic のストリームは最終的に actionCreator でなく Action が流れなければならないことに注意してください。

そのため、 actions.bar({}) として actionCreator から Action を生成しています。


特定の状況でのみ、Action に対して Action を発行する

filter を使います。

以下は actions.foo に対し、その payload.bartrue の時のみ actions.baz を発行する epic です。

export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.foo.match),
filter(action => action.payload.bar),
mapTo(actions.baz({})),
);

以下は actions.foo に対し、State の bartrue の時のみ actions.baz を発行する epic です。

export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.foo.match),
filter(_ => state$.value.bar),
mapTo(actions.baz({})),
);


Action に応じて、状況に応じた Action を発行する

map の中で条件分岐を行えば十分です。

以下は actions.foo に対し、その payload.bartrue であれば actions.baz を、そうでなければ actions.qux を発行する epic です。

export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.foo.match),
map(action => {
if (action.payload.bar) {
return actions.baz({});
} else {
return actions.qux({});
}
}),
);


Action に対して、複数の Action を発行する

mergeMap, of を使います。

以下は actions.foo に対し、 actions.baractions.baz を発行する epic です。

export const fooEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.foo.match),
mergeMapTo(of(
actions.bar({}),
actions.baz({}),
)),
);

ストリームを流れる Action をただ別の Action に変換するだけであれば map で良いのですが、個数を変えたい場合や非同期的に流したい場合は Action をストリームに変換する必要があります。


Action に対して、Action は発行しないが何か処理をする

tap, ignoreElements を使います。 map 等の中で行うこともできてしまいますが、避けるべきでしょう。

export const successEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.success.match),
tap(action => {
Alert.alert('成功しました');
}),
ignoreElements(),
);

なお、 ignoreElements を呼ばないと、 actions.success が流れ続け、 successEpic が反応し続けることとなります。


HTTP リクエストの成否をさばく

まず、以下のような Observable<Response> を返す request 関数があると都合が良いと思います。ここでは fetch を使うこととします。

import QueryString from 'query-string';

type HttpMethod = 'GET' | 'POST' | 'PATCH' | 'PUT' | 'DELETE';

interface RequestOption {
httpMethod: HttpMethod;
url: string;
params: { [key: string]: string | number };
headers: { [key: string]: string };
}

interface ResponseError {
message: string;
status: number;
}

export const request = <Response>({ httpMethod, url, params, headers }: RequestOption): Observable<Response> => {
return defer(async () => {
const _url = httpMethod === 'GET' ? `${url}?${QueryString.stringify(params)}` : url;
const options = {
headers,
method: httpMethod,
...(httpMethod === 'GET' ? {} : { body: JSON.stringify(_params) }),
};

const res = await fetch(_url, options);
const body = await res.json();
return {
status: res.status,
body,
};
}).pipe(
catchError(_err => {
return _throw(networkError);
}),
map(({ status, body }: { status: number, body: Response | ResponseError }) => {
if (status >= 400) {
throw { message: body.message, status } as ResponseError;
}

return body as Response;
}),
);
};

また、アプリ上考えられるリクエスト毎に関数を作ることとします。ここではシンプルなユーザの新規登録リクエストを考えます。

interface LoginResponse {

user_id: string;
}

export const register = ({ email, password }: { email: string, password: string }): Observable<{ userId: string }> => {
const url = `${urls.API}/users`;

const params = {
user: {
email,
password,
},
};

return request<LoginResponse>({ httpMethod: 'POST', url, params }).pipe(
map(res => ({
userId: res.user_id
})),
);
};

Action は以下のようにしておきます。

export const register = actionCreator<{ email: string, password: string }>('User/REGISTER');

export const registerSuccess = actionCreator<{ userId: string }>('User/REGISTER_SUCCESS');
export const registerFailure = actionCreator<{ errorMessage: string }>('User/REGISTER_FAILURE');

そうすると、 epic は以下のように書けます。

export const registerEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.register.match),
exhaustMap(action => {
const { email, password } = action.payload;

return api.register({ email, password }).pipe(
map(({ userId }) => actions.registerSuccess({ userId })),
catchError(error => {
const errorMessage = error.message;
return of(actions.registerFailure({ errorMessage }));
}),
);
}),
);

exhaustMap は、 mergeMap と似た振る舞いをするのですが、前の Action が完了していなければ新しい Action を無視するという性質があります。

これにより、仮に actions.register が連続で発行されても、 registerSuccessregisterFailure が流れるまではそれを破棄するようになります。そもそも Action が発行されないようにすべきですが、お手軽に保険がかけられるのは嬉しいですね。

また、 catchexhaustMap 内で行っているのがポイントです。 ストリームは一度 catch されると、以降何も流さなくなってしまいます。 そのため、エラーハンドリングは必ず mergeMapexhaustMap の中で完結させ、 epic のストリームを止めてはいけません。


AuthToken を用いた HTTP リクエストをする

参照透明の観点から言えば Reducer に AuthToken を持たせ、それを都度リクエスト関数に渡すようにするのが良いと思います。

export const submitEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.submit.match),
exhaustMap(action => {
const { title, message } = action.payload;
const { authToken } = state$.value.auth;

return api.submit({ title, message, authToken }).pipe(
map(({ something }) => actions.submitSuccess({ something })),
catchError(error => {
const errorMessage = error.message;
return of(actions.submitFailure({ errorMessage }));
}),
);
}),
);


AuthToken が expired だった場合、予め用意していた Refresh Token により AuthToken を再取得し、それにより再リクエストする

まず以下のようなオペレータを用意しました。

export const catchAuthFailureAndReAuthenticate = (action$: ActionsObservable<Action>) => (

catchError((error: any, source$) => {
if (error.SOME_PROPERTY !== RE_AUTHENTICATE_NEEDED) {
return _throw(error);
}

return merge(
action$.pipe(
filter(authActions.reAuthenticateSuccess.match),
takeUntil(action$.pipe(filter(authActions.reAuthenticateFailure.match))),
take(1),
mergeMapTo(source$),
),
of(authActions.reAuthenticate({})),
);
})
);

そして再認証の epic を作っておきます。

export const reAuthenticateEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.reAuthenticate.match),
exhaustMap(action => {
const { refreshToken } = state$.value.auth;

return api.reAuthenticate({ refreshToken }).pipe(
map(({ authToken }) => actions.reAuthenticateSuccess({ authToken })),
catchError((error: ResponseError | any) => {
let errorMessage = error.message;

if (!errorMessage) {
errorMessage = '再ログインが必要です。';
}

return of(actions.reAuthenticateFailure({ errorMessage }));
}),
);
}),
);

このように使います。


const decodePost = (json: PostJSON): Post => {
// mapping
};

// api
export const fetch = ({ postId, authToken }: { postId: string, authToken: string }): Observable<Post> => {
const url = `${urls.API}/posts/${postId}`;
return requestWithAuth<PostJSON>({
httpMethod: 'GET',
url,
params: null,
authToken,
}).pipe(map(res => res.map(decodePost)));
};

// epic
export const fetchEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(
filter(actions.fetchPost.match),
mergeMap(action => {
const { postId } = action.payload;
const { authToken } = state$.value.auth;

return defer(() => api.fetchPost({ postId, authToken })).pipe(
map(post => actions.fetchSuccess({ post })),
catchAuthFailureAndReAuthenticate(action$),
catchError((error: ResponseError | any) => {
const errorMessage = error.message;
return of(actions.fetchFailure({ errorMessage }));
}),
);
}),
);

reAuthenticateIfNeeded がしていることは以下の通りです。


  • error が渡され、再認証すべきでなければそのまま throw する

  • 再認証すべき時は


    • reAuthenticateSuccess が来たら元のストリーム( fetch )を再度流し、 reAuthenticateFailure が来たら何も流さずに終える」ようなストリームのもとで


    • reAuthenticate を呼び出す



なお、 reAuthenticateFailureEpic を用意し、そこで再認証失敗時のハンドリングを行います(ログイン画面に戻らせる等)。

defer を使うことで、 AuthToken がリフレッシュされた後のリクエスト時、リフレッシュされた AuthToken を使うようにしています。

このようにしないと、リフレッシュ後も初回の呼び出し時の AuthToken によりリクエストされることとなってしまいます。


画像アップロード時、進行度合いに応じて Action を発行する

以下のようなインターフェースの requestMultipart 関数を用意してみました。 formData の作成箇所等は省略しています。

interface MultipartRequestWithAuthOption {

httpMethod: HttpMethod;
url: string;
something: any;
progressPercentageObserver?: Observer<number>;
}

export const requestMultipartWithAuth = <Response>({ httpMethod, url, something, progressPercentageObserver }: MultipartRequestWithAuthOption): Observable<Response> => {
// FormData は引数等からよしなに用意する
// const formData = somethingToFormData(something)

return Observable.create((observer: Observer<Response>) => {
const xhr = new XMLHttpRequest();
xhr.open(httpMethod, url, true);

if (xhr.upload && progressPercentageObserver) {
xhr.upload.onprogress = (event: ProgressEvent) => {
const percentage = event.loaded / event.total;
progressPercentageObserver.next(percentage);
};
}

xhr.onload = () => {
const response = JSON.parse(xhr.response);

if (xhr.status >= 400) {
const err: ResponseError = {
status: xhr.status,
message: response.message,
};

observer.error(err);
return;
}

observer.next(response as Response);
observer.complete();

if (progressPercentageObserver) {
progressPercentageObserver.complete();
}
};

xhr.onerror = (err) => {
observer.error(err);
};

xhr.send(formData);
});
};

進行状況を流すための Observer を受け取り、 XMLHttpRequest のコールバックで流しています。

また、エンドポイントに対応するリクエスト関数は以下のように書きました。

export const submit = ({ something, progressPercentageObserver }: { something: any, progressPercentageObserver: Observer<number> }): Observable<SomeType> => {

const url = `${urls.API}/foo`;
return requestMultipartWithAuth<SomeType>({ httpMethod: 'PATCH', url, something, progressPercentageObserver });
};

epic では以下のように使います。

export const submitEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.submit.match),
exhaustMap(action => {
const { something } = action.payload;
const progressPercentageSubject = new Subject<number>();

return merge(
api.submit({ something, progressPercentageObserver: progressPercentageSubject }).pipe(
map(({ someResult }) => actions.submitSuccess({ someResult })),
catchError(error => {
const errorMessage = error.message;
return of(actions.submitFailure({ errorMessage }));
}),
),
progressPercentageSubject.map(percentage => actions.submitProgress({ percentage }))
);
}),
);

Observer としても Observable としても使える Subject をここで用意し、リクエスト関数に渡して進行状況を流してもらいます。

また、それを進行状況用の Action に map したストリームを戻り値のストリームに merge することで、進行状況と結果の両方が流れるストリームを作り上げています。


間隔が短い Action をまとめる

あまり無さそうですが、ある Action が連続で起きたときに、それらの結果をまとめて別の Action なり処理に繋げたいケースです。

例えば Bluetooth デバイス等からデータ転送を受けるとき、転送量の問題で複数回に分けてデータを送る必要があることがあります。一つ一つの転送が Action で表現されているとすると、全ての転送を終えたタイミングでデータを連結した payload を持ち完了を表す Action を発行したいこともあるでしょう。

export const dataTransferredEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.dataTransferred.match),
buffer(
action$.pipe(
filter(actions.dataTransferred.match),
debounceTime(500),
),
),
map(bufferedActions => {
const data = _.flatten(bufferedActions.map(a => a.payload.value));
return actions.dataTransferCompleted({ data })
}),
);

buffer は、流れてきた値をすぐには流さず保持します。引数に Observable を取り、その Observable に値が流れたタイミングで保持していた値を全て配列形式で流します。

上記の例では、 debouceTime を使い、最後に dataTransferred Action が流れてから 500ms が経過したタイミングで全ての転送が終わったとみなし、受け取ったデータを全て流しています。

後続の map では、バッファされていた Action の payload を連結し、それらをもとに完了を表す Action に変換しています。

転送が終わったことがわかるのであれば、時間で区切るのでなく、そのタイミングで値が流れる Observable を buffer の引数に取るのがより良さそうです。


期待していた Action が起きなかった場合に Action を発行する

例えば一定時間ユーザの入力を受け付けることを考えます。ユーザの入力相当の Action に対してなにか Action を発行するのは問題ありませんが、時間内に入力相当の Action がなかったことに対して Action を発行するのは少し工夫が必要だと思います。

export const userNonActionEpic: Epic<Action, RootState> = (action$, state$) => action$.pipe(

filter(actions.begin.match),
mergeMap(action => {
return action$.pipe(
filter(actions.finish.match),
take(1),
takeUntil(action$.pipe(filter(actions.someUserAction.match))),
);
}),
map(_finishAction => {
return actions.showSomeAlert({ message: 'foo' });
})
)

begin が流れてから、 someUserAction が流れるよりも早く finish が流れた場合に showSomeAlert が流れます。 「finish が流れた」ではなく、「一定時間経過した」であれば、 takeUntil の引数を of(null).pipe(delay(1000)) 等にすれば良いですね。

mergeMap の中身は Action のストリームに merge されますが、 action$ をソースにしているので、そのままでは Action が二重に流れてしまうストリームを作ってしまいます。そのため、 take(1) を付け加え、 finish が一つ流れたら以降何も流さないストリームとして merge することが必要となります。

元のストリームに元のストリームを元にしたストリーム(ややこしい)を merge する際はこの点に注意しないといけません。