個人的に、Dapr を使った実際のシステム開発では Actor の活用が重要そうな気がしています。
Actor の実装方法
本来は、プログラミング言語毎に用意された Dapr SDK を使って Actor を実装する事になりそうですが、まずは仕組みを理解するために SDK 無しで実装してみました。
Actor の実装に必要な HTTP エンドポイントは actors_api ドキュメント の "Dapr calling to user service code" 欄に記載されており、とりあえずはこのようになっていました。
HTTP エンドポイント | 概要 | |
---|---|---|
(a) | /dapr/config |
actorType と Actor の設定情報を取得 |
(b) | /actors/<actorType>/<actorId>/method/<methodName> |
Actor インスタンスのメソッド呼び出し |
(c) | /actors/<actorType>/<actorId> |
Actor インスタンスの非活性化 |
(d) | /healthz |
ヘルスチェック |
メソッド呼び出しには method/timer/<timerName>
と method/remind/<reminderName>
というバリエーションもあるようですが、ここでは無視しておきます。
次に、Actor のライフサイクルとしては、このような処理内容を想定しているようです。
- Dapr Runtime の起動時に (a) へのアクセスが発生するので、そのアプリケーションが扱う Actor の型名(actorType)と各種設定を返す
- Actor のメソッド呼び出し時に actorType が (a) のものと合致している場合に (b) が呼び出される
- 指定された actorId の Actor インスタンスが存在しなければインスタンス化して処理を実施
- 作成したインスタンスは非活性化されるまでキャッシュ(確保)しておく
- Actor インスタンスへ
actorIdleTimeout
の時間が経過してもアクセスされなかった場合に (c) が呼び出される- 該当する actorId の Actor インスタンスを解放
(d) は定期的(デフォルトは 5秒毎)に呼び出されます。
このように、Dapr では Actor の扱いを実装側に委ねている点 2 が特徴的で、主に SDK 側で処理する事を想定しているのだと思います。
また、アクターモデルの機能としては当たり前の事だと思いますが、Actor インスタンス毎(つまり actorId 単位)のメソッド呼び出しは、同時に複数の呼び出しが発生しないよう Dapr Runtime で排他制御されているようです。3
各エンドポイントへのアクセスをログ出力するだけの Actor(というよりはそのベース処理)を実装すると、次のようになります。
sample/sample.go
package main
import (
"encoding/json"
"log"
"net/http"
"os"
"strings"
)
type daprConfig struct {
Entities []string
ActorIdleTimeout string
}
func main() {
port := os.Getenv("APP_HTTP_PORT")
if port == "" {
port = "3000"
}
// (a)
http.HandleFunc("/dapr/config", func(w http.ResponseWriter, r *http.Request) {
log.Print("called dapr config")
config := daprConfig{[]string{"sampleActor"}, "30s"}
json.NewEncoder(w).Encode(config)
})
// (b), (c)
http.HandleFunc("/actors/", func(w http.ResponseWriter, r *http.Request) {
path := strings.TrimPrefix(r.URL.Path, "/actors/")
log.Printf("called actors: %s", path)
w.WriteHeader(http.StatusOK)
})
// (d)
http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
log.Print("called healthz")
w.WriteHeader(http.StatusOK)
})
log.Fatal(http.ListenAndServe(":" + port, nil))
}
動作確認
前回、構築した Dapr 環境で実行して挙動を確認してみます。
ビルド例
[macOS] % cd sample
[macOS] % GOOS=linux go build -o sample_linux sample.go
実行例
[macOS] % limactl shell lima_dapr
[lima] $ dapr run --app-id sample --app-port 3000 --dapr-http-port 4000 ./sample_linux
ℹ️ Starting Dapr with id sample. HTTP Port: 4000. gRPC Port: 40563
...省略
INFO[0000] application discovered on port 3000 app_id=sample instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
== APP == 2023/02/03 02:07:21 called dapr config
...省略
== APP == 2023/02/03 02:07:26 called healthz
== APP == 2023/02/03 02:07:31 called healthz
起動処理の中で (a) /dapr/config へのアクセスが発生し、5秒毎に (d) のヘルスチェックが実施されています。
actorId を t1
と t2
として、sampleActor
のメソッド呼び出しをそれぞれ行ってみます。
sampleActor のメソッド呼び出し例
[macOS] % curl http://localhost:4000/v1.0/actors/sampleActor/t1/method/test1
[macOS] % curl http://localhost:4000/v1.0/actors/sampleActor/t2/method/test2
ログの内容は次のようになりました。
メソッド呼び出しで (b) へのアクセスが発生し、actorId 毎に actorIdleTimeout
で設定した 30秒が経過した後で (c) へのアクセスが発生しています。
dapr run のログ内容
ℹ️ Starting Dapr with id sample. HTTP Port: 4000. gRPC Port: 40563
...省略
== APP == 2023/02/03 02:08:11 called healthz
== APP == 2023/02/03 02:08:12 called actors: sampleActor/t1/method/test1
== APP == 2023/02/03 02:08:16 called healthz
...省略
== APP == 2023/02/03 02:08:36 called healthz
== APP == 2023/02/03 02:08:40 called actors: sampleActor/t2/method/test2
== APP == 2023/02/03 02:08:41 called healthz
== APP == 2023/02/03 02:08:46 called healthz
== APP == 2023/02/03 02:08:51 called healthz
== APP == 2023/02/03 02:08:51 called actors: sampleActor/t1
== APP == 2023/02/03 02:08:56 called healthz
...省略
== APP == 2023/02/03 02:09:21 called healthz
== APP == 2023/02/03 02:09:21 called actors: sampleActor/t2
== APP == 2023/02/03 02:09:26 called healthz
SDK 利用による Actor の実装
Dapr SDK for Go を使って、数値をカウントアップするだけの Actor を実装してみます。
この SDK では、次のようにして Actor を実装すれば良さそうです。
- Actor は
actor.Server
インターフェースを実装して定義- 基本的に
actor.ServerImplBase
を struct へ埋め込んで、Type
メソッドを実装
- 基本的に
-
RegisterActorImplFactory
で Actor のファクトリ関数を登録
Dapr アプリケーションを実装する方法として、基本的に HTTP API と gRPC API の 2通りがありますが、Actor を実装する場合は HTTP API(github.com/dapr/go-sdk/service/http を使用)にするしかなさそうでした。4
Actor は ActorStateManager
を使って状態管理するようになっており、Set
した時点では永続化されず、内部的に Actor のメソッド呼び出し後にまとめて永続化 5するようになっています。
また、状態を Get
する際にキーが存在しない場合はエラーとなるようです。
現在の値を取得する Counter
とカウントアップを実施する CountUp
の 2つのメソッドを持つ Actor を実装してみました。
counter/counter.go
package main
import (
"context"
"log"
"os"
"github.com/dapr/go-sdk/actor"
dapr "github.com/dapr/go-sdk/client"
daprd "github.com/dapr/go-sdk/service/http"
)
// Actor の定義
type CounterActor struct {
actor.ServerImplBase
daprClient dapr.Client
}
// Actor の型名を取得
func (a *CounterActor) Type() string {
log.Print("called Type")
return "counterActor"
}
// カウンターの値を取得
func (a *CounterActor) Counter(ctx context.Context) (int, error) {
log.Print("called Counter")
var counter int
err := a.GetStateManager().Get("counter", &counter)
return counter, err
}
// カウントアップ
func (a *CounterActor) CountUp(ctx context.Context) (int, error) {
log.Print("called CountUp")
counter, _ := a.Counter(ctx)
counter += 1
err := a.GetStateManager().Set("counter", counter)
return counter, err
}
// Actor のファクトリ関数
func counterActorFactory() actor.Server {
log.Print("called counterActorFactory")
client, err := dapr.NewClient()
if err != nil {
log.Fatalf("failed to create client: %v", err)
}
return &CounterActor{daprClient: client}
}
func main() {
port := os.Getenv("APP_HTTP_PORT")
if port == "" {
port = "3000"
}
s := daprd.NewService(":" + port)
s.RegisterActorImplFactory(counterActorFactory)
err := s.Start()
if err != nil {
log.Fatalf("failed to start: %v", err)
}
}
動作確認
実行して counterActor
のメソッドを呼び出してみます。
ビルド例
[macOS] % cd counter
[macOS] % GOOS=linux go build -o counter_linux
実行例
[macOS] % limactl shell lima_dapr
[lima] $ APP_HTTP_PORT=3001 dapr run --app-id counter --app-port 3001 --dapr-http-port 4001 ./counter_linux
ℹ️ Starting Dapr with id counter. HTTP Port: 4001. gRPC Port: 38263
...省略
INFO[0000] application protocol: http. waiting on port 3001. This will block until the app is listening on that port. app_id=counter instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
== APP == 2023/02/03 04:09:12 called counterActorFactory
== APP == dapr client initializing for: 127.0.0.1:38263
== APP == 2023/02/03 04:09:12 called Type
INFO[0000] application discovered on port 3001 app_id=counter instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
INFO[0000] application configuration loaded app_id=counter instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
...省略
actorId を c1
にして CountUp
と Counter
を呼び出してみました。
CountUp
前に Counter
を呼び出すとエラーとなり、CountUp
を呼び出す事でカウントアップが実施されました。
メソッド名の大文字・小文字は Go 側と合わせる必要があるようなので、他のプログラミング言語と併用する際には注意が必要かもしれません。6
counterActor のメソッド呼び出し例1
[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/Counter
{"errorCode":"ERR_ACTOR_INVOKE_METHOD","message":"error invoke actor method: error from actor service: "}
[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/CountUp
1
[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/CountUp
2
[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/Counter
2
当然ながら、actorId が違うと別のカウントとなります。
counterActor のメソッド呼び出し例2
[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c2/method/CountUp
1
最後に、zipkin からトレースログを取得して、特定の項目だけを抽出してみたところ次のようになりました。
zipkin トレースログ取得例
[macOS] % curl -s 'http://localhost:9411/zipkin/api/v2/traces?limit=10' | jq '.[][].tags | {"dapr.actor", "dapr.api", "dapr.protocol", "dapr.status_code"}'
{
"dapr.actor": null,
"dapr.api": "/dapr.proto.runtime.v1.Dapr/ExecuteActorStateTransaction",
"dapr.protocol": "grpc",
"dapr.status_code": null
}
{
"dapr.actor": null,
"dapr.api": "/dapr.proto.runtime.v1.Dapr/GetActorState",
"dapr.protocol": "grpc",
"dapr.status_code": null
}
{
"dapr.actor": "counterActor.c2",
"dapr.api": "GET /v1.0/actors/counterActor/c2/method/CountUp",
"dapr.protocol": "http",
"dapr.status_code": "200"
}
{
"dapr.actor": "counterActor.c1",
"dapr.api": "GET /v1.0/actors/counterActor/c1/method/Counter",
"dapr.protocol": "http",
"dapr.status_code": "200"
}
{
"dapr.actor": null,
"dapr.api": "/dapr.proto.runtime.v1.Dapr/ExecuteActorStateTransaction",
"dapr.protocol": "grpc",
"dapr.status_code": null
}
{
"dapr.actor": "counterActor.c1",
"dapr.api": "GET /v1.0/actors/counterActor/c1/method/CountUp",
"dapr.protocol": "http",
"dapr.status_code": "200"
}
{
"dapr.actor": null,
"dapr.api": "/dapr.proto.runtime.v1.Dapr/ExecuteActorStateTransaction",
"dapr.protocol": "grpc",
"dapr.status_code": null
}
{
"dapr.actor": "counterActor.c1",
"dapr.api": "GET /v1.0/actors/counterActor/c1/method/CountUp",
"dapr.protocol": "http",
"dapr.status_code": "200"
}
{
"dapr.actor": null,
"dapr.api": "/dapr.proto.runtime.v1.Dapr/GetActorState",
"dapr.protocol": "grpc",
"dapr.status_code": null
}
{
"dapr.actor": "counterActor.c1",
"dapr.api": "GET /v1.0/actors/counterActor/c1/method/Counter",
"dapr.protocol": "http",
"dapr.status_code": "500"
}
-
Actor をインスタンス化するもしないも基本的には実装側次第。Akka の ActorSystem に相当する部分を自前で実装するようなものかも。 ↩
-
つまり、Dapr Runtime 経由で呼び出す限りは actorType + actorId 毎にシングルスレッドで順番に処理するのと同じ事になる。Dapr のドキュメントでは
Turn-based access
と表現している。 ↩ -
gRPC API で実装してみたところ、実行時に
panic: Actor is not supported by gRPC API
エラーとなり、実行できませんでした ↩ -
manager.go の InvokeMethod 参照。Actor の SaveState から ActorStateManager.Save が呼び出され、DaprStateAsyncProvider.Apply を呼び出して、最終的に Dapr Runtime の
ExecuteActorStateTransaction
という gRPC API を呼び出すようになっている ↩ -
Go の場合、メソッドの先頭を小文字にすると private となってしまい呼び出せなくなる ↩