0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

DaprのWorkflow Componentを触ってみる

Posted at

TL; DR

  • Dapr v1.10でWorkflow Componentが追加された
  • Workflowを使うと、複数の処理工程を手軽にまとめられる

はじめに

先週2/17にDapr v1.10がリリースされました :tada:

そして、今回のリリースで、「Workflow Component」が追加されました。

このコンポーネントは名前の通り、状態管理を伴う複数の処理を束ねる「ワークフロー」1を扱うためのものです(例:注文処理)。

そこで本記事では、workflowを用いたサンプルアプリを作りながら機能を紹介したいと思います。

環境

  • Dapr v1.10.0

workflowはアルファ版機能のため、今後のバージョンで仕様変更される可能性があります

Workflow Componentとは?

Dapr Workflowによって、開発者がマイクロサービスを横断したメッセージング、状態管理、例外処理を必要とするロジックを統合するのが容易になります。Dapr Workflowは実行時間が長く、耐障害性のある、状態を持つアプリケーションの開発を可能にします。

Dapr Workflow makes orchestrating the logic required for messaging, state management, and failure handling across various microservices easier for developers. Dapr Workflow enables you to create long running, fault-tolerant, stateful applications.

少し抽象的な説明ですが、マイクロサービス内で複雑な工程のタスクを進めるために利用できます。
以下のようにいろいろなパターンへの応用が紹介されています。

構成要素

アクティビティ

アクティビティは、ワークフローの処理単位です。内部に実際に行いたい処理を実装します。

ワークフロー

ワークフローは複数のアクティビティを決められた順序で(または並列に)実行するためのものです。
アクティビティを管理する役割を持ち、自身は直接タスクの処理はしません。
また、イベントソーシングを使うことで、障害が起きても別のホストでワークフローを再開できるという特徴を持っています。

制約

イベントソーシング実現のため、ワークフローやアクティビティは冪等である必要があります。
(復旧時に障害直前の内部状態を再現するには、各工程で同じ入力に対し常に同じ結果が得られる必要があるため)

  • ワークフローは非決定論的なAPIを呼んではならない
  • ワークフローは直接外部の状態とやり取りしてはならない
  • ワークフローは同一スレッド内で実行されなければならない
    • 並行処理が入ると冪等でなくなり予想外の状況に陥る可能性があるため
    • 並行処理したい場合はアクティビティへ委譲する

サンプルアプリ

続いて、実際に動かしてみます。最低限の機能だけを使ったシンプルなworkflowを作ってみました2

v1.10時点では、.NET SDKだけがworkflowに対応しています。
C#は全く書けないので、ひどいコードですがお許しください

旅行の予約を行うワークフロー内で、飛行機の予約とホテルの予約のアクティビティを順に実行します。

worker/worker/Workflows/BookingTripWorkflow.cs
public class BookingTripWorkflow : Workflow<BookPayload, BookResult>
{
    public override async Task<BookResult> RunAsync(WorkflowContext context, BookPayload payload)
    {
        string id = context.InstanceId;
        Console.WriteLine("workflow: InstanceId={0}", context.InstanceId);

        // STEP1: 飛行機を予約
        BookFlightRequest bookFlightRequest = new BookFlightRequest(payload.City, payload.Day, payload.Person);
        BookFlightResult bookFlightResult = await context.CallActivityAsync<BookFlightResult>(
            nameof(BookFlightActivity), bookFlightRequest);
        if (!bookFlightResult.Success) {
            return new BookResult(/* processed: */ false, 0);
        }

        // STEP2: ホテルを予約
        BookHotelRequest bookHotelRequest = new BookHotelRequest(payload.City, payload.Day, payload.Person);
        BookHotelResult bookHotelResult = await context.CallActivityAsync<BookHotelResult>(
            nameof(BookHotelActivity), bookHotelRequest);
        if (!bookFlightResult.Success) {
            // 予約できなかった場合の補正トランザクション
            CancelFlightRequest cancelFlightRequest = new CancelFlightRequest(bookFlightResult);
            await context.CallActivityAsync(nameof(CancelFlightActivity), cancelFlightRequest);
            return new BookResult(/* processed: */ false, 0);
        }

        Console.WriteLine("workflow completed");
        return new BookResult(/* processed: */ true, bookFlightResult.Price + bookHotelResult.Price);
    }
}

ワークフローを使ってみる

workerを起動

まずはworkflowを実行するためのアプリケーションを起動します。なぜかk8s上ではうまく動かなかったので、Dapr CLIを使っています。

$ dapr run --app-id myworker --dapr-http-port 3501 dotnet run

workerとDaprサイドカーが起動しました。

workflowを開始

