LoginSignup
5
0

More than 1 year has passed since last update.

Node.js(TypeScript) で Azure Service Bus を使って、Queue(キュー)を用いたメッセージの送受信をしてみた。

Last updated at Posted at 2022-12-01

はじめに

マイクロサービス間のデータベースの一貫性を保持するために、メッセージブローカというものがよく採用されます。その例として、RabbitMQ, Apache Kafka, Redis(Cache DBだけどメッセージブローカとしての機能もあります)などがあります。これらと同じような機能を備えた、Azure Service BusというサービスがAzureにはあります。今回は、Azure Service BusをNode.jsのSDKを使って、メッセージをパブリッシュして、そのパブリッシュされたメッセージをNode.jsで受信してみたいと思います。

メッセージの種類

Azure Service Busでサービス間のメッセージの送受信の方法は大きく分けて、二種類あります。Queueを使ったやり方。TopicとSubscriptionを使用したやり方。今回はQueueを使って、メッセージの送受信を行って行きたいと思います。

Queue

キューはFirst in First out方式(先入れ先出し法)で、各メッセージが 1 つのコンシューマーによって処理されます。

Topic と Subscription

Topic と Subscription方式では、一つのメッセージが トピックを介して、複数のコンシューマーによって処理されます。

Azure Service Bus の Namespace と Queueを作成する

今回の記事では、Azure CLIを使って、namespaceとqueueを作成していきます。Azure portalからの作成は、以下の記事が参考になると思います。

以下のshell scriptはリソースグループ、Azure Service Busのnamespace及び、queueの作成を一気にしてくれます。便利です。

#!/bin/bash

let randomIdentifier=$RANDOM*$RANDOM

readonly AZ_RESOURCE_GROUP="myTestingRG${randomIdentifier}"
readonly AZ_RESOURCE_GROUP_LOCATION="japaneast"
# service bus namespace
readonly AZ_SERVICE_BUS_NAMESPACE_NAME="nicenamespace${randomIdentifier}"
readonly AZ_SERVICE_BUS_NAMESPACE_LOCATION="japaneast"
readonly AZ_SERVICE_BUS_NAMESPACE_SKU="Basic"
# service bus queue
readonly AZ_SERVICE_BUS_QUEUE_NAME="nicequeue${randomIdentifier}"

echo "Checking the specifed resource group..."
RG_FOUND=$(az group show -g ${AZ_RESOURCE_GROUP} -o tsv --query "properties.provisioningState")
if [ "${RG_FOUND}" = "Succeeded" ]; then
    echo "The resource group: ${AZ_RESOURCE_GROUP} already exists."
    exit
else
    echo "Creating a new resource group..."
    az group create \
        --name ${AZ_RESOURCE_GROUP} \
        --location ${AZ_RESOURCE_GROUP_LOCATION}
    echo "Done! ${AZ_RESOURCE_GROUP} has been created."
fi

echo "Creating a new service bus namespace..."
az servicebus namespace create \
    --name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
    --resource-group ${AZ_RESOURCE_GROUP} \
    --location ${AZ_SERVICE_BUS_NAMESPACE_LOCATION} \
    --sku ${AZ_SERVICE_BUS_NAMESPACE_SKU}
echo "Done! ${AZ_SERVICE_BUS_NAMESPACE_NAME} has been created."

echo "Creating a new service bus queue..."
az servicebus queue create \
    --name ${AZ_SERVICE_BUS_QUEUE_NAME} \
    --namespace-name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
    --resource-group ${AZ_RESOURCE_GROUP}
echo "Done! ${AZ_SERVICE_BUS_QUEUE_NAME} has been created."

echo "\n----- Queue Name -----"
echo ${AZ_SERVICE_BUS_QUEUE_NAME}

echo "\n----- Connection String -----"
az servicebus namespace authorization-rule keys list \
    --name RootManageSharedAccessKey \
    --namespace-name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
    --resource-group ${AZ_RESOURCE_GROUP} -o tsv --query "primaryConnectionString"

