はじめに
redux-sagaで非同期処理と戦う - API呼び出しのスロットリングでは同時実行数制御をput
とget
、二重ループで実装されていたが、redux-sagaのchannel
を利用することでも同様のことができたので紹介する。
channelによる制御方法
実はズバリやりたいことが redux-sagaのドキュメント - 3.11. Using Channels > Using channels to communicate between Sagas に書いてある。
やることは、
-
channel
を生成する。生成したchannel
オブジェクトがキューになる。 - 同時実行数の数だけタスクを
fork
しておく。この際、生成したchannel
オブジェクトをそのタスクに渡しておく。このタスクがワーカースレッドのように動作する。 - タスクの処理では、引数で受け取った
channel
オブジェクトからtake
で待つ。take
で取れたらやりたい処理(同時実行数制御をかけたい重い処理など)を実行し、終了後またtake
で待つ。 - ワーカースレッドとは別に、処理の受付を
takeEvery
で行い、put
で第一引数に先ほどのchannel
、第二引数に渡したい任意データを指定して実行する。そうすると、3のtake
の戻り値として任意データが渡されつつ、処理がワーカースレッドに引き継がれるイメージ。
コード例は下記のとおり。なお、redux-sagaのドキュメントにあるコード例だとtakeEvery
の箇所はwhile
で単純にtake
、put
を繰り返しているだけだが、私の環境だとtake
の取りこぼしが発生するケースがあったためtakeEvery
に変えている(takeEvery
だと今のところ取りこぼしはない)。
コード例
// 重い非同期処理のダミー実装
function heavyProcess() {
return new Promise((resolve, reject) => {
setTimeout(() => {
resolve(true);
}, 2000);
});
}
function* watchRequests() {
// channelの生成
const chan = yield call(channel);
// 同時実行数分forkする (ワーカースレッドを立ち上げるイメージ)
for (let i = 0; i < 3; i++) {
yield fork(handleRequest, chan);
}
// forkしたスレッドに処理を委譲させるだけの処理をtakeEveryで実行させる
// 'request'アクションが来たらワーカースレッドにディスパッチするだけの受付スレッドを作成するイメージ
yield * takeEvery('request', function*(action) {
yield put(chan, action);
});
}
// ワーカースレッドの処理
function* handleRequest(chan) {
while (true) {
const action = yield take(chan);
console.log('start: ', action.payload.id);
yield call(heavyProcess);
console.log('end: ', action.payload.id);
}
}
function* saga() {
yield fork(watchRequests);
}
const sagaMiddleware = createSagaMiddleware();
const store = Redux.createStore(
() => {},
Redux.applyMiddleware(sagaMiddleware)
)
sagaMiddleware.run(saga);
// 'request'アクションを大量に実行
for (let i = 0; i < 20; i++) {
store.dispatch({
type: 'request',
payload: {
id: i
}
});
}
実行例
例では同時実行数は3にしているので、3つずつ動作していることが分かる。
start: 0
start: 1
start: 2
end: 0
start: 3
end: 1
start: 4
end: 2
start: 5
end: 3
start: 6
end: 4
start: 7
end: 5
start: 8
...
なお、サンプルコードは JSFiddle でChromeなら動かせます。