LoginSignup
1
1

More than 1 year has passed since last update.

Dapr go-sdkのactor clientはどのようにリクエストを行っているのか

Posted at

TL; DR

  • go-sdkのactor client作成時にgRPCリクエスト実装は不要
  • ユーザーが指定するのは関数フィールドの定義のみ
  • client.ImplActorClientStubreflect を使って動的に関数生成

はじめに

Daprのactorを使ってみようとgo-sdkのサンプル実装を見ていたところ、クライアント側にgRPCリクエスト処理が見当たりませんでした。

examples/actor/client/main.go
// actorのメソッドGetUserを呼び出し。この実装はどこにある?
user, err := myActor.GetUser(ctx, &api.User{
	Name: "abc",
	Age:  123,
})

メソッドを追いかけようとするも、実態はフィールドで定義がどこにも見当たりません。
そもそも、自作したactorを呼び出しているのに、SDKのように都合よくDaprへの通信が隠蔽されているのも不思議です。

examples/actor/api/actor.go
// メソッドではなくフィールド?実装はどこにあるの??
type ClientStub struct {
	GetUser         func(context.Context, *User) (*User, error)
	Invoke          func(context.Context, string) (string, error)
	Get             func(context.Context) (string, error)
	Post            func(context.Context, string) error
	StartTimer      func(context.Context, *TimerRequest) error
	StopTimer       func(context.Context, *TimerRequest) error
	StartReminder   func(context.Context, *ReminderRequest) error
	StopReminder    func(context.Context, *ReminderRequest) error
	IncrementAndGet func(ctx context.Context, stateKey string) (*User, error)
}

そこで本記事では、actorのクライアントがなぜ関数フィールドの定義だけでメソッド呼び出し可能になっているのかを調べてみました。

仕組み

actor呼び出しのgRPC

実装を探す前に、actor呼び出しで使われているgRPCメソッドを確認します。
grpcreplayをコネクションに仕込んでリクエスト、レスポンスをログに吐かせるのが便利です。

