1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Dapr の Actor 機能を試してみる

Last updated at Posted at 2023-02-03

Dapr 1の Actor 機能を試してみました。

個人的に、Dapr を使った実際のシステム開発では Actor の活用が重要そうな気がしています。

Actor の実装方法

本来は、プログラミング言語毎に用意された Dapr SDK を使って Actor を実装する事になりそうですが、まずは仕組みを理解するために SDK 無しで実装してみました。

Actor の実装に必要な HTTP エンドポイントは actors_api ドキュメント の "Dapr calling to user service code" 欄に記載されており、とりあえずはこのようになっていました。

HTTP エンドポイント 概要
(a) /dapr/config actorType と Actor の設定情報を取得
(b) /actors/<actorType>/<actorId>/method/<methodName> Actor インスタンスのメソッド呼び出し
(c) /actors/<actorType>/<actorId> Actor インスタンスの非活性化
(d) /healthz ヘルスチェック

メソッド呼び出しには method/timer/<timerName>method/remind/<reminderName> というバリエーションもあるようですが、ここでは無視しておきます。

次に、Actor のライフサイクルとしては、このような処理内容を想定しているようです。

  1. Dapr Runtime の起動時に (a) へのアクセスが発生するので、そのアプリケーションが扱う Actor の型名(actorType)と各種設定を返す
  2. Actor のメソッド呼び出し時に actorType が (a) のものと合致している場合に (b) が呼び出される
    • 指定された actorId の Actor インスタンスが存在しなければインスタンス化して処理を実施
    • 作成したインスタンスは非活性化されるまでキャッシュ(確保)しておく
  3. Actor インスタンスへ actorIdleTimeout の時間が経過してもアクセスされなかった場合に (c) が呼び出される
    • 該当する actorId の Actor インスタンスを解放

(d) は定期的(デフォルトは 5秒毎)に呼び出されます。

このように、Dapr では Actor の扱いを実装側に委ねている点 2 が特徴的で、主に SDK 側で処理する事を想定しているのだと思います。

また、アクターモデルの機能としては当たり前の事だと思いますが、Actor インスタンス毎(つまり actorId 単位)のメソッド呼び出しは、同時に複数の呼び出しが発生しないよう Dapr Runtime で排他制御されているようです。3

各エンドポイントへのアクセスをログ出力するだけの Actor(というよりはそのベース処理)を実装すると、次のようになります。

sample/sample.go
package main

import (
	"encoding/json"
	"log"
	"net/http"
	"os"
	"strings"
)

type daprConfig struct {
	Entities         []string
	ActorIdleTimeout string
}

func main() {
	port := os.Getenv("APP_HTTP_PORT")

	if port == "" {
		port = "3000"
	}

    // (a)
	http.HandleFunc("/dapr/config", func(w http.ResponseWriter, r *http.Request) {
		log.Print("called dapr config")

		config := daprConfig{[]string{"sampleActor"}, "30s"}

		json.NewEncoder(w).Encode(config)
	})

    // (b), (c)
	http.HandleFunc("/actors/", func(w http.ResponseWriter, r *http.Request) {
		path := strings.TrimPrefix(r.URL.Path, "/actors/")

		log.Printf("called actors: %s", path)

		w.WriteHeader(http.StatusOK)
	})

    // (d)
	http.HandleFunc("/healthz", func(w http.ResponseWriter, r *http.Request) {
		log.Print("called healthz")

		w.WriteHeader(http.StatusOK)
	})

	log.Fatal(http.ListenAndServe(":" + port, nil))
}

動作確認

前回、構築した Dapr 環境で実行して挙動を確認してみます。

ビルド例
[macOS] % cd sample
[macOS] % GOOS=linux go build -o sample_linux sample.go
実行例
[macOS] % limactl shell lima_dapr
[lima]  $ dapr run --app-id sample --app-port 3000 --dapr-http-port 4000 ./sample_linux 
ℹ️  Starting Dapr with id sample. HTTP Port: 4000. gRPC Port: 40563
...省略
INFO[0000] application discovered on port 3000           app_id=sample instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
== APP == 2023/02/03 02:07:21 called dapr config
...省略
== APP == 2023/02/03 02:07:26 called healthz
== APP == 2023/02/03 02:07:31 called healthz

起動処理の中で (a) /dapr/config へのアクセスが発生し、5秒毎に (d) のヘルスチェックが実施されています。

actorId を t1t2 として、sampleActor のメソッド呼び出しをそれぞれ行ってみます。

sampleActor のメソッド呼び出し例
[macOS] % curl http://localhost:4000/v1.0/actors/sampleActor/t1/method/test1
[macOS] % curl http://localhost:4000/v1.0/actors/sampleActor/t2/method/test2

