4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

datatech-jpAdvent Calendar 2022

Day 13

DataflowのFlex Templateを使ったバッチ処理の実装

Last updated at Posted at 2022-12-12

はじめに / 経緯

こんにちは、株式会社unerryでデータエンジニアをしているJanosと申します。
弊社ではGoogle Cloudを中心にビッグデータの連携を日々行っているのですが、今回位置情報に関するデータを受領しました。
1日ごとに先方のS3 bucketに置かれるため、日次のバッチ処理でS3 → GCSへのコピーからデータサイエンティストやアナリストが使うBigQueryへのloadまでの処理が必要となります。
例として1日あたりのデータは以下のような量でした:

例: 1日あたり(11/15) total 2.4 GiB, 138555 file

総データ量が2~3 GiBではビッグデータと呼ぶには到底小さすぎると思います。
ところがこのデータの厄介なところはデータを寸断しすぎており、1ファイルあたり数十 KiBのファイルが140,000ほどあって、おそらくはIO処理のため処理に多大な時間がかかっていました。
実際S3からGCSへ引っ張ってくるのにも大体1時間弱ほどかかっています。

またファイルの状態として以下のような問題点がありました:

  • データのcolumnの値がファイル名として使われており、そのファイル内のデータにそのcolumnがない (column欠損)
  • zip形式で保存されているが、BigQueryへのloadには対応していないのでgzip形式に直す必要がある (ファイルフォーマット変換)

これらの点を変更して適切なデータフォーマットに直すためtransformの処理が必要です。
大量のファイルに迅速かつ効率良くTransform処理をかけるためにDataflowを使うことにしました。

Flex Template

Dataflowへジョブを投げるのに今回はone-offではないバッチ処理ということもあり、コードから直接デプロイするのではなくflex templateの形にしてクラウドに置き、日付のパラメータだけyyyyMMddで渡せるようにすることにしました。

実践

flex templateはあまりネットに情報がなく、公式ドキュメントの説明もストリーミング処理なので難しいかなと思っていたのですが、要点を掴めば意外とすんなりできました。

まずはディレクトリ構造です:

dataflow
├── Dockerfile
├── metadata.json
├── requirements.txt
└── src
    ├── dofns
    │   ├── __init__.py
    │   └── dofns.py
    ├── main.py
    └── setup.py

dofns.pybeams.ParDoで使う並列処理の各実装を行なっています。
例としてこのように実装しています:

import apache_beam as beam
from google.cloud import storage

class Unzip(beam.DoFn):
    def __init__(self, bucket_name: str):
        super().__init__()
        self.bucket_name = bucket_name

    def setup(self):
        self.storage_client = storage.Client()

    def process(self, gcs_uri: str):
        # 処理を実装
        pass   

これらDofnを継承した処理をmain.pyでimportしてmain()でパイプラインを構築します:

from dofns import dofns
import datetime as dt

def main(target_date: dt.date):
    bucket_name = "******"
    options = PipelineOptions(
        # 省略
    )
    options.view_as(SetupOptions).save_main_session = True

    with beam.Pipeline(options=options) as p:
        (p
         | beam.Create(get_all_path(bucket_name, target_date))
         | beam.ParDo(dofns.Unzip(bucket_name))
         | beam.ParDo(dofns.FormatCSV())
         | beam.ParDo(dofns.Gzip(bucket_name))
         )

さて問題のflex templateですが実体はDockerコンテナです。
次のようなDockerfileを使ってコンテナイメージを作成しました:

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base

ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}

COPY ./requirements.txt .
COPY ./src ./src

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/src/main.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/src/setup.py"

RUN pip install -U apache-beam[gcp]==2.41.0
RUN pip install -U -r ./requirements.txt

作成したコンテナイメージはArtifact Registryに置きます。
また同時にmetadata.jsonを作成してflex template実行時にカスタムパラメータが渡せるようにします。
ここで同時にカスタムパラメータに正規表現でvalidateをかけることができます:

metadata.json
{
  "name": "*****_transform_pipeline",
  "description": "The pipeline that transforms zip files to gzip files.",
  "parameters": [
    {
      "name": "target_date",
      "label": "Target date of zip files.",
      "helpText": "Date of the target zip files.",
      "regexes": [
        "^[0-9]{4}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])$"
      ]
    }
  ]
}

gcloud dataflow flex-template buildコマンドを実行してflex template json fileを作成しCloud Storageに配置します:

gcloud dataflow flex-template build <path to the flex template json file to be located> \
    --image "<path to the docker image located in Artifact Registry>" \
    --sdk-language "PYTHON" \
    --metadata-file "metadata.json"

これでパラメータを指定してtemplateを呼び出すことができるようになりました!
flex templateを使ったジョブの作成はgcloud dataflow flex-template runコマンドを用いるか、またREST APIを使用することもできます。

最後に

このflex templateを使ったジョブは毎日20分未満で完了しており、とても満足しています!
あと余談ですが、コマンドでもRESTでもジョブをQueueに投げるとすぐにreturnしてしまうので、ジョブの監視は別途Bash scriptを組んで対応しました。今更ながらjqって本当に便利ですね!

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?