概要
Google BigQuery I/O connector を利用してDataflowからBigqueryにデータを書き込む
https://beam.apache.org/documentation/io/built-in/google-bigquery/
[{key:Value01, key2:value01},{key:Value02, key2:value01}]
なpcollectionを以下な感じでBigqueryに書き込めます
[pCollection] | beam.io.WriteToBigQuery(
table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)
設定
BigQueryについて
今回BiqQueryを触るのが初めてだったこの間の自分向けに雑な用語解説
機械翻訳のせいかドキュメントとか設定画面の表記とかで表記ゆれがあるのでそのあたりは大目に見てください・・・
- そもそもBigQueryって何?: Googleのすごい技術で作られてる超大容量データを超高速で処理できるフルマネージドなDBみたいななにか。SQLで色々処理できるし、GCPサービスとかその他サービスといい感じに連携して色々できる
- プロジェクト: RDBサーバーのインスタンスみたいな位置づけ
- データセット: RDBで言うDB名みたいなやつ
- テーブル: RDBで言う書くDB内のテーブルみたいなやつ
- テーブルスキーマ: そのままテーブルのスキーマ 以下の設定を行う
- フィールド名(列名): カラムの名前みたいなやつ
- 種類(Data Type): IntegerとかStringとか設定する
- モード: このカラムにはnullを入れていいとかだめとかそういうの
- Nullable: nullを許可する
- Required: nullを許可しない
- Repeated: 種類で指定した値の配列を入れるモード
table_specの設定
table_spec
に書き込む先のBigQueryのテーブル情報を設定する
https://beam.apache.org/documentation/io/built-in/google-bigquery/#table-names
table_spec = '[project_id]:[dataset_id].[table_id]'
[project_id]を省略した場合、パイプラインオプションからデフォルトのプロジェクトIDが利用されるようです。
schemaの設定
書き込む対象のテーブルのテーブルスキーマに合わせて設定を行う
table_schema = {
'fields': [{
'name': 'source', 'type': 'STRING', 'mode': 'NULLABLE'
}, {
'name': 'quote', 'type': 'STRING', 'mode': 'REQUIRED'
}]
}
[{key:Value01, key2:value01},{key:Value02, key2:value01}]
なデータが入れられるスキーマは多分以下のような感じになる(keyはNullを許可せず、key2はnullを許容し、どちらも文字が入る場合)
table_schema = {
'fields': [{
'name': 'key', 'type': 'STRING', 'mode': 'REQUIRED'
}, {
'name': 'key2', 'type': 'STRING', 'mode': 'NULLABLE'
}]
}
create_dispositionの設定
BigQueryにデータを書き込むとき、もし書き込む対象として指定したテーブルが存在しなかった場合の動作を設定します
-
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
- テーブルが存在しなかった場合、新しくテーブルを作成します
-
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER
- テーブルが存在しなかった場合、書き込み処理をfailさせます
write_dispositionの設定
既存のテーブルへデータを書き込むときに、既存のデータをどのように扱うか(エラーにする、削除する、追記する)設定します
-
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY
- 既存のデータがテーブル上に存在する場合、処理をfailさせます
-
write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE
- 既存のデータがテーブル上に存在する場合、全て削除してからデータを書き込みます
-
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND
- 既存のデータがテーブル上に存在する場合、その後ろの行へデータを追記します
その他設定
inserting methodの設定
BigQueryへデータを書き込むときにいくつかメソッドがあるようですが BigQuery I/O connector では load jobs
とtreaming inserts
が選択できるようです。
GCPのドキュメントを読むと、前者は大量のデータを一括で書き込むときに、後者はストリーミング処理のような細かいデータを継続的にリアルタイムに書き込むのに使うみたいなことが書いてあります。
https://beam.apache.org/documentation/io/built-in/google-bigquery/#setting-the-insertion-method
https://cloud.google.com/bigquery/docs/loading-data
どちらが使われるか、また明示的に指定する方法は以下のとおりです。またどちらのメソッドを使用する場合でも、bigqueryにそれぞれbigqueryのAPIに読み出し制限があるので注意が必要です。
-
load jobs
- 有限個数のPcollectionをBigQueryIOに渡した場合
- WriteToBigQuery(method = 'FILE_LOADS')を指定した場合
- quotas and limits
-
treaming inserts
- 無制限の個数のPcollectionをBigQueryIOに渡した場合
- WriteToBigQuery(method='STREAMING_INSERTS') を指定した場合
- quotas and limits
参考
https://beam.apache.org/documentation/io/built-in/google-bigquery/
https://dev.classmethod.jp/articles/google-bigquery-debut/