16
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

弁護士ドットコムAdvent Calendar 2019

Day 9

GCPを使ってAPIで取得したデータの差分を定期的に BigQuery にインポートする

Last updated at Posted at 2019-12-09

この記事は、「弁護士ドットコム Advent Calendar 2019 - Qiita」の9日目の記事です。
最近やったタスクのGCPを使ってAPIで取得したデータの差分を定期的に BigQuery にインポートするについて書きたいと思います。

途中goのサンプルコードがありますが、あくまでサンプルなのであしからず…

前提

とある日、外部サービスの様々なオブジェクト(ユーザー情報など)のデータをBIツールでみたいという依頼がきました。BIツール上で、この外部サービスのデータが連携できる状態になっておらず、BigQueryにデータを入れる必要がありました。

外部サービスのデータは、APIで取得可能になっており、各オブジェクトを指定して取得できるもので、レコードの更新日時などで絞り込んで検索できるようなものでした。レコードは件数が多く、挿入、更新が随時行われるようなものでした。レコードにはIDと、更新日時が存在します。

独立した環境がよく、運用を楽にしたかったので、GCPのサービスを使って作りました。
BigQueryでは、各オブジェクトあたり1つのテーブルを使ったほうが検索しやすいかなと思ったのと、外部サービスのデータが大容量になったときに仕組みの作り直しが起きないように、テーブルを差分更新する方法を採用しました。

方法

インフラ図

hoge.png

解説

定期的に実行する部分

Cloud Schedulerで1日1回Cloud Pub/Sub にメッセージを送信するジョブを設定しました。
Cloud Pub/SubはCloud Functions のフックのためだけに使うので、送られるメッセージは任意のメッセージにしました。

※ 今はCloud Functionsで直接スケジューリングすることができないので、Cloud Schedulerを経由しました。

差分更新しながらBigQueryにインポートする部分

Cloud Functionsで以下一連の処理を行います。
BigQueryのテーブルは事前に作っておく必要があります(一時テーブルと、以下では既存テーブルと呼んでいる差分更新をしていくテーブル)。

外部サービスのAPIを叩き、結果をCSVにして、Cloud Storageにアップロード

前日との差分を取得するために、APIでオブジェクトを指定し、処理実行日で絞り込みをしてリクエストを行い、レスポンスからCSVを作成します。
CSVのヘッダーはBigQueryに入れ込むときに使うので、列が対応するようになんとかして作っておきます。

上記で取得したCSVを /{処理実行日}/{オブジェクト名}.csv のようなパスでCloudStorageに保存します。

以下がサンプルコード。

package hoge

import (
	"context"
	"fmt"
	"time"

	"cloud.google.com/go/storage"
)

// UploadCSVToStorage はCSVをCloudStorageにアップロードする
// `/{実行日}/{オブジェクト名}.csv`のようなパスでCloudStorageに保存される
func UploadCSVToStorage() error {
	ctx := context.Background()

	execTime := time.Now().Format("2006-01-02")
	objectName := "user"
	bucket := "example-bucket"

	client, err := storage.NewClient(ctx)
	if err != nil {
		return err
	}
	defer client.Close()

	path := fmt.Sprintf("%s/%s.csv", execTime, objectName)

	w := client.Bucket(bucket).Object(path).NewWriter(ctx)
	defer w.Close()

	// CSVにAPIから取ってきた情報を書き込む
	// c := csv.NewWriter(w)
	// defer c.Flush()
	// ...
	// c.Write(row)

	return nil
}

BigQueryに差分更新用の一時テーブルを作成する

BigQueryにはCloud StorageにあるCSVを読み込んでテーブルを作成するといった機能が存在します[参考]
BigQueryで、上記で作ったCloudStorageのファイルを /{処理実行日}/user_*.csv のような条件で読み込ませ、一時テーブルを作成します。
CSVのヘッダーをもとに、BigQueryのカラムに対応するレコードが自動的にはいるようになっています。

以下がサンプルコード。

package hoge