それぞれのパラメータの値は以下のセクションで自由に変更することができます。

let randomIdentifier=$RANDOM*$RANDOM
# Resource group
readonly AZ_RESOURCE_GROUP="myTestingRG${randomIdentifier}"
readonly AZ_RESOURCE_GROUP_LOCATION="japaneast"
# Service bus namespace
readonly AZ_SERVICE_BUS_NAMESPACE_NAME="nicenamespace${randomIdentifier}"
readonly AZ_SERVICE_BUS_NAMESPACE_LOCATION="japaneast"
readonly AZ_SERVICE_BUS_NAMESPACE_SKU="Basic"
# Service bus queue
readonly AZ_SERVICE_BUS_QUEUE_NAME="nicequeue${randomIdentifier}"

   # 以下省略

すべてのコマンドが実行され終わると、以下のコマンドの実行結果として、ログに新しく作られたqueueの名前とprimaryConnectionStringの値が表示されます。Node.jsのSDKからService Busに接続するために必要なので、メモしておいてください。

echo "\n----- Queue Name -----"
echo ${AZ_SERVICE_BUS_QUEUE_NAME}

echo "\n----- Connection String -----"
az servicebus namespace authorization-rule keys list \
    --name RootManageSharedAccessKey \
    --namespace-name ${AZ_SERVICE_BUS_NAMESPACE_NAME} \
    --resource-group ${AZ_RESOURCE_GROUP} -o tsv --query "primaryConnectionString"

# 出力結果
----- Queue Name -----
nicequeue2493630

----- Connection String -----
Endpoint=sb://nicenamespace2493630.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0ikVxqbOLAQOCmuPTKSegF5cAKifSJiXdOwsUCbuKLE=

Node.js プロジェクトの作成

Node.jsのプロジェクトを作成していくにあたって、npmのworkspaceという機能を利用します。workspaceは一つのプロジェクト内で複数のサブプロジェクトの管理を容易にしてくれる機能です。今回は、メッセージを送信するsenderプロジェクトとメッセージを受信するreceiverプロジェクトの2つを作成していきます。

ルートプロジェクトの作成

mkdir try-service-bus && cd try-service-bus
npm init -y

concurrentlyをインストールします。複数のコマンドを同時に実行するために必要なツールです。ルートプロジェクトから複数のプロジェクトのpackage.jsonscriptsを同時に実行するために使用します。例えば、senderプロジェクトとreceiverプロジェクトのbuild コマンドを同時に実行したりします。

npm install -DE concurrently
package.json
{
  "name": "try-service-bus",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
+ "devDependencies": {
+ "concurrently": "7.6.0"
+ }
}

サブプロジェクトの追加

npm init-wを使用すると簡単にルートプロジェクトにサブプロジェクトを追加できます。以下のコマンドを実行するとpackagesファルダーにsenderreveiverのサブプロジェクトを追加してくれます。

npm init -y -w packages/sender && npm init -y -w packages/receiver && npm i

上記のコマンドの実行後に、ルートプロジェクトのpackage.jsonファイルに、workspacesセクションが追加されます。このセクションで指定されているフォルダーがサブプロジェクトとして、認識されることになります。

package.json
{
  "name": "try-service-bus",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "devDependencies": {
    "concurrently": "7.6.0"
  },
+ "workspaces": [
+   "packages/sender",
+   "packages/receiver"
+ ]
}
.
├── package-lock.json
├── package.json
└── packages # <= 追加
    ├── receiver # <= 追加
    │   └── package.json # <= 追加
    └── sender # <= 追加
        └── package.json # <= 追加

それぞれのサブプロジェクト内には新しく、独立したpackage.jsonファイルが追加されます。