ログの内容は次のようになりました。

メソッド呼び出しで (b) へのアクセスが発生し、actorId 毎に actorIdleTimeout で設定した 30秒が経過した後で (c) へのアクセスが発生しています。

dapr run のログ内容
ℹ️  Starting Dapr with id sample. HTTP Port: 4000. gRPC Port: 40563
...省略
== APP == 2023/02/03 02:08:11 called healthz
== APP == 2023/02/03 02:08:12 called actors: sampleActor/t1/method/test1
== APP == 2023/02/03 02:08:16 called healthz
...省略
== APP == 2023/02/03 02:08:36 called healthz
== APP == 2023/02/03 02:08:40 called actors: sampleActor/t2/method/test2
== APP == 2023/02/03 02:08:41 called healthz
== APP == 2023/02/03 02:08:46 called healthz
== APP == 2023/02/03 02:08:51 called healthz
== APP == 2023/02/03 02:08:51 called actors: sampleActor/t1
== APP == 2023/02/03 02:08:56 called healthz
...省略
== APP == 2023/02/03 02:09:21 called healthz
== APP == 2023/02/03 02:09:21 called actors: sampleActor/t2
== APP == 2023/02/03 02:09:26 called healthz

SDK 利用による Actor の実装

Dapr SDK for Go を使って、数値をカウントアップするだけの Actor を実装してみます。

この SDK では、次のようにして Actor を実装すれば良さそうです。

  • Actor は actor.Server インターフェースを実装して定義
    • 基本的に actor.ServerImplBase を struct へ埋め込んで、Type メソッドを実装
  • RegisterActorImplFactory で Actor のファクトリ関数を登録

Dapr アプリケーションを実装する方法として、基本的に HTTP API と gRPC API の 2通りがありますが、Actor を実装する場合は HTTP API(github.com/dapr/go-sdk/service/http を使用)にするしかなさそうでした。4

Actor は ActorStateManager を使って状態管理するようになっており、Set した時点では永続化されず、内部的に Actor のメソッド呼び出し後にまとめて永続化 5するようになっています。

また、状態を Get する際にキーが存在しない場合はエラーとなるようです。

現在の値を取得する Counter とカウントアップを実施する CountUp の 2つのメソッドを持つ Actor を実装してみました。

counter/counter.go
package main

import (
	"context"
	"log"
	"os"

	"github.com/dapr/go-sdk/actor"
	dapr "github.com/dapr/go-sdk/client"
	daprd "github.com/dapr/go-sdk/service/http"
)

// Actor の定義
type CounterActor struct {
	actor.ServerImplBase
	daprClient dapr.Client
}
// Actor の型名を取得
func (a *CounterActor) Type() string {
	log.Print("called Type")
	return "counterActor"
}
// カウンターの値を取得
func (a *CounterActor) Counter(ctx context.Context) (int, error) {
	log.Print("called Counter")

	var counter int

	err := a.GetStateManager().Get("counter", &counter)

	return counter, err
}
// カウントアップ
func (a *CounterActor) CountUp(ctx context.Context) (int, error) {
	log.Print("called CountUp")

	counter, _ := a.Counter(ctx)

	counter += 1

	err := a.GetStateManager().Set("counter", counter)

	return counter, err
}

// Actor のファクトリ関数
func counterActorFactory() actor.Server {
	log.Print("called counterActorFactory")

	client, err := dapr.NewClient()

	if err != nil {
		log.Fatalf("failed to create client: %v", err)
	}

	return &CounterActor{daprClient: client}
}

func main() {
	port := os.Getenv("APP_HTTP_PORT")

	if port == "" {
		port = "3000"
	}

	s := daprd.NewService(":" + port)
	s.RegisterActorImplFactory(counterActorFactory)

	err := s.Start()

	if err != nil {
		log.Fatalf("failed to start: %v", err)
	}
}

動作確認

実行して counterActor のメソッドを呼び出してみます。

ビルド例
[macOS] % cd counter
[macOS] % GOOS=linux go build -o counter_linux
実行例
[macOS] % limactl shell lima_dapr
[lima]  $ APP_HTTP_PORT=3001 dapr run --app-id counter --app-port 3001 --dapr-http-port 4001 ./counter_linux 
ℹ️  Starting Dapr with id counter. HTTP Port: 4001. gRPC Port: 38263
...省略
INFO[0000] application protocol: http. waiting on port 3001.  This will block until the app is listening on that port.  app_id=counter instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
== APP == 2023/02/03 04:09:12 called counterActorFactory
== APP == dapr client initializing for: 127.0.0.1:38263
== APP == 2023/02/03 04:09:12 called Type
INFO[0000] application discovered on port 3001           app_id=counter instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
INFO[0000] application configuration loaded              app_id=counter instance=lima-limadapr scope=dapr.runtime type=log ver=1.9.5
...省略

