1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

BullMQを用いたワーカーサービスの実装 with TypeScript

Posted at

TypeScriptを使った開発だとマイクロサービス間のイベントの管理や並列処理に悩まされると思います
今回はBullMQを用いてワーカーサービスをデプロイして分散処理できる構成を紹介します

お急ぎの方はこちら

BullMQ

redisを用いたメッセージキューイング用のライブラリです

スケジューリングやFIFO、メトリクスなど様々なオプションがあります
リトライやタスクの割り当てなどをライブラリ側がやってくれるので(カスタマイズも可能)ヘッドレスにワーカーサービスを利用することができます

実装

今回はメッセージをまとめて投稿する処理をメッセージキューイングを使って実装します

Processor

async function messageProcessor(job: Job) {
  const { message } = job.data;
  const result = await new Promise<string>((resolve) => {
    setTimeout(() => {
      const now = new Date().toISOString();
      const post = `[${now}] ${message}`;
      resolve(post);
    }, 1000);
  });
  return result;
}

お気づきの方いるかもですが、1秒待ってメッセージを返してるだけです
親切設計。

こいつをワーカーを使って処理していきます

Queue

ここではBullMQのqueueをセットアップしてイベントを管理するためのクラスを定義します

import {
  BaseJobOptions,
  Job,
  Processor,
  Queue,
  QueueEvents,
  Worker,
  WorkerOptions,
} from "bullmq";
import { connection } from "../config";

...

export class Producer<T extends QueueName> {
  private queue: Queue<DataType<T>, ReturnType<T>>;
  private queueEvent: QueueEvents;

  constructor(queueName: T, defaultJobOptions?: BaseJobOptions) {
    this.queue = new Queue<DataType<T>, ReturnType<T>>(queueName, {
      connection,
      defaultJobOptions,
    });
    this.queueEvent = new QueueEvents(queueName, { connection });
  }

  start() {
    this.queueEvent = new QueueEvents(this.queue.name, { connection });
    this.queueEvent.on("active", (job) => {
      console.log(`[${this.queue.name}] active job: ${job.jobId}`);
    });

    this.queueEvent.on("waiting", () => {
      const count = this.queue.getWaitingCount();
      console.log(`[${this.queue.name}] waiting job: ${count}`);
    });

    this.queueEvent.on("completed", async (result) => {
      console.log(`[${this.queue.name}] completed job: ${result.returnvalue}`);
    });

    this.queueEvent.on("error", (error) => {
      console.error(`[${this.queue.name}] error: ${error}`);
    });
  }

  async add(name: string, data: DataType<T>) {
    await this.queue.add(name, data);
  }
}

コンストラクタではqueueとqueueEventの初期化を行なっています
queueにつけた名前を使ってworkerはタスクを処理します

connectionは下記のように定義しておいてimportします

import dotenv from "dotenv";
dotenv.config();

export const connection = {
  host: process.env.REDIS_HOST,
  port: parseInt(process.env.REDIS_PORT!),
};

startメソッドではイベントリスナーの登録、addメソッドで初期化したqueueにデータを追加していきます
下記のように使用します


export async function enqueueMessages(messages: { message: string }[]) {
  const messageProducer = new Producer(QueueName.MESSAGE_QUEUE);
  messageProducer.start();

  await Promise.all(
    messages.map(async ({ message }) => {
      messageProducer.add("message", { message });
    })
  );
}

これでqueueにイベントをためます

Worker

queueと同様にBullMQのworkerをセットアップ、イベント管理をするクラスを作成します

export class Consumer<T extends QueueName> {
  private worker: Worker<DataType<T>, ReturnType<T>>;

  constructor(
    queueName: T,
    processor: Processor<DataType<T>, ReturnType<T>>,
    workerOptions?: Omit<WorkerOptions, "connection">
  ) {
    this.worker = new Worker<DataType<T>, ReturnType<T>>(queueName, processor, {
      ...workerOptions,
      connection,
    });
  }

  start() {
    this.worker.on("completed", (job) => {
      console.log(
        `[${this.worker.name}][pid:${process.pid}] completed job: ${job.returnvalue}`
      );
    });

    this.worker.on("failed", (_, error) => {
      console.error(
        `[${this.worker.name}][pid:${process.pid}] failed job: ${error.message}`
      );
    });

    process.on("SIGINT", async () => await this.gracefulShutdown());
    process.on("SIGTERM", async () => await this.gracefulShutdown());
  }

  private async gracefulShutdown() {
    await this.worker.close();
    process.exit(0);
  }
}

やってることはqueueとほとんど同じです
processがkillされたときにworkerをcloseしています
これしないとゾンビプロセスが発生することがあります

コンストラクタに渡したqueueNameをもつqueueにイベントが追加されるとworkerがprocessorを用いてタスクを処理し始めます

