Input Binding (入力バインディング)を使って随時処理を行う
前回は、Daprに備わっているInput Binding (入力バインディング)の中で、RabbitMQからキューをエントリしたらトリガされる方法に触れてきました。
今までの前提の上で入力バインディングの機能を追加しますので、このページから読まれている方は、前提として以下を読んできてください。
バインディングをトリガする
前回は、RabbitMQの管理コンソールからメッセージを投稿したら、Daprによってイベントがトリガされる様子まで進めてきました。
今までは、スケジューラーから自動でトリガされる、RabbitMQのQueueにenqueueしたらトリガされるなどを見てきましたが、今回はトリガする方に着目して進めていきます。
また、Service Invocation(サービス呼び出し)と異なるのは、非同期処理であるという事です。例えば、PDFの生成、動画のエンコード、画像処理など時間のかかるであろう処理は、その場で応答する前にタイムアウトする事が考えられますので、こちらの実装が適しています。
以下も併せて見てもらうとわかりますが、これはOutput Bindingのコールと同じです。
RabbitMQがInput / Output双方のバインディングに対応しているので、今回はこれを使ってバインディングのトリガを行ってみる事にします。
Appプロジェクトの変更
今まで作成してきたプロジェクトの中でフロントのAPIを担っているAppプロジェクトに新しくコントローラーを追加し、ここでトリガします。
Dapr SDKのDapr clientを通じて、payloadになるデータを作成し、InvokeBindingAsync("バインドの名前", "create", payloadData);という感じで送っています。
using Dapr.Client;
using Microsoft.AspNetCore.Mvc;
namespace App.Controllers;
[ApiController]
[Route("[controller]")]
public class InvokeTriggerController : ControllerBase
{
private readonly ILogger<InvokeTriggerController> _logger;
private readonly DaprClient _daprClient;
public InvokeTriggerController(ILogger<InvokeTriggerController> logger, DaprClient daprClient)
{
_logger = logger;
_daprClient = daprClient;
}
[HttpGet(Name = "GetInvokeTrigger")]
public async void GetAsync()
{
var payloadData = new
{
message = "from App to Worker via dapr component with RabbitMQ",
};
await _daprClient.InvokeBindingAsync("Rmq", "create", payloadData);
Console.WriteLine("Binding sent.");
}
}
componentの修正
各コンポーネントに対して、scopes:を設定してバインディングを扱えるアプリを制限していたのを覚えていますでしょうか?前回はworker-serviceのみのアクセス許可になっていましたから、appからもバインディングを扱えるように設定追加をします。
apiVersion: dapr.io/v1alpha1
kind: Component
metadata:
name: Rmq #バインディングの名前はRmq
spec:
type: bindings.rabbitmq
version: v1
metadata:
- name: queueName
value: daprexchange
- name: host
value: amqp://guest:guest@localhost:5672
- name: durable
value: true
- name: deleteWhenUnused
value: false
- name: ttlInSeconds
value: 60
- name: prefetchCount
value: 0
- name: exclusive
value: false
- name: maxPriority
value: 5
- name: contentType
value: "application/json"
scopes:
- worker-service
- app # 新たにappからもアクセスを可能に
WorkerService側は、前回の状態のままにしておきます。
起動と確認
編集を終えたら、tye runで起動しましょう。
$ tye run
Loading Application Details...
Launching Tye Host...
[14:44:48 INF] Executing application from tye.yaml
[14:44:48 INF] Dashboard running on http://127.0.0.1:8000
[14:44:48 INF] Build Watcher: Watching for builds...
...
今までと同じ要領で、Tye ダッシュボード http://127.0.0.1:8000 をブラウザで開きましょう。tye ダッシュボードから、それぞれAppのログ、Worker-serviceのログも開いておきましょう。
また、RabbitMQ側もQueueが届いているのがわかるように管理コンソールを開いておきます。
http://localhost:15672/#/queues/%2F/daprexchange
今回の確認ポイントは、App側でInvokeTriggerをGETしたら、RabbitMQにenqueueして、その後WorkerService側が受け取っている事となります。
appは https://localhost:62629/ で起動していますので、https://localhost:62629/swagger からAPIをテストします。
問題なく、200が返却されました。この時点でバインディングを呼び出しているはずです。
RabbitMQにもenqueueされています。
App側のログにも、Binding sentが記録されています。
Worker側のログにも、届いたメッセージが表示されました。