はじめに
マイクロサービス間のデータベースの一貫性を保持するために、メッセージブローカというものがよく採用されます。その例として、RabbitMQ, Apache Kafka, Redis(Cache DBだけどメッセージブローカとしての機能もあります)などがあります。これらと同じような機能を備えた、Azure Service BusというサービスがAzureにはあります。今回は、Azure Service BusをNode.jsのSDKを使って、メッセージをパブリッシュして、そのパブリッシュされたメッセージをNode.jsで受信してみたいと思います。
メッセージの種類
Azure Service Busでサービス間のメッセージの送受信の方法は大きく分けて、二種類あります。Queueを使ったやり方。TopicとSubscriptionを使用したやり方。今回はQueueを使って、メッセージの送受信を行って行きたいと思います。
Queue
キューはFirst in First out方式(先入れ先出し法)で、各メッセージが 1 つのコンシューマーによって処理されます。
Topic と Subscription
Topic と Subscription方式では、一つのメッセージが トピックを介して、複数のコンシューマーによって処理されます。
Azure Service Bus の Namespace と Queueを作成する
今回の記事では、Azure CLIを使って、namespaceとqueueを作成していきます。Azure portalからの作成は、以下の記事が参考になると思います。
以下のshell scriptはリソースグループ、Azure Service Busのnamespace及び、queueの作成を一気にしてくれます。便利です。
#!/bin/bash
let randomIdentifier=$RANDOM*$RANDOM
readonly AZ_RESOURCE_GROUP="myTestingRG${randomIdentifier}"
readonly AZ_RESOURCE_GROUP_LOCATION="japaneast"
# service bus namespace
readonly AZ_SERVICE_BUS_NAMESPACE_NAME="nicenamespace${randomIdentifier}"
readonly AZ_SERVICE_BUS_NAMESPACE_LOCATION="japaneast"
readonly AZ_SERVICE_BUS_NAMESPACE_SKU="Basic"
# service bus queue
readonly AZ_SERVICE_BUS_QUEUE_NAME="nicequeue${randomIdentifier}"
echo "Checking the specifed resource group..."
RG_FOUND=$(az group show -g ${AZ_RESOURCE_GROUP} -o tsv --query "properties.provisioningState")
if [ "${RG_FOUND}" = "Succeeded" ]; then
echo "The resource group: ${AZ_RESOURCE_GROUP} already exists."
exit
else
echo "Creating a new resource group..."
az group create \
--name ${AZ_RESOURCE_GROUP} \
--location ${AZ_RESOURCE_GROUP_LOCATION}
echo "Done! ${AZ_RESOURCE_GROUP} has been created."
fi
echo "Creating a new service bus namespace..."
az servicebus namespace create \
--name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
--resource-group ${AZ_RESOURCE_GROUP} \
--location ${AZ_SERVICE_BUS_NAMESPACE_LOCATION} \
--sku ${AZ_SERVICE_BUS_NAMESPACE_SKU}
echo "Done! ${AZ_SERVICE_BUS_NAMESPACE_NAME} has been created."
echo "Creating a new service bus queue..."
az servicebus queue create \
--name ${AZ_SERVICE_BUS_QUEUE_NAME} \
--namespace-name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
--resource-group ${AZ_RESOURCE_GROUP}
echo "Done! ${AZ_SERVICE_BUS_QUEUE_NAME} has been created."
echo "\n----- Queue Name -----"
echo ${AZ_SERVICE_BUS_QUEUE_NAME}
echo "\n----- Connection String -----"
az servicebus namespace authorization-rule keys list \
--name RootManageSharedAccessKey \
--namespace-name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
--resource-group ${AZ_RESOURCE_GROUP} -o tsv --query "primaryConnectionString"
それぞれのパラメータの値は以下のセクションで自由に変更することができます。
let randomIdentifier=$RANDOM*$RANDOM
# Resource group
readonly AZ_RESOURCE_GROUP="myTestingRG${randomIdentifier}"
readonly AZ_RESOURCE_GROUP_LOCATION="japaneast"
# Service bus namespace
readonly AZ_SERVICE_BUS_NAMESPACE_NAME="nicenamespace${randomIdentifier}"
readonly AZ_SERVICE_BUS_NAMESPACE_LOCATION="japaneast"
readonly AZ_SERVICE_BUS_NAMESPACE_SKU="Basic"
# Service bus queue
readonly AZ_SERVICE_BUS_QUEUE_NAME="nicequeue${randomIdentifier}"
# 以下省略
すべてのコマンドが実行され終わると、以下のコマンドの実行結果として、ログに新しく作られたqueueの名前とprimaryConnectionString
の値が表示されます。Node.jsのSDKからService Busに接続するために必要なので、メモしておいてください。
echo "\n----- Queue Name -----"
echo ${AZ_SERVICE_BUS_QUEUE_NAME}
echo "\n----- Connection String -----"
az servicebus namespace authorization-rule keys list \
--name RootManageSharedAccessKey \
--namespace-name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
--resource-group ${AZ_RESOURCE_GROUP} -o tsv --query "primaryConnectionString"
# 出力結果
----- Queue Name -----
nicequeue2493630
----- Connection String -----
Endpoint=sb://nicenamespace2493630.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0ikVxqbOLAQOCmuPTKSegF5cAKifSJiXdOwsUCbuKLE=
Node.js プロジェクトの作成
Node.jsのプロジェクトを作成していくにあたって、npmのworkspaceという機能を利用します。workspaceは一つのプロジェクト内で複数のサブプロジェクトの管理を容易にしてくれる機能です。今回は、メッセージを送信するsenderプロジェクトとメッセージを受信するreceiverプロジェクトの2つを作成していきます。
ルートプロジェクトの作成
mkdir try-service-bus && cd try-service-bus
npm init -y
concurrently
をインストールします。複数のコマンドを同時に実行するために必要なツールです。ルートプロジェクトから複数のプロジェクトのpackage.json
のscripts
を同時に実行するために使用します。例えば、senderプロジェクトとreceiverプロジェクトのbuild
コマンドを同時に実行したりします。
npm install -DE concurrently
{
"name": "try-service-bus",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
+ "devDependencies": {
+ "concurrently": "7.6.0"
+ }
}
サブプロジェクトの追加
npm init
の -w
を使用すると簡単にルートプロジェクトにサブプロジェクトを追加できます。以下のコマンドを実行するとpackages
ファルダーにsender
とreveiver
のサブプロジェクトを追加してくれます。
npm init -y -w packages/sender && npm init -y -w packages/receiver && npm i
上記のコマンドの実行後に、ルートプロジェクトのpackage.json
ファイルに、workspaces
セクションが追加されます。このセクションで指定されているフォルダーがサブプロジェクトとして、認識されることになります。
{
"name": "try-service-bus",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
"devDependencies": {
"concurrently": "7.6.0"
},
+ "workspaces": [
+ "packages/sender",
+ "packages/receiver"
+ ]
}
.
├── package-lock.json
├── package.json
└── packages # <= 追加
├── receiver # <= 追加
│ └── package.json # <= 追加
└── sender # <= 追加
└── package.json # <= 追加
それぞれのサブプロジェクト内には新しく、独立したpackage.json
ファイルが追加されます。
{
"name": "sender",
"version": "1.0.0",
"description": "",
"main": "index.js",
"devDependencies": {},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
{
"name": "receiver",
"version": "1.0.0",
"description": "",
"main": "index.js",
"devDependencies": {},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Sender プロジェクトにライブラリーをインストール
-w
のオプションを使用すれば、ルートプロジェクトからサブプロジェクトに対して、npmのコマンドを実行することができます。以下のコマンドを実行することによって、、ルートプロジェクトからsender
プロジェクトに対して、ライブラリーをインストールすることができます。
npm install <ライブラリーの名前> -w sender
@azure/service-bus
をインストールします。このライブラリーを使用することによって簡単にazure service busに接続して、メッセージを送受信することができます。
npm install -E @azure/service-bus -w sender
typescriptの開発環境のための、ライブラリーをインストールします。
npm install -DE typescript @types/node nodemon -w sender
{
"name": "sender",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
+ "dependencies": {
+ "@azure/service-bus": "7.7.3"
+ },
+ "devDependencies": {
+ "@types/node": "18.11.9",
+ "nodemon": "2.0.20",
+ "typescript": "4.9.3"
+ }
}
Receiver プロジェクトにライブラリーをインストール
receiver
プロジェクトにもsender
プロジェクトと全く同じ、ライブラリーをインストールしていきます。
npm install -E @azure/service-bus -w receiver
npm install -DE typescript @types/node nodemon -w receiver
{
"name": "receiver",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"keywords": [],
"author": "",
"license": "ISC",
+ "dependencies": {
+ "@azure/service-bus": "7.7.3"
+ },
+ "devDependencies": {
+ "@types/node": "18.11.9",
+ "nodemon": "2.0.20",
+ "typescript": "4.9.3"
+ }
}
TypeScript のセットアップ
typescriptの開発環境のセットアップをしていきます。
mkdir packages/sender/src packages/receiver/src
touch packages/sender/src/index.ts packages/receiver/src/index.ts
const greet: string = 'hello';
console.log(`${greet} from sender`)
const greet: string = 'hello';
console.log(`${greet} from receiver`)
sender
プロジェクトとreceiver
プロジェクトそれぞれにtsconfig.json
を追加します。
touch packages/sender/tsconfig.json packages/receiver/tsconfig.json
tsconfig.json
の内容は、必要最低限のオプションだけを追加しています。outDir
の項目は絶対に./dist
にしてください。
{
"compilerOptions": {
"target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
"module": "commonjs", /* Specify what module code is generated. */
"outDir": "./dist", /* Specify an output folder for all emitted files. */
"esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
"forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */
"strict": true, /* Enable all strict type-checking options. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
}
}
{
"compilerOptions": {
"target": "es2016", /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
"module": "commonjs", /* Specify what module code is generated. */
"outDir": "./dist", /* Specify an output folder for all emitted files. */
"esModuleInterop": true, /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
"forceConsistentCasingInFileNames": true, /* Ensure that casing is correct in imports. */
"strict": true, /* Enable all strict type-checking options. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
}
}
package.json
にscripts
を追加します。
{
"name": "sender",
"version": "1.0.0",
"description": "",
"main": "index.js",
+ "scripts": {
+ "start": "node dist/index.js",
+ "dev": "nodemon dist/index.js",
+ "watch": "tsc --watch",
+ "build": "tsc"
+ },
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@azure/service-bus": "7.7.3"
},
"devDependencies": {
"@types/node": "18.11.9",
"nodemon": "2.0.20",
"typescript": "4.9.3"
}
}
{
"name": "receiver",
"version": "1.0.0",
"description": "",
"main": "index.js",
+ "scripts": {
+ "start": "node dist/index.js",
+ "dev": "nodemon dist/index.js",
+ "watch": "tsc --watch",
+ "build": "tsc"
+ },
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"@azure/service-bus": "7.7.3"
},
"devDependencies": {
"@types/node": "18.11.9",
"nodemon": "2.0.20",
"typescript": "4.9.3"
}
}
上記でサブプロジェクトのpackage.json
に追加したscripts
をルートプロジェクトから実行するための、scripts
をルートプロジェクトのpackage.json
に追加していきます。
{
"name": "try-service-bus",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
+ "start": "concurrently \"npm run start:sender\" \"npm run start:receiver\"",
+ "dev": "concurrently \"npm run dev:sender\" \"npm run dev:receiver\"",
+ "build": "concurrently \"npm run build:sender\" \"npm run build:receiver\"",
+ "watch": "concurrently \"npm run watch:sender\" \"npm run watch:receiver\"",
+ "start:sender": "npm run start -w sender",
+ "dev:sender": "npm run dev -w sender",
+ "watch:sender": "npm run watch -w sender",
+ "build:sender": "npm run build -w sender",
+ "start:receiver": "npm run start -w receiver",
+ "dev:receiver": "npm run dev -w receiver",
+ "watch:receiver": "npm run watch -w receiver",
+ "build:receiver": "npm run build -w receiver"
},
"keywords": [],
"author": "",
"license": "ISC",
"devDependencies": {
"concurrently": "7.6.0"
},
"workspaces": [
"packages/sender",
"packages/receiver"
]
}
以下のコマンドを実行すると、両方のサブプロジェクトをビルドして、それぞれ実行してくれます。
npm run build && npm run start
❯ npm run build && npm run start
> try-service-bus@1.0.0 build
> concurrently "npm run build:sender" "npm run build:receiver"
[0]
[0] > try-service-bus@1.0.0 build:sender
[0] > npm run build -w sender
[0]
[1]
[1] > try-service-bus@1.0.0 build:receiver
[1] > npm run build -w receiver
[1]
[0]
[0] > sender@1.0.0 build
[0] > tsc
[0]
[1]
[1] > receiver@1.0.0 build
[1] > tsc
[1]
[0] npm run build:sender exited with code 0
[1] npm run build:receiver exited with code 0
> try-service-bus@1.0.0 start
> concurrently "npm run start:sender" "npm run start:receiver"
[1]
[1] > try-service-bus@1.0.0 start:receiver
[1] > npm run start -w receiver
[1]
[0]
[0] > try-service-bus@1.0.0 start:sender
[0] > npm run start -w sender
[0]
[0]
[0] > sender@1.0.0 start
[0] > node dist/index.js
[0]
[1]
[1] > receiver@1.0.0 start
[1] > node dist/index.js
[1]
[0] hello from sender
[1] hello from receiver
[0] npm run start:sender exited with code 0
[1] npm run start:receiver exited with code
Senderの実装
それぞれのコードの説明は、ソースコードにコメントとして、書き込みましたので、そちらを参照してください。
import { ServiceBusClient, ServiceBusMessage } from "@azure/service-bus";
// az CLIでazure service busのリソースを作成したときに、
// メモしておいた、`queue`の名前と、`connectionString`をそのまま代入してください。
const CONNECTION_STRING =
"Endpoint=sb://nicenamespace2493630.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0ikVxqbOLAQOCmuPTKSegF5cAKifSJiXdOwsUCbuKLE=";
const QUEUE_NAME = "nicequeue2493630";
const messages: ServiceBusMessage[] = [
{ body: "Hey1" },
{ body: "Hey2" },
{ body: "Hey3" },
{ body: "Hey4" },
{ body: "Hey5" },
{ body: "Hey6" },
{ body: "Hey7" },
{ body: "Hey8" },
{ body: "Hey9" },
{ body: "Hey10" },
];
async function main() {
// Azure Service Bus クライアントの作成
const sbClient = new ServiceBusClient(CONNECTION_STRING);
// 特定のキュー又はトピックに紐付いたセンダーの生成
const sender = sbClient.createSender(QUEUE_NAME);
try {
// 複数のメッセージを格納するバッチオブジェクトを生成
const batch = await sender.createMessageBatch();
// 作成したバッチにメッセージを格納する
messages.forEach((msgItem) => {
const isSuccess = batch.tryAddMessage(msgItem);
if (!isSuccess) {
throw new Error("Too many messages to send");
}
});
// サービスバスにメッセージが格納されたバッチを送信する
await sender.sendMessages(batch);
console.log(`Messages have been successfully sent to the queue: ${QUEUE_NAME}`);
// センダーを閉じる
await sender.close();
} finally {
// サービスバスとの接続を切る
await sbClient.close();
}
}
main().catch((err) => {
console.log(err);
process.exit(1);
});
Receiver の実装
import {
delay,
ServiceBusClient,
ServiceBusMessage,
ProcessErrorArgs,
} from "@azure/service-bus";
// az CLIでazure service busのリソースを作成したときに、
// メモしておいた、`queue`の名前と、`connectionString`をそのまま代入してください。
const CONNECTION_STRING =
"Endpoint=sb://nicenamespace2493630.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0ikVxqbOLAQOCmuPTKSegF5cAKifSJiXdOwsUCbuKLE=";
const QUEUE_NAME = "nicequeue2493630";
async function main() {
// Azure Service Bus クライアントの作成
const sbClient = new ServiceBusClient(CONNECTION_STRING);
// 特定のキュー又はサブスクリプションに紐付いたレシーバーの生成
const receiver = sbClient.createReceiver(QUEUE_NAME);
receiver.subscribe({
// センダーからのメッセージを無事受信した場合に呼ばれるコールバック関数
processMessage: async (messageReceived: ServiceBusMessage) => {
console.log(`Received message: ${messageReceived.body}`);
},
// センダーからのメッセージの受信を失敗した場合に呼ばれるコールバック関数
processError: async (error: ProcessErrorArgs) => {
console.log(error);
},
});
// センダーがメッセージを送信するまで待つ
await delay(99999);
// レシーバーを閉じる
await receiver.close();
// サービスバスとの接続を切る
await sbClient.close();
}
main().catch((err) => {
console.log(err);
process.exit(1);
});
動かしてみる
実際に動かしてみます。
typescriptのプロジェクトなので、以下のコマンドでビルドします。
npm run build
npm run start
を実行すると、同時にsender
とreceiver
を動かすことはができますが、実行結果をわかりやすく確認するために、以下のコマンドを使用して、分けて実行します。
メッセージをazure service busにメッセージを送信します。
❯ npm run start:sender
> try-service-bus@1.0.0 start:sender
> npm run start -w sender
> sender@1.0.0 start
> node dist/index.js
Messages have been successfully sent to the queue: nicequeue2493630
receiver
を起動すると無事にメッセージを受け取り処理できていることが確認できました。
❯ npm run start:receiver
> try-service-bus@1.0.0 start:receiver
> npm run start -w receiver
> receiver@1.0.0 start
> node dist/index.js
Received message: Hey1
Received message: Hey2
Received message: Hey3
Received message: Hey4
Received message: Hey5
Received message: Hey6
Received message: Hey7
Received message: Hey8
Received message: Hey9
Received message: Hey10
おわり
Azure Service BusにNode.jsを使って、接続して、メッセージの送受信をしてみた。@azure/service-bus
というライブラリーを使用することによって、とても簡単にメッセージの送受信ができた。
今回試したメッセージの送受信の方法としてキュー(queue)を使いました。この他にトピック(topic)とサブスクリプション(subscription)を用いた方法もAzure Service Busは対応しています。そちらの方法もまた試していきたいと思います。