はじめに
2020 年 12 月 22 日に、Azure Functions の新たなトリガーである RabbitMQ の Extension が GA (Generally Available: 一般公開) されました。この拡張機能を使用することにより、Azure Functions は、RabbitMQ の Queue へ入っているキューを取り出しての処理や、メッセージを作成して、別の Queue へ格納することができます。
RabbitMQ Extension for Windows and Linux is now generally available
現時点で使用できる App Service Plan は、Premium, Dedicated (App Service) Plan となっており、Consumption Plan は対応しておりませんので注意が必要です。 詳しい内容は、GitHub の Rabbit MQ Extensions リポジトリがございますのでこちらにて最新情報を確認することができます。
Azure/azure-functions-rabbitmq-extension
準備
RabbitMQ と Azure Functions の事前準備として以下のソフトウェアをインストールしておきます。
- Docker
- Visual Studio
RabbitMQ の環境構築
今回ローカル環境で開発するために、RabbitMQ のソフトウェアを直接ダウンロードする方法がありますが、Docker コンテナとして実行しそのコンテナを Azure プラットフォーム上へデプロイすることを想定しておりますため、Docker 上で構築を行います。
- Docker Hub から、対象の RabbitMQ の Docker image を pull します。UI 画面にて操作を行いたいため、管理画面が付いております
3-management
を選択しました。
https://hub.docker.com/_/rabbitmq
$ docker pull rabbitmq:3-management
- RabbitMQ のコンテナを立ち上げます。UI 画面を使用するために、15672 ポートと メッセージのやり取りにて AMQP を使用するために、5672 ポートをフォワードするようにします。
$ docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
- http://localhost:8080/ へアクセスを行い、RabbitMQ の UI 画面にてログインを行います。デフォルトの設定では、以下のようになっております。
- ユーザ名:guest
- パスワード:guest
- メッセージの Queue を転送するための Exchange を作成します。上部メニューの Exchange から、
Add a new exchange
から行います。Name にtest
と追加し、Add exchange
ボタンを押します。
- Queue の作成を行います。上部メニューの Queues から、Azure Functions からメッセージを取得する Queue である
test
を作成し、取得したメッセージを別の Queue へ転送するtest2
を作成します。
- 作成した
test
queue に 4) で作成した Exchange と紐づけるため、test
のExchange を Bind します。
Azure Functions プロジェクトの作成
-
Azure Functions のプロジェクトを作成し、最初はトリガー設定は空とします。
-
RabbitMQ の Extension をインストールするために、作成したプロジェクトから右クリックをし、
NuGetパッケージの管理
を選択します。
- 検索画面から、プレリリースのチェックボックスを選択し、
Microsoft.Azure.WebJobs.Extensions.RabbitMQ
を検索後にインストールを行います。
- local.settings.json にて、Azure Functions が RabbitMQ へ接続するための設定を記載します。今回はローカル環境での設定となっておりますが、Azure プラットフォームや他の環境へ RabbitMQ がデプロイされている場合には、適宜設定値を変更してください。
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"RabbitMQConnectionName": "amqp://guest:guest@localhost:5672",
"HostNameAppSetting": "localhost",
"UserNameAppSetting": "guest",
"PasswordAppSetting": "guest"
}
}
- あらかじめ作成した関数コード(Function1.cs)に以下の処理を行うコードを記述します。
Timer Trigger を使用して 1 分毎に Queue へメッセージを送信する関数
QueueName = "test" となっており、test Queue へ現在時刻のメッセージを送信します。
[FunctionName("OutputFunction")]
public static void Run2([TimerTrigger("0 */1 * * * *")] TimerInfo myTimer,
[RabbitMQ(
ConnectionStringSetting = "RabbitMQConnectionName",
QueueName = "test")] out string outputMessage,
ILogger log)
{
String now_datetime = DateTime.Now.ToString();
log.LogInformation($"C# Timer trigger function executed at: {now_datetime}");
outputMessage = now_datetime;
}
Queue から取得したメッセージを別の Queue へ転送する関数
Azure Functions が test
Queue へ入ったことを検知して、メッセージを取り出して、test2
Queue へ受け流しをします。
[FunctionName("InputFunction")]
public static void Run(
[RabbitMQTrigger("test", ConnectionStringSetting = "RabbitMQConnectionName")] string inputMessage,
[RabbitMQ(
ConnectionStringSetting = "RabbitMQConnectionName",
QueueName = "test2")] out string outputMessage,
ILogger logger)
{
outputMessage = inputMessage;
logger.LogInformation($"RabittMQ output binding function sent message: {outputMessage}");
}
検証結果
Azure Functions からの実行画面
赤枠で囲まれている部分は、Timer Trigger によって発行された DateTime のメッセージを test
Queue へ転送しています。
緑枠で囲まれている部分は、RabbitMQ Trigger にて test
Queue へメッセージが格納されたので、取得を行い test2
Queue へ転送されました。
RabbitMQ からの実行画面
test Queue
Azure Functions の実行を一時止めたのですが、test Queue の画面を見るとメッセージは全てはけていることが分かります。
test2 Queue
test2 の画面では、Ready が 2 になっており、メッセージが 2 件格納されていることが分かります。
下画面にあります メッセージを取得する機能を利用すると、Payload に Datetime の内容が格納されています。
まとめ
いかがでしたでしょうか。GA されたばかりの Azure Functions RabbitMQ Extension をローカル環境にて実行する内容になりましたが、Azure Functions 側では、すでにあるキューイングサービスの Kafka Extension と同様にパッケージをインストールしコードを書くことですぐに構築することができました。