イベントの監視にはポーリングではなくredisのpub/sub機能が使用されます

下記のように使用します

export async function initiateMessageConsumer() {
  const messageConsumer = new Consumer(
    QueueName.MESSAGE_QUEUE,
    messageProcessor,
    { concurrency: 16 }
  );

  messageConsumer.start();
}

16個のタスクまで並列で処理します

concurrencyは非同期処理のみ並列で処理します(詳細後述)

Server

producerとconsumerは別のプロセスで起動しましょう
producerはexpressで実装し、Httpリクエストで受け取ったmessagesをキューイングします

import express from "express";
import { enqueueMessages } from "./queue/producers/messageProducer";

const app = express();
app.use(express.json());

app.post("/messages", async (req, res) => {
  try {
    const { messages }: { messages: { message: string }[] } = req.body;
    await enqueueMessages(messages);
    res.status(200).send("Messages are added to the queue");
  } catch (error) {
    console.error("Error adding jobs to the queue:", error);
    res.status(500).send("Failed to add jobs to the queue");
  }
});

app.listen(3000, () => {
  console.log("Server is listening on port 3000");
});

consumerはthrongを用いてCPUコア数のプロセスを起動して並列処理をさせます
デプロイしたらこのプロセス自体もproducerとは別のサーバーで動かすことになると思います

import throng from "throng";
import os from "os";
import { initiateMessageConsumers } from "./queue/consumers/messageConsumer";

const count = os.cpus().length;
void throng({
  worker: initiateMessageConsumer,
  count,
});

concurrencyで同時実行数は指定していますが、concurrencyは非同期処理だけが並列で処理されます
これはJavaScriptがシングルスレッドモデルであるためです

例えばCPU集約タスクの場合はconcurrencyは指定しても意味がありません
むしろオーバーヘッドが増えるだけです
この場合はマルチプロセスで複数のworker(concurrency: 1)で処理しないと並列処理ができません
そのためあらかじめ複数のプロセスを起動しています

BullMQにはworker_threadのように別ファイルにプロセッサーを置いてマルチスレッドで処理することもできます(参考)

起動

下記のdocker-composeを使用します

version: "3"
services:
  server:
    container_name: server
    build:
      context: .
      dockerfile: Dockerfile
      target: dev-server
    ports:
      - "3000:3000"
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379

  consumer:
    container_name: consumer
    build:
      context: .
      dockerfile: Dockerfile
      target: dev-consumer
    ports:
      - "3001:3001"
    depends_on:
      - redis
    environment:
      - REDIS_HOST=redis
      - REDIS_PORT=6379

  redis:
    container_name: redis
    image: redis:7.0.15-alpine3.20
    ports:
      - "6379:6379"
    user: "redis:redis"

Dockerfile
#### base ####
FROM node:18.3.0-slim as node
FROM ubuntu:focal-20220531 as base
ENV NODE_ENV=production

