9
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

はじめに

本記事は ぷりぷりあぷりけーしょんず Advent Calendar 2019 の6日目の記事です。

今回はMLエンジニアとして業務を行っている筆者が普段利用している Dataflow(ApacheBeam) + Python3 を使った分散処理で大量のデータを処理する方法を紹介します。

今回紹介すること

  • ApacheBeam の Python SDK を使って Dataflow を動作させる方法
  • BigQuery から BigQuery へ大量のデータを処理してインサートする方法

今回紹介しないこと

  • ApacheBeam の詳しい概念
  • BigQuery以外の入出力を使っての処理

Dataflowとは? ApacheBeamとは?

DataflowとはGoogle Could Platform で提供されているサービスの1つです。 Dataflow公式サイト
分散処理を手軽に実装することができることやBigQueryとの連携も簡単なので、分析の基板などに多く利用されています。
Dataflowは内部的にパイプライン処理を実現するためのフレームワークであるApacheBeamを使って実装されています。
そのため、実際に開発を行う時には ApacheBeam の SDK を利用することになるでしょう。 ApacheBeam公式サイト
Dataflow では ApachBeam の SDK は Java と Python の2種類がサポートされています。

実際に触ってみる

実際に環境構築からDataflow上で処理を動かすまでの流れをみていきます。

環境構築

Pipenvを使って開発用の仮想環境を構築しましょう。(Pythonやその他ライブラリのVersionが同じであれば、Pipenvを利用しなくても大丈夫です。)
以下のようなPipfileを使って環境を立ち上げます。

[[source]]
name = "pypi"
url = "https://pypi.org/simple"
verify_ssl = true

[dev-packages]

[packages]
apache-beam=="1.14.*"

[requires]
python_version = "3.7"

気を付けるべきポイント

  • ApacheBeamSDKのVersion
    DataflowがサポートしているSDKのバージョンは 1.14 までです。
  • Pythonのversion
    現時点(2019/12/06)で Dataflow がサポートしている PythonSDK の version は 2系 となっており、Python3系を利用すると警告がでます。
    しかし、私の経験則から言うと、不具合が起きるのはごく稀なケースで本番で運用しても差し支えないレベルだと思います。(ただし責任は取れませんので、最終的にはご自身の判断でご利用ください)

Setup.py を作成する

Dataflow上で templeate として保存して実行する際には setup.py が必要になります。
ここに、実行時の依存関係を記述していきます。
entry_points ばご自身のパッケージ構成に合わせて指定してください。

setup.py
PACKAGES = [
    "apache-beam[gcp]==2.14.*",
]

setup(
    name='dataflow-sample',
    url='',
    author='',
    author_email='',
    version='0.1',
    install_requires=REQUIRED_PACKAGES,
    packages=find_packages(),
    include_package_data=True,
    entry_points=dict(console_scripts=[
        'sample=sample:main'
    ]),
    description='dataflow sample',
)

パイプラインのセットアップをする

ApacheBeamのパイプラインを動かすためのセットアップとmain関数の実装を行います。
必要な手順は以下です。

  • runnerを設定する
  • Piplineのインスタンスを生成する
setup_sample.py
import sys
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions


def setup(args):
    runner = "DirectRunner" 
    return beam.Pipeline(runner, options=PipelineOptions(args))


def main():
    pipeline = setup(sys.args)
    with pipeline as p:
        # pipelineを記載する
        pass

オプションを定義する

実際にDataflowで動かすためには SDK で指定されている様々なオプションを設定することが必要です。
代表的なものを列挙してみると以下のようになります。

StandardOptions

name type description
streaming boolean ストリーミングモード or バッチモードを選択

SetupOptions

name type description
setup_file string setup.pyのpathを指定する

GoogleCouldOptions

name type description
region string 使うリージョンを指定する
project string 使うプロジェクトIDを指定する
job_name string ジョブを実行した時の名前を指定する(任意の値)
template_location string templateを保存するGCPのpathを指定する

これらのオプションをコード上か実行時のコマンドライン引数で指定する必要があります。
コードで指定する場合には以下のようになります。

options_sample.py
def option_setting(options: PipelineOptions) -> PipelineOptions:
    cloud_options = options.view_as(GoogleCloudOptions)
    cloud_options.region = "asia-northeast1"
    cloud_options.project = "your project id"
    
    setup_options = options.view_as(SetupOptions)
    setup_options.setup_file = "specify your setup.py path"
    return options

def setup(args):
    runner = "DirectRunner" 
    options = option_setting(PipelineOptions(args))
    return beam.Pipeline(runner, options=options)

基本的には、 PipelineOptions.view_as() で設定したい Options として振舞わせます。
あとは、指定したいプロパティに対して値を設定すれば良いだけです。

また、実行時に必要な設定値がある場合には独自のカスタムオプションを作成することも可能です。
実装は PipelineOptions を継承して、必要なメソッドをオーバーライドするだけです。

costom_options_sample.py
class CostomOptions(PipelineOptions):

    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            '--hoge',
            type=int,
            default=0,
            help='This is Costom Value'
        )

BigQuery to BigQuery パイプラインを定義する

実際にBigQueryからデータを読み取ってBigQueryへ格納するパイプラインを定義してみます。
簡単な例として、Userテーブルからidのみを抽出して別のテーブルにInsertする処理を実装してみます。

pipeline.py
def b2b_pipline(pipe: PCollection):

    # 実行した SQL を記載する
    query = "SELECT id, name, age FROM sample.users"
    
    _ = (pipe
        | "Read from BigQuery" >> beam.io.Read(BigQuerySource(query=query, use_standard_sql=True))
        | "Preprocess" >> beam.Map(lambda data: data["id"])
        | "Write to BigQuery" >> apache_beam.io.WriteToBigQuery(
                table="user_ids",
                schema="id:INTEGER",
                create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
                write_disposition=BigQueryDisposition.WRITE_APPEND,
            ) 
        )

Piplineの処理では、入力, 中間操作, 出力の3つの操作を行っています。
今回紹介したもの意外にも種類がいろいろあるので、 公式のリファレンス などを参考してカスタマイズすることが可能です。

実際に動かす環境について

実装したパイプラインをLocalやGCP上で動かしてみます。

ApacheBeamを動かすときには、いくつか環境を選べます。

  • Localで動かす
  • DataflowでTemplate保存しないで動かす
  • DataflowでTemplate保存して動かす
ユースケース Runner template_location
Localで動かす DirectRunner None
Dataflowで動かす DataflowRunner None
Dataflowでtemplateとして動かす DataflowRunner テンプレートを保存するGCSのパスを指定する

Templateを保存すると、PipelineをGCSに保存することができて、コンソールやコマンドラインなどから起動することができます。Pipelineを定時実行したいときなどに非常に有用です。

終わりに

今回は、 ApcheBeam を Python で実装して、 Dataflow で動かすまでを紹介してみました。
あなたの参考になれば幸いです。

9
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?