Cloud Dataflow Template
Cloud Dataflow for JavaをAppEngine for Goから実行する の続きの記事です。
前回は単純にDataflow Templateを作成し、実行するのみでした。
今回は実行する時にパラメータを渡す方法を説明します。
ValueProviderの作成
実行時パラメータを受け取る場合、ロジック側で受け入れることができるようにValueProviderを定義しておきます。
以下の例では、 InputTable
, OutputProjectID
, OutputKind
の3つの値をパラメータとして受け入れるように書いています。
public interface BigQueryToDatastoreOptions extends PipelineOptions {
@Description("Path of the bigquery table to read from")
@Default.String("cpb101demo1:samples.table")
ValueProvider<String> getInputTable();
@Description("Output destination Datastore ProjectID")
@Default.String("cpb101demo1")
ValueProvider<String> getOutputProjectID();
@Description("Output destination Datastore Kind")
@Default.String("hogeKind")
ValueProvider<String> getOutputKind();
void setInputTable(ValueProvider<String> value);
void setOutputProjectID(ValueProvider<String> value);
void setOutputKind(ValueProvider<String> value);
}
public static void main(String[] args) {
BigQueryToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(BigQueryToDatastoreOptions.class);
String inputTable = options.getInputTable().get();
String projectID = options.getOutputProjectID().get();
String kind = options.getOutputKind().get();
Pipeline p = Pipeline.create(options);
PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());
CreateEntities createEntities = new CreateEntities();
createEntities.setKind(kind);
PCollection<Entity> entities = keywordGroups.apply(createEntities);
entities.apply(DatastoreIO.v1().write().withProjectId(projectID));
p.run();
}
Dataflow Templateの作成
Dataflow Templateは以下のように作成します。
mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \
-Dexec.args="--project=cpb101demo1 \
--stagingLocation=gs://cpb101demo1/staging \
--inputTable=cpb101demo1:samples.table \
--outputProjectID=cpb101demo1 \
--outputKind=Utterance \
--dataflowJobFile=gs://cpb101demo1-df-template/BigQueryToDatastore201702021500 \
--runner=TemplatingDataflowPipelineRunner"
Dataflow Templateの実行
実行方法は前回の記事のApp Engineの時と同じです。
実行時パラメータをRequest Bodyの中に含めてあげれば、Dataflow Template実行時にその値が受け渡されます。
{
"jobName": "samplejob",
"gcsPath": "gs://cpb101demo1-df-template/BigQueryToDatastore201703081355",
"parameters": {
"inputTable": "topgate-ai-dev:dialog.utterance_1000",
"outputProjectID": "cpb101demo1",
"outputKind": "sample1356"
},
"environment": {
"tempLocation": "gs://cpb101demo1/staging",
"zone": "us-central1-f"
}
}
パラメータをパイプラインで持ち回る方法
今回、なんだかんだで苦労したのが、実行時に渡されたパラメータをパイプラインで持ち回る方法です。
パラメータはパイプライン実行前に取得できますが、利用するのはパイプラインの途中になります。
Javaのコード上は1つのマシンで動いているように見えるDataflowですが、実際には複数のインスタンスに分かれて実行されるので、値を共有するのは地味に大変です。
ベストなのかは分からないけど、とりあえず成功した方法は、 パイプライン作成後にインスタンス変数に値をセットする
です。
こんな感じで、セッターを用意しておいて...
static class CreateEntityFn extends DoFn<KV<Integer, Iterable<TableRow>>, Entity> {
String kind = "";
public void setKind(String kind) {
this.kind = kind;
}
}
パイプライン作成時に、セットします。
Pipeline p = Pipeline.create(options);
PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());
CreateEntities createEntities = new CreateEntities();
createEntities.setKind(kind);
分かってしまえば単純ですが、ここにたどり着くまでに、うろうろしてたので、これからやる人は同じ穴にはまらないように注意してください!
#おわり
実行時に読み込み元のCSVファイルを変更したいとか、出力先のDatastoreのKindを変えたいとかは、よくあるパターンだと思うので、ValueProvider使って、受け渡していきましょう!
今回使ったサンプルコードは github に置いてあります。