0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

# DataflowからBigQueryへデータを書き込むときのメモ

Posted at

概要

Google BigQuery I/O connector を利用してDataflowからBigqueryにデータを書き込む

https://beam.apache.org/documentation/io/built-in/google-bigquery/

[{key:Value01, key2:value01},{key:Value02, key2:value01}] なpcollectionを以下な感じでBigqueryに書き込めます

[pCollection] | beam.io.WriteToBigQuery(
    table_spec,
    schema=table_schema,
    write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
    create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
  )

設定

BigQueryについて

今回BiqQueryを触るのが初めてだったこの間の自分向けに雑な用語解説
機械翻訳のせいかドキュメントとか設定画面の表記とかで表記ゆれがあるのでそのあたりは大目に見てください・・・

  • そもそもBigQueryって何?: Googleのすごい技術で作られてる超大容量データを超高速で処理できるフルマネージドなDBみたいななにか。SQLで色々処理できるし、GCPサービスとかその他サービスといい感じに連携して色々できる
  • プロジェクト: RDBサーバーのインスタンスみたいな位置づけ
  • データセット: RDBで言うDB名みたいなやつ
  • テーブル: RDBで言う書くDB内のテーブルみたいなやつ
  • テーブルスキーマ: そのままテーブルのスキーマ 以下の設定を行う
    • フィールド名(列名): カラムの名前みたいなやつ
    • 種類(Data Type): IntegerとかStringとか設定する
    • モード: このカラムにはnullを入れていいとかだめとかそういうの
      • Nullable: nullを許可する
      • Required: nullを許可しない
      • Repeated: 種類で指定した値の配列を入れるモード

table_specの設定

table_specに書き込む先のBigQueryのテーブル情報を設定する
https://beam.apache.org/documentation/io/built-in/google-bigquery/#table-names

    table_spec = '[project_id]:[dataset_id].[table_id]'

[project_id]を省略した場合、パイプラインオプションからデフォルトのプロジェクトIDが利用されるようです。

schemaの設定

書き込む対象のテーブルのテーブルスキーマに合わせて設定を行う

table_schema = {
    'fields': [{
        'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
    }, {
        'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
    }]
}

[{key:Value01, key2:value01},{key:Value02, key2:value01}] なデータが入れられるスキーマは多分以下のような感じになる(keyはNullを許可せず、key2はnullを許容し、どちらも文字が入る場合)

table_schema = {
    'fields': [{
        'name': 'key', 'type': 'STRING', 'mode': 'REQUIRED'
    }, {
        'name': 'key2', 'type': 'STRING', 'mode': 'NULLABLE'
    }]
}

create_dispositionの設定

BigQueryにデータを書き込むとき、もし書き込む対象として指定したテーブルが存在しなかった場合の動作を設定します

  • create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    • テーブルが存在しなかった場合、新しくテーブルを作成します
  • create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
    • テーブルが存在しなかった場合、書き込み処理をfailさせます

write_dispositionの設定

既存のテーブルへデータを書き込むときに、既存のデータをどのように扱うか(エラーにする、削除する、追記する)設定します

  • write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY
    • 既存のデータがテーブル上に存在する場合、処理をfailさせます
  • write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
    • 既存のデータがテーブル上に存在する場合、全て削除してからデータを書き込みます
  • write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
    • 既存のデータがテーブル上に存在する場合、その後ろの行へデータを追記します

その他設定

inserting methodの設定

BigQueryへデータを書き込むときにいくつかメソッドがあるようですが BigQuery I/O connector では load jobstreaming insertsが選択できるようです。
GCPのドキュメントを読むと、前者は大量のデータを一括で書き込むときに、後者はストリーミング処理のような細かいデータを継続的にリアルタイムに書き込むのに使うみたいなことが書いてあります。

https://beam.apache.org/documentation/io/built-in/google-bigquery/#setting-the-insertion-method
https://cloud.google.com/bigquery/docs/loading-data

どちらが使われるか、また明示的に指定する方法は以下のとおりです。またどちらのメソッドを使用する場合でも、bigqueryにそれぞれbigqueryのAPIに読み出し制限があるので注意が必要です。

  • load jobs
    • 有限個数のPcollectionをBigQueryIOに渡した場合
    • WriteToBigQuery(method = 'FILE_LOADS')を指定した場合
    • quotas and limits
  • treaming inserts
    • 無制限の個数のPcollectionをBigQueryIOに渡した場合
    • WriteToBigQuery(method='STREAMING_INSERTS') を指定した場合
    • quotas and limits

参考

https://beam.apache.org/documentation/io/built-in/google-bigquery/
https://dev.classmethod.jp/articles/google-bigquery-debut/

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?