16
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Cloud Dataflow for JavaをAppEngine for Goから実行する

Last updated at Posted at 2017-04-12

Cloud Dataflow Template

従来のCloud Dataflowはバッチモードで起動する場合、どこかからデプロイする必要がありました。
しかし、Dataflow Templateを使えば、事前にCloud Storageに配置しておき、任意のタイミングで実行することができます。

Dataflow Templateの作成

Dataflow Templateは以下のように作成します。

mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \
              -Dexec.args="--project={your GCP ProjectID} \
              --stagingLocation=gs://{your bucket} \
              --dataflowJobFile=gs://{your output Dataflow Template Path} \
              --runner=TemplatingDataflowPipelineRunner"

Parameter

Dataflow Template作成時に以下のようなパラメータを渡します。
dataflowJobFile がTemplateの名前のようなもので、Templateを実行する時に指定します。

  • project : GCP ProjectID
  • stagingLocation : Dataflowが利用するファイルを置くGCS Bucket
  • dataflowJobFile : Dataflow Templateの設定ファイルを作成するGCS Path

Dataflow Templateの実行

Dataflow Templateを実行する方法は3つあります。

今回は、gcloud sdk からの実行と、 Client Libraryからの実行 について説明します。

gcloud sdkから実行

gcloud sdkで実行する場合は以下のようになります。

{job_name}は起動したDataflowのJobのNameとして設定される任意の値です。

gcloud beta dataflow jobs run {job_name} \
        --gcs-location gs://{your Dataflow Template Path}

Client Libraryから実行

今回は、App Engine Goから実行しました。
以下のような感じで、Client LibraryからDataflowのAPIを叩きます。

import (
	"fmt"
	"io"
	"net/http"

	"golang.org/x/oauth2"
	"golang.org/x/oauth2/google"

	"google.golang.org/appengine"
	"google.golang.org/appengine/log"
	"google.golang.org/appengine/urlfetch"
)

const ProjectID = "your project id"

func init() {
	http.HandleFunc("/cron/start", handler)
}

func handler(w http.ResponseWriter, r *http.Request) {
	ctx := appengine.NewContext(r)

	client := &http.Client{
		Transport: &oauth2.Transport{
			Source: google.AppEngineTokenSource(ctx, "https://www.googleapis.com/auth/cloud-platform"),
			Base:   &urlfetch.Transport{Context: ctx},
		},
	}

	res, err := client.Post(fmt.Sprintf("https://dataflow.googleapis.com/v1b3/projects/%s/templates", ProjectID), "application/json", r.Body)
	if err != nil {
		log.Errorf(ctx, "ERROR dataflow: %s", err)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	_, err = io.Copy(w, res.Body)
	if err != nil {
		log.Errorf(ctx, "ERROR Copy API response: %s", err)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}
	w.WriteHeader(res.StatusCode)
}

今回はJSONをぶん投げて、パラメータを渡しています。

{
	"jobName": "{job_name}",
	"gcsPath": "gs://{your Dataflow Template Path}",
	"environment": {
		"tempLocation": "gs://{your bucket}",
		"zone": "us-central1-f"
	}
}

#おわり

DataflowはAppEngineの苦手なバッチ処理をやってくれるフルマネージドサービスです。
料金も案外安いので、AppEngineでごりごりTaskQueueでやってたようなCloud StorageからCSVを読み込んで、Datastoreへ書き込むみたいなことは、Dataflowでやった方が簡単かもしれません。
バッチ処理の起動タイミングも、Dataflow Templateで解決しました。
これからは、もりもりDataflowの知見をためていきましょう!

#Next

Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する

#for Python

Cloud Dataflow(Python)をAppEngineから実行する

16
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?