Edited at

Cloud Dataflow(Python)をAppEngineから実行する

More than 1 year has passed since last update.

(5/31 v2.0.0リリース後に更新)

Cloud Dataflow Python版がようやくGAということでTemplate実行がPython版でもできるか試したところ、あらかじめ登録したパイプラインを(3/23時点でパラメータ渡しはまだできないものの)(5/31の2.0.0リリースでパラメータ渡しできるようになりました)AppEngineから起動することができたので手順を共有したいと思います。


Cloud Dataflow Template実行とは

DataflowのパイプラインをあらかじめGCSに登録しておき、任意のタイミングで好きなパラメータを渡して登録したパイプラインを実行することができる機能です。AppEngine経由でTemplate実行を呼び出しすることで、パイプライン起動用のサーバを自分で立てたりすることなくデータ加工や分析用の処理をプログラムから簡単に実行できるようになります。cronも使えばデータ分析パイプラインの定期実行も可能になります。

機械学習で精度を出すための試行錯誤をする段階ではあまり使うことはないかもしれませんが、実運用に入った時にはサーバの運用を気にせずに煩雑なデータ処理パイプラインを実行できるようになると運用者はかなり楽になるのではと思います。しかも試行錯誤時に使っていたパイプラインをそのままに近い形でも実運用にもっていけるようになると開発も楽になりそうです。(高精度のモデルを作る難しさもさることながら作り込んだ機械学習モデルを安定的に稼働するシステムとして形にするのもなかなか苦労するはず)


実行方法

DataflowのTemplate実行は以下のステップを取ります。


  • パイプラインのコードで外から受け取ったパラメータを使うよう処理を変更

  • GCSにDataflowのパイプラインを登録する

  • AppEngineからDataflow APIを叩いて登録したパイプラインを実行する

  • (必要なら)AppEngineからパイプラインを停止する(クラスタを削除する)

以下、ステップごとの手順を説明していきたいと思います。


パイプラインでパラメータを使うよう修正

外から渡されるパラメータを受け取るためのカスタムオプションクラスを定義します。

Beamではプログラムは実行時に外部から渡されるパラメータはValueProviderクラスを経由して参照します。

PipelineOptionsクラスではパラメータをValueProviderとして読み込むためのadd_value_provider_argumentメソッドを持った独自parserを持っています。PipelineOptionsクラスを継承したカスタムオプションクラスを作り、初期化時に呼び出される_add_argparse_argsメソッド内でparserに追加したいパラメータの設定を記述します。

以下の例ではカスタムのパラメータとしてinput,output,dateを指定しています。


pipeline.py

import apache_beam as beam

class MyOptions(PipelineOptions):

@classmethod
def _add_argparse_args(cls, parser):

parser.add_value_provider_argument(
'--input',
default="gs://{mybucket}/{pathtofile}",
help='input gcs file path')

parser.add_value_provider_argument(
'--output',
default="gs://{mybucket}/{pathtofile}",
help='output gcs file path')

parser.add_value_provider_argument(
'--date',
default="20170531",
help='today')

options = MyOptions()


パイプラインの処理の方ではValueProviderの値を使うように修正します。

ValueProviderは先に作成したカスタムOptionの変数として参照することができます。

プログラムはValueProviderを経由して遅延的に値を取得する必要があり値は.get()で取得します。

PTransformやDoFn内部の処理でValueProviderを利用したい場合はコンストラクタでValueProviderを渡して、インスタンス変数として保持しておき、内部で.get()を使うと参照できるようでした。

なおBeamで提供されているクラスReadFromText,WriteToTextは引数として直接ValueProviderを渡すことができます。

以下の例では外部パラメータinputで指定された先のファイルを読み込み、dateで指定された文字列に各行を置き換え、outputで指定されたパスに書き出します。


pipeline.py

class MyDoFn(beam.DoFn):

# コンストラクタでValueProviderを受け取りインスタンス変数にセット
def __init__(self, date):
self._date = date

def process(self, element):
yield self._date.get() # 値は.get()メソッドで取得

p = beam.Pipeline(options=options)

(p | "Read" >> beam.io.ReadFromText(options.input)
| "DoFn" >> beam.ParDo(MyDoFn(options.date)) # DoFnコンストラクタにValueProviderを渡す
| "Write" >> beam.io.WriteToText(options.output))

p.run()



GCSにパイプラインを登録する

GoogleCloudOptionsにTemplateの登録先のGCSパスをつけて実行するとパイプラインが実行される代わりに指定したGCSパスにパイプラインの処理内容を記述したTemplateファイルが登録されます。実行環境はDataflowRunnerでパイプラインが実行可能な場所であればどこでも大丈夫なはずです。


pipeline.py

options = MyOptions()

# runnerにはDataflowRunnerを指定する
options.view_as(beam.options.pipeline_options.StandardOptions).runner = 'DataflowRunner'

# template_locationにTemplateを登録するGCSパスを指定する
google_cloud_options = options.view_as(beam.options.pipeline_options.GoogleCloudOptions)
google_cloud_options.template_location = 'gs://{your bucket}/mytemplate'