RUN apt-get update \
    && apt-get -qq install -y --no-install-recommends \
    tini \
    && rm -rf /var/lib/apt/lists/*

COPY --from=node /usr/local/include/ /usr/local/include/
COPY --from=node /usr/local/lib/ /usr/local/lib/
COPY --from=node /usr/local/bin/ /usr/local/bin/
RUN corepack disable && corepack enable

RUN groupadd --gid 1000 node \
    && useradd --uid 1000 --gid node --shell /bin/bash --create-home node \
    && mkdir /app \
    && chown -R node:node /app

WORKDIR /app
USER node
COPY --chown=node:node package*.json ./
RUN npm install --only=production && npm cache clean --force

#### dev-server ####
FROM base as dev-server
ENV NODE_ENV=development
ENV PATH=/app/node_modules/.bin:$PATH
COPY --chown=node:node . .
RUN npm install && npm cache clean --force
RUN npm run build
CMD ["node", "dist/main.js"]

#### dev-consumer ####
FROM base as dev-consumer
ENV NODE_ENV=development
ENV PATH=/app/node_modules/.bin:$PATH
COPY --chown=node:node . .
RUN npm install && npm cache clean --force
RUN npm run build
CMD ["node", "dist/consumer.js"]

#### prod-server ####
FROM base as prod-server
COPY --chown=node:node . .
RUN npm run build
ENTRYPOINT ["/usr/bin/tini", "--"]
CMD ["node", "dist/main.js"]

#### prod-consumer ####
FROM base as prod-consumer
COPY --chown=node:node . .
RUN npm run build
ENTRYPOINT ["/usr/bin/tini", "--"]
CMD ["node", "dist/consumer.js"]

docker-compose up で起動したら下記のリクエストを送ればワーカーサービスがメッセージを処理してくれます(vscode extentionsのRestClientみたいなの使ってます

POST http://localhost:3000/messages
Content-Type: application/json

{
  "messages": [
    { "message": "hello" },
    { "message": "good morning!" },
    { "message": "good afternoon!" },
    { "message": "good evening!" },
    { "message": "good night!" }
  ]
}

他にも親子プロセスの管理やワークフローなど様々なことが実装可能なので気になる方はぜひドキュメントを参照して試してみてください

以上BullMQを用いた簡単な実装例でした👋

おまけ

実際にterraformを用いてデプロイします

ecrにイメージをpush

// ecr.tf

resource "aws_ecr_repository" "default" {
  name                 = local.repository_name
  image_tag_mutability = "MUTABLE"

  image_scanning_configuration {
    scan_on_push = true
  }
}

resource "null_resource" "default" {
  provisioner "local-exec" {
    command = "sh ${path.module}/dockerbuild.sh"

    environment = {
      AWS_REGION     = var.region
      AWS_ACCOUNT_ID = var.account_id
      REPO_URL       = aws_ecr_repository.default.repository_url
      CONTAINER_NAME = "${local.server_container_name}"
      DOCKER_DIR     = "${local.docker_dir}"
      TARGET_STAGE   = "prod-server"
    }
  }
}

elastcache

resource "aws_elasticache_cluster" "redis" {
  cluster_id           = "redis-cluster"
  engine               = "redis"
  node_type            = "cache.t2.micro"
  num_cache_nodes      = 1
  parameter_group_name = "default.redis3.2"
  port                 = 6379
  subnet_group_name    = aws_elasticache_subnet_group.default.name
  security_group_ids   = ["${var.sg_id}"]
}

resource "aws_elasticache_subnet_group" "default" {
  name       = "default"
  subnet_ids = ["${var.public_a_id}", "${var.public_c_id}"]
}

ecs

// service.tf
resource "aws_ecs_service" "server" {
  name            = local.server_service_name
  cluster         = aws_ecs_cluster.default.id
  task_definition = aws_ecs_task_definition.default.arn
  desired_count   = local.service_count
  launch_type     = local.task_requires_compatibilities

  load_balancer {
    target_group_arn = var.tg_arn
    container_name   = local.server_service_name
    container_port   = var.webapp_port
  }

  network_configuration {
    subnets = [
      "${var.public_a_id}",
      "${var.public_c_id}",
    ]
    security_groups = [
      "${var.sg_id}"
    ]
    assign_public_ip = true
  }
}

// cluster.tf
resource "aws_ecs_cluster" "default" {
  name = local.ecs_cluster_name
}

// task.tf

data "template_file" "server" {
  template = file("${local.task_definitions_filepath}")
  vars = {
    SERVICE_NAME    = "${local.server_service_name}"
    ECR_IMAGE       = "${var.ecr_repository_uri}server:latest"
    LOGS_GROUP_NAME = "${var.logs_group_name}"
    LOG_DRIVER      = "${local.task_log_driver}"
    REGION          = "${var.region}"
  }
}

data "template_file" "consumer" {
  template = file("${local.task_definitions_filepath}")
  vars = {
    SERVICE_NAME    = "${local.consumer_service_name}"
    ECR_IMAGE       = "${var.ecr_repository_uri}consumer:latest"
    LOGS_GROUP_NAME = "${var.logs_group_name}"
    LOG_DRIVER      = "${local.task_log_driver}"
    REGION          = "${var.region}"
  }
}

resource "aws_ecs_server_task_definition" "server" {
  container_definitions    = data.template_file.server.rendered
  family                   = local.server_task_definitions_name
  cpu                      = local.task_cpu
  memory                   = local.task_memory
  network_mode             = local.task_network_mode
  requires_compatibilities = ["${local.task_requires_compatibilities}"]
  execution_role_arn       = var.execution_role_arn
}

resource "aws_ecs_consumer_task_definition" "consumer" {
  container_definitions    = data.template_file.consumer.rendered
  family                   = local.consumer_task_definitions_name
  cpu                      = local.task_cpu
  memory                   = local.task_memory
  network_mode             = local.task_network_mode
  requires_compatibilities = ["${local.task_requires_compatibilities}"]
  execution_role_arn       = var.execution_role_arn
}

task_definition.json

[
  {
    "name": "${SERVICE_NAME}",
    "image": "${ECR_IMAGE}",
    "essential": true,
    "portMappings": [
      {
        "containerPort": 3000,
        "hostPort": 3000
      }
    ],
    "logConfiguration": {
      "logDriver": "${LOG_DRIVER}",
      "options": {
        "awslogs-region": "${REGION}",
        "awslogs-group": "${LOGS_GROUP_NAME}",
        "awslogs-stream-prefix": "${SERVICE_NAME}"
      }
    }
  }
]

あとalbとvpcとsgとiamの設定が必要ですが、になる方はリポジトリを見てください(長くなりそうだからやめた)

おしまい

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?