はじめに
この記事は Classi Advent Calendar 2019の23日目の記事です。
こんにちは、ClassiのデータAI部の@tomoyanamekawaです。
普段はGCP上でのデータ分析基盤構築をメインでやっています。
最近、「BigQueryにあるデータを中の値に合わせてファイル分割してGCSに保存したい」ということがあって、その時にCloud Dataflowにお世話になりました。
他の方にも需要ありそうで、かつPythonでの実装例が少なかったのでまとめようと思います。
今回のゴール
BigQueryにある特定のテーブルをGoogle Cloud Storage(GCS)にexportする処理をdailyで実行する。
ただしあるcolumnの値によって保存先のディレクトリを変えたい。
ファイル形式はjson。
例
BigQueryにあるreservationsテーブルを
このように日付/shop_idごとに分けてGCSに保存をしたい。
完成図
環境
- Python 3.7.3
- apache-beam==2.16.0
Cloud Dataflowとは
GCPで提供されているサーバレスにETL処理を行えるサービスです。
裏側ではApache Beamが動いているので、サーバレスにApache Beamを使えるサービスとも言えます。
並列処理を行えるため大規模なデータに対しても高速に処理を行うことができます。
ストリーム処理とバッチ処理の両方に対応していますが、今回はバッチ処理を使います。
詳しくは公式ページで。
とりあえず使えるようになりたいという方はゆずたそさんの発表資料にあるこちらの手順が良いと思います(私もこれでキャッチアップしました)。
カスタムテンプレートの作成
Cloud Dataflowでは「テンプレート」と呼ばれるものを利用してETL処理を作成します。
一般的な処理であればGoogleが提供するテンプレートを使用すればGUIベースで簡単にできます。
ただ今回やりたいことはこちらだとできないのでカスタムテンプレートを自分で作成します。
ちなみにプログラミング言語はJavaかPythonが利用できます。
今回はPythonで書きますが、Javaのほうが機能やドキュメントが豊富なので、自分やチームメンバーがJavaを書けてメンテナンスの問題もない場合はJavaのほうがいいと思います。
カスタムテンプレートの中身はこちらです。
import os
import json
import datetime
import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions
class JsonSink(fileio.TextSink):
def write(self, record):
self._fh.write(json.dumps(record).encode('utf8'))
self._fh.write('\n'.encode('utf8'))
if __name__ == '__main__':
now = datetime.datetime.now().strftime('%Y%m%d')
project_id = 'your_project'
dataset_name = 'your_dataset'
table_name = 'your_table'
bucket_name = 'your_bucket'
# オプション
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = project_id
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
#パイプラインの作成
pipeline = beam.Pipeline(options=pipeline_options)
(pipeline
| 'read' >> beam.io.Read(beam.io.BigQuerySource(
project=project_id,
use_standard_sql=True,
query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
))
| 'write' >> beam.io.fileio.WriteToFiles(
path=f'gs://{bucket_name}/{now}',
destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
sink=JsonSink(),
file_naming=beam.io.fileio.destination_prefix_naming()
)
)
pipeline.run()
ポイントはこちらのDynamic Destinationsという機能を利用しているところです。
recordという変数の中に各レコードごとの値が入ってくるのでrecord['shop_id']
でレコードごとにdestination(保存先のファイル名)を変えられます。
作成したテンプレートはGCS上に置く必要があるのでこちらのコマンドを実行します。
python -m test_template
そうするとgoogle_cloud_options.template_location
で指定した場所にテンプレートが設置されます。
実行時にテンプレートの置き場所を設定することもできます。
dailyで動くようにする
Cloud Dataflow自体にはscheduler機能がないため、dailyで実行するためには外部から実行する必要があります。
そこで今回はCloud Scheduler + Cloud Pub/Sub + Cloud Functionsでサーバレスに実行できるようにします。
Cloud Functionsに下記のスクリプトを登録します。
このスクリプトがカスタムテンプレートを実行してくれます。
from googleapiclient.discovery import build
def main(data, context):
job = 'my_job'
dataflow = build('dataflow', 'v1b3')
request = dataflow.projects().templates().launch(
projectId='your_project',
gcsPath='gs://your_bucket/templates/test_template'
)
response = request.execute()
Cloud FunctionsのトリガーはPub/Subです。
また、Pub/Subをトリガーにする場合には2つ引数を受け取る必要があるのでmain(data, context)
としています。
あとはトリガーにしてあるPub/SubのTopicを作成して、Cloud SchedulerからdailyでそのTopicがpublishされるようにすれば完成です!
もしCloud Composerやサーバを立ててその他のワークフローエンジンやcronでScheduleする場合は下記のgcloudコマンドからカスタムテンプレートを実行できます。
gcloud dataflow jobs run my_job \
--gcs-location gs://your_bucket/templates/test_template \
--region=asia-northeast1
おわりに
このような処理を大規模を短時間で行えるシステムを自分たちで実装しようと思うと手間が恐ろしいのでCloud Dataflowはとても便利です。
ただちょいとお高いので、「Cloud Dataflowでxx万円とかした」とかならないように使い所は選ぶ必要があるかなと思っています。
あしたは@tetsuya0617さんです。お楽しみに!