packages/sender/package.json
{
  "name": "sender",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "devDependencies": {},
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}
packages/receiver/package.json
{
  "name": "receiver",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "devDependencies": {},
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Sender プロジェクトにライブラリーをインストール

-w のオプションを使用すれば、ルートプロジェクトからサブプロジェクトに対して、npmのコマンドを実行することができます。以下のコマンドを実行することによって、、ルートプロジェクトからsender プロジェクトに対して、ライブラリーをインストールすることができます。

npm install <ライブラリーの名前> -w sender

@azure/service-busをインストールします。このライブラリーを使用することによって簡単にazure service busに接続して、メッセージを送受信することができます。

npm install -E @azure/service-bus -w sender

typescriptの開発環境のための、ライブラリーをインストールします。

npm install -DE typescript @types/node nodemon -w sender
packages/sender/package.json
{
  "name": "sender",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
+ "dependencies": {
+   "@azure/service-bus": "7.7.3"
+ },
+ "devDependencies": {
+   "@types/node": "18.11.9",
+   "nodemon": "2.0.20",
+   "typescript": "4.9.3"
+ }
}

Receiver プロジェクトにライブラリーをインストール

receiverプロジェクトにもsenderプロジェクトと全く同じ、ライブラリーをインストールしていきます。

npm install -E @azure/service-bus -w receiver
npm install -DE typescript @types/node nodemon -w receiver
packages/receiver/package.json
{
  "name": "receiver",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
+ "dependencies": {
+   "@azure/service-bus": "7.7.3"
+ },
+ "devDependencies": {
+   "@types/node": "18.11.9",
+   "nodemon": "2.0.20",
+   "typescript": "4.9.3"
+ }
}

TypeScript のセットアップ

typescriptの開発環境のセットアップをしていきます。

mkdir packages/sender/src packages/receiver/src
touch packages/sender/src/index.ts packages/receiver/src/index.ts
packages/sender/src/index.ts
const greet: string = 'hello';
console.log(`${greet} from sender`)
packages/receiver/src/index.ts
const greet: string = 'hello';
console.log(`${greet} from receiver`)

senderプロジェクトとreceiverプロジェクトそれぞれにtsconfig.jsonを追加します。

touch packages/sender/tsconfig.json packages/receiver/tsconfig.json

tsconfig.jsonの内容は、必要最低限のオプションだけを追加しています。outDirの項目は絶対に./distにしてください。

packages/sender/tsconfig.json
{
  "compilerOptions": {
    "target": "es2016",                                  /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
    "module": "commonjs",                                /* Specify what module code is generated. */
    "outDir": "./dist",                                   /* Specify an output folder for all emitted files. */
    "esModuleInterop": true,                             /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
    "forceConsistentCasingInFileNames": true,            /* Ensure that casing is correct in imports. */
    "strict": true,                                      /* Enable all strict type-checking options. */
    "skipLibCheck": true                                 /* Skip type checking all .d.ts files. */
  }
}

packages/receiver/tsconfig.json
{
  "compilerOptions": {
    "target": "es2016",                                  /* Set the JavaScript language version for emitted JavaScript and include compatible library declarations. */
    "module": "commonjs",                                /* Specify what module code is generated. */
    "outDir": "./dist",                                   /* Specify an output folder for all emitted files. */
    "esModuleInterop": true,                             /* Emit additional JavaScript to ease support for importing CommonJS modules. This enables 'allowSyntheticDefaultImports' for type compatibility. */
    "forceConsistentCasingInFileNames": true,            /* Ensure that casing is correct in imports. */
    "strict": true,                                      /* Enable all strict type-checking options. */
    "skipLibCheck": true                                 /* Skip type checking all .d.ts files. */
  }
}

package.jsonscriptsを追加します。

packages/sender/package.json
{
  "name": "sender",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
+ "scripts": {
+   "start": "node dist/index.js",
+   "dev": "nodemon dist/index.js",
+   "watch": "tsc --watch",
+   "build": "tsc"
+ },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@azure/service-bus": "7.7.3"
  },
  "devDependencies": {
    "@types/node": "18.11.9",
    "nodemon": "2.0.20",
    "typescript": "4.9.3"
  }
}
packages/receiver/package.json
{
  "name": "receiver",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
+ "scripts": {
+   "start": "node dist/index.js",
+   "dev": "nodemon dist/index.js",
+   "watch": "tsc --watch",
+   "build": "tsc"
+ },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "@azure/service-bus": "7.7.3"
  },
  "devDependencies": {
    "@types/node": "18.11.9",
    "nodemon": "2.0.20",
    "typescript": "4.9.3"
  }
}

