背景
-
BigQuery Storage Write API でデフォルトストリームを使いたかった(使用言語: python)
-
が、公式ドキュメントを読んでもデフォルトストリームを使用する場合の具体的な実装方法がわからなかった
- 日本語版公式ドキュメントにはJavaのサンプルコードしかない。
- Githubのpythonのサンプルコードも、pendingタイプのストリームを使う場合の実装方法しかサンプルがなかった
ストリームについて
Storage Write APIには「ストリーム」という概念があり、これによって、BigQueryテーブルにデータが書き込まれます。
大きく分けると、以下の種別があります。
詳しくは公式ドキュメントを参照。
https://cloud.google.com/bigquery/docs/write-api?hl=ja#overview
デフォルトストリーム
基本的にこちらが推奨されている。
特徴としては以下のとおり(公式docsより)。
- デフォルト ストリームに書き込まれたデータは、すぐにクエリで使用できます。
- デフォルト ストリームは、at-least-once(少なくとも 1 回)セマンティクスをサポートしています。
- デフォルト ストリームを明示的に作成する必要はありません。
アプリケーションで作成したストリーム
アプリケーション側で明示的にストリームを作成する。
以下の種類がありますが、ここでは割愛します。詳しくは公式ドキュメントを参照してください。
- 保留タイプ (pending type)
- コミットタイプ (commited type)
- バッファタイプ (buffered type)
結論
そもそも、デフォルトストリームは「ストリームを明示的に作成しなくてよい」ということを理解できていませんでした。
ストリーム名をデフォルトのストリーム名にすることがポイントです。
実装方法
- stream名を
projects/${projectId}/datasets/${datasetId}/tables/${tableId}/streams/_default
にする -
AppendRows
のみを呼び出す-
CreateWriteStream
は呼ばない
-
※ 実は、AppendRows
だけでよいことは公式ドキュメントの「APIフロー」に書かれている(推測難しい…)。
公式のpendingタイプのサンプルコードをdefaultにアレンジするとこのようになる。
def append_rows_default(project_id: str, dataset_id: str, table_id: str):
write_client = bigquery_storage_v1.BigQueryWriteClient()
parent = write_client.table_path(project_id, dataset_id, table_id)
request_template = types.AppendRowsRequest()
# デフォルトのストリーム名を指定する
request_template.write_stream = f"{parent}/streams/_default"
proto_schema = types.ProtoSchema()
proto_descriptor = descriptor_pb2.DescriptorProto()
customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
proto_schema.proto_descriptor = proto_descriptor
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.writer_schema = proto_schema
request_template.proto_rows = proto_data
append_rows_stream = writer.AppendRowsStream(write_client, request_template)
# proto_rowsに行データを追加する
proto_rows = types.ProtoRows()
proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
proto_rows.serialized_rows.append(create_row_data(2, "Bob"))
request = types.AppendRowsRequest()
proto_data = types.AppendRowsRequest.ProtoData()
proto_data.rows = proto_rows
request.proto_rows = proto_data
response_future_1 = append_rows_stream.send(request)
print(response_future_1.result())
append_rows_stream.close()