Cloud Dataflow Template
従来のCloud Dataflowはバッチモードで起動する場合、どこかからデプロイする必要がありました。
しかし、Dataflow Templateを使えば、事前にCloud Storageに配置しておき、任意のタイミングで実行することができます。
Dataflow Templateの作成
Dataflow Templateは以下のように作成します。
mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \
-Dexec.args="--project={your GCP ProjectID} \
--stagingLocation=gs://{your bucket} \
--dataflowJobFile=gs://{your output Dataflow Template Path} \
--runner=TemplatingDataflowPipelineRunner"
Parameter
Dataflow Template作成時に以下のようなパラメータを渡します。
dataflowJobFile
がTemplateの名前のようなもので、Templateを実行する時に指定します。
- project : GCP ProjectID
- stagingLocation : Dataflowが利用するファイルを置くGCS Bucket
- dataflowJobFile : Dataflow Templateの設定ファイルを作成するGCS Path
Dataflow Templateの実行
Dataflow Templateを実行する方法は3つあります。
今回は、gcloud sdk
からの実行と、 Client Libraryからの実行
について説明します。
gcloud sdkから実行
gcloud sdkで実行する場合は以下のようになります。
{job_name}は起動したDataflowのJobのNameとして設定される任意の値です。
gcloud beta dataflow jobs run {job_name} \
--gcs-location gs://{your Dataflow Template Path}
Client Libraryから実行
今回は、App Engine Goから実行しました。
以下のような感じで、Client LibraryからDataflowのAPIを叩きます。
import (
"fmt"
"io"
"net/http"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/appengine"
"google.golang.org/appengine/log"
"google.golang.org/appengine/urlfetch"
)
const ProjectID = "your project id"
func init() {
http.HandleFunc("/cron/start", handler)
}
func handler(w http.ResponseWriter, r *http.Request) {
ctx := appengine.NewContext(r)
client := &http.Client{
Transport: &oauth2.Transport{
Source: google.AppEngineTokenSource(ctx, "https://www.googleapis.com/auth/cloud-platform"),
Base: &urlfetch.Transport{Context: ctx},
},
}
res, err := client.Post(fmt.Sprintf("https://dataflow.googleapis.com/v1b3/projects/%s/templates", ProjectID), "application/json", r.Body)
if err != nil {
log.Errorf(ctx, "ERROR dataflow: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
_, err = io.Copy(w, res.Body)
if err != nil {
log.Errorf(ctx, "ERROR Copy API response: %s", err)
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(res.StatusCode)
}
今回はJSONをぶん投げて、パラメータを渡しています。
{
"jobName": "{job_name}",
"gcsPath": "gs://{your Dataflow Template Path}",
"environment": {
"tempLocation": "gs://{your bucket}",
"zone": "us-central1-f"
}
}
#おわり
DataflowはAppEngineの苦手なバッチ処理をやってくれるフルマネージドサービスです。
料金も案外安いので、AppEngineでごりごりTaskQueueでやってたようなCloud StorageからCSVを読み込んで、Datastoreへ書き込むみたいなことは、Dataflowでやった方が簡単かもしれません。
バッチ処理の起動タイミングも、Dataflow Templateで解決しました。
これからは、もりもりDataflowの知見をためていきましょう!
#Next
Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する
#for Python