DaprでPubSub(パブリッシュとサブスクライブ)
前回は、Daprに備わっているBinding (バインディング)を用いて、SMTPのOutput Bindingを設定し、メールを送信する方法について触れました。
今までの前提の上でバインディングの機能を追加しますので、このページから読まれている方は、前提として以下を読んできてください。
DaprとRabbitMQを通じてPub/Sub
様々なアプリケーションを構成するにあたって非同期メッセージングは、結合度を下げ、且つ並列処理とスケーラービリティを確保する上で、とても大事な技術です。
今まで行ってきた、RabbitMQでのBindingでトリガする事と何が違うのか?と思われるかもしれませんが、BindingはQueueだけで無く、主にDaprの外で稼働する様々なサービスとの相互運用性に重きを置いています。
よって、RabbitMQだけでなく、例えばスケジューラもそうですし、Twitterで指定されたハッシュタグでのTweetでトリガするBindingなどがあるのは、そういった背景からです。
今回のPub / Subによるメッセージング処理は、むしろサービス間の非同期メッセージングに重きを置いています。
今まで行ってきたService Invocation(サービス起動)が同期での呼び出しであれば、非同期はこちらのPub/Subを使ってくださいねという意図があります。
例えば、以下の図のようにECサイトをイメージしてもらうとわかりやすいですね。CartでOrderされたら、Shipping(配送手続きAPI)とEmail(メールでの注文確認API)に同時に送るみたいなケースです。
この場合、PublisherはCart で、サブスクライバーはShippingとEmailの二つになり、どちらにDapr経由で呼び出されます。
プロジェクトを作成、追加
今回は、新たに二つの.NET Web API プロジェクトを追加しました。PubServiceとSubServiceです。プロジェクトのルートフォルダから以下を実行してください。今回は意図的にPubServiceをコンソールプログラムとしてプロジェクトを作成しています。
dotnet new console -o PubService
dotnet new webapi -o SubService
dotnet new webapi -o SubService2
dotnet sln add PubService SubService SubService2
もちろん、忘れずにtye.yamlにもプロジェクトを追加しておき、Daprサイドカーを起動するようにしましょう。
# tye application configuration file
# read all about it at https://github.com/dotnet/tye
#
# when you've given us a try, we'd love to know what you think:
# https://aka.ms/AA7q20u
#
name: dapr
extensions:
- name: dapr
# log-level configures the log level of the dapr sidecar
log-level: debug
# config allows you to pass additional configuration into the dapr sidecar
# config will be interpreted as a named k8s resource when deployed, and will be interpreted as
# a file on disk when running locally at `./components/myconfig.yaml`
#
# config: myconfig
# components-path configures the components path of the dapr sidecar
components-path: "./components/"
# If not using the default Dapr placement service or otherwise using a placement service on a nonstandard port,
# you can configure the Dapr sidecar to use an explicit port.
# placement-port: 6050
services:
- name: service-a
project: ServiceA/ServiceA.csproj
- name: service-b
project: ServiceB/ServiceB.csproj
- name: app
project: App/App.csproj
- name: stateservice
project: StateService/StateService.csproj
- name: worker-service
project: WorkerService/WorkerService.csproj
- name: pub-service
project: PubService/PubService.csproj
- name: sub-service
project: SubService/SubService.csproj
- name: sub-service2
project: SubService2/SubService2.csproj
# This may conflict with the redis instance that dapr manages.
#
# Doing a `docker ps` can show if its already running. If that's the case
# then comment out out when running locally.
# - name: redis
# image: redis
# bindings:
# - port: 6379
componentsにファイルを追加
こちらも、慣れてきたと思います。componentsフォルダに設定を追加します。
今回は、Daprを用いたPub/Subを行います。
今回はRedisかRabbitMQのどちらか好きな方を使って試してみましょう。
いずれの場合においても、同じpubsub.yamlとして定義を行います。今回はRabbitMQ環境があったので、RabbitMQで動作させています。
Redisを使ってPubSubを行う場合
Redisを使った場合のPub/Sub設定例です。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: daprpubsub
spec:
type: pubsub.redis
version: v1
metadata:
- name: redisHost
value: localhost:6379
- name: redisPassword
value: ""
scopes:
- pub-service
- sub-service
- sub-service2
RabbitMQを使ってPubSubを行う場合
もう一つはRabbitMQを用いた場合の設定例です。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: daprpubsub
spec:
type: pubsub.rabbitmq
version: v1
metadata:
- name: host
value: "amqp://localhost:5672"
- name: durable
value: "false"
- name: deletedWhenUnused
value: "false"
- name: autoAck
value: "false"
- name: reconnectWait
value: "0"
- name: concurrency
value: parallel
scopes:
- sub-service
- sub-service2
Subscriptionの定義
subscription.yamlとしてもう一つYAMLファイルを追加します。
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
name: apppubsub
spec:
topic: AppStatus
route: /appstatus
pubsubname: daprpubsub
scopes:
- sub-service
- sub-service2
この設定ファイルでは、Subscriptionの定義を行うことができます。
pubsubname: daprpubsubは、pubsub.yamlのmetadataのnameです。そして、route /appstatusでPOSTされるパス、scopesでsub-serviceとsub-service2がサブスクライブしています。
起動チェック
起動確認も兼ねて、いつものtye runで起動してみましょう。
tye run
起動したら、Tye Dashboardを見てみましょう。
http://127.0.0.1:8000/ にアクセスすると、いつも通り起動中のサービスとDaprサイドカーの状況が表示されると思います。
ここでは、sub-service-daprのログをみてみます。(サービスの方じゃなくて、Daprのサイドカー側です。)
ログを追いかけると、以下のような感じでdaprpubsubの起動ログが残っていると思います。ブラウザからページ内、検索でrabbitmqなどを探してみると把握しやすいと思います。
以下はログを抜き出しだ結果です。
[sub-service-dapr]: msg="found component. name: daprpubsub, type: pubsub.rabbitmq/v1"
[sub-service-dapr]: msg="loading component. name: daprpubsub, type: pubsub.rabbitmq/v1"
[sub-service-dapr]: msg="rabbitmq pub/sub: connectionCount: current=0 reference=0"
[sub-service-dapr]: msg="rabbitmq pub/sub: connected with connectionCount=1"
[sub-service-dapr]: msg="component loaded. name: daprpubsub, type: pubsub.rabbitmq/v1"
[sub-service-dapr]: msg="rabbitmq pub/sub: subscribe to topic/queue 'AppStatus/sub-service-AppStatus'"
[sub-service-dapr]: msg="rabbitmq pub/sub: declaring exchange 'AppStatus' of kind 'fanout'"
[sub-service-dapr]: msg="rabbitmq pub/sub: declaring queue 'sub-service-AppStatus'"
[sub-service-dapr]: msg="rabbitmq pub/sub: binding queue 'sub-service-AppStatus' to exchange 'AppStatus'"
RabbitMQでも確認
ログを見た事には理由があります、DaprのPubSubは、Subscriptionで定義したように、定義CRDに基づいて、設定を自動的に行ってくれるのです。
RabbitMQで確認してみましょう。
http://localhost:15672/#/exchanges にアクセスします。
特に設定していないにも関わらずAppStatusという名称のExchangeがfanoutで構成されています。
Exchangeの詳細も確認すると、sub-service-AppStatusとsub-service2-AppStatusの二つのQueueが自動で作成され、AppStatus ExchangeにBindingされています。
(ここでの、BindingはRabbitMQのExchangeにQueueをバインドするバインディングです。DaprのBindingでは無いので、ご注意を)
今回は、RabbitMQで行っていますが、Redisでも同様で自動設定されると思われます。
このように、定義してPubSubを各サービス間で設定できるのは、非常に便利に思えます。これは、後にKubernatesにデプロイメントした際に便利な機能です。
SubServiceとSubService2プロジェクトの編集
双方とも同じ内容 になります。
まず、WeatherForecastController.csとWeatherForecastr.csは使用しませんので削除してください。
Dapr.AspNetCoreパッケージを追加
今までは、dotnet add package や Visual Studio からNuGetパッケージマネージャーで追加したりなどしてきました。今回は、パッケージコンソールから追加してみましょう。やり方はいずれの方法でもよく、つまりは以下のライブラリがプロジェクトに追加できればOKです。
Install-Package Dapr.AspNetCore
Install-Package Dapr.Client
プロジェクトが多いので、既定のプロジェクトをSubServiceとSubService2に切り替えるのを忘れないでください。
Program.csの修正
Program.csからも、今までと同じように//app.UseHttpsRedirection();としてコメントしてください。
Dapr自体はPOSTできればOKなのですが、SDKで結構よしなにしてくれる仕組みがありますので、それらを有効にします。
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers().AddDapr(); // 拡張メソッド追加
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
var app = builder.Build();
app.UseCloudEvents(); // クラウドイベントフォーマット対応
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
//app.UseHttpsRedirection(); // Dapr内通信の為、HTTPのみに変更
app.UseAuthorization();
app.MapControllers();
app.MapSubscribeHandler(); // サブスクライブハンドラー有効化
app.Run();
コントローラー追加
次に、それぞれのプロジェクトにコントローラを追加しましょう。
コントローラー側では、拡張メソッドによってアノテーションでサブスクライブ設定が可能になっています。
プロジェクトSubService側
using Dapr;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
namespace SubService.Controllers
{
[Route("[controller]")]
[ApiController]
public class AppStatusController : ControllerBase
{
private readonly ILogger<AppStatusController> _logger;
public AppStatusController(ILogger<AppStatusController> logger)
{
_logger = logger;
}
// サブスクライブ設定
[Topic("apppubsub", "AppStatus")]
[HttpPost(Name = "PostAppStatus")]
public void PostAsync([FromBody] string message)
{
Console.WriteLine("Subscriber received : " + message);
}
}
}
プロジェクトSubService2側
using Dapr;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
namespace SubService2.Controllers
{
[Route("[controller]")]
[ApiController]
public class AppStatusController : ControllerBase
{
private readonly ILogger<AppStatusController> _logger;
public AppStatusController(ILogger<AppStatusController> logger)
{
_logger = logger;
}
// サブスクライブ設定
[Topic("apppubsub", "AppStatus")]
[HttpPost(Name = "PostAppStatus")]
public void PostAsync([FromBody] string message)
{
Console.WriteLine("Subscriber received : " + message);
}
}
}
以下のような状態になっていると思います。
PubServiceプロジェクト側の編集
今回は、良い機会なのでConsoleプロジェクトとしてホストして、同じようにDaprをサイドカーとしてローディングしています。今までと特に違いはありませんが、このようにコンソールプログラムを常駐させる事もできますし、任意のコンテナを常駐させる事もできます。
PubServiceにクライアントSDK追加
こちらでも、PubServiceプロジェクトにクライアントライブラリを追加しておきましょう。
$ dotnet add package Dapr.Client
$ dotnet add package Dapr.AspNetCore
Program.csの編集
2秒毎に、ランダムで生成した数値をPublishEventAsyncで送るシンプルなものです。
using Dapr.Client;
namespace PubService
{
class Program
{
static async Task Main(string[] args)
{
string PUBSUB_NAME = "daprpubsub";
string TOPIC_NAME = "AppStatus";
while (true)
{
System.Threading.Thread.Sleep(2000);
Random random = new Random();
int stateId = random.Next(1, 1000);
CancellationTokenSource source = new CancellationTokenSource();
CancellationToken cancellationToken = source.Token;
using var client = new DaprClientBuilder().Build();
await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, stateId.ToString(), cancellationToken);
Console.WriteLine("Published data: " + stateId);
}
}
}
}
検証
今回はtye runで起動しますが、編集箇所も多かったですし、ちょっとオプションを追加します。 --watchオプションを追加する事で、tyeを起動した状態でプロジェクトを編集すると、自動でリビルドした上でプロセスをリロードしてくれます。コーディングやデバッグしながらの作業で、とても便利です。
tye run --watch
.....
起動のログ
.....
.....
[15:47:37 INF] watch: pub-service_e203bdc7-c Exited
[15:47:37 INF] watch: File changed: C:\Users\kahiro\DaprQiita\PubService\Program.cs
[15:47:37 INF] Build Watcher: Builds requested; waiting 250ms for more...
[15:47:37 INF] Build Watcher: Getting requests...
[15:47:37 INF] Build Watcher: Processing 1 requests...
[15:47:37 INF] Build Watcher: Building project C:\Users\kahiro\DaprQiita\PubService\PubService.csproj...
[15:47:37 INF] Build Watcher: Waiting for builds to complete...
[15:47:39 INF] Built C:\Users\kahiro\DaprQiita\PubService\PubService.csproj with exit code 0
[15:47:39 INF] Build Watcher: Done with requests; waiting for more...
[15:47:40 INF] pub-service_e203bdc7-c running on process id 20756 bound to http://localhost:59051, https://localhost:59052
[15:47:40 INF] Replica pub-service_e203bdc7-c is moving to a ready state
[15:47:40 INF] watch: pub-service_e203bdc7-c Started
[15:47:41 INF] Selected process 20756.
[15:47:41 INF] Listening for event pipe events for pub-service_e203bdc7-c on process id 20756
PubService側ログ
PubService側のログを見てみましょう。慣れてきたと思いますが、以下のViewから各プロジェクトのログを参照できます。
PubServiceのDapr側ログ
Dapr側サイドカーでも、"rabbitmq pub/sub: publishing message to AppStatus" app_id=pub-service という感じでログが残っています。
RabbiqMQ側管理コンソール
RabbiqMQ側の管理コンソールからExchangeの状態を確認します。以下のようにメッセージがExchangeに入り、バインドされたQueueへFanoutで渡されています。
Queueにそれぞれ入ったメッセージもConsumeされている事がわかります。
Zipkin側 Dapr分散トレースの様子
ZipkinでもDaprによるメッセージ送信先Topicなど確認できます。
SubServiceとSubService2側 Daprのログ
以下のように、DaprがAppStatus Topicから、メッセージをACKしている事がわかります。
SubServiceとSubService2のログ
SubService側では、Daprから渡された結果を表示していますが、双方とも同じデータを受け取っている事がわかります。