9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Cloud Dataflow for Javaを実行時パラメータ付きでAppEngine for Goから実行する

Last updated at Posted at 2017-04-12

Cloud Dataflow Template

Cloud Dataflow for JavaをAppEngine for Goから実行する の続きの記事です。
前回は単純にDataflow Templateを作成し、実行するのみでした。
今回は実行する時にパラメータを渡す方法を説明します。

ValueProviderの作成

実行時パラメータを受け取る場合、ロジック側で受け入れることができるようにValueProviderを定義しておきます。
以下の例では、 InputTable , OutputProjectID , OutputKind の3つの値をパラメータとして受け入れるように書いています。

public interface BigQueryToDatastoreOptions extends PipelineOptions {

		@Description("Path of the bigquery table to read from")
		@Default.String("cpb101demo1:samples.table")
		ValueProvider<String> getInputTable();

		@Description("Output destination Datastore ProjectID")
		@Default.String("cpb101demo1")
		ValueProvider<String> getOutputProjectID();

		@Description("Output destination Datastore Kind")
		@Default.String("hogeKind")
		ValueProvider<String> getOutputKind();

		void setInputTable(ValueProvider<String> value);

		void setOutputProjectID(ValueProvider<String> value);

		void setOutputKind(ValueProvider<String> value);
	}

	public static void main(String[] args) {
		BigQueryToDatastoreOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
				.as(BigQueryToDatastoreOptions.class);

		String inputTable = options.getInputTable().get();
		String projectID = options.getOutputProjectID().get();
		String kind = options.getOutputKind().get();

		Pipeline p = Pipeline.create(options);

		PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
				.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());
		
		CreateEntities createEntities = new CreateEntities();
		createEntities.setKind(kind);
		
		PCollection<Entity> entities = keywordGroups.apply(createEntities);
		entities.apply(DatastoreIO.v1().write().withProjectId(projectID));

		p.run();
	}

Dataflow Templateの作成

Dataflow Templateは以下のように作成します。

mvn compile exec:java -Dexec.mainClass=org.sinmetal.flow.BigQueryToDatastore \
              -Dexec.args="--project=cpb101demo1 \
              --stagingLocation=gs://cpb101demo1/staging \
              --inputTable=cpb101demo1:samples.table \
              --outputProjectID=cpb101demo1 \
              --outputKind=Utterance \
              --dataflowJobFile=gs://cpb101demo1-df-template/BigQueryToDatastore201702021500 \
              --runner=TemplatingDataflowPipelineRunner"

Dataflow Templateの実行

実行方法は前回の記事のApp Engineの時と同じです。
実行時パラメータをRequest Bodyの中に含めてあげれば、Dataflow Template実行時にその値が受け渡されます。

{
	"jobName": "samplejob",
	"gcsPath": "gs://cpb101demo1-df-template/BigQueryToDatastore201703081355",
	"parameters": {
		"inputTable": "topgate-ai-dev:dialog.utterance_1000",
		"outputProjectID": "cpb101demo1",
        "outputKind": "sample1356"
	},
	"environment": {
		"tempLocation": "gs://cpb101demo1/staging",
		"zone": "us-central1-f"
	}
}

パラメータをパイプラインで持ち回る方法

今回、なんだかんだで苦労したのが、実行時に渡されたパラメータをパイプラインで持ち回る方法です。
パラメータはパイプライン実行前に取得できますが、利用するのはパイプラインの途中になります。
Javaのコード上は1つのマシンで動いているように見えるDataflowですが、実際には複数のインスタンスに分かれて実行されるので、値を共有するのは地味に大変です。
ベストなのかは分からないけど、とりあえず成功した方法は、 パイプライン作成後にインスタンス変数に値をセットする です。

こんな感じで、セッターを用意しておいて...

static class CreateEntityFn extends DoFn<KV<Integer, Iterable<TableRow>>, Entity> {
	
    String kind = "";

    public void setKind(String kind) {
        this.kind = kind;
    }
}

パイプライン作成時に、セットします。

    Pipeline p = Pipeline.create(options);

    PCollection<KV<Integer, Iterable<TableRow>>> keywordGroups = p
				.apply(BigQueryIO.Read.named("ReadUtterance").from(inputTable)).apply(new GroupKeywords());
		
    CreateEntities createEntities = new CreateEntities();
    createEntities.setKind(kind);

分かってしまえば単純ですが、ここにたどり着くまでに、うろうろしてたので、これからやる人は同じ穴にはまらないように注意してください!

#おわり

実行時に読み込み元のCSVファイルを変更したいとか、出力先のDatastoreのKindを変えたいとかは、よくあるパターンだと思うので、ValueProvider使って、受け渡していきましょう!

今回使ったサンプルコードは github に置いてあります。

9
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?