BigQuery Storage Write API
gRPC でデータを書き込むことができます。
2021 年に一般提供が開始された Google BigQuery Write API は、1 つの統合型 API で高性能なバッチ処理とストリーミングを実現する、BigQuery の推奨データ取り込みパスです。
Write API は、プロトコル バッファ形式のバイナリデータの使用を想定しています。そのため、この API は高スループットのストリーミングに非常に有効です。しかし、プロトコル バッファを扱うのは若干難しい場合もあります。
効率的なプロトコル。Storage Write API は、HTTP over REST ではなく gRPC ストリーミングを使用します。このため、以前の insertAll メソッドよりも効率的です。Storage Write API は、プロトコル バッファを使用してバイナリ形式をサポートします。これは JSON よりも効率的な転送方式です。書き込みリクエストは非同期ですが、順序指定が保証されます
...
費用低減。Storage Write API は、以前の insertAll ストリーミング API よりも大幅に低コストです。また、1 か月あたり最大 2 TB を無料で取り込めます。
API 認証情報
Storage Write API を使用するには、bigquery.tables.updateData 権限が必要です。
bigquery.tables.updateData 権限は、以下の Identity and Access Management(IAM)事前定義ロールに含まれています。
- bigquery.dataEditor
- bigquery.dataOwner
- bigquery.admin
以下のような JSON ファイルで取得できます。
{
"type": "service_account",
"project_id": "xxx",
"private_key_id": "xxx",
"private_key": "-----BEGIN PRIVATE KEY-----xxx-----END PRIVATE KEY-----\n",
"client_email": "${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com",
"client_id": "xxx",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/${SERVICE_ACCOUNT_NAME}%40${PROJECT_ID}.iam.gserviceaccount.com",
"universe_domain": "googleapis.com"
}
新規作成
以下のコマンドで作成できます。
export SERVICE_ACCOUNT_NAME=YOUR_SERVICE_ACCOUNT_NAME
export PROJECT_ID=YOUR_PROJECT_ID
gcloud iam service-accounts create ${SERVICE_ACCOUNT_NAME} \
--display-name ${SERVICE_ACCOUNT_NAME}
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
--role roles/bigquery.dataEditor
gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format='table(bindings.role)' \
--filter="bindings.members:${SERVICE_ACCOUNT_NAME}"
gcloud iam service-accounts keys create key.json \
--iam-account=${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com
既存利用
すでにあるサービスアカウントから探すには以下のコマンドが使えます。
export SERVICE_ACCOUNT_NAME=YOUR_SERVICE_ACCOUNT_NAME
export PROJECT_ID=YOUR_PROJECT_ID
gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format="table(bindings.members)" \
--filter="bindings.role:bigquery.dataEditor"
gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format="table(bindings.members)" \
--filter="bindings.role:bigquery.dataOwner"
gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format="table(bindings.members)" \
--filter="bindings.role:bigquery.admin"
gcloud iam service-accounts keys create key.json \
--iam-account=${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com
データセット・テーブル・スキーマ作成
サンプルテーブルを以下のように作成します。
bq mk kyouhei
bq query --nouse_legacy_sql '
CREATE TABLE
kyouhei.sample_table
(
id STRING NOT NULL,
user_uuid STRING NOT NULL,
action_type INT64 NOT NULL,
)
'
grpcurl でテータ書き込み
grpcurl
を使ってデータを書き込みます。
brew install grpcurl
row.json
として書き込みたい JSON データを準備します。
{"id":"testId","user_uuid":"testUuid","action_type":1}
sample.proto
と proto-convert
を使って row.bin
バイナリに変換します。
syntax = "proto3";
enum ActionType {
ACTION_UNSPECIFIED = 0;
ACTION_CREATE = 1;
ACTION_UPDATE = 2;
ACTION_DELETE = 3;
}
message SampleTable {
string id = 1;
string user_uuid = 2;
ActionType action_type = 3;
}
proto-convert -m j2b -p sample.proto -t SampleTable -i row.json -o row.bin
row.bin
バイナリから row.txt
base64 エンコードテキストに変換します。
base64 -i row.bin -o row.txt
# cat row.txt --> CgZ0ZXN0SWQSCHRlc3RVdWlkGAE=
serializedRows
に row.txt
の内容を入れ、test.json
を作成します。
デフォルトストリームに書き込む形をとります。
{
"writeStream": "projects/${PROJECT_ID}/datasets/kyouhei/tables/sample_table/_default",
"protoRows": {
"writerSchema": {
"protoDescriptor": {
"name": "sampleTable",
"field": [
{
"name": "id",
"number": 1,
"type": 9
},
{
"name": "user_uuid",
"number": 2,
"type": 9
},
{
"name": "action_type",
"number": 3,
"type": 3
}
]
}
},
"rows": {
"serializedRows": [
"CgZ0ZXN0SWQSCHRlc3RVdWlkGAE="
]
}
}
}
空の serializedRows
を作って、以下のコマンドを使って挿入してもいいです。
jq '.protoRows.rows.serializedRows |= .+ ["'$(cat row.txt)'"]' test.json > tmp && mv tmp test.json
storage.proto
ファイルが必要なので、ダウンロードしておきます。
git clone https://github.com/googleapis/googleapis.git
test.json
と認証トークンを使って grpcurl
でデータを書き込むことがます。
export GOOGLE_APPLICATION_CREDENTIALS=key.json
TOKEN=$(gcloud auth application-default print-access-token)
echo $TOKEN
grpcurl -vv \
-import-path ~/googleapis \
-proto google/cloud/bigquery/storage/v1/storage.proto \
-H "Authorization: Bearer ${TOKEN}" \
-d @ \
bigquerystorage.googleapis.com:443 \
google.cloud.bigquery.storage.v1.BigQueryWrite.AppendRows < test.json
Resolved method descriptor:
// Appends data to the given stream.
//
// If `offset` is specified, the `offset` is checked against the end of
// stream. The server returns `OUT_OF_RANGE` in `AppendRowsResponse` if an
// attempt is made to append to an offset beyond the current end of the stream
// or `ALREADY_EXISTS` if user provides an `offset` that has already been
// written to. User can retry with adjusted offset within the same RPC
// connection. If `offset` is not specified, append happens at the end of the
// stream.
//
// The response contains an optional offset at which the append
// happened. No offset information will be returned for appends to a
// default stream.
//
// Responses are received in the same order in which requests are sent.
// There will be one response for each successful inserted request. Responses
// may optionally embed error information if the originating AppendRequest was
// not successfully processed.
//
// The specifics of when successfully appended data is made visible to the
// table are governed by the type of stream:
//
// * For COMMITTED streams (which includes the default stream), data is
// visible immediately upon successful append.
//
// * For BUFFERED streams, data is made visible via a subsequent `FlushRows`
// rpc which advances a cursor to a newer offset in the stream.
//
// * For PENDING streams, data is not made visible until the stream itself is
// finalized (via the `FinalizeWriteStream` rpc), and the stream is explicitly
// committed via the `BatchCommitWriteStreams` rpc.
rpc AppendRows ( stream .google.cloud.bigquery.storage.v1.AppendRowsRequest ) returns ( stream .google.cloud.bigquery.storage.v1.AppendRowsResponse ) {
option (.google.api.http) = {
post: "/v1/{write_stream=projects/*/datasets/*/tables/*/streams/*}"
body: "*"
};
option (.google.api.method_signature) = "write_stream";
}
Request metadata to send:
authorization: Bearer xxx
Response headers received:
alt-svc: h3=":443"; ma=2592000,h3-29=":443"; ma=2592000
content-disposition: attachment
content-type: application/grpc
date: Tue, 22 Aug 2023 08:10:36 GMT
Estimated response size: 74 bytes
Response contents:
{
"appendResult": {
},
"writeStream": "projects/${PROJECT_ID}/datasets/kyouhei/tables/sample_table/_default"
}
Response trailers received:
content-disposition: attachment
Sent 1 request and received 1 response
以下のクエリでデータが書き込まれたことが確認できます。
bq query --nouse_legacy_sql '
SELECT * FROM kyouhei.sample_table
'
+--------+-----------+-------------+
| id | user_uuid | action_type |
+--------+-----------+-------------+
| testId | testUuid | 1 |
+--------+-----------+-------------+