Edited at

Cloud Dataflow for JavaをAppEngine for Goから実行する

More than 1 year has passed since last update.


Cloud Dataflow Template

従来のCloud Dataflowはバッチモードで起動する場合、どこかからデプロイする必要がありました。

しかし、Dataflow Templateを使えば、事前にCloud Storageに配置しておき、任意のタイミングで実行することができます。


Dataflow Templateの作成

Dataflow Templateは以下のように作成します。

mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \

-Dexec.args="--project={your GCP ProjectID} \
--stagingLocation=gs://{your bucket} \
--dataflowJobFile=gs://{your output Dataflow Template Path} \
--runner=TemplatingDataflowPipelineRunner"


Parameter

Dataflow Template作成時に以下のようなパラメータを渡します。

dataflowJobFile がTemplateの名前のようなもので、Templateを実行する時に指定します。


  • project : GCP ProjectID

  • stagingLocation : Dataflowが利用するファイルを置くGCS Bucket

  • dataflowJobFile : Dataflow Templateの設定ファイルを作成するGCS Path


Dataflow Templateの実行

Dataflow Templateを実行する方法は3つあります。

今回は、gcloud sdk からの実行と、 Client Libraryからの実行 について説明します。


gcloud sdkから実行

gcloud sdkで実行する場合は以下のようになります。

{job_name}は起動したDataflowのJobのNameとして設定される任意の値です。

gcloud beta dataflow jobs run {job_name} \

--gcs-location gs://{your Dataflow Template Path}


Client Libraryから実行

今回は、App Engine Goから実行しました。

以下のような感じで、Client LibraryからDataflowのAPIを叩きます。

import (

"fmt"
"io"
"net/http"

"golang.org/x/oauth2"
"golang.org/x/oauth2/google"

"google.golang.org/appengine"
"google.golang.org/appengine/log"
"google.golang.org/appengine/urlfetch"
)

const ProjectID = "your project id"

func init() {
http.HandleFunc("/cron/start", handler)
}

func handler(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)

client := &http.Client{
Transport: &oauth2.Transport{
Source: google.AppEngineTokenSource(ctx, "https://www.googleapis.com/auth/cloud-platform"),
Base: &urlfetch.Transport{Context: ctx},
},
}

res, err := client.Post(fmt.Sprintf("https://dataflow.googleapis.com/v1b3/projects/%s/templates", ProjectID), "application/json", r.Body)
if err != nil {
log.Errorf(ctx, "ERROR dataflow: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}

_, err = io.Copy(w, res.Body)
if err != nil {
log.Errorf(ctx, "ERROR Copy API response: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(res.StatusCode)
}

今回はJSONをぶん投げて、パラメータを渡しています。

{

"jobName": "{job_name}",
"gcsPath": "gs://{your Dataflow Template Path}",
"environment": {
"tempLocation": "gs://{your bucket}",
"zone": "us-central1-f"
}
}


おわり

DataflowはAppEngineの苦手なバッチ処理をやってくれるフルマネージドサービスです。

料金も案外安いので、AppEngineでごりごりTaskQueueでやってたようなCloud StorageからCSVを読み込んで、Datastoreへ書き込むみたいなことは、Dataflowでやった方が簡単かもしれません。

バッチ処理の起動タイミングも、Dataflow Templateで解決しました。

これからは、もりもりDataflowの知見をためていきましょう!


Next

Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する


for Python

Cloud Dataflow(Python)をAppEngineから実行する