LoginSignup
7
6

More than 1 year has passed since last update.

.NET 6 と Daprを使った分散サービス開発 その10 PubSub

Last updated at Posted at 2022-03-17

DaprでPubSub(パブリッシュとサブスクライブ)

前回は、Daprに備わっているBinding (バインディング)を用いて、SMTPのOutput Bindingを設定し、メールを送信する方法について触れました。

今までの前提の上でバインディングの機能を追加しますので、このページから読まれている方は、前提として以下を読んできてください。


DaprとRabbitMQを通じてPub/Sub

様々なアプリケーションを構成するにあたって非同期メッセージングは、結合度を下げ、且つ並列処理とスケーラービリティを確保する上で、とても大事な技術です。

今まで行ってきた、RabbitMQでのBindingでトリガする事と何が違うのか?と思われるかもしれませんが、BindingはQueueだけで無く、主にDaprの外で稼働する様々なサービスとの相互運用性に重きを置いています。

よって、RabbitMQだけでなく、例えばスケジューラもそうですし、Twitterで指定されたハッシュタグでのTweetでトリガするBindingなどがあるのは、そういった背景からです。

今回のPub / Subによるメッセージング処理は、むしろサービス間の非同期メッセージングに重きを置いています。
今まで行ってきたService Invocation(サービス起動)が同期での呼び出しであれば、非同期はこちらのPub/Subを使ってくださいねという意図があります。

例えば、以下の図のようにECサイトをイメージしてもらうとわかりやすいですね。CartでOrderされたら、Shipping(配送手続きAPI)とEmail(メールでの注文確認API)に同時に送るみたいなケースです。

この場合、PublisherはCart で、サブスクライバーはShippingとEmailの二つになり、どちらにDapr経由で呼び出されます。

image.png

プロジェクトを作成、追加

今回は、新たに二つの.NET Web API プロジェクトを追加しました。PubServiceとSubServiceです。プロジェクトのルートフォルダから以下を実行してください。今回は意図的にPubServiceをコンソールプログラムとしてプロジェクトを作成しています。

dotnet new console -o PubService
dotnet new webapi -o SubService
dotnet new webapi -o SubService2
dotnet sln add PubService SubService SubService2

もちろん、忘れずにtye.yamlにもプロジェクトを追加しておき、Daprサイドカーを起動するようにしましょう。

tye.yaml
# tye application configuration file
# read all about it at https://github.com/dotnet/tye
#
# when you've given us a try, we'd love to know what you think:
#    https://aka.ms/AA7q20u
#
name: dapr
extensions:
- name: dapr

  # log-level configures the log level of the dapr sidecar
  log-level: debug

  # config allows you to pass additional configuration into the dapr sidecar
  # config will be interpreted as a named k8s resource when deployed, and will be interpreted as
  # a file on disk when running locally at `./components/myconfig.yaml`
  #
  # config: myconfig

  # components-path configures the components path of the dapr sidecar
  components-path: "./components/"

  # If not using the default Dapr placement service or otherwise using a placement service on a nonstandard port,
  # you can configure the Dapr sidecar to use an explicit port.
  # placement-port: 6050
services:
- name: service-a
  project: ServiceA/ServiceA.csproj
- name: service-b
  project: ServiceB/ServiceB.csproj
- name: app
  project: App/App.csproj
- name: stateservice
  project: StateService/StateService.csproj
- name: worker-service
  project: WorkerService/WorkerService.csproj
- name: pub-service
  project: PubService/PubService.csproj
- name: sub-service
  project: SubService/SubService.csproj
- name: sub-service2
  project: SubService2/SubService2.csproj

# This may conflict with the redis instance that dapr manages.
#
# Doing a `docker ps` can show if its already running. If that's the case
# then comment out out when running locally. 
# - name: redis
#   image: redis
#   bindings: 
#   - port: 6379

componentsにファイルを追加

