4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

BigQueryのテーブルをPythonクライアントから操作する

Posted at

事前準備

1.クレデンシャルの取得

あらかじめBigQueryのAPIを利用するためのサービスアカウントを作成し、クレデンシャル(JSON)をダウンロードしておきます。
https://cloud.google.com/docs/authentication/getting-started?hl=ja

2.必要なパッケージのインストール

Pipfile
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
google-cloud-bigquery = "*"
google-cloud-bigquery-datatransfer = "*"

[requires]
python_version = "3.8"

テーブルの再作成(Drop⇒Create)

1.クライアントの初期化

migrate_table.py
import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

[PATH TO CREDENTIAL]にはクレデンシャルのJSONのパスを指定。

2.テーブルの再生成

migrate_table.py
table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

# 既存テーブルの削除
client.delete_table(table, not_found_ok=True)

# テーブルの作成
schema = [
    bigquery.SchemaField('id', 'INT64'),
    bigquery.SchemaField('name', 'STRING')
]
client.create_table(bigquery.Table(table, schema=schema))

[DATASET NAME][TABLE NAME]には作成先のデータセット名とテーブル名を指定します。

既存テーブルを削除する際にnot_found_okフラグがFalseだとテーブルが存在しない場合にエラーになるのでTrueにしておきます。DDLでいうとDROP TABLE IF EXISTSみたいなイメージ。

テーブルの作成時にはスキーマ定義としてカラム名と型を指定する必要があります。デフォルトではNullableなカラムになるのでNot Nullなカラムにする場合はモードを指定して

bigquery.SchemaField('id', 'INT64', mode='REQUIRED')

とします。

3.データのインポート

初期データとしてCSVのデータをインポートします。
あらかじめスキーマ定義の型に合わせたデータをCSVで用意しておきます。

import.csv
id, name
1, hogehoge
2, fugafuga
migrate_table.py
# 初期データのインポート
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.skip_leading_rows = 1
with open('import.csv', 'rb') as sourceData:
    job = client.load_table_from_file(sourceData, table, job_config=job_config)
job.result()

CSVのヘッダを読み飛ばす場合はskip_leading_rowsでスキップ行数を指定すればOK。

Viewの再生成

migrate_view.py
import json

from google.cloud import bigquery
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery.Client(
    credentials=credentials,
    project=credentials.project_id,
)

table = client.dataset('[DATASET NAME]').table('[TABLE NAME]')

client.delete_table(table, not_found_ok=True)
view = bigquery.Table(table)
view.view_query = 'SELECT * FROM dataset_name.table_name'
client.create_table(view)

ビューの場合も実体テーブルとほぼ流れは同じ。
違う点は、ビューの場合はview_queryにSQLのクエリを直接指定する点。
スキーマ定義はSQLのクエリから自動的に構成されるため指定する必要はありません。

スケジュールの更新

Scheduled Queryを利用して定期的にデータの洗い替えを行うようなシーンで、実行クエリやスケジューリングの設定を変更したい場合。

migrate_schedule.py

import json

from google.cloud import bigquery
from google.oauth2 import service_account

from google.cloud import bigquery_datatransfer_v1
import google.protobuf.json_format

credentials = service_account.Credentials.from_service_account_file(
    '[PATH TO CREDENTIAL]',
    scopes=["https://www.googleapis.com/auth/cloud-platform"],
)

client = bigquery_datatransfer_v1.DataTransferServiceClient(
    credentials=credentials
)

config = google.protobuf.json_format.ParseDict(
    {
        "name": '[RESOURCE NAME]',
        "destination_dataset_id": '[DATASET NAME]',
        "display_name": '[DISPLAY NAME]',
        "data_source_id": "scheduled_query",
        "params": {
            "query": "SELECT * FROM dataset_name.table_name",
            "destination_table_name_template": '[TABLE NAME]',
            "write_disposition": "WRITE_TRUNCATE",
            "partitioning_field": "",
        },
        "schedule": "every 24 hours",
    },
    bigquery_datatransfer_v1.types.TransferConfig(),
)

update_mask = {"paths": ["display_name", "params", "schedule"]}

response = client.update_transfer_config(
    config, update_mask 
) 

スケジュールの設定内容(config)とアップデートマスク(update_mask)をupdate_transfer_configに渡してあげます。
update_maskconfigの中でどのフィールドを更新するかを指定するものです。update_maskに含まれない設定値は更新の対象外となります。

configの設定値の詳細は以下の通り。

name

スケジュールのリソース名を指定します。
リソース名はBigQueryのコンソールの「スケジュールされたクエリ」からスケジュールの一覧が表示されるので、該当のスケジュールの構成情報から確認できます。
bigquery.png

destination_dataset_id

スケジュールで実行したクエリ結果の保存先データセット名を指定します。

display_name

スケジュールの表示名なので任意な名前を付けます。

params.query

実行するクエリを指定します。

params.destination_table_name_template

スケジュールで実行したクエリ結果の保存先テーブル名を指定します。

params.write_disposition

テーブルへの保存方法を指定します。
WRITE_TRUNCATEを指定した場合、既存のテーブルは実行クエリの結果で置換されます。
WRITE_APPENDを指定した場合、既存のテーブルに対して実行クエリの結果が追加されます。

schedule

スケジュールの実行タイミングを指定します。
ex.
1時間ごとに実行・・・every 1 hour
毎日0時に実行・・・every day 00:00

おわりに

BigQueryをデータマート的に使う場合、コンソールからテーブルを作りまくると変更管理がしにくいのでコードに落としておくとgitで管理できてオススメです。

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?