背景
CloudStorageに毎日アップロードされたデータを簡単なpython処理後、Bigquery上のtableにoutputすることです。
Airflow(CloudComposer)では簡単にできましたが、データ量がそこまで大きくはないので、CloudFunctionsだけで対応できるかなと思います。
目次
1、サンプルデータ
2、事前準備ーGCS&Bigquery&ServiceAccount
3、CloudFunctionsの実装
1、サンプルデータ
プラバシーの考慮で、サンプルデータを利用します。
データセット:
・東京電力使用実績データ:https://www.tepco.co.jp/forecast/html/download-j.html
・東京天気気温データ:https://www.data.jma.go.jp/gmd/risk/obsdl/#
月別データを使用することを想定します。
データフォーマットはJsonlです(業務と一致にします。)。CSVファイルも同じです。
2 準備ーGCSとBigqueryの設定
①outputとして、Bigqueryにdataset及び空白tableを作ります。
②input及びtriggerとして、GCS上bucketを作成します。
③連携用のServiceAccountを作成し、適当な権限を付けます。
3、CloudFunctionsの実装
・functionの設定その1:
・functionの設定その2:
環境変数のところにプロジェクトID及びBigqueryのテーブルIDを設定しました。
・codeの部分
Runtimeはpython3.9で、pythonライブラリを利用するために、requirements.txtにパッケージ名とバージョンを記入します。
requirements.txt
# Function dependencies, for example:
# package>=version
pandas==1.3.5
google-cloud-bigquery==2.0.0
google-cloud-storage==1.32.0
pyarrow==7.0.0
pyarrowがないとエラーになります。
pyarrowとは
Apache Arrow は大規模なデータをメモリに読み込んで処理するためのプラットフォームで、高速なデータ転送やファイル入出力機能など、効 率的なデータ処理に必要な機能を提供してくれます。PyArrowはAppache ArrowのPythonインターフェースで、NumPyやpandasと連携して、Apache Arrowを利用できるようになっています。
main.py
from google.cloud import bigquery
from google.cloud import storage as gcs
from io import BytesIO
import os
import pandas as pd
def input_from_gcs(project_id, bucket_name, filename):
client = gcs.Client(project_id)
bucket = client.get_bucket(bucket_name)
blob = bucket.blob(filename)
df = pd.read_json(BytesIO(blob.download_as_string()), orient='records', lines=True)
return df
def deal_data(df):
'''
your process code
'''
df_output = df
return df_output
def df_output_to_bigquery(project_id, table_id, df_output):
client = bigquery.Client(project=project_id)
job_config = bigquery.LoadJobConfig(autodetect=True, write_disposition='WRITE_APPEND')
load_job = client.load_table_from_dataframe(df_output, table_id, job_config=job_config)
load_job.result()
destination_table = client.get_table(table_id)
print("Loaded {} rows.".format(destination_table.num_rows))
def main(event, context):
file = event
project_id = os.environ.get("project_id")
table_id = os.environ.get("table_id")
bucket_name = file["bucket"]
filename = file["name"]
df = input_from_gcs(project_id, bucket_name, filename)
df_output = deal_data(df)
df_output_to_bigquery(project_id, table_id, df_output)
print("Task Done")
関数input_from_gcsによりデータを読み込む。
関数deal_dataはデータの処理です。
関数df_output_to_bigqueryはBigqueryに出力する。ちなみにbigquery.LoadJobConfigで、スキーマは自動探索(autodetect=True)、データは上書きではなく追加の形(write_disposition='WRITE_APPEND')と設定されています。その他の設定については公式ドキュメントにご参照ください。
main関数のところは環境変数、またはtriggerによりbucketとfilenameなどの情報を読み込んでいます。
最後
CloudFunctionsのメモリーが調整できますが、でかいデータはdataflowおよびdataprocまで任せた方がよいかな。その限界の議論については多分ネット上どこかにあるだろう。