上記でサブプロジェクトのpackage.jsonに追加したscriptsをルートプロジェクトから実行するための、scriptsをルートプロジェクトのpackage.jsonに追加していきます。

package.json
{
  "name": "try-service-bus",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
+   "start": "concurrently \"npm run start:sender\" \"npm run start:receiver\"",
+   "dev": "concurrently \"npm run dev:sender\" \"npm run dev:receiver\"",
+   "build": "concurrently \"npm run build:sender\" \"npm run build:receiver\"",
+   "watch": "concurrently \"npm run watch:sender\" \"npm run watch:receiver\"",
+   "start:sender": "npm run start -w sender",
+   "dev:sender": "npm run dev -w sender",
+   "watch:sender": "npm run watch -w sender",
+   "build:sender": "npm run build -w sender",
+   "start:receiver": "npm run start -w receiver",
+   "dev:receiver": "npm run dev -w receiver",
+   "watch:receiver": "npm run watch -w receiver",
+   "build:receiver": "npm run build -w receiver"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "devDependencies": {
    "concurrently": "7.6.0"
  },
  "workspaces": [
   "packages/sender",
   "packages/receiver"
  ]
}

以下のコマンドを実行すると、両方のサブプロジェクトをビルドして、それぞれ実行してくれます。

npm run build && npm run start
❯ npm run build && npm run start

> try-service-bus@1.0.0 build
> concurrently "npm run build:sender" "npm run build:receiver"

[0] 
[0] > try-service-bus@1.0.0 build:sender
[0] > npm run build -w sender
[0] 
[1] 
[1] > try-service-bus@1.0.0 build:receiver
[1] > npm run build -w receiver
[1] 
[0] 
[0] > sender@1.0.0 build
[0] > tsc
[0] 
[1] 
[1] > receiver@1.0.0 build
[1] > tsc
[1] 
[0] npm run build:sender exited with code 0
[1] npm run build:receiver exited with code 0

> try-service-bus@1.0.0 start
> concurrently "npm run start:sender" "npm run start:receiver"

[1] 
[1] > try-service-bus@1.0.0 start:receiver
[1] > npm run start -w receiver
[1] 
[0] 
[0] > try-service-bus@1.0.0 start:sender
[0] > npm run start -w sender
[0] 
[0] 
[0] > sender@1.0.0 start
[0] > node dist/index.js
[0] 
[1] 
[1] > receiver@1.0.0 start
[1] > node dist/index.js
[1] 
[0] hello from sender
[1] hello from receiver
[0] npm run start:sender exited with code 0
[1] npm run start:receiver exited with code 

Senderの実装

それぞれのコードの説明は、ソースコードにコメントとして、書き込みましたので、そちらを参照してください。

packages/sender/src/index.ts
import { ServiceBusClient, ServiceBusMessage } from "@azure/service-bus";

// az CLIでazure service busのリソースを作成したときに、
// メモしておいた、`queue`の名前と、`connectionString`をそのまま代入してください。 
const CONNECTION_STRING =
  "Endpoint=sb://nicenamespace2493630.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0ikVxqbOLAQOCmuPTKSegF5cAKifSJiXdOwsUCbuKLE=";
const QUEUE_NAME = "nicequeue2493630";

const messages: ServiceBusMessage[] = [
  { body: "Hey1" },
  { body: "Hey2" },
  { body: "Hey3" },
  { body: "Hey4" },
  { body: "Hey5" },
  { body: "Hey6" },
  { body: "Hey7" },
  { body: "Hey8" },
  { body: "Hey9" },
  { body: "Hey10" },
];

