2
2

More than 1 year has passed since last update.

Cloud FunctionsによるGCSとBigquery連携:GCS-Trigger

Last updated at Posted at 2022-06-29

背景

CloudStorageに毎日アップロードされたデータを簡単なpython処理後、Bigquery上のtableにoutputすることです。
Airflow(CloudComposer)では簡単にできましたが、データ量がそこまで大きくはないので、CloudFunctionsだけで対応できるかなと思います。

目次

1、サンプルデータ
2、事前準備ーGCS&Bigquery&ServiceAccount
3、CloudFunctionsの実装

1、サンプルデータ

プラバシーの考慮で、サンプルデータを利用します。

データセット:
・東京電力使用実績データ:https://www.tepco.co.jp/forecast/html/download-j.html
・東京天気気温データ:https://www.data.jma.go.jp/gmd/risk/obsdl/#
月別データを使用することを想定します。
データフォーマットはJsonlです(業務と一致にします。)。CSVファイルも同じです。

2 準備ーGCSとBigqueryの設定

①outputとして、Bigqueryにdataset及び空白tableを作ります。
②input及びtriggerとして、GCS上bucketを作成します。
③連携用のServiceAccountを作成し、適当な権限を付けます。

3、CloudFunctionsの実装

・functionの設定その1:
gb1.png
・functionの設定その2:
gb2.png
環境変数のところにプロジェクトID及びBigqueryのテーブルIDを設定しました。

・codeの部分
Runtimeはpython3.9で、pythonライブラリを利用するために、requirements.txtにパッケージ名とバージョンを記入します。
requirements.txt

# Function dependencies, for example:
# package>=version

pandas==1.3.5
google-cloud-bigquery==2.0.0
google-cloud-storage==1.32.0
pyarrow==7.0.0

pyarrowがないとエラーになります。
pyarrowとは

Apache Arrow は大規模なデータをメモリに読み込んで処理するためのプラットフォームで、高速なデータ転送やファイル入出力機能など、効 率的なデータ処理に必要な機能を提供してくれます。PyArrowはAppache ArrowのPythonインターフェースで、NumPyやpandasと連携して、Apache Arrowを利用できるようになっています。

main.py

from google.cloud import bigquery
from google.cloud import storage as gcs
from io import BytesIO
import os
import pandas as pd


def input_from_gcs(project_id, bucket_name, filename):
    client = gcs.Client(project_id)
    bucket = client.get_bucket(bucket_name)
    blob = bucket.blob(filename)
    df = pd.read_json(BytesIO(blob.download_as_string()), orient='records', lines=True)
    return df


def deal_data(df):
    '''
    your process code
    '''
    df_output = df
    return df_output


def df_output_to_bigquery(project_id, table_id, df_output):
    client = bigquery.Client(project=project_id)
    job_config = bigquery.LoadJobConfig(autodetect=True, write_disposition='WRITE_APPEND')
    load_job = client.load_table_from_dataframe(df_output, table_id, job_config=job_config)
    load_job.result()
    destination_table = client.get_table(table_id)
    print("Loaded {} rows.".format(destination_table.num_rows))


def main(event, context):
    file = event
    project_id = os.environ.get("project_id")
    table_id = os.environ.get("table_id")
    bucket_name = file["bucket"]
    filename = file["name"]

    df = input_from_gcs(project_id, bucket_name, filename)
    df_output = deal_data(df)
    df_output_to_bigquery(project_id, table_id, df_output)

    print("Task Done")

関数input_from_gcsによりデータを読み込む。
関数deal_dataはデータの処理です。
関数df_output_to_bigqueryはBigqueryに出力する。ちなみにbigquery.LoadJobConfigで、スキーマは自動探索(autodetect=True)、データは上書きではなく追加の形(write_disposition='WRITE_APPEND')と設定されています。その他の設定については公式ドキュメントにご参照ください。
main関数のところは環境変数、またはtriggerによりbucketとfilenameなどの情報を読み込んでいます。

最後

CloudFunctionsのメモリーが調整できますが、でかいデータはdataflowおよびdataprocまで任せた方がよいかな。その限界の議論については多分ネット上どこかにあるだろう。

2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2