TL; DR
Actor実行時に一意なキーを発行しロックすることで、同じIDのアクターが同時に2つ以上起動しないようにしている
はじめに
DaprのActor Componentは、ターンベースのアクセスモデルを採用しています。各アクターはメッセージを逐次的に処理するため、あるタイプ、IDのアクターは常に1つの呼び出ししか処理しません。
言い換えると、アクターのメソッドで状態更新の競合を考慮する必要が無くなります。
public async Task<int> IncrementAsync()
{
// メッセージを並行処理することが無いため、getからsetの間に状態が更新される心配がない!
var counterValue = await StateManager.TryGetStateAsync<int>("counter");
var currentValue = counterValue.HasValue ? counterValue.Value : 0;
var newValue = currentValue + 1;
await StateManager.SetStateAsync("counter", newValue);
return newValue;
}
そこで本記事では、アクターの逐次実行をどのように保障しているのかを調べてみました。
アクターが逐次実行されることを確認
実装を見る前に、実際にアクターがメッセージを逐次実行することを確認します。Go SDKのサンプルプログラムを改造して試してみましょう。
まず、サーバー(アクター)側に5秒のsleepを仕込みます。
func (t *TestActor) GetUser(ctx context.Context, user *api.User) (*api.User, error) {
fmt.Println("call get user req = ", user)
time.Sleep(5 * time.Second) // 追加(この行以外はそのまま)
return user, nil
}
続いて、クライアントでメッセージを同時に投げるようにします。
// こちらは全面書き直し
package main
import (
"context"
"fmt"
"log"
"sync"
dapr "github.com/dapr/go-sdk/client"
"github.com/dapr/go-sdk/examples/actor/api"
)
func main() {
ctx := context.Background()
client, err := dapr.NewClient()
if err != nil {
panic(err)
}
defer client.Close()
myActor := new(api.ClientStub)
client.ImplActorClientStub(myActor)
wg := &sync.WaitGroup{}
// 同じ種類、IDのアクターに対し、レスポンスを待たずに同時にメッセージ送信
for i := 0; i < 3; i++ {
i := i // pin
wg.Add(1)
go func() {
defer wg.Done()
log.Printf("%d: start\n", i)
user, err := myActor.GetUser(ctx, &api.User{
Name: fmt.Sprintf("user%d", i),
Age: 20,
})
if err != nil {
panic(err)
}
log.Printf("%d: %+v\n", i, user)
}()
}
wg.Wait()
}
実行してみると、確かに前のメッセージ処理が完了してから次の処理が始まっていることが確認できます。
$ dapr run --app-id actor-serving \
--app-protocol http \
--app-port 8080 \
--dapr-http-port 3500 \
--log-level debug \
--components-path ./config \
go run ./serving/main.go
== APP == call get user req = &{user0 20}
== APP == call get user req = &{user1 20}
== APP == call get user req = &{user2 20}
# レスポンス取得が5秒ずつ遅れていることから、逐次実行されているのが確認できる
$ dapr run --app-id actor-client \
--log-level debug \
--components-path ./config \
go run ./client/main.go
== APP == 2023/01/15 10:48:02 2: start
== APP == 2023/01/15 10:48:02 1: start
== APP == 2023/01/15 10:48:02 0: start
== APP == 2023/01/15 10:48:07 0: &{Name:user0 Age:20}
== APP == 2023/01/15 10:48:12 1: &{Name:user1 Age:20}
== APP == 2023/01/15 10:48:17 2: &{Name:user2 Age:20}
サイドカーのアクターロック機構
Daprのアクターは「仮想アクターパターン」によって実装されています。アクターの実体となるプロセスを常駐させる代わりに、呼び出されたタイミングで有効化されてメソッドが実行されます。
また、メッセージはgRPCでやりとりされ、Daprサイドカー経由でアプリケーションコンテナ(実態はgRPCサーバ)でメソッドが実行されます。
これだけでは単なるgRPCサービスで並列実行を防げないため、サイドカーがメッセージを中継する際にロックをかけています。
アクターへのメッセージがいずれかのDaprサイドカーに到達すると、適切なアプリケーションコンテナのDaprサイドカーへ転送されます。
サイドカーが自身のアプリケーションで処理可能と判断すると、アクターの種類、IDをキーにロックを取得してからメッセージをアプリケーションコンテナへ転送します。
// 関連処理のみ抜粋
func (a *actorsRuntime) callLocalActor(ctx context.Context, req *invokev1.InvokeMethodRequest) (*invokev1.InvokeMethodResponse, error) {
actorTypeID := req.Actor()
// アクターを取得or新規作成(※メッセージの処理はせず、ロックやライフサイクルのみ管理)
act := a.getOrCreateActor(actorTypeID.GetActorType(), actorTypeID.GetActorId())
// アクターを一意なキーでロック
uuidObj, err := uuid.NewRandom()
if err != nil {
return nil, fmt.Errorf("failed to generate UUID: %w", err)
}
uuid := uuidObj.String()
reentrancyID := &uuid
err := act.lock(reentrancyID)
if err != nil {
return nil, status.Error(codes.ResourceExhausted, err.Error())
}
defer act.unlock()
// 以下メッセージをアプリケーションコンテナへ送信する処理
}
act.lock
によって、内部的に *sync.Mutex
(act.actorLock.methodLock
)のロックを取得します。ロックはメッセージ処理完了(unlockの defer
文)まで解放されないため、この間に到達した他のメッセージは act.lock
の行でブロックされます。
結果、ある種類、IDのアクターに対してメッセージは常に逐次実行されます。
(言い換えると、クライアント側でIDさえ変えてしまえばどんどん新しいアクターを「生成」することができます1。これが「仮想アクターパターン」のゆえんです。)
おわりに
以上、アクターの逐次実行の仕組みの紹介でした。
アクターはComponentの中で唯一1から独自実装されているので、興味深い機構がいくつも組み込まれています。機会があれば、Go力、Dapr力を上げて再度コードリーディングに挑みたいと思います。
-
前述の通り、これはアプリケーションサーバの実装から見れば、単に
ID
パラメータの異なるgRPCリクエストを送ったにすぎません。(参考: Dapr go-sdkのactor clientはどのようにリクエストを行っているのか) ↩