変更後のクライアント実装(クリックで展開)
client/main.go
import (
	"context"
	"fmt"
	"os"
	"time"

	dapr "github.com/dapr/go-sdk/client"
	"github.com/dapr/go-sdk/examples/actor/api"
	"github.com/google/go-replayers/grpcreplay"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func main() {
	ctx := context.Background()

	// コネクションをgrpcreplay用のものに差し替え
	rec, err := grpcreplay.NewRecorder("service.replay", nil)
	defer func() {
		err = rec.Close()
		if err != nil {
			fmt.Println(err)
		}
		err = writeDump("service.replay")
		if err != nil {
			fmt.Println(err)
		}
	}()
	if err != nil {
		panic(err)
	}

	port := os.Getenv("DAPR_GRPC_PORT")
	if port == "" {
		port = "50001"
	}

	conn, err := grpc.Dial(
		"localhost:"+port,
		append(rec.DialOptions(), grpc.WithTransportCredentials(insecure.NewCredentials()))...,
	)
	if err != nil {
		panic(err)
	}

	client := dapr.NewClientWithConnection(conn)
	defer client.Close()
	//...
}

func writeDump(fileName string) error {
	f, err := os.Create(fileName + ".log")
	if err != nil {
		return err
	}
	defer f.Close()

	// カセットはgRPCのバイナリのため、人が読める形式のログファイルを別途生成
	return grpcreplay.Fprint(f, fileName)
}
service.replay.log
initial state: ""
#1: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"GetUser" data:"{\"name\":\"abc\",\"age\":123}"#2: kind: RESPONSE, method: , ref index: 1, message:
data:"{\"name\":\"abc\",\"age\":123}"#3: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"Invoke" data:"\"laurence\""#4: kind: RESPONSE, method: , ref index: 3, message:
data:"\"laurence\""#5: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"Post" data:"\"laurence\""#6: kind: RESPONSE, method: , ref index: 5, message:
#7: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"Get"#8: kind: RESPONSE, method: , ref index: 7, message:
data:"\"get result\""#9: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"StartTimer" data:"{\"timer_name\":\"testTimerName\",\"call_back\":\"Invoke\",\"duration\":\"5s\",\"period\":\"5s\",\"data\":\"\\\"hello\\\"\"}"#10: kind: RESPONSE, method: , ref index: 9, message:
#11: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"StopTimer" data:"{\"timer_name\":\"testTimerName\",\"call_back\":\"Invoke\",\"duration\":\"\",\"period\":\"\",\"data\":\"\"}"#12: kind: RESPONSE, method: , ref index: 11, message:
#13: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"StartReminder" data:"{\"reminder_name\":\"testReminderName\",\"duration\":\"5s\",\"period\":\"5s\",\"data\":\"\\\"hello\\\"\"}"#14: kind: RESPONSE, method: , ref index: 13, message:
#15: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"StopReminder" data:"{\"reminder_name\":\"testReminderName\",\"duration\":\"\",\"period\":\"\",\"data\":\"\"}"#16: kind: RESPONSE, method: , ref index: 15, message:
#17: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"IncrementAndGet" data:"\"testStateKey\""#18: kind: RESPONSE, method: , ref index: 17, message:
data:"{\"name\":\"\",\"age\":5}"#19: kind: REQUEST, method: /dapr.proto.runtime.v1.Dapr/InvokeActor, ref index: 0, message:
actor_type:"testActorType" actor_id:"ActorImplID123456" method:"IncrementAndGet" data:"\"testStateKey\""#20: kind: RESPONSE, method: , ref index: 19, message:
data:"{\"name\":\"\",\"age\":6}"

使用されているのは /dapr.proto.runtime.v1.Dapr/InvokeActor でした。

また、

  • actorのメソッドの種類によらず同じgRPCメソッドが使用される
  • リクエストのデータはmarshalされて data に格納される

ということも確認できます。

秘密は client.ImplActorClientStub にあり

本題のクライアント実装を探します。
gRPC処理は、 client.ImplActorClientStub によって関数フィールドに代入されていました。

examples/actor/client/main.go
	// implement actor client stub
	myActor := new(api.ClientStub)
	client.ImplActorClientStub(myActor)

この関数は内部的に reflect を使い、メソッド名や関数のシグネチャに合わせたgRPC実装を動的に生成しています(→実装)。

client/actor.go
// 主要な部分だけ抜粋
func (c *GRPCClient) implActor(actor actor.Client, serializer codec.Codec) {
	actorValue := reflect.ValueOf(actor)
	valueOfActor := actorValue.Elem()
	typeOfActor := valueOfActor.Type()

	numField := valueOfActor.NumField()
	for i := 0; i < numField; i++ {
		t := typeOfActor.Field(i)
		methodName := t.Name
		f := valueOfActor.Field(i)

		if f.Kind() == reflect.Func && f.IsValid() && f.CanSet() {
			outNum := t.Type.NumOut()
			funcOuts := make([]reflect.Type, outNum)
			for i := 0; i < outNum; i++ {
				funcOuts[i] = t.Type.Out(i)
			}

			// 動的に生成した関数をフィールドに代入
			f.Set(reflect.MakeFunc(f.Type(), c.makeCallProxyFunction(actor, methodName, funcOuts, serializer)))
		}
	}
}

reflectによる関数動的生成

reflect により生成される関数は以下で定義されています。

本体となるリクエスト処理はこの部分です。確かに先ほどのgRPCメソッドをリクエストしています。

client/actor.go
rsp, err := c.InvokeActor(invCtx, &InvokeActorRequest{
	ActorType: actor.Type(),
	ActorID:   actor.ID(),
	Method:    methodName,
	Data:      data,
})

ActorTypeActorID はclientが自前で持っているメソッド、 Methodreflect で取得したフィールド名から取得しています。

また、Data は関数フィールドの context.Context の次の引数をjson.Marshalしたものを使用しています。

client/actor.go
var data []byte
if len(inIArr) > 0 {
	data, err = json.Marshal(inIArr[0])
}

最終的に、以下の関数をフィールドへ代入しているのと同等になります。

c.GetUser = func(ctx context.Context, user *User) (*User, error) {
	data, err = json.Marshal(user)
	if err != nil {
		panic(err)
	}

	rsp, err := client.InvokeActor(ctx, &InvokeActorRequest{
		ActorType: c.Type(),
		ActorID:   c.ID(),
		Method:    "GetUser",
		Data:      data,
	})
	// ... (rspのシリアライズ)
	return reply, err
}

actor呼び出しのgRPCメソッドが共通化されているとはいえ、フィールド名と引数、戻り値型さえあれば実装が完成してしまうのは圧巻です。こんなコードが書けるようになりたい...

おわりに

以上、Dapr go-sdkでactorのクライアントをユーザーが実装しなくてよい仕組みの紹介でした。

Goでメタプログラミングといえばコード生成のイメージが強かったので、reflect で自然に実装を入れ込めるのが印象的でした。ユーザーに事前に go generate を要求しない分、より手軽に使えると感じました。
この設計は色々な所で応用できそうです。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1