TypeScriptを使った開発だとマイクロサービス間のイベントの管理や並列処理に悩まされると思います
今回はBullMQを用いてワーカーサービスをデプロイして分散処理できる構成を紹介します
お急ぎの方はこちら
BullMQ
redisを用いたメッセージキューイング用のライブラリです
スケジューリングやFIFO、メトリクスなど様々なオプションがあります
リトライやタスクの割り当てなどをライブラリ側がやってくれるので(カスタマイズも可能)ヘッドレスにワーカーサービスを利用することができます
実装
今回はメッセージをまとめて投稿する処理をメッセージキューイングを使って実装します
Processor
async function messageProcessor(job: Job) {
const { message } = job.data;
const result = await new Promise<string>((resolve) => {
setTimeout(() => {
const now = new Date().toISOString();
const post = `[${now}] ${message}`;
resolve(post);
}, 1000);
});
return result;
}
お気づきの方いるかもですが、1秒待ってメッセージを返してるだけです
親切設計。
こいつをワーカーを使って処理していきます
Queue
ここではBullMQのqueueをセットアップしてイベントを管理するためのクラスを定義します
import {
BaseJobOptions,
Job,
Processor,
Queue,
QueueEvents,
Worker,
WorkerOptions,
} from "bullmq";
import { connection } from "../config";
...
export class Producer<T extends QueueName> {
private queue: Queue<DataType<T>, ReturnType<T>>;
private queueEvent: QueueEvents;
constructor(queueName: T, defaultJobOptions?: BaseJobOptions) {
this.queue = new Queue<DataType<T>, ReturnType<T>>(queueName, {
connection,
defaultJobOptions,
});
this.queueEvent = new QueueEvents(queueName, { connection });
}
start() {
this.queueEvent = new QueueEvents(this.queue.name, { connection });
this.queueEvent.on("active", (job) => {
console.log(`[${this.queue.name}] active job: ${job.jobId}`);
});
this.queueEvent.on("waiting", () => {
const count = this.queue.getWaitingCount();
console.log(`[${this.queue.name}] waiting job: ${count}`);
});
this.queueEvent.on("completed", async (result) => {
console.log(`[${this.queue.name}] completed job: ${result.returnvalue}`);
});
this.queueEvent.on("error", (error) => {
console.error(`[${this.queue.name}] error: ${error}`);
});
}
async add(name: string, data: DataType<T>) {
await this.queue.add(name, data);
}
}
コンストラクタではqueueとqueueEventの初期化を行なっています
queueにつけた名前を使ってworkerはタスクを処理します
connectionは下記のように定義しておいてimportします
import dotenv from "dotenv";
dotenv.config();
export const connection = {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT!),
};
start
メソッドではイベントリスナーの登録、add
メソッドで初期化したqueueにデータを追加していきます
下記のように使用します
export async function enqueueMessages(messages: { message: string }[]) {
const messageProducer = new Producer(QueueName.MESSAGE_QUEUE);
messageProducer.start();
await Promise.all(
messages.map(async ({ message }) => {
messageProducer.add("message", { message });
})
);
}
これでqueueにイベントをためます
Worker
queueと同様にBullMQのworkerをセットアップ、イベント管理をするクラスを作成します
export class Consumer<T extends QueueName> {
private worker: Worker<DataType<T>, ReturnType<T>>;
constructor(
queueName: T,
processor: Processor<DataType<T>, ReturnType<T>>,
workerOptions?: Omit<WorkerOptions, "connection">
) {
this.worker = new Worker<DataType<T>, ReturnType<T>>(queueName, processor, {
...workerOptions,
connection,
});
}
start() {
this.worker.on("completed", (job) => {
console.log(
`[${this.worker.name}][pid:${process.pid}] completed job: ${job.returnvalue}`
);
});
this.worker.on("failed", (_, error) => {
console.error(
`[${this.worker.name}][pid:${process.pid}] failed job: ${error.message}`
);
});
process.on("SIGINT", async () => await this.gracefulShutdown());
process.on("SIGTERM", async () => await this.gracefulShutdown());
}
private async gracefulShutdown() {
await this.worker.close();
process.exit(0);
}
}
やってることはqueueとほとんど同じです
processがkillされたときにworkerをcloseしています
これしないとゾンビプロセスが発生することがあります
コンストラクタに渡したqueueNameをもつqueueにイベントが追加されるとworkerがprocessorを用いてタスクを処理し始めます
イベントの監視にはポーリングではなくredisのpub/sub機能が使用されます
下記のように使用します
export async function initiateMessageConsumer() {
const messageConsumer = new Consumer(
QueueName.MESSAGE_QUEUE,
messageProcessor,
{ concurrency: 16 }
);
messageConsumer.start();
}
16個のタスクまで並列で処理します
concurrencyは非同期処理のみ並列で処理します(詳細後述)
Server
producerとconsumerは別のプロセスで起動しましょう
producerはexpressで実装し、Httpリクエストで受け取ったmessagesをキューイングします
import express from "express";
import { enqueueMessages } from "./queue/producers/messageProducer";
const app = express();
app.use(express.json());
app.post("/messages", async (req, res) => {
try {
const { messages }: { messages: { message: string }[] } = req.body;
await enqueueMessages(messages);
res.status(200).send("Messages are added to the queue");
} catch (error) {
console.error("Error adding jobs to the queue:", error);
res.status(500).send("Failed to add jobs to the queue");
}
});
app.listen(3000, () => {
console.log("Server is listening on port 3000");
});
consumerはthrongを用いてCPUコア数のプロセスを起動して並列処理をさせます
デプロイしたらこのプロセス自体もproducerとは別のサーバーで動かすことになると思います
import throng from "throng";
import os from "os";
import { initiateMessageConsumers } from "./queue/consumers/messageConsumer";
const count = os.cpus().length;
void throng({
worker: initiateMessageConsumer,
count,
});
concurrencyで同時実行数は指定していますが、concurrencyは非同期処理だけが並列で処理されます
これはJavaScriptがシングルスレッドモデルであるためです
例えばCPU集約タスクの場合はconcurrencyは指定しても意味がありません
むしろオーバーヘッドが増えるだけです
この場合はマルチプロセスで複数のworker(concurrency: 1)で処理しないと並列処理ができません
そのためあらかじめ複数のプロセスを起動しています
BullMQにはworker_threadのように別ファイルにプロセッサーを置いてマルチスレッドで処理することもできます(参考)
起動
下記のdocker-composeを使用します
version: "3"
services:
server:
container_name: server
build:
context: .
dockerfile: Dockerfile
target: dev-server
ports:
- "3000:3000"
depends_on:
- redis
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
consumer:
container_name: consumer
build:
context: .
dockerfile: Dockerfile
target: dev-consumer
ports:
- "3001:3001"
depends_on:
- redis
environment:
- REDIS_HOST=redis
- REDIS_PORT=6379
redis:
container_name: redis
image: redis:7.0.15-alpine3.20
ports:
- "6379:6379"
user: "redis:redis"
Dockerfile
#### base ####
FROM node:18.3.0-slim as node
FROM ubuntu:focal-20220531 as base
ENV NODE_ENV=production
RUN apt-get update \
&& apt-get -qq install -y --no-install-recommends \
tini \
&& rm -rf /var/lib/apt/lists/*
COPY --from=node /usr/local/include/ /usr/local/include/
COPY --from=node /usr/local/lib/ /usr/local/lib/
COPY --from=node /usr/local/bin/ /usr/local/bin/
RUN corepack disable && corepack enable
RUN groupadd --gid 1000 node \
&& useradd --uid 1000 --gid node --shell /bin/bash --create-home node \
&& mkdir /app \
&& chown -R node:node /app
WORKDIR /app
USER node
COPY --chown=node:node package*.json ./
RUN npm install --only=production && npm cache clean --force
#### dev-server ####
FROM base as dev-server
ENV NODE_ENV=development
ENV PATH=/app/node_modules/.bin:$PATH
COPY --chown=node:node . .
RUN npm install && npm cache clean --force
RUN npm run build
CMD ["node", "dist/main.js"]
#### dev-consumer ####
FROM base as dev-consumer
ENV NODE_ENV=development
ENV PATH=/app/node_modules/.bin:$PATH
COPY --chown=node:node . .
RUN npm install && npm cache clean --force
RUN npm run build
CMD ["node", "dist/consumer.js"]
#### prod-server ####
FROM base as prod-server
COPY --chown=node:node . .
RUN npm run build
ENTRYPOINT ["/usr/bin/tini", "--"]
CMD ["node", "dist/main.js"]
#### prod-consumer ####
FROM base as prod-consumer
COPY --chown=node:node . .
RUN npm run build
ENTRYPOINT ["/usr/bin/tini", "--"]
CMD ["node", "dist/consumer.js"]
docker-compose up
で起動したら下記のリクエストを送ればワーカーサービスがメッセージを処理してくれます(vscode extentionsのRestClientみたいなの使ってます
POST http://localhost:3000/messages
Content-Type: application/json
{
"messages": [
{ "message": "hello" },
{ "message": "good morning!" },
{ "message": "good afternoon!" },
{ "message": "good evening!" },
{ "message": "good night!" }
]
}
他にも親子プロセスの管理やワークフローなど様々なことが実装可能なので気になる方はぜひドキュメントを参照して試してみてください
以上BullMQを用いた簡単な実装例でした👋
おまけ
実際にterraformを用いてデプロイします
ecrにイメージをpush
// ecr.tf
resource "aws_ecr_repository" "default" {
name = local.repository_name
image_tag_mutability = "MUTABLE"
image_scanning_configuration {
scan_on_push = true
}
}
resource "null_resource" "default" {
provisioner "local-exec" {
command = "sh ${path.module}/dockerbuild.sh"
environment = {
AWS_REGION = var.region
AWS_ACCOUNT_ID = var.account_id
REPO_URL = aws_ecr_repository.default.repository_url
CONTAINER_NAME = "${local.server_container_name}"
DOCKER_DIR = "${local.docker_dir}"
TARGET_STAGE = "prod-server"
}
}
}
elastcache
resource "aws_elasticache_cluster" "redis" {
cluster_id = "redis-cluster"
engine = "redis"
node_type = "cache.t2.micro"
num_cache_nodes = 1
parameter_group_name = "default.redis3.2"
port = 6379
subnet_group_name = aws_elasticache_subnet_group.default.name
security_group_ids = ["${var.sg_id}"]
}
resource "aws_elasticache_subnet_group" "default" {
name = "default"
subnet_ids = ["${var.public_a_id}", "${var.public_c_id}"]
}
ecs
// service.tf
resource "aws_ecs_service" "server" {
name = local.server_service_name
cluster = aws_ecs_cluster.default.id
task_definition = aws_ecs_task_definition.default.arn
desired_count = local.service_count
launch_type = local.task_requires_compatibilities
load_balancer {
target_group_arn = var.tg_arn
container_name = local.server_service_name
container_port = var.webapp_port
}
network_configuration {
subnets = [
"${var.public_a_id}",
"${var.public_c_id}",
]
security_groups = [
"${var.sg_id}"
]
assign_public_ip = true
}
}
// cluster.tf
resource "aws_ecs_cluster" "default" {
name = local.ecs_cluster_name
}
// task.tf
data "template_file" "server" {
template = file("${local.task_definitions_filepath}")
vars = {
SERVICE_NAME = "${local.server_service_name}"
ECR_IMAGE = "${var.ecr_repository_uri}server:latest"
LOGS_GROUP_NAME = "${var.logs_group_name}"
LOG_DRIVER = "${local.task_log_driver}"
REGION = "${var.region}"
}
}
data "template_file" "consumer" {
template = file("${local.task_definitions_filepath}")
vars = {
SERVICE_NAME = "${local.consumer_service_name}"
ECR_IMAGE = "${var.ecr_repository_uri}consumer:latest"
LOGS_GROUP_NAME = "${var.logs_group_name}"
LOG_DRIVER = "${local.task_log_driver}"
REGION = "${var.region}"
}
}
resource "aws_ecs_server_task_definition" "server" {
container_definitions = data.template_file.server.rendered
family = local.server_task_definitions_name
cpu = local.task_cpu
memory = local.task_memory
network_mode = local.task_network_mode
requires_compatibilities = ["${local.task_requires_compatibilities}"]
execution_role_arn = var.execution_role_arn
}
resource "aws_ecs_consumer_task_definition" "consumer" {
container_definitions = data.template_file.consumer.rendered
family = local.consumer_task_definitions_name
cpu = local.task_cpu
memory = local.task_memory
network_mode = local.task_network_mode
requires_compatibilities = ["${local.task_requires_compatibilities}"]
execution_role_arn = var.execution_role_arn
}
task_definition.json
[
{
"name": "${SERVICE_NAME}",
"image": "${ECR_IMAGE}",
"essential": true,
"portMappings": [
{
"containerPort": 3000,
"hostPort": 3000
}
],
"logConfiguration": {
"logDriver": "${LOG_DRIVER}",
"options": {
"awslogs-region": "${REGION}",
"awslogs-group": "${LOGS_GROUP_NAME}",
"awslogs-stream-prefix": "${SERVICE_NAME}"
}
}
}
]
あとalbとvpcとsgとiamの設定が必要ですが、になる方はリポジトリを見てください(長くなりそうだからやめた)
おしまい