Edited at

GAE/GoのRPCにフックするライブラリaespyを作った

More than 3 years have passed since last update.

@laco0416です。GAE/GoのRPCに介入して通信内容を使ってゴニョゴニョするためのライブラリを作ったのでその仕組みとかGAE/GoのRPC覗く上で気をつけなきゃいけないところなどを書きます。

laco0416/aespy


背景

GAE/Goに限らず、AppEngineとその他のGCPのサービス(Datastore, Memcache, UrlFetchなどなど)はRPCによって通信している。RPCはProtocol Buffer(以後pbと略記)という形式でシリアライズされたオブジェクトを用いており、これは簡単にデシリアライズ可能である。しかしGAE/JにはApiProxyとDelegateという仕組みが最初から用意されており、比較的容易にRPCに介入できるが、GAE/GoにはまだAPIが用意されていない。

ところでGAE/GoのDatastoreにはContextを取得できるイベントハンドラが存在しない。PropertyLoadSaverを実装してSaveLoadに介入出来るが、このフックではAppEngineのContextが取得できないため出来る処理が大きく限られるのが問題である。例えばDatastoreにエンティティを保存するときに自動でMemcacheにも流し込むのはコールバック内では行えないため、通常はアプリケーション側で手続き的に行う他ない。

Datastoreの操作を自動でMemcacheの操作に置き換えるgoonというライブラリはこの手続き的な処理を内部でやってくれる便利なやつで、今ではgoon無しで全部自力で書くのは苦痛でしかない。しかしやってくれるのは当然Memcacheへの自動キャッシュだけで、それ以外のことをやるには自力で書くしかない。

今回aespyを作ったモチベーションは、「DatastoreにPutされるエンティティを自動でBigQueryにStream Insertする」ことである。goonのようにライブラリを通じてDatastoreを操作するのではなく、RPCにフックするようにしたのはgoonと併用したいというのもあるし、極力既存のアプリケーションのコードを変えずに導入したいからだ。


aespyの仕組み

かつてGAE/JにはMemvacheというライブラリがあった。これはGAE/JのApiProxyとDelegateを使ってDatastoreへのRPCを乗っ取り、自動でMemcacheの操作に置き換えるライブラリである。今回BigQueryに流し込むライブラリを作るにあたって、まずはGAE/GoにおけるApiProxyとDelegateを作る必要があった。


GAE/GoのRPC

GAE/Goではappengine.Contextが全てである。Contextが取得できなければほとんど何もできないし、逆にいえばContextさえあれば何でも出来る。なぜならRPCを実際に行うためにContext.Call()が呼ばれているからである。

appengine.Contextはinterfaceであるため、実装を満たしてしまえばappengine.Contextをラップした独自のaespy.Contextを作ることが出来る。何もしないただのラッパーを作るには次のようなコードを書けば良い。参考になったのは同じくContextをラップしてRPCを乗っ取るaetestの実装である。appengine_internalをライブラリで使って大丈夫なのかと不安だったが、実際デプロイして使っても全く問題なかったので大丈夫だろう。

package aespy

import (
"net/http"

"appengine"
"appengine_internal"
)

// Context はappengine.Contextを継承したinterfaceである
type Context interface {
appengine.Context
}

// NewContext はappengine.ContextをラップしたContextを返す
func NewContext(req *http.Request) Context {
c := &context{Context: appengine.NewContext(req)}
return c
}

// FromContext はすでにあるappengine.Contextをラップする
func FromContext(ctx appengine.Context) Context {
c := &context{Context: ctx}
return c
}

// aespy.Contextの実装をする構造体
type context struct {
appengine.Context
}

func (c *context) Request() interface{} { return c.Context.Request() }
func (c *context) FullyQualifiedAppID() string { return c.Context.FullyQualifiedAppID() }
func (c *context) Debugf(format string, args ...interface{}) { c.Context.Debugf(format, args...) }
func (c *context) Infof(format string, args ...interface{}) { c.Context.Infof(format, args...) }
func (c *context) Warningf(format string, args ...interface{}) { c.Context.Warningf(format, args...) }
func (c *context) Errorf(format string, args ...interface{}) { c.Context.Errorf(format, args...) }
func (c *context) Criticalf(format string, args ...interface{}) { c.Context.Criticalf(format, args...) }

func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
return c.Context.Call(service, method, in, out, opts)
}

もうおわかりだと思うが、

func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {

return c.Context.Call(service, method, in, out, opts)
}

がRPCの呼び出しである。ここにフック用の処理を追加すればaespyが完成する。

実際にRPCをフックしている仕組みはaespyのソースを読んで欲しい。ドキュメントはないがそんなに大きなライブラリじゃないので15分あれば全部理解できると思う


