LoginSignup
11
5

More than 3 years have passed since last update.

bullで管理するNodeJSの分散job

Last updated at Posted at 2020-02-09

bullとは

NodeJSで分散ジョブとメッセージを処理するためのキューパッケージです。redisをベースに動作します。
kueの後継的なライブラリです。

確認環境

express-generator にてプロジェクト作成

ここでは、play_node_bullというプロジェクトで作成します。

npx express-generator play_node_bull
cd play_code_bull
npm install

bullのインストール

npm install bull

app.js の編集

app.jsに以下の行を追記します。

App.js
var createError = require('http-errors');
var express = require('express');
var path = require('path');
var cookieParser = require('cookie-parser');
var logger = require('morgan');

var indexRouter = require('./routes/index');
var usersRouter = require('./routes/users');
+ var jobsRouter = require('./routes/jobs');  

var app = express();

// view engine setup
app.set('views', path.join(__dirname, 'views'));
app.set('view engine', 'jade');

app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));

app.use('/', indexRouter);
app.use('/users', usersRouter);
+ app.use('/jobs', jobsRouter); 

// catch 404 and forward to error handler
app.use(function(req, res, next) {
  next(createError(404));
});

// error handler
app.use(function(err, req, res, next) {
  // set locals, only providing error in development
  res.locals.message = err.message;
  res.locals.error = req.app.get('env') === 'development' ? err : {};

  // render the error page
  res.status(err.status || 500);
  res.render('error');
});

module.exports = app;

jobs.js の追加

1ブロック目

routesフォルダに、jobs.jsを追加します。
routerとBullを準備します。

routes/jobs.js
const express = require('express');
const router = express.Router();
const Bull = require("bull")

2ブロック目

重い処理のスタブを準備します。

routes/jobs.js
// 重い処理のスタブ
const heavyJob = async () => {
  return new Promise((resolve)=>{
    setInterval(() => {
      resolve();
     }, 10000);
  })  
}

3ブロック目

bullオブジェクトと処理名を定義します。
処理名は、キューの送信と受信を紐づけるキーとなります。

routes/jobs.js
const queue = new Bull('bulltest', { redis: { port: 6379, host: '127.0.0.1' } });

const processor = 'processor_name'; // 処理名

4ブロック目

エンキューするエンドポイントを定義します。
エンキューするqueue.add()には、データオブジェクトと処理名を指定します。

routes/jobs.js
router.post('/', (req, res, next) => {
  const data = { greetingTime: (new Date()).toString() }     // jobのデータ(object)
  queue.add(processor, data);
  res.send('ok');
});

module.exports = router;

5ブロック目

デキューするハンドラを記述します。
queue.process()に処理名と並行処理数、ハンドラ関数を指定します。

routes/jobs.js
const concurrency = 2;  // 並行処理数

queue.process(processor, concurrency, async (job, done) => {
  try {
    await heavyJob();
    done();
  } catch (e) {
    done(e);
  }
});

実行

アプリケーションを実行します。デフォルトでは3000番ポートで待機します。

npm start

APIの呼び出し

curlコマンドで、作成したjobsAPIを呼び出します。

curl -X POST 'http://127.0.0.1:3000/jobs'

jobのオプション

interface JobOpts {
  priority: number, // オプションの優先度。1(最高の優先度)~MAX_INT(最低の優先度)を指定する。パフォーマンスにわずかな影響を与えるため、必要でない限り使用しない。
  delay: number, // このジョブを処理できるようになるまで待機するミリ秒数。正確に遅延するためには、サーバーとクライアントの時刻を同期する必要がある。
  attempts: number, // ジョブが完了するまでのリトライ回数
  repeat: RepeatOpts, // cron仕様に従ってジョブを繰り返す。
  backoff: number | BackoffOpts, // ジョブが失敗した場合の自動再試行設定。遅延時間を設定するか、{type: 'fixed' or 'exponential')を指定。
  lifo: boolean, // 後入れ先出しにする (default false)
  timeout: number, // タイムアウトエラーでジョブが失敗するまでのミリ秒数
  jobId: number | string, // ジョブIDを上書きする。既に存在するIDを持つジョブを追加しようとしても、追加されない。
  removeOnComplete: boolean | number, // trueの場合、正常に完了したときにジョブを削除します。falseの場合は`completed`セットに保持される。
  removeOnFail: boolean | number, // trueの場合、処理に失敗したときにジョブを削除します。falseの場合は`faild`セットに保持される。
  stackTraceLimit: number, // スタックトレースに記録されるスタックトレース行の量を制限。
}
11
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
5