0
0

python + BigQuery Storage Write API でデフォルトストリームを使用する

Last updated at Posted at 2024-07-18

背景

  • BigQuery Storage Write API でデフォルトストリームを使いたかった(使用言語: python)

  • が、公式ドキュメントを読んでもデフォルトストリームを使用する場合の具体的な実装方法がわからなかった

ストリームについて

Storage Write APIには「ストリーム」という概念があり、これによって、BigQueryテーブルにデータが書き込まれます。
大きく分けると、以下の種別があります。

詳しくは公式ドキュメントを参照。
https://cloud.google.com/bigquery/docs/write-api?hl=ja#overview

デフォルトストリーム

基本的にこちらが推奨されている。
特徴としては以下のとおり(公式docsより)。
- デフォルト ストリームに書き込まれたデータは、すぐにクエリで使用できます。
- デフォルト ストリームは、at-least-once(少なくとも 1 回)セマンティクスをサポートしています。
- デフォルト ストリームを明示的に作成する必要はありません。

アプリケーションで作成したストリーム

アプリケーション側で明示的にストリームを作成する。
以下の種類がありますが、ここでは割愛します。詳しくは公式ドキュメントを参照してください。

  • 保留タイプ (pending type)
  • コミットタイプ (commited type)
  • バッファタイプ (buffered type)

結論

そもそも、デフォルトストリームは「ストリームを明示的に作成しなくてよい」ということを理解できていませんでした。

ストリーム名をデフォルトのストリーム名にすることがポイントです。

実装方法

※ 実は、AppendRowsだけでよいことは公式ドキュメントの「APIフロー」に書かれている(推測難しい…)。

公式のpendingタイプのサンプルコードをdefaultにアレンジするとこのようになる。

def append_rows_default(project_id: str, dataset_id: str, table_id: str):
    write_client = bigquery_storage_v1.BigQueryWriteClient()
    parent = write_client.table_path(project_id, dataset_id, table_id)

    request_template = types.AppendRowsRequest()
    # デフォルトのストリーム名を指定する
    request_template.write_stream = f"{parent}/streams/_default"

    proto_schema = types.ProtoSchema()
    proto_descriptor = descriptor_pb2.DescriptorProto()
    customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)
    proto_schema.proto_descriptor = proto_descriptor
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.writer_schema = proto_schema
    request_template.proto_rows = proto_data

    append_rows_stream = writer.AppendRowsStream(write_client, request_template)

    # proto_rowsに行データを追加する
    proto_rows = types.ProtoRows()
    proto_rows.serialized_rows.append(create_row_data(1, "Alice"))
    proto_rows.serialized_rows.append(create_row_data(2, "Bob"))

    request = types.AppendRowsRequest()
    proto_data = types.AppendRowsRequest.ProtoData()
    proto_data.rows = proto_rows
    request.proto_rows = proto_data

    response_future_1 = append_rows_stream.send(request)
    print(response_future_1.result())

    append_rows_stream.close()

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0