2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

grpcurl と BigQuery Storage Write API を使ってデータを書き込む

Last updated at Posted at 2023-08-22

BigQuery Storage Write API

gRPC でデータを書き込むことができます。

2021 年に一般提供が開始された Google BigQuery Write API は、1 つの統合型 API で高性能なバッチ処理とストリーミングを実現する、BigQuery の推奨データ取り込みパスです。
image.png

Write API は、プロトコル バッファ形式のバイナリデータの使用を想定しています。そのため、この API は高スループットのストリーミングに非常に有効です。しかし、プロトコル バッファを扱うのは若干難しい場合もあります。

効率的なプロトコル。Storage Write API は、HTTP over REST ではなく gRPC ストリーミングを使用します。このため、以前の insertAll メソッドよりも効率的です。Storage Write API は、プロトコル バッファを使用してバイナリ形式をサポートします。これは JSON よりも効率的な転送方式です。書き込みリクエストは非同期ですが、順序指定が保証されます
...
費用低減。Storage Write API は、以前の insertAll ストリーミング API よりも大幅に低コストです。また、1 か月あたり最大 2 TB を無料で取り込めます。

API 認証情報

Storage Write API を使用するには、bigquery.tables.updateData 権限が必要です。
bigquery.tables.updateData 権限は、以下の Identity and Access Management(IAM)事前定義ロールに含まれています。

  • bigquery.dataEditor
  • bigquery.dataOwner
  • bigquery.admin

以下のような JSON ファイルで取得できます。

key.json
{
  "type": "service_account",
  "project_id": "xxx",
  "private_key_id": "xxx",
  "private_key": "-----BEGIN PRIVATE KEY-----xxx-----END PRIVATE KEY-----\n",
  "client_email": "${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com",
  "client_id": "xxx",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/${SERVICE_ACCOUNT_NAME}%40${PROJECT_ID}.iam.gserviceaccount.com",
  "universe_domain": "googleapis.com"
}

新規作成

以下のコマンドで作成できます。

export SERVICE_ACCOUNT_NAME=YOUR_SERVICE_ACCOUNT_NAME
export PROJECT_ID=YOUR_PROJECT_ID

