※この問題をNode Streamの平行処理で解決する pipeline-pipe というモジュールを作ってみました。良ければ試してみてください。
※本実装はQモージュールで却下されました。代替策を別途調査してください。https://github.com/kriskowal/q/issues/621
Promise の実装は q モジュールを使用します。
var Q = require('q');
/**
* @param {Function} task 何度も実行したい処理を関数で渡します
* @param {Array} pool task に渡す引数を配列で渡します
* @param {number} concurrent 同時にタスクを実行する数を指定します
* @return {Object} pool が全て消化されたことを示す promise を返します
*/
function doConcurrent(task, pool, concurrent) {
var workers = [];
var results = [];
var needle = 0;
while (workers.length < concurrent) {
workers.push(Q.invoke(workIfAny));
}
return Q.all(workers).then(function() {
return results;
});
function workIfAny() {
if (needle < pool.length) {
return Q.fcall(task, pool[needle++])
.then(results.push.bind(results))
.then(workIfAny);
}
}
}
問題
Promise でたまに困るのは、Promise.all に渡す配列の数が多すぎると FATAL ERROR: CALL_AND_RETRY_2 Allocation failed - process out of memory
が出たり、一度に飛ばすHTTPリクエストが多すぎたりすると相手のサーバに負荷をかけすぎて以下のようなエラーを出してしまうことです。
Error: socket hang up
at createHangUpError (http.js:1476:15)
at CleartextStream.socketCloseListener (http.js:1526:23)
at CleartextStream.emit (events.js:117:20)
at tls.js:693:10
at process._tickCallback (node.js:419:13)
解決方法
Producer-Consumer パターンを応用し、一度に平行して仕事をする数を限定して、高速かつ安定してタスクを消化します。
以下冒頭の doConcurrent 関数を使って100万枚の画像を8平行でダウンロードする例を示します。
var Q = require('q');
var files = [];
for (var i = 0; i < 1000 * 1000; i++)
files.push('photo' + i + '.jpg');
doConcurrent(fetchImage, files, 8)
.then(console.log.bind(null, 'done'))
function fetchImage(file) { … }