async function main() {
  // Azure Service Bus クライアントの作成
  const sbClient = new ServiceBusClient(CONNECTION_STRING);

  // 特定のキュー又はトピックに紐付いたセンダーの生成
  const sender = sbClient.createSender(QUEUE_NAME);

  try {
    // 複数のメッセージを格納するバッチオブジェクトを生成
    const batch = await sender.createMessageBatch();

    // 作成したバッチにメッセージを格納する
    messages.forEach((msgItem) => {
      const isSuccess = batch.tryAddMessage(msgItem);
      if (!isSuccess) {
        throw new Error("Too many messages to send");
      }
    });

    // サービスバスにメッセージが格納されたバッチを送信する
    await sender.sendMessages(batch);

    console.log(`Messages have been successfully sent to the queue: ${QUEUE_NAME}`);

    // センダーを閉じる
    await sender.close();
  } finally {
    // サービスバスとの接続を切る
    await sbClient.close();
  }
}

main().catch((err) => {
  console.log(err);
  process.exit(1);
});

Receiver の実装

packages/receiver/src/index.ts
import {
  delay,
  ServiceBusClient,
  ServiceBusMessage,
  ProcessErrorArgs,
} from "@azure/service-bus";

// az CLIでazure service busのリソースを作成したときに、
// メモしておいた、`queue`の名前と、`connectionString`をそのまま代入してください。 
const CONNECTION_STRING =
  "Endpoint=sb://nicenamespace2493630.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=0ikVxqbOLAQOCmuPTKSegF5cAKifSJiXdOwsUCbuKLE=";
const QUEUE_NAME = "nicequeue2493630";

async function main() {
  // Azure Service Bus クライアントの作成
  const sbClient = new ServiceBusClient(CONNECTION_STRING);

  // 特定のキュー又はサブスクリプションに紐付いたレシーバーの生成
  const receiver = sbClient.createReceiver(QUEUE_NAME);

  receiver.subscribe({
    // センダーからのメッセージを無事受信した場合に呼ばれるコールバック関数
    processMessage: async (messageReceived: ServiceBusMessage) => {
      console.log(`Received message: ${messageReceived.body}`);
    },
    // センダーからのメッセージの受信を失敗した場合に呼ばれるコールバック関数
    processError: async (error: ProcessErrorArgs) => {
      console.log(error);
    },
  });

  // センダーがメッセージを送信するまで待つ
  await delay(99999);

  // レシーバーを閉じる
  await receiver.close();

  // サービスバスとの接続を切る
  await sbClient.close();
}

main().catch((err) => {
  console.log(err);
  process.exit(1);
});

動かしてみる

実際に動かしてみます。
typescriptのプロジェクトなので、以下のコマンドでビルドします。

npm run build

npm run startを実行すると、同時にsenderreceiverを動かすことはができますが、実行結果をわかりやすく確認するために、以下のコマンドを使用して、分けて実行します。

メッセージをazure service busにメッセージを送信します。

❯ npm run start:sender

> try-service-bus@1.0.0 start:sender
> npm run start -w sender


> sender@1.0.0 start
> node dist/index.js

Messages have been successfully sent to the queue: nicequeue2493630

receiverを起動すると無事にメッセージを受け取り処理できていることが確認できました。

❯ npm run start:receiver

> try-service-bus@1.0.0 start:receiver
> npm run start -w receiver


> receiver@1.0.0 start
> node dist/index.js

Received message: Hey1
Received message: Hey2
Received message: Hey3
Received message: Hey4
Received message: Hey5
Received message: Hey6
Received message: Hey7
Received message: Hey8
Received message: Hey9
Received message: Hey10

おわり

Azure Service BusにNode.jsを使って、接続して、メッセージの送受信をしてみた。@azure/service-busというライブラリーを使用することによって、とても簡単にメッセージの送受信ができた。

今回試したメッセージの送受信の方法としてキュー(queue)を使いました。この他にトピック(topic)とサブスクリプション(subscription)を用いた方法もAzure Service Busは対応しています。そちらの方法もまた試していきたいと思います。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0