actorId を c1 にして CountUpCounter を呼び出してみました。
CountUp 前に Counter を呼び出すとエラーとなり、CountUp を呼び出す事でカウントアップが実施されました。

メソッド名の大文字・小文字は Go 側と合わせる必要があるようなので、他のプログラミング言語と併用する際には注意が必要かもしれません。6

counterActor のメソッド呼び出し例1
[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/Counter
{"errorCode":"ERR_ACTOR_INVOKE_METHOD","message":"error invoke actor method: error from actor service: "}

[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/CountUp
1

[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/CountUp
2

[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c1/method/Counter
2

当然ながら、actorId が違うと別のカウントとなります。

counterActor のメソッド呼び出し例2
[macOS] % curl http://localhost:4001/v1.0/actors/counterActor/c2/method/CountUp
1

最後に、zipkin からトレースログを取得して、特定の項目だけを抽出してみたところ次のようになりました。

zipkin トレースログ取得例
[macOS] % curl -s 'http://localhost:9411/zipkin/api/v2/traces?limit=10' | jq '.[][].tags | {"dapr.actor", "dapr.api", "dapr.protocol", "dapr.status_code"}'
{
  "dapr.actor": null,
  "dapr.api": "/dapr.proto.runtime.v1.Dapr/ExecuteActorStateTransaction",
  "dapr.protocol": "grpc",
  "dapr.status_code": null
}
{
  "dapr.actor": null,
  "dapr.api": "/dapr.proto.runtime.v1.Dapr/GetActorState",
  "dapr.protocol": "grpc",
  "dapr.status_code": null
}
{
  "dapr.actor": "counterActor.c2",
  "dapr.api": "GET /v1.0/actors/counterActor/c2/method/CountUp",
  "dapr.protocol": "http",
  "dapr.status_code": "200"
}
{
  "dapr.actor": "counterActor.c1",
  "dapr.api": "GET /v1.0/actors/counterActor/c1/method/Counter",
  "dapr.protocol": "http",
  "dapr.status_code": "200"
}
{
  "dapr.actor": null,
  "dapr.api": "/dapr.proto.runtime.v1.Dapr/ExecuteActorStateTransaction",
  "dapr.protocol": "grpc",
  "dapr.status_code": null
}
{
  "dapr.actor": "counterActor.c1",
  "dapr.api": "GET /v1.0/actors/counterActor/c1/method/CountUp",
  "dapr.protocol": "http",
  "dapr.status_code": "200"
}
{
  "dapr.actor": null,
  "dapr.api": "/dapr.proto.runtime.v1.Dapr/ExecuteActorStateTransaction",
  "dapr.protocol": "grpc",
  "dapr.status_code": null
}
{
  "dapr.actor": "counterActor.c1",
  "dapr.api": "GET /v1.0/actors/counterActor/c1/method/CountUp",
  "dapr.protocol": "http",
  "dapr.status_code": "200"
}
{
  "dapr.actor": null,
  "dapr.api": "/dapr.proto.runtime.v1.Dapr/GetActorState",
  "dapr.protocol": "grpc",
  "dapr.status_code": null
}
{
  "dapr.actor": "counterActor.c1",
  "dapr.api": "GET /v1.0/actors/counterActor/c1/method/Counter",
  "dapr.protocol": "http",
  "dapr.status_code": "500"
}
  1. Dapr の概要や環境構築に関しては 前回の記事 が参考になるかもしれません

  2. Actor をインスタンス化するもしないも基本的には実装側次第。Akka の ActorSystem に相当する部分を自前で実装するようなものかも。

  3. つまり、Dapr Runtime 経由で呼び出す限りは actorType + actorId 毎にシングルスレッドで順番に処理するのと同じ事になる。Dapr のドキュメントでは Turn-based access と表現している。

  4. gRPC API で実装してみたところ、実行時に panic: Actor is not supported by gRPC API エラーとなり、実行できませんでした

  5. manager.go の InvokeMethod 参照。Actor の SaveState から ActorStateManager.Save が呼び出され、DaprStateAsyncProvider.Apply を呼び出して、最終的に Dapr Runtime の ExecuteActorStateTransaction という gRPC API を呼び出すようになっている

  6. Go の場合、メソッドの先頭を小文字にすると private となってしまい呼び出せなくなる

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?