注意
BigQuery Streamingは時々データが消えるような現象があるようです。
このライブラリーのせいかと思ったから、新しいやつを作ってみたけど、それでもダメでした。
AWS Lambdaなどで、普通のStreamingじゃないインサートを推薦します。
本文
スキーマの定義とかは面倒ですよね。
CLIとか使わなくても動いてほしいよね。
Goはせっかく型システムがあるんだから、そっちに任せよう。
rounds/go-bqstreamer
をフォークして、テーブルの自動生成、スキーマの自動変更(現在、フィルドの追加のみ)をするようにハックしました。
そしてreflect
をバリバリ使って、エンコードしてくれるヘルパーライブラリーを作りました。
使い方
BQTabler
というinterfaceを作ります。これでテーブル名が決まります。
type BQTabler interface {
BQTable() string
}
ログをstructとして定義します。
type HogeLog struct {
UserID int `bq:"user_uuid"`
Time time.Time `bq:"time"`
HogeID int `bq:"hoge_id"`
UserAgent string `bq:"user_agent"`
}
func (HogeLog) BQTable() string {
return "hoge" + time.Now().UTC().Format("20060102") // 日別
}
BigQueryに接続します。
type bigquery struct {
projectID string
datasetID string
*bqstreamer.Streamer
}
func newBigquery(ctx context.Context, projectID, datasetID string) bigquery {
key := ...
srv, err := bqstreamer.NewBigQueryService(key)
if err != nil {
panic(err)
}
client, err := bqstreamer.NewStreamer(srv, 500, time.Second, time.Second, 10)
if err != nil {
panic(err)
}
b := bigquery{
projectID: projectID,
datasetID: datasetID,
Streamer: client,
}
b.CreateTables = true // 自動生成オン!
go b.Start()
go b.reportErrors(ctx) // clientのエラーチャンネルを見るgoroutine
go func() {
select {
case <-ctx.Done():
// contextがcancelされたらやめる
b.Stop()
return
}
}()
return b
}
そしてログを流します。
func (b bigquery) post(table string, msg interface{}) error {
row, err := bq.Encode(msg) // guregu/bq
if err != nil {
return err
}
if t, ok := msg.(BQTabler); ok {
table = t.BQTable()
}
// go-bqstreamerのQueueRowを呼ぶ
b.QueueRow(b.projectID, b.datasetID, table, row)
return nil
}
以上でBigQueryを気楽に使えます。
b.post("hoge", HogeLog{...}) // テーブル名はBQTablerによって上書きされる
注意
上記のライブラリーは開発中なので、今は使わない方がいいと思います。
将来的にgo-bqstreamer
をwrapしているようなライブラリーが作りたいですね。