続いて、workflowを開始します。旅行予約ワークフロー開始をリクエストします。
パスには <コンポーネント名("dapr"固定?)>/<ワークフロー名>/<作成したいワークフローのインスタンスID> を、リクエストボディにはワークフローの入力値をJSONで指定します3

$ curl -XPOST http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/12345678/start -d '{"input": {"City": "Tokyo", "Day": "Sunday", "Person": 2}}' -H "Content-Type: application/json"                                                                                                                   {"instance_id":"12345678"}

workflowのインスタンスIDが返ってきました。workflowはここから非同期で実行されます。

今回はcurlで直接叩きましたが、実際のシステムでは前段にAPIサーバーを立てて、予約APIを叩かれたらワークフロー開始リクエストをする構成になると思います。

(公式のワークフローパターン「非同期HTTP API」参照)

workflowの状態を確認

先ほどのインスタンスIDを指定して、現在のworkflowの状態を取得します。
インスタンスIDは全ワークフロー名で一意にする必要がある(ワークフロー名が異なっていても衝突してしまう)ので注意が必要です。UUIDかULID推奨。

# 実行中
$ curl http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/12345678 | jq
{
  "WFInfo": {
    "instance_id": "12345678"
  },
  "start_time": "2023-02-22T00:17:50Z",
  "metadata": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"City\":\"Tokyo\",\"Day\":\"Sunday\",\"Person\":2}",
    "dapr.workflow.last_updated": "2023-02-22T00:17:50Z",
    "dapr.workflow.name": "BookingTripWorkflow",
    "dapr.workflow.runtime_status": "RUNNING"
  }
}
# ...
# 完了
$ curl http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/12345678 | jq
{
  "WFInfo": {
    "instance_id": "12345678"
  },
  "start_time": "2023-02-22T00:17:50Z",
  "metadata": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"City\":\"Tokyo\",\"Day\":\"Sunday\",\"Person\":2}",
    "dapr.workflow.last_updated": "2023-02-22T00:17:55Z",
    "dapr.workflow.name": "BookingTripWorkflow",
    "dapr.workflow.output": "{\"Processed\":true,\"Price\":30000}",
    "dapr.workflow.runtime_status": "COMPLETED"
  }
}

コーナーケース

失敗

続いて、アクティビティを失敗させてみます。

# 金曜日はホテルが満室で予約できなかった
$ curl -XPOST http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/10000/start -d '{"input": {"City": "Seoul", "Day": "Friday", "Person": 2}}' -H "Content-Type: application/json"
{"instance_id":"10000"}
worker-log
== APP == workflow: InstanceId=10000
== APP == BookflightActivity started: city=Seoul day=Friday person=2
== APP == BookflightActivity completed: price=100000
== APP == workflow: InstanceId=10000
== APP == BookHotelActivity started: city=Seoul day=Friday person=2
== APP == BookHotelActivity failed: System.Exception: No Vacancy!
== APP ==    at worker.Workflows.Activities.BookHotelActivity.Price(String day) in /home/syuparn/dapr-workflow-sample/worker/worker/Workflows/BookHotelActivity.cs:line 30
== APP ==    at worker.Workflows.Activities.BookHotelActivity.RunAsync(WorkflowActivityContext context, BookHotelRequest req) in /home/syuparn/dapr-workflow-sample/worker/worker/Workflows/BookHotelActivity.cs:line 15
== APP == workflow: InstanceId=10000
== APP == CancelflightActivity started
== APP == CancelflightActivity completed
== APP == workflow: InstanceId=10000

途中でアクティビティが失敗した場合でもワークフローは完了状態になります。

$ curl http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/10000 | jq
{
  "WFInfo": {
    "instance_id": "10000"
  },
  "start_time": "2023-02-22T03:45:04Z",
  "metadata": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"City\":\"Seoul\",\"Day\":\"Friday\",\"Person\":2}",
    "dapr.workflow.last_updated": "2023-02-22T03:45:15Z",
    "dapr.workflow.name": "BookingTripWorkflow",
    "dapr.workflow.output": "{\"Processed\":false,\"Price\":0}",
    "dapr.workflow.runtime_status": "COMPLETED"
  }
}

分散システムであるためロールバック機能は今のところ見当たりませんでした。失敗した際にこれまでのアクティビティをキャンセルしたい場合は、補正トランザクションを使うのがよさそうです。

        // STEP2: ホテルを予約
        BookHotelRequest bookHotelRequest = new BookHotelRequest(payload.City, payload.Day, payload.Person);
        BookHotelResult bookHotelResult = await context.CallActivityAsync<BookHotelResult>(
            nameof(BookHotelActivity), bookHotelRequest);
        if (!bookFlightResult.Success) {
            // ロールバックはできないため、STEP1の飛行機の予約をキャンセルするアクティビティを実行
            CancelFlightRequest cancelFlightRequest = new CancelFlightRequest(bookFlightResult);
            await context.CallActivityAsync(nameof(CancelFlightActivity), cancelFlightRequest);
            return new BookResult(/* processed: */ false, 0);
        }

