9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

promise-queue: 100件から常に3件取り出して並列実行

Last updated at Posted at 2020-04-22

Node.jsにもasync / await構文が実装されたが、Queueみたいな仕組みがない。

for文で1件ずつawaitで回していると何時まで経っても終わらない。
かといって、一気にPromiseに変換して、Promise.allで見張る仕組みにすると
同時実行数制限で止まってしまうといった問題も出てくるだろう。

探したらpromise-queueという神ライブラリを見つけたので、
紹介しつつ活用していきたい。

Installation

promise-queueはnpmでインストールできる。

$ npm install promise-queue

Usage

Exampleの各セクションを元に加工

const Queue = require('promise-queue');
const maxConcurrent = 3; // 並列実行するタスク数
const maxQueue = Infinity; // キューの最大数
const queue = new Queue(maxConcurrent, maxQueue);

これで3件までしか並列実行されないQueueが作成される。
実際のソースコードを読む限り引数はどちらも省略可能。
初期値はどちらもInfinifyとなっている。

後はPromiseを返す関数を1個ずつaddメソッドに渡してやれば良い。

// キューを宣言
const Queue = require('promise-queue');
const queue = new Queue(3);

// 100個のやりたいことを定義する
const something = Array(100).fill(0).map((_, i) => i + 1);
console.log(something);
// [1, 2, 3, ... 98, 99, 100]

const main = async () => {
  const resolves = [];
  for (const item of something) {
    // 戻り値がPromiseの関数を渡す
    queue.add(() => db.insert(it));
  }
}

main();

Test

ではNode.jsのREPLで挙動を確認してみよう。

$ node
> const Queue = require('promise-queue');
> const queue = new Queue(3);
> const something = Array(100).fill(0).map((_, i) => i + 1);
> const resolves = [];

ここでresolvesという空配列を定義する。
更にnew Promise時に払い出されるresolve関数をresolves配列に格納していく。
REPL上の任意のタイミングでresolveを発火させながら状況を確認出来るようにしていく作戦だ。

> something.forEach(it =>
... queue.add(() =>
..... new Promise(resolve => {
....... resolves.push(resolve);
....... })
..... )
... );

こんな感じ

queueaddで突っ込まれた100個の関数の内、
まだ3個しか実行していないはずなので、
resolvesに格納されたresolve関数も3個だけになるはずである。

> resolves.length
3
> queue.getQueueLength()
97
> queue.getPendingLength()
3

完璧。
そしてresolves配列に入っている関数を実行すれば、
更に次の関数が実行されてresolves配列内の関数がどんどん増えるはずである。

> resolves[0]()
> resolves.length
4
> queue.getQueueLength()
96
> queue.getPendingLength()
3
> resolves.forEach(it => it())
> resolves.length
7
> queue.getQueueLength()
93
> queue.getPendingLength()
3

npmで配布されているだけあって完璧な挙動だ。

もしこのライブラリが無かった場合、
Queueを実現する為にあちこちに無駄なthenを仕込む事になる。
可読性は最悪になっていただろう。

Async関数を突っ込むだけでキューになるpromise-queueは、
もうバッチ処理には欠かせない存在と言っても過言ではなさそうだ。

Example

全部終わったら処理を継続してほしかったので、
更に関数でラッピングしてAsync関数の配列を突っ込むとPromiseを返す関数を作ってみた。
(元のコードはLiveScript、JSに手動コンパイルしただけでテストしてないので、エラーが出たらごめんなさい)

src/functions/queue.js
const Queue = require('promise-queue');
const isFinished = queue =>
  queue.getQueueLength() === 0 && queue.getPendingLength() === 0;

module.exports = (count, asyncFns) => {
  queue = new Queue(count);
  return new Promise(resolve =>
    asyncFns.forEach(async fn => {
      await queue.add(fn);
      if (isFinished(queue)) resolve();
    })
  );
}

これを使って100件のユーザをfor文回して登録するということをやってみたい

なお、この例は実務では先輩から「ばかやろう!バルクインサートでやれ!」と叱られてしまうので、
同時入力数が制限されているゴニョゴニョな処理は各自考えて欲しい。

const queue = require('./functions/queue.js');

const mysql = require('mysql2/promise');
const conn = mysql.createConnection(opt);
const users = [
  {name: "taro", age: 18},
  {name: "jiro", age: 17},
  // こんな感じのデータが大量に存在する
];

const main = async function () {
  // この二重関数定義がポイント
  const asyncFns = users.map(user => () =>
    conn.query("INSERT INTO users SET ?", user);
  );
  await queue(3, asyncFns);
  console.log("全件インサート完了!");
}

main();
9
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?