@laco0416です。GAE/GoのRPCに介入して通信内容を使ってゴニョゴニョするためのライブラリを作ったのでその仕組みとかGAE/GoのRPC覗く上で気をつけなきゃいけないところなどを書きます。
背景
GAE/Goに限らず、AppEngineとその他のGCPのサービス(Datastore, Memcache, UrlFetchなどなど)はRPCによって通信している。RPCはProtocol Buffer(以後pbと略記)という形式でシリアライズされたオブジェクトを用いており、これは簡単にデシリアライズ可能である。しかしGAE/JにはApiProxyとDelegateという仕組みが最初から用意されており、比較的容易にRPCに介入できるが、GAE/GoにはまだAPIが用意されていない。
ところでGAE/GoのDatastoreにはContextを取得できるイベントハンドラが存在しない。PropertyLoadSaverを実装してSave
とLoad
に介入出来るが、このフックではAppEngineのContextが取得できないため出来る処理が大きく限られるのが問題である。例えばDatastoreにエンティティを保存するときに自動でMemcacheにも流し込むのはコールバック内では行えないため、通常はアプリケーション側で手続き的に行う他ない。
Datastoreの操作を自動でMemcacheの操作に置き換えるgoonというライブラリはこの手続き的な処理を内部でやってくれる便利なやつで、今ではgoon無しで全部自力で書くのは苦痛でしかない。しかしやってくれるのは当然Memcacheへの自動キャッシュだけで、それ以外のことをやるには自力で書くしかない。
今回aespyを作ったモチベーションは、「DatastoreにPutされるエンティティを自動でBigQueryにStream Insertする」ことである。goonのようにライブラリを通じてDatastoreを操作するのではなく、RPCにフックするようにしたのはgoonと併用したいというのもあるし、極力既存のアプリケーションのコードを変えずに導入したいからだ。
aespyの仕組み
かつてGAE/JにはMemvacheというライブラリがあった。これはGAE/JのApiProxyとDelegateを使ってDatastoreへのRPCを乗っ取り、自動でMemcacheの操作に置き換えるライブラリである。今回BigQueryに流し込むライブラリを作るにあたって、まずはGAE/GoにおけるApiProxyとDelegateを作る必要があった。
GAE/GoのRPC
GAE/Goではappengine.Context
が全てである。Context
が取得できなければほとんど何もできないし、逆にいえばContext
さえあれば何でも出来る。なぜならRPCを実際に行うためにContext.Call()
が呼ばれているからである。
appengine.Context
はinterfaceであるため、実装を満たしてしまえばappengine.Context
をラップした独自のaespy.Context
を作ることが出来る。何もしないただのラッパーを作るには次のようなコードを書けば良い。参考になったのは同じくContextをラップしてRPCを乗っ取るaetestの実装である。appengine_internal
をライブラリで使って大丈夫なのかと不安だったが、実際デプロイして使っても全く問題なかったので大丈夫だろう。
package aespy
import (
"net/http"
"appengine"
"appengine_internal"
)
// Context はappengine.Contextを継承したinterfaceである
type Context interface {
appengine.Context
}
// NewContext はappengine.ContextをラップしたContextを返す
func NewContext(req *http.Request) Context {
c := &context{Context: appengine.NewContext(req)}
return c
}
// FromContext はすでにあるappengine.Contextをラップする
func FromContext(ctx appengine.Context) Context {
c := &context{Context: ctx}
return c
}
// aespy.Contextの実装をする構造体
type context struct {
appengine.Context
}
func (c *context) Request() interface{} { return c.Context.Request() }
func (c *context) FullyQualifiedAppID() string { return c.Context.FullyQualifiedAppID() }
func (c *context) Debugf(format string, args ...interface{}) { c.Context.Debugf(format, args...) }
func (c *context) Infof(format string, args ...interface{}) { c.Context.Infof(format, args...) }
func (c *context) Warningf(format string, args ...interface{}) { c.Context.Warningf(format, args...) }
func (c *context) Errorf(format string, args ...interface{}) { c.Context.Errorf(format, args...) }
func (c *context) Criticalf(format string, args ...interface{}) { c.Context.Criticalf(format, args...) }
func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
return c.Context.Call(service, method, in, out, opts)
}
もうおわかりだと思うが、
func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
return c.Context.Call(service, method, in, out, opts)
}
がRPCの呼び出しである。ここにフック用の処理を追加すればaespyが完成する。
実際にRPCをフックしている仕組みはaespyのソースを読んで欲しい。ドキュメントはないがそんなに大きなライブラリじゃないので15分あれば全部理解できると思う
Context.Callの解説
Context.Call
の仕組みについて解説をしておく。RPCと聞くと私は無意識に非同期処理なのかと思っていた(特に根拠はない)が、このCall
メソッドは内部的にはどうあれ、挙動としては同期処理である。
それぞれの仮引数について簡単に説明すると、
service: string
RPCの宛先。DatastoreへのRPCであればdatastore_v3
等が入る。
method: string
RPCで呼び出す関数。DatastoreのPutならPut
と入っている。
in: appengine_internal.ProtoMessage
RPCで送るリクエスト内容。pbの一番基底のインタフェースとして渡されるので、serviceとmethodに応じてtype assertionする。
重要なのは、in
はCallの前後で変化しないということだ。
out: appengine_internal.ProtoMessage
RPCで返ってきたレスポンス内容。pbの一番基底のインタフェースとして渡されるので、serviceとmethodに応じてtype assertionする。
重要なのは、out
は参照であり、Callするまでは空だが、Call後に値が入るということだ。
opts: *appengine_internal.CallOptions
RPCの設定。型でわかるように構造体である。タイムアウトの設定などを持っているが、フックして使うことは殆どない。
RPCをフックしてゴニョゴニョする上で肝なのがin
とout
なのは明白である。RPCの前後のフックをそれぞれpreCall
とpostCall
とするなら、実装は最低限次のようにしなくてはならない。
func (c *context) Call(service, method string, in, out appengine_internal.ProtoMessage, opts *appengine_internal.CallOptions) error {
c.preCall(service, method, in) //この段階ではinにしか意味がない
ret := c.Context.Call(service, method, in, out, opts) //本来のRPC
c.postCall(service, method, in, out) //ここでinとout両方に意味がある
return ret //本来のRPCの結果を返す
}
DatastoreへのRPC
ここまでであらゆるRPCを覗き見ることができるようになったので、本来の目的であるDatastoreへのRPCに対するアプローチを行う。
Datastore.Put
Datastore.Putの際にContext.Call
に渡される引数の中身は次の通りである
- service:
"datastore_v3"
- method:
"Put"
- in:
*datastore.PutRequest
- out:
*datastore.PutResponse
datastore.PutRequest
のdatastore
は"appengine/datastore"
パッケージではなく、"appengine_internal/datastore"
パッケージであるのに注意。
PutRequest
は「Putされるエンティティ」を内包するpbである。PutされたエンティティはPurRequest.GetEntity
で取得できる。注意する点は、GetEntity
はEntityProto型の スライス を返すということである。DatastoreはPutでもGetでもDeleteでも内部的には**Multiを呼んでおり、常に複数で扱っているのが理由である。
func prePut(c appengine.Context, in appengine_internal.ProtoMessage) error {
req, ok := in.(*pb.PutRequest)
if !ok {
return fmt.Errorf("Not PutRequest: %#v", in)
}
for _, proto := range req.GetEntity() {
entity, err := protoToEntity(c, proto)
if err != nil {
return err
}
fmt.Printf("%+v", entity)
}
return nil
}
EntityProto型はDatastoreとやり取りするエンティティ1件を表す構造体である。エンティティは自身のキーとプロパティの情報を持っている。EntityProto型を扱いやすいよう独自のEntity型に変換するprotoToEntity
を次のように実装した。
import (
"appengine"
"appengine/datastore"
pb "appengine_internal/datastore"
)
type Entity struct {
Key *datastore.Key
Kind string
Properties map[string]interface{}
}
func unwrapPropertyValue(pv *pb.PropertyValue) interface{} {
var value interface{}
switch {
case pv.BooleanValue != nil:
value = pv.GetBooleanValue()
case pv.Int64Value != nil:
value = pv.GetInt64Value()
case pv.DoubleValue != nil:
value = pv.GetDoubleValue()
case pv.StringValue != nil:
value = pv.GetStringValue()
case pv.Pointvalue != nil:
value = pv.GetPointvalue()
case pv.Referencevalue != nil:
value = pv.GetReferencevalue()
case pv.Uservalue != nil:
value = pv.GetUservalue()
default:
value = pv.String()
}
return value
}
func protoToEntity(c appengine.Context, proto *pb.EntityProto) (*Entity, error) {
entity := new(Entity)
key, err := protoToKey(c, proto.GetKey())
if err != nil {
return nil, err
}
entity.Key = key
entity.Kind = key.Kind()
entity.Properties = make(map[string]interface{})
for _, p := range proto.GetProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
// no index field
for _, p := range proto.GetRawProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
return entity, nil
}
func protoToKey(c appengine.Context, proto *pb.Reference) (*datastore.Key, error) {
var key *datastore.Key
for _, e := range proto.GetPath().GetElement() {
key = datastore.NewKey(c, e.GetType(), e.GetName(), e.GetId(), key)
if _, err := key.GobEncode(); err != nil {
return nil, err
}
}
return key, nil
}
最も注意すべきはdatastore:",noindex"
なプロパティの扱いである。EntityProto.GetProperty
で取得できるのはインデックス有りのプロパティだけであり、noindexなプロパティはEntityProto.GetRawProperty
で取得する必要がある。
for _, p := range proto.GetProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
// no index field
for _, p := range proto.GetRawProperty() {
entity.Properties[p.GetName()] = unwrapPropertyValue(p.GetValue())
}
Call
の前のin
に含まれるエンティティのキーはアプリケーション側でNewKey
もしくはNewIncompleteKey
したままのキーである。NewIncompleteKey
の場合はキーは空であるため、実際にPutされ作成されたKeyを取得するにはCall
の後のout
を見る必要がある。*datastore.PutResponse
はPutされたエンティティのキーだけを返すが、in
と併用することで、自身のキーを持つ完全なエンティティを復元することが可能になる。
func postPut(c appengine.Context, in, out appengine_internal.ProtoMessage) error {
req, ok := in.(*pb.PutRequest)
if !ok {
return fmt.Errorf("Not PutRequest: %#v", in)
}
resp, ok := out.(*pb.PutResponse)
if !ok {
return fmt.Errorf("Not PutResponse: %#v", out)
}
for i, entityProto := range req.GetEntity() {
entity, err := protoToEntity(c, entityProto)
if err != nil {
return err
}
key, err := protoToKey(c, resp.GetKey()[i])
if err != nil {
return err
}
entity.Key = key
fmt.Printf("%+v", entity)
}
return nil
}
Datastore.Get
GetはPutと逆であり、in
にはキーだけがあり、out
にはGetされたエンティティが含まれている。
- in:
*pb.GetRequest
- out:
*pb.GetResponse
詳しくはコードを参考にしてほしい
Datastore.Delete
Deleteではinにキーが入るが、outはキーもエンティティも持たない。
- in:
*pb.DeleteRequest
- out:
*pb.DeleteResponse
BBQ
本来の目的であるDatastore->BigQueryのリアルタイムインサートのためのライブラリを作った
名前は特に意味はなくてbqを含めて3,4文字で済ませたかっただけである。bbqはaespyに依存し、aespyを隠蔽するライブラリで、利用者がappengine_internal
を意識しなくて良いようにしてある。
bbqの実装はソースを読んでもらえばいいとして、アプリケーション側で使う例が次のコードである。
func handleAddNewPerson(w http.ResponseWriter, r *http.Request) {
b := bbq.NewBBQ(&bbq.Option{Log: true})
b.AddKind("Person", "aespy", "person")
c, ch := b.Hook(r)
p := &Person{Name: r.URL.Query().Get("name")}
if err := p.Save(c); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp, err := json.Marshal(p)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := <-ch; err != nil {
c.Debugf(err.Error())
}
w.Write(resp)
}
bbqに関わるのは最初の3行だけである
b := bbq.NewBBQ(&bbq.Option{Log: true})
b.AddKind("Person", "bbqsample", "person")
c, ch := b.Hook(r)
b.AddKind
ではBigQueryへのインサート対象とするKindと、追加先のデータセットIDとテーブルIDを渡す。Kindを追加し終わったら、c, ch := b.Hook(r)
でフックを掛ける。戻り値の1つ目はaespyでラップされたappengine.Context
、2つ目はRPCへのハンドラの返り値を受け取るためのchannelになっている。このchannelを無視すればインサートが終了する前にクライアントとの通信を終わらせ、<-ch
すればインサート処理の終了を待つことが出来る。
実際の運用としてはAddKind
まではアプリケーション中にstaticな場所で用意しておき、各エンドポイントで*http.request
が手に入り次第必要に応じてHookするというのがスマートな気がする。できたばかりのライブラリなので実際に使って便利なAPIになっているかどうかはまだ検討の余地がある。
問題
RPCでやり取りされるpbは完全にgolangから切り離されているので、structの情報は完全に失われている。そのためフック中で使いたいメタデータ的な情報をstructのフィールドタグに持たせておくことはできない。逆に言えば実際にDatastoreに渡されるエンティティそのものを扱えるのでBigQueryへのリアルタイムバックアップという要件でいえば必要十分であるので今のところ特に解決方法は考えていない。
謝辞
aespyとbbqはシルバーウィークもくもく会の時間内で作った。月曜の13時から火曜日の17時まで泊まり込み28時間ぶっ通しのもくもく会(実体は達成目標のないハッカソンに近い)は楽しかったが体力的に厳しかったので次は寝袋を持ち込もうと思う。また参加したい。