中断してみる

続いて、terminate APIを使ってworkflowを強制的に中断します。

$ curl -XPOST http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/20000/start -d '{"input": {"City": "Tokyo", "Day": "Sunday", "Person": 2}}' -H "Content-Type: application/json"
{"instance_id":"20000"}

# 強制停止(パス注意!ワークフロー名は指定しない)
$ curl -XPOST http://localhost:3501/v1.0-alpha1/workflows/dapr/20000/terminate

実行中のアクティビティが中断され、ワークフローの状態が TERMINATED になります。

$ curl http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/20000 | jq
{
  "WFInfo": {
    "instance_id": "20000"
  },
  "start_time": "2023-02-22T03:50:34Z",
  "metadata": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"City\":\"Tokyo\",\"Day\":\"Sunday\",\"Person\":2}",
    "dapr.workflow.last_updated": "2023-02-22T03:50:36Z",
    "dapr.workflow.name": "BookingTripWorkflow",
    "dapr.workflow.output": "",
    "dapr.workflow.runtime_status": "TERMINATED"
  }
}

ワークフローの途中でworkerを殺し、再度workerを起動

アクティビティの途中でworkerを殺します。直前の状態はRUNNINGです。

$ curl -XPOST http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/30000/start -d '{"input": {"City": "Tokyo", "Day": "Sunday
", "Person": 2}}' -H "Content-Type: application/json"
{"instance_id":"30000"}
== APP == BookflightActivity completed: price=0
== APP == workflow: InstanceId=30000
== APP == BookHotelActivity started: city=Tokyo day=Sunday person=2
^Cℹ️  
terminated signal received: shutting down
INFO[0026] Dapr shutting down                            app_id=myworker instance=DESKTOP-G3C02SC scope=dapr.runtime type=log ver=1.10.0
INFO[0026] Stopping PubSub subscribers and input bindings  app_id=myworker instance=DESKTOP-G3C02SC scope=dapr.runtime type=log ver=1.10.0
INFO[0026] Shutting down workflow engine                 app_id=myworker instance=DESKTOP-G3C02SC scope=dapr.runtime type=log ver=1.10.0
✅  Exited Dapr successfully
✅  Exited App successfully
$ curl -s http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/30000 | jq
{
  "WFInfo": {
    "instance_id": "30000"
  },
  "start_time": "2023-02-22T12:09:47Z",
  "metadata": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"City\":\"Tokyo\",\"Day\":\"Sunday\",\"Person\":2}",
    "dapr.workflow.last_updated": "2023-02-22T12:09:48Z",
    "dapr.workflow.name": "BookingTripWorkflow",
    "dapr.workflow.runtime_status": "RUNNING"
  }
}

workerを再び起動します。イベントソーシングを使っているため、クラッシュ直前までの状態変化(=アクティビティ呼び出しによる更新差分)を順番に適用することでワークフローを復元し、処理を再開することができます

== APP == BookHotelActivity started: city=Tokyo day=Sunday person=2
== APP == BookHotelActivity completed: price=30000
== APP == workflow: InstanceId=30000
== APP == workflow completed
$ curl -s http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/30000 | jq
{
  "WFInfo": {
    "instance_id": "30000"
  },
  "start_time": "2023-02-22T12:09:47Z",
  "metadata": {
    "dapr.workflow.custom_status": "",
    "dapr.workflow.input": "{\"City\":\"Tokyo\",\"Day\":\"Sunday\",\"Person\":2}",
    "dapr.workflow.last_updated": "2023-02-22T12:11:17Z",
    "dapr.workflow.name": "BookingTripWorkflow",
    "dapr.workflow.output": "{\"Processed\":true,\"Price\":30000}",
    "dapr.workflow.runtime_status": "COMPLETED"
  }
}

障害に強いので、長い処理でも途中で止まってしまう心配はいりません。これは本当にありがたいです。

完了したworkflowを再度start

インスタンスIDが返ってくるだけでした。エラーにならないのは冪等だからだと思います。

$ curl -XPOST http://localhost:3501/v1.0-alpha1/workflows/dapr/BookingTripWorkflow/12345678/start -d '{"input": {"City": "Tokyo", "Day": "Sunday", "Person": 2}}' -H "Content-Type: application/json"
{"instance_id":"12345678"}

内部実装について

Workflowの実現方法はまさに「componentの総力戦」です。内部的には以下のcomponentを使用しています。

  • state
  • actor
  • binding

ワークフローの進め方

