Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する

More than 1 year has passed since last update.


Cloud Dataflow Template

Cloud Dataflow for JavaをAppEngine for Goから実行する の続きの記事です。

前回は単純にDataflow Templateを作成し、実行するのみでした。

今回は実行する時にパラメータを渡す方法を説明します。


ValueProviderの作成

実行時パラメータを受け取る場合、ロジック側で受け入れることができるようにValueProviderを定義しておきます。

以下の例では、 InputTable , OutputProjectID , OutputKind の3つの値をパラメータとして受け入れるように書いています。

public interface BigQueryToDatastoreOptions extends PipelineOptions {

@Description("Path of the bigquery table to read from")
@Default.String("cpb101demo1:samples.table")
ValueProvider<String> getInputTable();

@Description("Output destination Datastore ProjectID")
@Default.String("cpb101demo1")
ValueProvider<String> getOutputProjectID();

@Description("Output destination Datastore Kind")
@Default.String("hogeKind")
ValueProvider<String> getOutputKind();

void setInputTable(ValueProvider<String> value);

void setOutputProjectID(ValueProvider<String> value);

void setOutputKind(ValueProvider<String> value);
}

public static void main(String[] args) {
BigQueryToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(BigQueryToDatastoreOptions.class);

String inputTable = options.getInputTable().get();
String projectID = options.getOutputProjectID().get();
String kind = options.getOutputKind().get();

Pipeline p = Pipeline.create(options);

PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());

CreateEntities createEntities = new CreateEntities();
createEntities.setKind(kind);

PCollection<Entity> entities = keywordGroups.apply(createEntities);
entities.apply(DatastoreIO.v1().write().withProjectId(projectID));

p.run();
}


Dataflow Templateの作成

Dataflow Templateは以下のように作成します。

mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \

-Dexec.args="--project=cpb101demo1 \
--stagingLocation=gs://cpb101demo1/staging \
--inputTable=cpb101demo1:samples.table \
--outputProjectID=cpb101demo1 \
--outputKind=Utterance \
--dataflowJobFile=gs://cpb101demo1-df-template/BigQueryToDatastore201702021500 \
--runner=TemplatingDataflowPipelineRunner"


Dataflow Templateの実行

実行方法は前回の記事のApp Engineの時と同じです。

実行時パラメータをRequest Bodyの中に含めてあげれば、Dataflow Template実行時にその値が受け渡されます。

{

"jobName": "samplejob",
"gcsPath": "gs://cpb101demo1-df-template/BigQueryToDatastore201703081355",
"parameters": {
"inputTable": "topgate-ai-dev:dialog.utterance_1000",
"outputProjectID": "cpb101demo1",
"outputKind": "sample1356"
},
"environment": {
"tempLocation": "gs://cpb101demo1/staging",
"zone": "us-central1-f"
}
}


パラメータをパイプラインで持ち回る方法

今回、なんだかんだで苦労したのが、実行時に渡されたパラメータをパイプラインで持ち回る方法です。

パラメータはパイプライン実行前に取得できますが、利用するのはパイプラインの途中になります。

Javaのコード上は1つのマシンで動いているように見えるDataflowですが、実際には複数のインスタンスに分かれて実行されるので、値を共有するのは地味に大変です。

ベストなのかは分からないけど、とりあえず成功した方法は、 パイプライン作成後にインスタンス変数に値をセットする です。

こんな感じで、セッターを用意しておいて...

static class CreateEntityFn extends DoFn<KV<Integer, Iterable<TableRow>>, Entity> {

String kind = "";

public void setKind(String kind) {
this.kind = kind;
}
}

パイプライン作成時に、セットします。

    Pipeline p = Pipeline.create(options);

PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());

CreateEntities createEntities = new CreateEntities();
createEntities.setKind(kind);

分かってしまえば単純ですが、ここにたどり着くまでに、うろうろしてたので、これからやる人は同じ穴にはまらないように注意してください!


おわり

実行時に読み込み元のCSVファイルを変更したいとか、出力先のDatastoreのKindを変えたいとかは、よくあるパターンだと思うので、ValueProvider使って、受け渡していきましょう!

今回使ったサンプルコードは github に置いてあります。