Context.Callの解説

Context.Callの仕組みについて解説をしておく。RPCと聞くと私は無意識に非同期処理なのかと思っていた(特に根拠はない)が、このCallメソッドは内部的にはどうあれ、挙動としては同期処理である。

それぞれの仮引数について簡単に説明すると、


service: string

RPCの宛先。DatastoreへのRPCであればdatastore_v3等が入る。


method: string

RPCで呼び出す関数。DatastoreのPutならPutと入っている。


in: appengine_internal.ProtoMessage

RPCで送るリクエスト内容。pbの一番基底のインタフェースとして渡されるので、serviceとmethodに応じてtype assertionする。

重要なのは、inはCallの前後で変化しないということだ。


out: appengine_internal.ProtoMessage

RPCで返ってきたレスポンス内容。pbの一番基底のインタフェースとして渡されるので、serviceとmethodに応じてtype assertionする。

重要なのは、outは参照であり、Callするまでは空だが、Call後に値が入るということだ。


opts: *appengine_internal.CallOptions

RPCの設定。型でわかるように構造体である。タイムアウトの設定などを持っているが、フックして使うことは殆どない。

RPCをフックしてゴニョゴニョする上で肝なのがinoutなのは明白である。RPCの前後のフックをそれぞれpreCallpostCallとするなら、実装は最低限次のようにしなくてはならない。

func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {

c.preCall(service, method, in) //この段階ではinにしか意味がない
ret := c.Context.Call(service, method, in, out, opts) //本来のRPC
c.postCall(service, method, in, out) //ここでinとout両方に意味がある
return ret //本来のRPCの結果を返す
}


DatastoreへのRPC

ここまでであらゆるRPCを覗き見ることができるようになったので、本来の目的であるDatastoreへのRPCに対するアプローチを行う。


Datastore.Put

Datastore.Putの際にContext.Callに渡される引数の中身は次の通りである


  • service: "datastore_v3"

  • method: "Put"

  • in: *datastore.PutRequest

  • out: *datastore.PutResponse

datastore.PutRequestdatastore"appengine/datastore"パッケージではなく、"appengine_internal/datastore"パッケージであるのに注意。

PutRequestは「Putされるエンティティ」を内包するpbである。PutされたエンティティはPurRequest.GetEntityで取得できる。注意する点は、GetEntityはEntityProto型の スライス を返すということである。DatastoreはPutでもGetでもDeleteでも内部的には**Multiを呼んでおり、常に複数で扱っているのが理由である。

func prePut(c appengine.Context, in appengine_internal.ProtoMessage) error {

req, ok := in.(*pb.PutRequest)
if !ok {
return fmt.Errorf("Not PutRequest: %#v", in)
}
for _, proto := range req.GetEntity() {
entity, err := protoToEntity(c, proto)
if err != nil {
return err
}
fmt.Printf("%+v", entity)
}
return nil
}

EntityProto型はDatastoreとやり取りするエンティティ1件を表す構造体である。エンティティは自身のキーとプロパティの情報を持っている。EntityProto型を扱いやすいよう独自のEntity型に変換するprotoToEntityを次のように実装した。

import (

"appengine"
"appengine/datastore"
pb "appengine_internal/datastore"
)

type Entity struct {
Key *datastore.Key
Kind string
Properties map[string]interface{}
}

func unwrapPropertyValue(pv *pb.PropertyValue) interface{} {
var value interface{}
switch {
case pv.BooleanValue != nil:
value = pv.GetBooleanValue()
case pv.Int64Value != nil:
value = pv.GetInt64Value()
case pv.DoubleValue != nil:
value = pv.GetDoubleValue()
case pv.StringValue != nil:
value = pv.GetStringValue()
case pv.Pointvalue != nil:
value = pv.GetPointvalue()
case pv.Referencevalue != nil:
value = pv.GetReferencevalue()
case pv.Uservalue != nil:
value = pv.GetUservalue()
default:
value = pv.String()
}
return value
}

func protoToEntity(c appengine.Context, proto *pb.EntityProto) (*Entity, error) {
entity := new(Entity)
key, err := protoToKey(c, proto.GetKey())
if err != nil {
return nil, err
}
entity.Key = key
entity.Kind = key.Kind()
entity.Properties = make(map[string]interface{})
for _, p := range proto.GetProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
// no index field
for _, p := range proto.GetRawProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
return entity, nil
}

func protoToKey(c appengine.Context, proto *pb.Reference) (*datastore.Key, error) {
var key *datastore.Key
for _, e := range proto.GetPath().GetElement() {
key = datastore.NewKey(c, e.GetType(), e.GetName(), e.GetId(), key)
if _, err := key.GobEncode(); err != nil {
return nil, err
}
}
return key, nil
}