import (
	"context"
	"fmt"
	"time"

	"cloud.google.com/go/bigquery"
)

// UpdateTemporaryTable はCloudStorageにある各ファイルからBigQueryの差分更新用のテンポラリテーブルを更新する
func UpdateTemporaryTable() error {
	ctx := context.Background()
	pID := "example-project-id"
	bucket := "example-bucket"
	datasetID := "example-dataset-id"
	execTime := time.Now().Format("2006-01-02")
	objectName := "user"
        tmpTableName := "tmp_user"

	client, err := bigquery.NewClient(ctx, pID)
	if err != nil {
		return err
	}

	gcsPath := fmt.Sprintf("gs://%s/%s/%s_*.csv", bucket, execTime, objectName)
	gcsRef := bigquery.NewGCSReference(gcsPath)
	gcsRef.AllowJaggedRows = true
	gcsRef.AllowQuotedNewlines = true
	gcsRef.FieldDelimiter = "\t"
	gcsRef.SourceFormat = bigquery.CSV

	loader := client.Dataset(datasetID).Table(tmpTableName).LoaderFrom(gcsRef)
	loader.WriteDisposition = bigquery.WriteTruncate

	job, err := loader.Run(ctx)
	if err != nil {
		return err
	}

	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if err := status.Err(); err != nil {
		return err
	}

	return nil
}

一時テーブルを使って、既存のテーブルの差分更新を行う

BigQueryにはクエリの結果をテーブルに書き込むといった機能があります[参考]。これをつかって既存のテーブルをクエリの結果で更新することもできます。1

クエリの内容としては、既存のテーブルと差分更新用のテーブルをマージし、レコードのIDごとで、更新日時降順に並べて、一番新しいレコードで絞り込むといったものになります。
これらを使って差分更新を実現しています。

以下がサンプルコードとなります。サンプルとして、テーブルのスキーマには、idと更新日時(updated_at) があるものとします。

package hoge

import (
	"context"
	"fmt"

	"cloud.google.com/go/bigquery"
)

// 一時テーブルと既存のテーブルをマージしてセレクトするクエリ
// このクエリの結果を使って既存のテーブルを更新する
// 各テーブル部分が変数になっている
// ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) AS rowNumber でidごとでupdated_at降順に並べて行番号を振っている
// その後 WHERE rowNumber = 1 をして、updated_atが一番新しいもののみを取得するようにしている
const mergeQuery = `
	SELECT
		  * EXCEPT(rowNumber)
		FROM (
		  SELECT
		    *,
		    ROW_NUMBER() OVER (PARTITION BY id ORDER BY updated_at DESC) AS rowNumber
		  FROM (
		    SELECT
		      *
		    FROM
		      %s
		    UNION ALL
		    SELECT
		      *
		    FROM
			  %s
		  )
		)  WHERE rowNumber = 1;
`

// UpdateMainTable は一時テーブルを使って既存のテーブルを更新します
func UpdateMainTable(ctx context.Context) error {
	pID := "example-project-id"
	datasetID := "example-dataset-id"
	objectName := "user"
	mainTable := "example-data-set-id.user"
	tmpTable := "example-data-set-id.tmp_user"

	client, err := bigquery.NewClient(ctx, pID)
	if err != nil {
		return err
	}

	q := client.Query(fmt.Sprintf(mergeQuery, mainTable, tmpTable))
	q.QueryConfig.Dst = client.Dataset(datasetID).Table(objectName)
	q.WriteDisposition = bigquery.WriteTruncate

	job, err := q.Run(ctx)
	if err != nil {
		return err
	}

	status, err := job.Wait(ctx)
	if err != nil {
		return err
	}

	if err := status.Err(); err != nil {
		return err
	}

	return nil
}

これで完了!

さいごに

https://tech.mercari.com/entry/2018/06/28/100000
BigQuery差分更新の方法に関して、こちらの記事を大変参考にさせていただきました。ありがとうございました。

  1. クエリ結果の書き込み時のオプションでWriteTruncateてのがあるので、実際は空っぽにして書き込みしてる?

16
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?