Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
Help us understand the problem. What is going on with this article?

Cloud Dataflow for JavaをAppEngine for Goから実行する

More than 3 years have passed since last update.

Cloud Dataflow Template

従来のCloud Dataflowはバッチモードで起動する場合、どこかからデプロイする必要がありました。
しかし、Dataflow Templateを使えば、事前にCloud Storageに配置しておき、任意のタイミングで実行することができます。

Dataflow Templateの作成

Dataflow Templateは以下のように作成します。

mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \
              -Dexec.args="--project={your GCP ProjectID} \
              --stagingLocation=gs://{your bucket} \
              --dataflowJobFile=gs://{your output Dataflow Template Path} \
              --runner=TemplatingDataflowPipelineRunner"

Parameter

Dataflow Template作成時に以下のようなパラメータを渡します。
dataflowJobFile がTemplateの名前のようなもので、Templateを実行する時に指定します。

  • project : GCP ProjectID
  • stagingLocation : Dataflowが利用するファイルを置くGCS Bucket
  • dataflowJobFile : Dataflow Templateの設定ファイルを作成するGCS Path

Dataflow Templateの実行

Dataflow Templateを実行する方法は3つあります。

今回は、gcloud sdk からの実行と、 Client Libraryからの実行 について説明します。

gcloud sdkから実行

gcloud sdkで実行する場合は以下のようになります。

{job_name}は起動したDataflowのJobのNameとして設定される任意の値です。

gcloud beta dataflow jobs run {job_name} \
        --gcs-location gs://{your Dataflow Template Path}

Client Libraryから実行

今回は、App Engine Goから実行しました。
以下のような感じで、Client LibraryからDataflowのAPIを叩きます。

import (
    "fmt"
    "io"
    "net/http"

    "golang.org/x/oauth2"
    "golang.org/x/oauth2/google"

    "google.golang.org/appengine"
    "google.golang.org/appengine/log"
    "google.golang.org/appengine/urlfetch"
)

const ProjectID = "your project id"

func init() {
    http.HandleFunc("/cron/start", handler)
}

func handler(w http.ResponseWriter, r *http.Request) {
    ctx := appengine.NewContext(r)

    client := &http.Client{
        Transport: &oauth2.Transport{
            Source: google.AppEngineTokenSource(ctx, "https://www.googleapis.com/auth/cloud-platform"),
            Base:   &urlfetch.Transport{Context: ctx},
        },
    }

    res, err := client.Post(fmt.Sprintf("https://dataflow.googleapis.com/v1b3/projects/%s/templates", ProjectID), "application/json", r.Body)
    if err != nil {
        log.Errorf(ctx, "ERROR dataflow: %s", err)
        w.WriteHeader(http.StatusInternalServerError)
        return
    }

    _, err = io.Copy(w, res.Body)
    if err != nil {
        log.Errorf(ctx, "ERROR Copy API response: %s", err)
        w.WriteHeader(http.StatusInternalServerError)
        return
    }
    w.WriteHeader(res.StatusCode)
}

今回はJSONをぶん投げて、パラメータを渡しています。

{
    "jobName": "{job_name}",
    "gcsPath": "gs://{your Dataflow Template Path}",
    "environment": {
        "tempLocation": "gs://{your bucket}",
        "zone": "us-central1-f"
    }
}

おわり

DataflowはAppEngineの苦手なバッチ処理をやってくれるフルマネージドサービスです。
料金も案外安いので、AppEngineでごりごりTaskQueueでやってたようなCloud StorageからCSVを読み込んで、Datastoreへ書き込むみたいなことは、Dataflowでやった方が簡単かもしれません。
バッチ処理の起動タイミングも、Dataflow Templateで解決しました。
これからは、もりもりDataflowの知見をためていきましょう!

Next

Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する

for Python

Cloud Dataflow(Python)をAppEngineから実行する

sinmetal
GCPUG Admin https://gcpug.jp
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away