最も注意すべきはdatastore:",noindex"なプロパティの扱いである。EntityProto.GetPropertyで取得できるのはインデックス有りのプロパティだけであり、noindexなプロパティはEntityProto.GetRawPropertyで取得する必要がある。

    for _, p := range proto.GetProperty() {

entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
// no index field
for _, p := range proto.GetRawProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}

Callの前のinに含まれるエンティティのキーはアプリケーション側でNewKeyもしくはNewIncompleteKeyしたままのキーである。NewIncompleteKeyの場合はキーは空であるため、実際にPutされ作成されたKeyを取得するにはCallの後のoutを見る必要がある。*datastore.PutResponseはPutされたエンティティのキーだけを返すが、inと併用することで、自身のキーを持つ完全なエンティティを復元することが可能になる。

func postPut(c appengine.Context, in, out appengine_internal.ProtoMessage) error {

req, ok := in.(*pb.PutRequest)
if !ok {
return fmt.Errorf("Not PutRequest: %#v", in)
}
resp, ok := out.(*pb.PutResponse)
if !ok {
return fmt.Errorf("Not PutResponse: %#v", out)
}
for i, entityProto := range req.GetEntity() {
entity, err := protoToEntity(c, entityProto)
if err != nil {
return err
}
key, err := protoToKey(c, resp.GetKey()[i])
if err != nil {
return err
}
entity.Key = key
fmt.Printf("%+v", entity)
}
return nil
}


Datastore.Get

GetはPutと逆であり、inにはキーだけがあり、outにはGetされたエンティティが含まれている。


  • in: *pb.GetRequest

  • out: *pb.GetResponse

詳しくはコードを参考にしてほしい


Datastore.Delete

Deleteではinにキーが入るが、outはキーもエンティティも持たない。


  • in: *pb.DeleteRequest

  • out: *pb.DeleteResponse

https://github.com/laco0416/aespy/blob/master/datastore_handler.go#L189-L225


BBQ

本来の目的であるDatastore->BigQueryのリアルタイムインサートのためのライブラリを作った

bbq

名前は特に意味はなくてbqを含めて3,4文字で済ませたかっただけである。bbqはaespyに依存し、aespyを隠蔽するライブラリで、利用者がappengine_internalを意識しなくて良いようにしてある。

bbqの実装はソースを読んでもらえばいいとして、アプリケーション側で使う例が次のコードである。

func handleAddNewPerson(w http.ResponseWriter, r *http.Request) {

b := bbq.NewBBQ(&bbq.Option{Log: true})
b.AddKind("Person", "aespy", "person")
c, ch := b.Hook(r)
p := &Person{Name: r.URL.Query().Get("name")}
if err := p.Save(c); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := json.Marshal(p)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := <-ch; err != nil {
c.Debugf(err.Error())
}
w.Write(resp)
}

bbqに関わるのは最初の3行だけである

    b := bbq.NewBBQ(&bbq.Option{Log: true})

b.AddKind("Person", "bbqsample", "person")
c, ch := b.Hook(r)

b.AddKindではBigQueryへのインサート対象とするKindと、追加先のデータセットIDとテーブルIDを渡す。Kindを追加し終わったら、c, ch := b.Hook(r)でフックを掛ける。戻り値の1つ目はaespyでラップされたappengine.Context、2つ目はRPCへのハンドラの返り値を受け取るためのchannelになっている。このchannelを無視すればインサートが終了する前にクライアントとの通信を終わらせ、<-chすればインサート処理の終了を待つことが出来る。

実際の運用としてはAddKindまではアプリケーション中にstaticな場所で用意しておき、各エンドポイントで*http.requestが手に入り次第必要に応じてHookするというのがスマートな気がする。できたばかりのライブラリなので実際に使って便利なAPIになっているかどうかはまだ検討の余地がある。


問題

RPCでやり取りされるpbは完全にgolangから切り離されているので、structの情報は完全に失われている。そのためフック中で使いたいメタデータ的な情報をstructのフィールドタグに持たせておくことはできない。逆に言えば実際にDatastoreに渡されるエンティティそのものを扱えるのでBigQueryへのリアルタイムバックアップという要件でいえば必要十分であるので今のところ特に解決方法は考えていない。


謝辞

aespyとbbqはシルバーウィークもくもく会の時間内で作った。月曜の13時から火曜日の17時まで泊まり込み28時間ぶっ通しのもくもく会(実体は達成目標のないハッカソンに近い)は楽しかったが体力的に厳しかったので次は寝袋を持ち込もうと思う。また参加したい。