1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

PythonでGCSをトリガーにBQデータ格納

Posted at

やること

GCSにファイルがアップロードされたことをトリガーに、そのファイルをBQに格納する

GCS

GCSのコンソール上で「バケットを作成」
※今回は名前がtest
その下の階層に「フォルダを作成」
※こちらも名前がtest

あげるファイル
test20220606.csv

date,id,name,click
2022-06-06,111111,Aki,4
2022-06-06,222222,Haru,10
2022-06-06,333333,Natu,23
2022-06-06,444444,Huyu,19
2022-06-06,555555,Oto,1

BigQuery

プロジェクトからデータセットだけ作成
データセット名:test

コード

requirements.txt

google-cloud-bigquery

pip install -r requirements.txt

main.py

from google.cloud import bigquery

def import_bq(event, context):
    file = event['name']
    bucket = event['bucket']
    bq_client = bigquery.Client()
    
    # スキーマの設定
    schema_json = [
        bigquery.SchemaField("date", "DATE"),
        bigquery.SchemaField("id", "STRING"),
        bigquery.SchemaField("name", "STRING"),
        bigquery.SchemaField("click", "INTEGER"),
    ]
    
    job_config = bigquery.LoadJobConfig(
        schema=schema_json,
        # 宛先テーブルがすでに存在する場合、上書き
        write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
        # CSV形式でBQに格納
        source_format=bigquery.SourceFormat.CSV,
        # null許容フィールドをスキーマに追加できるようにする
        schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
        # 今回CSVなのでカラムの部分をスキップ
        skip_leading_rows=1,
        # パーティションを日毎で分割
        time_partitioning=bigquery.TimePartitioning(
            type_=bigquery.TimePartitioningType.DAY,
            field="date", 
        ),
    )
    # TODO $20220606でパーティションを切っているので変数にする必要あり
    table_id = bq_client.dataset('テーブルセット名').table('report$20220606')
    uri = f'gs://{bucket}/{file}'
    load_job = bq_client.load_table_from_uri(
        uri, table_id, job_config=job_config
    )
    return load_job.result()

Function

プロジェクト設定

gcloud config set project プロジェクト名

下記コマンドでデプロイ

gcloud functions deploy import_bq \
--runtime python39 \
--trigger-resource GCSのバケット名 \
--trigger-event google.storage.object.finalize \
--project プロジェクト名 \
--region us-central1 \
--allow-unauthenticated \
--memory 512MB

確認

GCSのコンソールからtestフォルダの中にtest20220606.csvのファイルをアップロード
BQのデータセット名:'test'の中にreportが入っていてデータがあればOK!

最後に

ドキュメント探すのに時間かかったので、使ったものは全て参考文献に記載しました。
(また、コードにも何をしているかコメント残してます)

参考文献

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?