12
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ClassiAdvent Calendar 2019

Day 23

Cloud Dataflowを用いてデータの値によって動的に宛先を変えてGCSに保存する

Last updated at Posted at 2019-12-23

はじめに

この記事は Classi Advent Calendar 2019の23日目の記事です。

こんにちは、ClassiのデータAI部の@tomoyanamekawaです。
普段はGCP上でのデータ分析基盤構築をメインでやっています。

最近、「BigQueryにあるデータを中の値に合わせてファイル分割してGCSに保存したい」ということがあって、その時にCloud Dataflowにお世話になりました。
他の方にも需要ありそうで、かつPythonでの実装例が少なかったのでまとめようと思います。

今回のゴール

BigQueryにある特定のテーブルをGoogle Cloud Storage(GCS)にexportする処理をdailyで実行する。
ただしあるcolumnの値によって保存先のディレクトリを変えたい。
ファイル形式はjson。

BigQueryにあるreservationsテーブルを
reservationsテーブル
このように日付/shop_idごとに分けてGCSに保存をしたい。
reservations_GCS

完成図

完成図

環境

  • Python 3.7.3
  • apache-beam==2.16.0

Cloud Dataflowとは

GCPで提供されているサーバレスにETL処理を行えるサービスです。
裏側ではApache Beamが動いているので、サーバレスにApache Beamを使えるサービスとも言えます。
並列処理を行えるため大規模なデータに対しても高速に処理を行うことができます。

ストリーム処理とバッチ処理の両方に対応していますが、今回はバッチ処理を使います。
詳しくは公式ページで。

とりあえず使えるようになりたいという方はゆずたそさんの発表資料にあるこちらの手順が良いと思います(私もこれでキャッチアップしました)。

カスタムテンプレートの作成

Cloud Dataflowでは「テンプレート」と呼ばれるものを利用してETL処理を作成します。
一般的な処理であればGoogleが提供するテンプレートを使用すればGUIベースで簡単にできます。
ただ今回やりたいことはこちらだとできないのでカスタムテンプレートを自分で作成します。

ちなみにプログラミング言語はJavaかPythonが利用できます。
今回はPythonで書きますが、Javaのほうが機能やドキュメントが豊富なので、自分やチームメンバーがJavaを書けてメンテナンスの問題もない場合はJavaのほうがいいと思います。

カスタムテンプレートの中身はこちらです。

test_template.py
import os
import json
import datetime

import apache_beam as beam
from apache_beam.io import fileio
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions, GoogleCloudOptions

        
class JsonSink(fileio.TextSink):
    def write(self, record):
      self._fh.write(json.dumps(record).encode('utf8'))
      self._fh.write('\n'.encode('utf8'))


if __name__ == '__main__':
    now = datetime.datetime.now().strftime('%Y%m%d')
    project_id = 'your_project'
    dataset_name = 'your_dataset'
    table_name = 'your_table'
    bucket_name = 'your_bucket'

    # オプション
    pipeline_options = PipelineOptions()
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
    google_cloud_options.project = project_id
    google_cloud_options.job_name = 'myjob'
    google_cloud_options.staging_location = f'gs://{bucket_name}/staging'
    google_cloud_options.temp_location = f'gs://{bucket_name}/temp'
    google_cloud_options.template_location = f'gs://{bucket_name}/templates/test_template'
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
        
    #パイプラインの作成
    pipeline = beam.Pipeline(options=pipeline_options)
    (pipeline 
        | 'read' >> beam.io.Read(beam.io.BigQuerySource(
            project=project_id, 
            use_standard_sql=True, 
            query=f'select * from `{project_id}.{dataset_name}.{table_name}`'
        ))
        | 'write' >> beam.io.fileio.WriteToFiles(
            path=f'gs://{bucket_name}/{now}',
            destination=lambda record, table_name=table_name: f"shop_id_{record['shop_id']}/",
            sink=JsonSink(),
            file_naming=beam.io.fileio.destination_prefix_naming()
        )
    )

    pipeline.run()

ポイントはこちらのDynamic Destinationsという機能を利用しているところです。
recordという変数の中に各レコードごとの値が入ってくるのでrecord['shop_id']でレコードごとにdestination(保存先のファイル名)を変えられます。

作成したテンプレートはGCS上に置く必要があるのでこちらのコマンドを実行します。

python -m test_template

そうするとgoogle_cloud_options.template_locationで指定した場所にテンプレートが設置されます。
実行時にテンプレートの置き場所を設定することもできます

dailyで動くようにする

Cloud Dataflow自体にはscheduler機能がないため、dailyで実行するためには外部から実行する必要があります。
そこで今回はCloud Scheduler + Cloud Pub/Sub + Cloud Functionsでサーバレスに実行できるようにします。
schedulerの構成

Cloud Functionsに下記のスクリプトを登録します。
このスクリプトがカスタムテンプレートを実行してくれます。

from googleapiclient.discovery import build
def main(data, context):
    job = 'my_job'
    dataflow = build('dataflow', 'v1b3')
    request = dataflow.projects().templates().launch(
        projectId='your_project',
        gcsPath='gs://your_bucket/templates/test_template'
    )
    response = request.execute()

Cloud FunctionsのトリガーはPub/Subです。
また、Pub/Subをトリガーにする場合には2つ引数を受け取る必要があるのでmain(data, context)としています。

あとはトリガーにしてあるPub/SubのTopicを作成して、Cloud SchedulerからdailyでそのTopicがpublishされるようにすれば完成です!

もしCloud Composerやサーバを立ててその他のワークフローエンジンやcronでScheduleする場合は下記のgcloudコマンドからカスタムテンプレートを実行できます。

gcloud dataflow jobs run my_job \
        --gcs-location gs://your_bucket/templates/test_template \
        --region=asia-northeast1

おわりに

このような処理を大規模を短時間で行えるシステムを自分たちで実装しようと思うと手間が恐ろしいのでCloud Dataflowはとても便利です。
ただちょいとお高いので、「Cloud Dataflowでxx万円とかした」とかならないように使い所は選ぶ必要があるかなと思っています。

あしたは@tetsuya0617さんです。お楽しみに!

12
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?