7
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Go: 努力せずにBigQuery Streaming使いたい

Last updated at Posted at 2015-10-22

注意

BigQuery Streamingは時々データが消えるような現象があるようです。
このライブラリーのせいかと思ったから、新しいやつを作ってみたけど、それでもダメでした。
AWS Lambdaなどで、普通のStreamingじゃないインサートを推薦します。

本文

スキーマの定義とかは面倒ですよね。
CLIとか使わなくても動いてほしいよね。
Goはせっかく型システムがあるんだから、そっちに任せよう。

rounds/go-bqstreamerフォークして、テーブルの自動生成、スキーマの自動変更(現在、フィルドの追加のみ)をするようにハックしました。

そしてreflectをバリバリ使って、エンコードしてくれるヘルパーライブラリーを作りました。

使い方

BQTablerというinterfaceを作ります。これでテーブル名が決まります。

type BQTabler interface {
	BQTable() string
}

ログをstructとして定義します。

type HogeLog struct {
	UserID     int             `bq:"user_uuid"`
	Time       time.Time       `bq:"time"`
	HogeID     int             `bq:"hoge_id"`
	UserAgent  string          `bq:"user_agent"`
}

func (HogeLog) BQTable() string {
	return "hoge" + time.Now().UTC().Format("20060102") // 日別
}

BigQueryに接続します。


type bigquery struct {
	projectID string
	datasetID string

	*bqstreamer.Streamer
}

func newBigquery(ctx context.Context, projectID, datasetID string) bigquery {
	key := ...
	srv, err := bqstreamer.NewBigQueryService(key)
	if err != nil {
		panic(err)
	}
	client, err := bqstreamer.NewStreamer(srv, 500, time.Second, time.Second, 10)
	if err != nil {
		panic(err)
	}

	b := bigquery{
		projectID: projectID,
		datasetID: datasetID,
		Streamer:  client,
	}
	b.CreateTables = true // 自動生成オン!

	go b.Start()
	go b.reportErrors(ctx) // clientのエラーチャンネルを見るgoroutine
	go func() {
		select {
		case <-ctx.Done():
			// contextがcancelされたらやめる
			b.Stop()
			return
		}
	}()

	return b
}

そしてログを流します。

func (b bigquery) post(table string, msg interface{}) error {
	row, err := bq.Encode(msg) // guregu/bq
	if err != nil {
		return err
	}

	if t, ok := msg.(BQTabler); ok {
		table = t.BQTable()
	}

	// go-bqstreamerのQueueRowを呼ぶ
	b.QueueRow(b.projectID, b.datasetID, table, row)
	return nil
}

以上でBigQueryを気楽に使えます。

 b.post("hoge", HogeLog{...}) // テーブル名はBQTablerによって上書きされる

注意

上記のライブラリーは開発中なので、今は使わない方がいいと思います。
将来的にgo-bqstreamerをwrapしているようなライブラリーが作りたいですね。

7
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?