はじめに / 経緯
こんにちは、株式会社unerryでデータエンジニアをしているJanosと申します。
弊社ではGoogle Cloudを中心にビッグデータの連携を日々行っているのですが、今回位置情報に関するデータを受領しました。
1日ごとに先方のS3 bucketに置かれるため、日次のバッチ処理でS3 → GCSへのコピーからデータサイエンティストやアナリストが使うBigQueryへのloadまでの処理が必要となります。
例として1日あたりのデータは以下のような量でした:
例: 1日あたり(11/15) total 2.4 GiB, 138555 file
総データ量が2~3 GiBではビッグデータと呼ぶには到底小さすぎると思います。
ところがこのデータの厄介なところはデータを寸断しすぎており、1ファイルあたり数十 KiBのファイルが140,000ほどあって、おそらくはIO処理のため処理に多大な時間がかかっていました。
実際S3からGCSへ引っ張ってくるのにも大体1時間弱ほどかかっています。
またファイルの状態として以下のような問題点がありました:
- データのcolumnの値がファイル名として使われており、そのファイル内のデータにそのcolumnがない (column欠損)
- zip形式で保存されているが、BigQueryへのloadには対応していないのでgzip形式に直す必要がある (ファイルフォーマット変換)
これらの点を変更して適切なデータフォーマットに直すためtransformの処理が必要です。
大量のファイルに迅速かつ効率良くTransform処理をかけるためにDataflowを使うことにしました。
Flex Template
Dataflowへジョブを投げるのに今回はone-offではないバッチ処理ということもあり、コードから直接デプロイするのではなくflex templateの形にしてクラウドに置き、日付のパラメータだけyyyyMMdd
で渡せるようにすることにしました。
実践
flex templateはあまりネットに情報がなく、公式ドキュメントの説明もストリーミング処理なので難しいかなと思っていたのですが、要点を掴めば意外とすんなりできました。
まずはディレクトリ構造です:
dataflow
├── Dockerfile
├── metadata.json
├── requirements.txt
└── src
├── dofns
│ ├── __init__.py
│ └── dofns.py
├── main.py
└── setup.py
dofns.py
でbeams.ParDo
で使う並列処理の各実装を行なっています。
例としてこのように実装しています:
import apache_beam as beam
from google.cloud import storage
class Unzip(beam.DoFn):
def __init__(self, bucket_name: str):
super().__init__()
self.bucket_name = bucket_name
def setup(self):
self.storage_client = storage.Client()
def process(self, gcs_uri: str):
# 処理を実装
pass
これらDofnを継承した処理をmain.py
でimportしてmain()
でパイプラインを構築します:
from dofns import dofns
import datetime as dt
def main(target_date: dt.date):
bucket_name = "******"
options = PipelineOptions(
# 省略
)
options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=options) as p:
(p
| beam.Create(get_all_path(bucket_name, target_date))
| beam.ParDo(dofns.Unzip(bucket_name))
| beam.ParDo(dofns.FormatCSV())
| beam.ParDo(dofns.Gzip(bucket_name))
)
さて問題のflex templateですが実体はDockerコンテナです。
次のようなDockerfileを使ってコンテナイメージを作成しました:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR}
WORKDIR ${WORKDIR}
COPY ./requirements.txt .
COPY ./src ./src
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/src/main.py"
ENV FLEX_TEMPLATE_PYTHON_SETUP_FILE="${WORKDIR}/src/setup.py"
RUN pip install -U apache-beam[gcp]==2.41.0
RUN pip install -U -r ./requirements.txt
作成したコンテナイメージはArtifact Registryに置きます。
また同時にmetadata.json
を作成してflex template実行時にカスタムパラメータが渡せるようにします。
ここで同時にカスタムパラメータに正規表現でvalidateをかけることができます:
{
"name": "*****_transform_pipeline",
"description": "The pipeline that transforms zip files to gzip files.",
"parameters": [
{
"name": "target_date",
"label": "Target date of zip files.",
"helpText": "Date of the target zip files.",
"regexes": [
"^[0-9]{4}(0[1-9]|1[0-2])(0[1-9]|[12][0-9]|3[01])$"
]
}
]
}
gcloud dataflow flex-template build
コマンドを実行してflex template json fileを作成しCloud Storageに配置します:
gcloud dataflow flex-template build <path to the flex template json file to be located> \
--image "<path to the docker image located in Artifact Registry>" \
--sdk-language "PYTHON" \
--metadata-file "metadata.json"
これでパラメータを指定してtemplateを呼び出すことができるようになりました!
flex templateを使ったジョブの作成はgcloud dataflow flex-template run
コマンドを用いるか、またREST APIを使用することもできます。
最後に
このflex templateを使ったジョブは毎日20分未満で完了しており、とても満足しています!
あと余談ですが、コマンドでもRESTでもジョブをQueueに投げるとすぐにreturnしてしまうので、ジョブの監視は別途Bash scriptを組んで対応しました。今更ながらjqって本当に便利ですね!