Help us understand the problem. What is going on with this article?

Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する

More than 1 year has passed since last update.

Cloud Dataflow Template

Cloud Dataflow for JavaをAppEngine for Goから実行する の続きの記事です。
前回は単純にDataflow Templateを作成し、実行するのみでした。
今回は実行する時にパラメータを渡す方法を説明します。

ValueProviderの作成

実行時パラメータを受け取る場合、ロジック側で受け入れることができるようにValueProviderを定義しておきます。
以下の例では、 InputTable , OutputProjectID , OutputKind の3つの値をパラメータとして受け入れるように書いています。

public interface BigQueryToDatastoreOptions extends PipelineOptions {

        @Description("Path of the bigquery table to read from")
        @Default.String("cpb101demo1:samples.table")
        ValueProvider<String> getInputTable();

        @Description("Output destination Datastore ProjectID")
        @Default.String("cpb101demo1")
        ValueProvider<String> getOutputProjectID();

        @Description("Output destination Datastore Kind")
        @Default.String("hogeKind")
        ValueProvider<String> getOutputKind();

        void setInputTable(ValueProvider<String> value);

        void setOutputProjectID(ValueProvider<String> value);

        void setOutputKind(ValueProvider<String> value);
    }

    public static void main(String[] args) {
        BigQueryToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(BigQueryToDatastoreOptions.class);

        String inputTable = options.getInputTable().get();
        String projectID = options.getOutputProjectID().get();
        String kind = options.getOutputKind().get();

        Pipeline p = Pipeline.create(options);

        PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
                .apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());

        CreateEntities createEntities = new CreateEntities();
        createEntities.setKind(kind);

        PCollection<Entity> entities = keywordGroups.apply(createEntities);
        entities.apply(DatastoreIO.v1().write().withProjectId(projectID));

        p.run();
    }

Dataflow Templateの作成

Dataflow Templateは以下のように作成します。

mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \
              -Dexec.args="--project=cpb101demo1 \
              --stagingLocation=gs://cpb101demo1/staging \
              --inputTable=cpb101demo1:samples.table \
              --outputProjectID=cpb101demo1 \
              --outputKind=Utterance \
              --dataflowJobFile=gs://cpb101demo1-df-template/BigQueryToDatastore201702021500 \
              --runner=TemplatingDataflowPipelineRunner"

Dataflow Templateの実行

実行方法は前回の記事のApp Engineの時と同じです。
実行時パラメータをRequest Bodyの中に含めてあげれば、Dataflow Template実行時にその値が受け渡されます。

{
    "jobName": "samplejob",
    "gcsPath": "gs://cpb101demo1-df-template/BigQueryToDatastore201703081355",
    "parameters": {
        "inputTable": "topgate-ai-dev:dialog.utterance_1000",
        "outputProjectID": "cpb101demo1",
        "outputKind": "sample1356"
    },
    "environment": {
        "tempLocation": "gs://cpb101demo1/staging",
        "zone": "us-central1-f"
    }
}

パラメータをパイプラインで持ち回る方法

今回、なんだかんだで苦労したのが、実行時に渡されたパラメータをパイプラインで持ち回る方法です。
パラメータはパイプライン実行前に取得できますが、利用するのはパイプラインの途中になります。
Javaのコード上は1つのマシンで動いているように見えるDataflowですが、実際には複数のインスタンスに分かれて実行されるので、値を共有するのは地味に大変です。
ベストなのかは分からないけど、とりあえず成功した方法は、 パイプライン作成後にインスタンス変数に値をセットする です。

こんな感じで、セッターを用意しておいて...

static class CreateEntityFn extends DoFn<KV<Integer, Iterable<TableRow>>, Entity> {

    String kind = "";

    public void setKind(String kind) {
        this.kind = kind;
    }
}

パイプライン作成時に、セットします。

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
                .apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());

    CreateEntities createEntities = new CreateEntities();
    createEntities.setKind(kind);

分かってしまえば単純ですが、ここにたどり着くまでに、うろうろしてたので、これからやる人は同じ穴にはまらないように注意してください!

おわり

実行時に読み込み元のCSVファイルを変更したいとか、出力先のDatastoreのKindを変えたいとかは、よくあるパターンだと思うので、ValueProvider使って、受け渡していきましょう!

今回使ったサンプルコードは github に置いてあります。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした