JavaScript
Node.js
promise
Stream

Promise.all での大量の並行処理を安定させる

※この問題をNode Streamで解決するモジュールを作ったので見てみてください。 https://github.com/piglovesyou/node-hole

※本実装はQモージュールで却下されました。代替策を別途調査してください。https://github.com/kriskowal/q/issues/621

Promise の実装は q モジュールを使用します。

var Q = require('q');

/**
* @param {Function} task 何度も実行したい処理を関数で渡します
* @param {Array} pool task に渡す引数を配列で渡します
* @param {number} concurrent 同時にタスクを実行する数を指定します
* @return {Object} pool が全て消化されたことを示す promise を返します
*/

function doConcurrent(task, pool, concurrent) {
var workers = [];
var results = [];
var needle = 0;

while (workers.length < concurrent) {
workers.push(Q.invoke(workIfAny));
}

return Q.all(workers).then(function() {
return results;
});

function workIfAny() {
if (needle < pool.length) {
return Q.fcall(task, pool[needle++])
.then(results.push.bind(results))
.then(workIfAny);
}
}
}


問題

Promise でたまに困るのは、Promise.all に渡す配列の数が多すぎると FATAL ERROR: CALL_AND_RETRY_2 Allocation failed - process out of memory が出たり、一度に飛ばすHTTPリクエストが多すぎたりすると相手のサーバに負荷をかけすぎて以下のようなエラーを出してしまうことです。

Error: socket hang up

at createHangUpError (http.js:1476:15)
at CleartextStream.socketCloseListener (http.js:1526:23)
at CleartextStream.emit (events.js:117:20)
at tls.js:693:10
at process._tickCallback (node.js:419:13)


解決方法

Producer-Consumer パターンを応用し、一度に平行して仕事をする数を限定して、高速かつ安定してタスクを消化します。

以下冒頭の doConcurrent 関数を使って100万枚の画像を8平行でダウンロードする例を示します。

var Q = require('q');

var files = [];
for (var i = 0; i < 1000 * 1000; i++)
files.push('photo' + i + '.jpg');

doConcurrent(fetchImage, files, 8)
.then(console.log.bind(null, 'done'))

function fetchImage(file) { }