gcloud iam service-accounts create ${SERVICE_ACCOUNT_NAME} \
--display-name ${SERVICE_ACCOUNT_NAME}
gcloud projects add-iam-policy-binding ${PROJECT_ID} \
--member serviceAccount:${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com \
--role roles/bigquery.dataEditor

gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format='table(bindings.role)' \
--filter="bindings.members:${SERVICE_ACCOUNT_NAME}"

gcloud iam service-accounts keys create key.json \
--iam-account=${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com 

既存利用

すでにあるサービスアカウントから探すには以下のコマンドが使えます。

export SERVICE_ACCOUNT_NAME=YOUR_SERVICE_ACCOUNT_NAME
export PROJECT_ID=YOUR_PROJECT_ID

gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format="table(bindings.members)" \
--filter="bindings.role:bigquery.dataEditor"
gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format="table(bindings.members)" \
--filter="bindings.role:bigquery.dataOwner"
gcloud projects get-iam-policy ${PROJECT_ID} \
--flatten="bindings[].members" \
--format="table(bindings.members)" \
--filter="bindings.role:bigquery.admin"

gcloud iam service-accounts keys create key.json \
--iam-account=${SERVICE_ACCOUNT_NAME}@${PROJECT_ID}.iam.gserviceaccount.com 

データセット・テーブル・スキーマ作成

サンプルテーブルを以下のように作成します。

bq mk kyouhei

bq query --nouse_legacy_sql '
CREATE TABLE
  kyouhei.sample_table
(
  id STRING NOT NULL,
  user_uuid STRING NOT NULL,
  action_type INT64 NOT NULL,
)
'

grpcurl でテータ書き込み

grpcurl を使ってデータを書き込みます。

brew install grpcurl

row.json として書き込みたい JSON データを準備します。

row.json
{"id":"testId","user_uuid":"testUuid","action_type":1}

sample.protoproto-convert を使って row.bin バイナリに変換します。

sample.proto
syntax = "proto3";

enum ActionType {
  ACTION_UNSPECIFIED = 0;
  ACTION_CREATE = 1;
  ACTION_UPDATE = 2;
  ACTION_DELETE = 3;
}

message SampleTable {
  string id = 1;
  string user_uuid = 2;
  ActionType action_type = 3;
}
proto-convert -m j2b -p sample.proto -t SampleTable -i row.json -o row.bin

row.bin バイナリから row.txt base64 エンコードテキストに変換します。

base64 -i row.bin -o row.txt
# cat row.txt --> CgZ0ZXN0SWQSCHRlc3RVdWlkGAE=

serializedRowsrow.txt の内容を入れ、test.json を作成します。
デフォルトストリームに書き込む形をとります。

test.json
{
  "writeStream": "projects/${PROJECT_ID}/datasets/kyouhei/tables/sample_table/_default",
  "protoRows": {
    "writerSchema": {
      "protoDescriptor": {
        "name": "sampleTable",
        "field": [
          {
            "name": "id",
            "number": 1,
            "type": 9
          },
          {
            "name": "user_uuid",
            "number": 2,
            "type": 9
          },
          {
            "name": "action_type",
            "number": 3,
            "type": 3
          }
        ]
      }
    },
    "rows": {
      "serializedRows": [
        "CgZ0ZXN0SWQSCHRlc3RVdWlkGAE="
      ]
    }
  }
}

空の serializedRows を作って、以下のコマンドを使って挿入してもいいです。

jq '.protoRows.rows.serializedRows |= .+ ["'$(cat row.txt)'"]' test.json > tmp && mv tmp test.json

storage.proto ファイルが必要なので、ダウンロードしておきます。

git clone https://github.com/googleapis/googleapis.git

test.json と認証トークンを使って grpcurl でデータを書き込むことがます。

export GOOGLE_APPLICATION_CREDENTIALS=key.json
TOKEN=$(gcloud auth application-default print-access-token)
echo $TOKEN

grpcurl -vv \
-import-path ~/googleapis \
-proto google/cloud/bigquery/storage/v1/storage.proto \
-H "Authorization: Bearer ${TOKEN}" \
-d @ \
bigquerystorage.googleapis.com:443 \
google.cloud.bigquery.storage.v1.BigQueryWrite.AppendRows < test.json
result
Resolved method descriptor:
// Appends data to the given stream.
//
// If `offset` is specified, the `offset` is checked against the end of
// stream. The server returns `OUT_OF_RANGE` in `AppendRowsResponse` if an
// attempt is made to append to an offset beyond the current end of the stream
// or `ALREADY_EXISTS` if user provides an `offset` that has already been
// written to. User can retry with adjusted offset within the same RPC
// connection. If `offset` is not specified, append happens at the end of the
// stream.
//
// The response contains an optional offset at which the append
// happened.  No offset information will be returned for appends to a
// default stream.
//
// Responses are received in the same order in which requests are sent.
// There will be one response for each successful inserted request.  Responses
// may optionally embed error information if the originating AppendRequest was
// not successfully processed.
//
// The specifics of when successfully appended data is made visible to the
// table are governed by the type of stream:
//
// * For COMMITTED streams (which includes the default stream), data is
// visible immediately upon successful append.
//
// * For BUFFERED streams, data is made visible via a subsequent `FlushRows`
// rpc which advances a cursor to a newer offset in the stream.
//
// * For PENDING streams, data is not made visible until the stream itself is
// finalized (via the `FinalizeWriteStream` rpc), and the stream is explicitly
// committed via the `BatchCommitWriteStreams` rpc.
rpc AppendRows ( stream .google.cloud.bigquery.storage.v1.AppendRowsRequest ) returns ( stream .google.cloud.bigquery.storage.v1.AppendRowsResponse ) {
  option (.google.api.http) = {
    post: "/v1/{write_stream=projects/*/datasets/*/tables/*/streams/*}"
    body: "*"
  };
  option (.google.api.method_signature) = "write_stream";
}

Request metadata to send:
authorization: Bearer xxx

Response headers received:
alt-svc: h3=":443"; ma=2592000,h3-29=":443"; ma=2592000
content-disposition: attachment
content-type: application/grpc
date: Tue, 22 Aug 2023 08:10:36 GMT

Estimated response size: 74 bytes

Response contents:
{
  "appendResult": {
    
  },
  "writeStream": "projects/${PROJECT_ID}/datasets/kyouhei/tables/sample_table/_default"
}

Response trailers received:
content-disposition: attachment
Sent 1 request and received 1 response

以下のクエリでデータが書き込まれたことが確認できます。

bq query --nouse_legacy_sql '
SELECT * FROM kyouhei.sample_table
'
+--------+-----------+-------------+
|   id   | user_uuid | action_type |
+--------+-----------+-------------+
| testId | testUuid  |           1 |
+--------+-----------+-------------+

参考

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?