はじめに
本記事は ぷりぷりあぷりけーしょんず Advent Calendar 2019 の6日目の記事です。
今回はMLエンジニアとして業務を行っている筆者が普段利用している Dataflow(ApacheBeam) + Python3 を使った分散処理で大量のデータを処理する方法を紹介します。
今回紹介すること
- ApacheBeam の Python SDK を使って Dataflow を動作させる方法
- BigQuery から BigQuery へ大量のデータを処理してインサートする方法
今回紹介しないこと
- ApacheBeam の詳しい概念
- BigQuery以外の入出力を使っての処理
Dataflowとは? ApacheBeamとは?
DataflowとはGoogle Could Platform で提供されているサービスの1つです。 Dataflow公式サイト
分散処理を手軽に実装することができることやBigQueryとの連携も簡単なので、分析の基板などに多く利用されています。
Dataflowは内部的にパイプライン処理を実現するためのフレームワークであるApacheBeamを使って実装されています。
そのため、実際に開発を行う時には ApacheBeam の SDK を利用することになるでしょう。 ApacheBeam公式サイト
Dataflow では ApachBeam の SDK は Java と Python の2種類がサポートされています。
実際に触ってみる
実際に環境構築からDataflow上で処理を動かすまでの流れをみていきます。
環境構築
Pipenvを使って開発用の仮想環境を構築しましょう。(Pythonやその他ライブラリのVersionが同じであれば、Pipenvを利用しなくても大丈夫です。)
以下のようなPipfileを使って環境を立ち上げます。
[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true
[dev-packages]
[packages]
apache-beam=="1.14.*"
[requires]
python_version = "3.7"
気を付けるべきポイント
- ApacheBeamSDKのVersion
DataflowがサポートしているSDKのバージョンは1.14
までです。 - Pythonのversion
現時点(2019/12/06)で Dataflow がサポートしている PythonSDK の version は 2系 となっており、Python3系を利用すると警告がでます。
しかし、私の経験則から言うと、不具合が起きるのはごく稀なケースで本番で運用しても差し支えないレベルだと思います。(ただし責任は取れませんので、最終的にはご自身の判断でご利用ください)
Setup.py を作成する
Dataflow上で templeate として保存して実行する際には setup.py
が必要になります。
ここに、実行時の依存関係を記述していきます。
entry_points
ばご自身のパッケージ構成に合わせて指定してください。
PACKAGES = [
"apache-beam[gcp]==2.14.*",
]
setup(
name='dataflow-sample',
url='',
author='',
author_email='',
version='0.1',
install_requires=REQUIRED_PACKAGES,
packages=find_packages(),
include_package_data=True,
entry_points=dict(console_scripts=[
'sample=sample:main'
]),
description='dataflow sample',
)
パイプラインのセットアップをする
ApacheBeamのパイプラインを動かすためのセットアップとmain関数の実装を行います。
必要な手順は以下です。
- runnerを設定する
- Piplineのインスタンスを生成する
import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
def setup(args):
runner = "DirectRunner"
return beam.Pipeline(runner, options=PipelineOptions(args))
def main():
pipeline = setup(sys.args)
with pipeline as p:
# pipelineを記載する
pass
オプションを定義する
実際にDataflowで動かすためには SDK で指定されている様々なオプションを設定することが必要です。
代表的なものを列挙してみると以下のようになります。
StandardOptions
name | type | description |
---|---|---|
streaming | boolean | ストリーミングモード or バッチモードを選択 |
SetupOptions
name | type | description |
---|---|---|
setup_file | string | setup.pyのpathを指定する |
GoogleCouldOptions
name | type | description |
---|---|---|
region | string | 使うリージョンを指定する |
project | string | 使うプロジェクトIDを指定する |
job_name | string | ジョブを実行した時の名前を指定する(任意の値) |
template_location | string | templateを保存するGCPのpathを指定する |
これらのオプションをコード上か実行時のコマンドライン引数で指定する必要があります。
コードで指定する場合には以下のようになります。
def option_setting(options: PipelineOptions) -> PipelineOptions:
cloud_options = options.view_as(GoogleCloudOptions)
cloud_options.region = "asia-northeast1"
cloud_options.project = "your project id"
setup_options = options.view_as(SetupOptions)
setup_options.setup_file = "specify your setup.py path"
return options
def setup(args):
runner = "DirectRunner"
options = option_setting(PipelineOptions(args))
return beam.Pipeline(runner, options=options)
基本的には、 PipelineOptions.view_as()
で設定したい Options
として振舞わせます。
あとは、指定したいプロパティに対して値を設定すれば良いだけです。
また、実行時に必要な設定値がある場合には独自のカスタムオプションを作成することも可能です。
実装は PipelineOptions
を継承して、必要なメソッドをオーバーライドするだけです。
class CostomOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
'--hoge',
type=int,
default=0,
help='This is Costom Value'
)
BigQuery to BigQuery パイプラインを定義する
実際にBigQueryからデータを読み取ってBigQueryへ格納するパイプラインを定義してみます。
簡単な例として、Userテーブルからidのみを抽出して別のテーブルにInsertする処理を実装してみます。
def b2b_pipline(pipe: PCollection):
# 実行した SQL を記載する
query = "SELECT id, name, age FROM sample.users"
_ = (pipe
| "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
| "Preprocess" >> beam.Map(lambda data: data["id"])
| "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
table="user_ids",
schema="id:INTEGER",
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=BigQueryDisposition.WRITE_APPEND,
)
)
Piplineの処理では、入力, 中間操作, 出力の3つの操作を行っています。
今回紹介したもの意外にも種類がいろいろあるので、 公式のリファレンス などを参考してカスタマイズすることが可能です。
実際に動かす環境について
実装したパイプラインをLocalやGCP上で動かしてみます。
ApacheBeamを動かすときには、いくつか環境を選べます。
- Localで動かす
- DataflowでTemplate保存しないで動かす
- DataflowでTemplate保存して動かす
ユースケース | Runner | template_location |
---|---|---|
Localで動かす | DirectRunner | None |
Dataflowで動かす | DataflowRunner | None |
Dataflowでtemplateとして動かす | DataflowRunner | テンプレートを保存するGCSのパスを指定する |
Templateを保存すると、PipelineをGCSに保存することができて、コンソールやコマンドラインなどから起動することができます。Pipelineを定時実行したいときなどに非常に有用です。
終わりに
今回は、 ApcheBeam を Python で実装して、 Dataflow で動かすまでを紹介してみました。
あなたの参考になれば幸いです。