こちらも、慣れてきたと思います。componentsフォルダに設定を追加します。
今回は、Daprを用いたPub/Subを行います。

今回はRedisかRabbitMQのどちらか好きな方を使って試してみましょう。
いずれの場合においても、同じpubsub.yamlとして定義を行います。今回はRabbitMQ環境があったので、RabbitMQで動作させています。

Redisを使ってPubSubを行う場合

Redisを使った場合のPub/Sub設定例です。

pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: daprpubsub
spec:
  type: pubsub.redis
  version: v1
  metadata:
  - name: redisHost
    value: localhost:6379
  - name: redisPassword
    value: ""
scopes:
  - pub-service
  - sub-service
  - sub-service2

RabbitMQを使ってPubSubを行う場合

もう一つはRabbitMQを用いた場合の設定例です。

pubsub.yaml
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
  name: daprpubsub
spec:
  type: pubsub.rabbitmq
  version: v1
  metadata:
  - name: host
    value: "amqp://localhost:5672"
  - name: durable
    value: "false"
  - name: deletedWhenUnused
    value: "false"
  - name: autoAck
    value: "false"
  - name: reconnectWait
    value: "0"
  - name: concurrency
    value: parallel
scopes:
  - sub-service
  - sub-service2

Subscriptionの定義

subscription.yamlとしてもう一つYAMLファイルを追加します。

subscription.yaml
apiVersion: dapr.io/v1alpha1
kind: Subscription
metadata:
  name: apppubsub
spec:
  topic: AppStatus
  route: /appstatus
  pubsubname: daprpubsub
scopes:
  - sub-service
  - sub-service2

この設定ファイルでは、Subscriptionの定義を行うことができます。
pubsubname: daprpubsubは、pubsub.yamlのmetadataのnameです。そして、route /appstatusでPOSTされるパス、scopesでsub-serviceとsub-service2がサブスクライブしています。

起動チェック

起動確認も兼ねて、いつものtye runで起動してみましょう。

tye run

起動したら、Tye Dashboardを見てみましょう。
http://127.0.0.1:8000/ にアクセスすると、いつも通り起動中のサービスとDaprサイドカーの状況が表示されると思います。

image.png

ここでは、sub-service-daprのログをみてみます。(サービスの方じゃなくて、Daprのサイドカー側です。)

ログを追いかけると、以下のような感じでdaprpubsubの起動ログが残っていると思います。ブラウザからページ内、検索でrabbitmqなどを探してみると把握しやすいと思います。

以下はログを抜き出しだ結果です。

[sub-service-dapr]: msg="found component. name: daprpubsub, type: pubsub.rabbitmq/v1" 
[sub-service-dapr]: msg="loading component. name: daprpubsub, type: pubsub.rabbitmq/v1" 
[sub-service-dapr]: msg="rabbitmq pub/sub: connectionCount: current=0 reference=0"
[sub-service-dapr]: msg="rabbitmq pub/sub: connected with connectionCount=1"
[sub-service-dapr]: msg="component loaded. name: daprpubsub, type: pubsub.rabbitmq/v1" 
[sub-service-dapr]: msg="rabbitmq pub/sub: subscribe to topic/queue 'AppStatus/sub-service-AppStatus'"
[sub-service-dapr]: msg="rabbitmq pub/sub: declaring exchange 'AppStatus' of kind 'fanout'"
[sub-service-dapr]: msg="rabbitmq pub/sub: declaring queue 'sub-service-AppStatus'"
[sub-service-dapr]: msg="rabbitmq pub/sub: binding queue 'sub-service-AppStatus' to exchange 'AppStatus'"

RabbitMQでも確認

ログを見た事には理由があります、DaprのPubSubは、Subscriptionで定義したように、定義CRDに基づいて、設定を自動的に行ってくれるのです。

RabbitMQで確認してみましょう。
http://localhost:15672/#/exchanges にアクセスします。

