やること
GCSにファイルがアップロードされたことをトリガーに、そのファイルをBQに格納する
GCS
GCSのコンソール上で「バケットを作成」
※今回は名前がtest
その下の階層に「フォルダを作成」
※こちらも名前がtest
あげるファイル
test20220606.csv
date,id,name,click
2022-06-06,111111,Aki,4
2022-06-06,222222,Haru,10
2022-06-06,333333,Natu,23
2022-06-06,444444,Huyu,19
2022-06-06,555555,Oto,1
BigQuery
プロジェクトからデータセットだけ作成
データセット名:test
コード
requirements.txt
google-cloud-bigquery
pip install -r requirements.txt
main.py
from google.cloud import bigquery
def import_bq(event, context):
file = event['name']
bucket = event['bucket']
bq_client = bigquery.Client()
# スキーマの設定
schema_json = [
bigquery.SchemaField("date", "DATE"),
bigquery.SchemaField("id", "STRING"),
bigquery.SchemaField("name", "STRING"),
bigquery.SchemaField("click", "INTEGER"),
]
job_config = bigquery.LoadJobConfig(
schema=schema_json,
# 宛先テーブルがすでに存在する場合、上書き
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
# CSV形式でBQに格納
source_format=bigquery.SourceFormat.CSV,
# null許容フィールドをスキーマに追加できるようにする
schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
# 今回CSVなのでカラムの部分をスキップ
skip_leading_rows=1,
# パーティションを日毎で分割
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="date",
),
)
# TODO $20220606でパーティションを切っているので変数にする必要あり
table_id = bq_client.dataset('テーブルセット名').table('report$20220606')
uri = f'gs://{bucket}/{file}'
load_job = bq_client.load_table_from_uri(
uri, table_id, job_config=job_config
)
return load_job.result()
Function
プロジェクト設定
gcloud config set project プロジェクト名
下記コマンドでデプロイ
gcloud functions deploy import_bq \
--runtime python39 \
--trigger-resource GCSのバケット名 \
--trigger-event google.storage.object.finalize \
--project プロジェクト名 \
--region us-central1 \
--allow-unauthenticated \
--memory 512MB
確認
GCSのコンソールからtestフォルダの中にtest20220606.csv
のファイルをアップロード
BQのデータセット名:'test'の中にreport
が入っていてデータがあればOK!
最後に
ドキュメント探すのに時間かかったので、使ったものは全て参考文献に記載しました。
(また、コードにも何をしているかコメント残してます)
参考文献