# パイプラインを実行する
p = beam.Pipeline(options=options)

~パイプライン処理コード~

p.run().wait_until_finish()


上記実行すると指定したGCSのパスにパイプラインの処理内容が記述されたTemplateファイルが生成されています。


登録したパイプラインを実行する

登録したTemplateの実行はGoogle REST APIに指示を送ります。

Google Cloud Client Libraryは(2017/3時点で)まだ未対応のようなのでここではGoogle APIs Client Libraryを利用します。利用にあたりDataflowのAPI Client Libraryをインストールしておきます(versionは3/23時点ではv1b3が最新のようでした)。上記実行で作成されたGCSパスをリクエストパラメータのbodyのgcsPathに指定して実行するとTemplateからジョブが生成・実行されます。

以下GoとPythonのサンプルコードを載せていますが、その他言語のLibraryからも実行できるはずです。

(Python版はローカルで試したもののAppEngine上では試していないので問題あればお知らせください)


Go

import (

"net/http"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/appengine"
dataflow "google.golang.org/api/dataflow/v1b3"
"google.golang.org/appengine/urlfetch"
)
//中略
func handler(w http.ResponseWriter, r *http.Request) {
c := appengine.NewContext(r)
client := &http.Client{
Transport: &oauth2.Transport{
Source: google.AppEngineTokenSource(c, "https://www.googleapis.com/auth/cloud-platform"),
Base: &urlfetch.Transport{Context: c},
},
}

service, err := dataflow.New(client)
templates := dataflow.NewProjectsTemplatesService(service)
req := &dataflow.CreateJobFromTemplateRequest{
GcsPath: "gs://{your bucket}/mytemplate",
JobName: "sklearn",
Parameters: map[string]string{
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
}
job, err := templates.Create("{your project}", req).Do()
//中略
}



Python

from oauth2client.client import GoogleCredentials

from oauth2client.service_account import ServiceAccountCredentials
from apiclient.discovery import build

credentials = GoogleCredentials.get_application_default()
service = build("dataflow", "v1b3", credentials=credentials)
templates = service.projects().templates()

body = {
"environment": {
"bypassTempDirValidation": False,
"tempLocation": "gs://{your bucket}/temp",
#"serviceAccountEmail": "A String",
#"zone": "us-central1-f",
"maxWorkers": 1,
},
"gcsPath": "gs://{your bucket}/mytemplate",
"parameters": {
"input": "gs://{yourbucket}/{pathtofile1}",
"output": "gs://{yourbucket}/{pathtofile2}",
"date": "20170601",
},
"jobName": "sklearn",
}
req = templates.create(projectId="{your project}", body=body)
res = req.execute()


bodyの中で必須項目はgcsPathとjobNameだけのようです。jobNameには実行中のjobでユニークとなる文字列を入れておけばよいようです。parameterは実行時にパイプラインに渡したい実行時パラメータを指定します。

レスポンスにはjobのIDが含まれているので後にジョブをキャンセルする場合には保持しておきます。

ちなみに登録したパイプラインはコンソール上からも実行できます。

Dataflowのコンソール画面の上にある[+ RUN JOB]から遷移する下記画面でカスタムテンプレートを選択し、登録したTemplateのGCSパスを指定してjobを実行することもできます。

dataflowjob.png


実行中のパイプラインを停止する

パイプラインのjobを起動したものの問題が発覚した場合や、streaming modeで所定の時間帯だけ起動したいような場合にはTemplateからのjob起動だけでなくjob停止する必要が出てきます。停止する際は同じDataflow REST APIでstateを"JOB_STATE_CANCELLED"に指定してjobをupdateします。以下Pythonコード例です。


Python

jobs= service.projects().jobs()

body = {
"requestedState": "JOB_STATE_CANCELLED"
}
req = jobs.update(projectId={your project}, jobId={job ID}, body=body)
res = req.execute()


これでjobはキャンセルされ、起動したクラスタも削除されます。


おわりに

AppEngineからcronなどを使うことであらかじめ作っておいたデータ分析パイプラインを定期実行したりもできます。これでデータ分析の検証フェーズでの前処理実行だけでなく、運用フェーズでのデータの収集・加工用途に対してもDataflow活用の幅が広がってきました。

簡単にパイプラインを作ることができるため、データ分析システムの検証フェーズでも運用時のデータ収集を想定した前処理ワークフローを書いておくことで、検証時に利用を想定していたデータがシステム開発時に取得コストが思いのほか高いのがわかって泣く泣くモデリングをやり直すなどの問題を早めに洗い出しやすくすることができます。

GCPの特徴の一つとして、クラウド側が面倒なインフラ構築や運用を担ってくれることで、アプリケーション開発者、機械学習エンジニアが開発・データ分析に集中することができること、が挙げられのではと思っています。Webアプリケーション開発にAppEngineがあったように、データ分析で面倒になりがちなデータ処理パイプラインの構築・運用の役割をDataflowが担ってくれるものと期待しています。


for Java

Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する