特に設定していないにも関わらずAppStatusという名称のExchangeがfanoutで構成されています。

image.png

Exchangeの詳細も確認すると、sub-service-AppStatusとsub-service2-AppStatusの二つのQueueが自動で作成され、AppStatus ExchangeにBindingされています。

(ここでの、BindingはRabbitMQのExchangeにQueueをバインドするバインディングです。DaprのBindingでは無いので、ご注意を)

今回は、RabbitMQで行っていますが、Redisでも同様で自動設定されると思われます。
このように、定義してPubSubを各サービス間で設定できるのは、非常に便利に思えます。これは、後にKubernatesにデプロイメントした際に便利な機能です。

SubServiceとSubService2プロジェクトの編集

双方とも同じ内容 になります。
まず、WeatherForecastController.csとWeatherForecastr.csは使用しませんので削除してください。

Dapr.AspNetCoreパッケージを追加

今までは、dotnet add package や Visual Studio からNuGetパッケージマネージャーで追加したりなどしてきました。今回は、パッケージコンソールから追加してみましょう。やり方はいずれの方法でもよく、つまりは以下のライブラリがプロジェクトに追加できればOKです。

image.png

Install-Package Dapr.AspNetCore
Install-Package Dapr.Client

プロジェクトが多いので、既定のプロジェクトをSubServiceとSubService2に切り替えるのを忘れないでください。

image.png

Program.csの修正

Program.csからも、今までと同じように//app.UseHttpsRedirection();としてコメントしてください。
Dapr自体はPOSTできればOKなのですが、SDKで結構よしなにしてくれる仕組みがありますので、それらを有効にします。

Program.cs
var builder = WebApplication.CreateBuilder(args);

// Add services to the container.
builder.Services.AddControllers().AddDapr(); // 拡張メソッド追加

// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();
app.UseCloudEvents(); // クラウドイベントフォーマット対応

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

//app.UseHttpsRedirection(); // Dapr内通信の為、HTTPのみに変更

app.UseAuthorization();

app.MapControllers();
app.MapSubscribeHandler(); // サブスクライブハンドラー有効化

app.Run();


コントローラー追加

次に、それぞれのプロジェクトにコントローラを追加しましょう。
コントローラー側では、拡張メソッドによってアノテーションでサブスクライブ設定が可能になっています。

プロジェクトSubService側

AppStatusController.cs
using Dapr;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;

namespace SubService.Controllers
{
    [Route("[controller]")]
    [ApiController]
    public class AppStatusController : ControllerBase
    {

        private readonly ILogger<AppStatusController> _logger;

        public AppStatusController(ILogger<AppStatusController> logger)
        {
            _logger = logger;
        }

        // サブスクライブ設定
        [Topic("apppubsub", "AppStatus")]
        [HttpPost(Name = "PostAppStatus")]
        public void PostAsync([FromBody] string message)
        {
            Console.WriteLine("Subscriber received : " + message);
        }

    }
}

プロジェクトSubService2側

AppStatusController.cs
using Dapr;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;

namespace SubService2.Controllers
{
    [Route("[controller]")]
    [ApiController]
    public class AppStatusController : ControllerBase
    {

        private readonly ILogger<AppStatusController> _logger;

        public AppStatusController(ILogger<AppStatusController> logger)
        {
            _logger = logger;
        }

        // サブスクライブ設定
        [Topic("apppubsub", "AppStatus")]
        [HttpPost(Name = "PostAppStatus")]
        public void PostAsync([FromBody] string message)
        {
            Console.WriteLine("Subscriber received : " + message);
        }

    }
}

以下のような状態になっていると思います。

image.png

PubServiceプロジェクト側の編集

今回は、良い機会なのでConsoleプロジェクトとしてホストして、同じようにDaprをサイドカーとしてローディングしています。今までと特に違いはありませんが、このようにコンソールプログラムを常駐させる事もできますし、任意のコンテナを常駐させる事もできます。