新しいworkflowが開始されると、内部的に2つのactorが生成されます。

  • dapr.internal.wfengine.workflow: 各workflowインスタンスの進捗を管理
    • アプリケーション内のworkflowを起動し、結果が返ってきたらstateに保存
  • dapr.internal.wfengine.activity: 各activityの進捗を管理
    • workflowがactivityをスケジューリングした際に起動
    • アプリケーション内のactivityを起動

workflow本体とactivityが別々のactorになっているおかげで、workflow内でactivityを非同期に呼び出すことができるという仕組みのようです。

stateの状態更新

上でも少し触れましたが、workflowの状態はactorが持っているstateで管理されます4

イベントソーシングなので、workflowの状態は上書きされず「状態変化の履歴」が都度追記されます。

# Dapr CLI のstate componentは `dapr_redis` コンテナを使用
$ docker exec -it dapr_redis redis-cli

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000000"
1) "data"
2) "\b\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x12\x0b\b\x9e\xe3\xda\x9f\x06\x10\x95\xac\xda)r\x00"
3) "version"
4) "5"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000001"
1) "version"
2) "5"
3) "data"
4) "\b\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x12\x0b\b\x9e\xe3\xda\x9f\x06\x10\xff\xc4\xdd&\x1a\xc2\x01\n\x13BookingTripWorkflow\x1a/\n-{\"City\":\"Hawaii\",\"Day\":\"Saturday\",\"Person\":2}\"0\n\x06123456\x12&\n$44b681b5-5e54-4f6a-9e59-870b910638b4:4\n dd30cfa1bf1186f3bccc354552a0c675\x12\x101ba69c34d47bf990B\x12\n\x10e7b1cbac96023ac0"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000002"
1) "version"
2) "5"
3) "data"
4) "\x12\x0b\b\x9e\xe3\xda\x9f\x06\x10\xab\xec\xb1T2\x7f\n\x12BookFlightActivity\x12\x00\x1a1\n/[{\"City\":\"Hawaii\",\"Day\":\"Saturday\",\"Person\":2}]\"4\n dd30cfa1bf1186f3bccc354552a0c675\x12\x10e7b1cbac96023ac0"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000003"
1) "data"
2) "\b\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x12\x0c\b\x9e\xe3\xda\x9f\x06\x10\xf1\x9f\xe4\xcc\x02r\x00"
3) "version"
4) "3"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000004"
1) "data"
2) "\b\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x12\x0c\b\x9e\xe3\xda\x9f\x06\x10\x90\xd2\xcf\xcb\x02:#\x12!\n\x1f{\"Success\":true,\"Price\":200000}"
3) "version"
4) "3"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000005"
1) "version"
2) "3"
3) "data"
4) "\b\x01\x12\x0c\b\x9e\xe3\xda\x9f\x06\x10\x82\x9a\xda\xd1\x022~\n\x11BookHotelActivity\x12\x00\x1a1\n/[{\"City\":\"Hawaii\",\"Day\":\"Saturday\",\"Person\":2}]\"4\n dd30cfa1bf1186f3bccc354552a0c675\x12\x10e7b1cbac96023ac0"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000006"
1) "data"
2) "\b\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x12\x0c\b\xa3\xe3\xda\x9f\x06\x10\xc7\x8c\xef\xd5\x02r\x00"
3) "version"
4) "1"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000007"
1) "data"
2) "\b\xff\xff\xff\xff\xff\xff\xff\xff\xff\x01\x12\x0c\b\xa3\xe3\xda\x9f\x06\x10\xcd\xa4\xf7\xd4\x02:$\b\x01\x12 \n\x1e{\"Success\":true,\"Price\":60000}"
3) "version"
4) "1"

127.0.0.1:6379> hgetall "myworker||dapr.internal.wfengine.workflow||123456||history-000008"
1) "data"
2) "\b\x02\x12\x0c\b\xa3\xe3\xda\x9f\x06\x10\xf1\xae\xa4\xd8\x02\"'\b\x01\x12#\n!{\"Processed\":true,\"Price\":260000}"
3) "version"
4) "1"

障害が起きた際は保存されているこれらのイベントを再度読み込んで、中断した箇所まで復元してから再開できるという仕掛けです。

おわりに

以上、workflow componentの紹介でした。
今回触れられませんでしたが、workflowはリマインダーによる定期実行や外部入力との連携機能もあるようです。また、重い処理を複数のアクティビティで並列処理して高速化するパターンも紹介されていました(ドキュメント)。
まだまだ奥が深いので、今後も注目です。

  1. 以下、概念としてのワークフローは「ワークフロー」、workflow componentは「workflow」と表記します。

  2. さらに多くの機能を触ってみたい場合は、公式のサンプルがおすすめです。

  3. 特に変換処理をいれずとも、workflowの RunAsync メソッドの引数のクラスにデシリアライズされて渡されます。

  4. actorは、state componentのうち actorStateStoretrue にしたものを自動で利用します。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?