PubServiceにクライアントSDK追加

こちらでも、PubServiceプロジェクトにクライアントライブラリを追加しておきましょう。

$ dotnet add package Dapr.Client
$ dotnet add package Dapr.AspNetCore

Program.csの編集

2秒毎に、ランダムで生成した数値をPublishEventAsyncで送るシンプルなものです。

Program.cs
using Dapr.Client;

namespace PubService
{
    class Program
    {
        static async Task Main(string[] args)
        {
            string PUBSUB_NAME = "daprpubsub";
            string TOPIC_NAME = "AppStatus";

            while (true)
            {
                System.Threading.Thread.Sleep(2000);
                Random random = new Random();
                int stateId = random.Next(1, 1000);
                CancellationTokenSource source = new CancellationTokenSource();
                CancellationToken cancellationToken = source.Token;
                using var client = new DaprClientBuilder().Build();

                await client.PublishEventAsync(PUBSUB_NAME, TOPIC_NAME, stateId.ToString(), cancellationToken);
                Console.WriteLine("Published data: " + stateId);
            }
        }
    }
}

検証

今回はtye runで起動しますが、編集箇所も多かったですし、ちょっとオプションを追加します。 --watchオプションを追加する事で、tyeを起動した状態でプロジェクトを編集すると、自動でリビルドした上でプロセスをリロードしてくれます。コーディングやデバッグしながらの作業で、とても便利です。

tye run --watch
.....
起動のログ
.....
.....
[15:47:37 INF] watch: pub-service_e203bdc7-c Exited
[15:47:37 INF] watch: File changed: C:\Users\kahiro\DaprQiita\PubService\Program.cs
[15:47:37 INF] Build Watcher: Builds requested; waiting 250ms for more...
[15:47:37 INF] Build Watcher: Getting requests...
[15:47:37 INF] Build Watcher: Processing 1 requests...
[15:47:37 INF] Build Watcher: Building project C:\Users\kahiro\DaprQiita\PubService\PubService.csproj...
[15:47:37 INF] Build Watcher: Waiting for builds to complete...
[15:47:39 INF] Built C:\Users\kahiro\DaprQiita\PubService\PubService.csproj with exit code 0
[15:47:39 INF] Build Watcher: Done with requests; waiting for more...
[15:47:40 INF] pub-service_e203bdc7-c running on process id 20756 bound to http://localhost:59051, https://localhost:59052
[15:47:40 INF] Replica pub-service_e203bdc7-c is moving to a ready state
[15:47:40 INF] watch: pub-service_e203bdc7-c Started
[15:47:41 INF] Selected process 20756.
[15:47:41 INF] Listening for event pipe events for pub-service_e203bdc7-c on process id 20756

PubService側ログ

PubService側のログを見てみましょう。慣れてきたと思いますが、以下のViewから各プロジェクトのログを参照できます。

image.png

image.png

PubServiceのDapr側ログ

Dapr側サイドカーでも、"rabbitmq pub/sub: publishing message to AppStatus" app_id=pub-service という感じでログが残っています。

image.png

RabbiqMQ側管理コンソール

RabbiqMQ側の管理コンソールからExchangeの状態を確認します。以下のようにメッセージがExchangeに入り、バインドされたQueueへFanoutで渡されています。

image.png

Queueにそれぞれ入ったメッセージもConsumeされている事がわかります。

image.png

image.png

Zipkin側 Dapr分散トレースの様子

ZipkinでもDaprによるメッセージ送信先Topicなど確認できます。

image.png

SubServiceとSubService2側 Daprのログ

以下のように、DaprがAppStatus Topicから、メッセージをACKしている事がわかります。

image.png

SubServiceとSubService2のログ

SubService側では、Daprから渡された結果を表示していますが、双方とも同じデータを受け取っている事がわかります。

image.png

image.png

7
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
6