Google Cloud Dataflowでは、パイプラインを分岐することでデータを複数の場所に出力できます。しかし、この方法では事前に出力先が決まっていなければならず、例えば「データ内で指定された任意のBigQueryテーブルへ出力」みたいなことはできません(多分)。そのような動的な出力振り分けを実現する方法を探していたところ、汎用的なものとして DynamicDestinations
というクラスを利用すればいいことが分かりました。
本記事ではこの DynamicDestinations
を用いた方法および簡略化された方法の2通りでテーブル振り分けを試してみます。
サンプル構成
今回は出力のみ興味があるので、余計なデータ処理を挟まず「Pub/Subから取得したテキストをそのままBigQueryに保存する」ことをします。ただし、通常の単一テーブルへの出力と比較できるよう、パイプラインを分岐して2種類の出力処理をつけます。
- 片方は、全てのテキストを指定したテーブルへ出力します。
- もう片方は、テキストの先頭文字に応じてテーブルを変更します。
BigQueryに保存されたデータの具体例は以下の通りです。
$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
| text |
+-----------+
| Charizard |
| Alakazam |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
| text |
+------------+
| Cloyster |
| Clefairy |
| Charmander |
+------------+
${table}
には全ての入力データが入っているのに対し、例えば ${table}_C
には C で始まるテキストのみが入っています。振り分け用の各テーブルは、コード内で生成した名前で、必要になったものだけ実行時に作られます。
コード
package com.example.dataflow;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
/**
* An example of BigQuery dynamic destinations.
*
* <p>To run this example using managed resource in Google Cloud
* Platform, you should specify the following command-line options:
* --project=<YOUR_PROJECT_ID>
* --jobName=<JOB_NAME>
* --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
* --runner=DataflowRunner
* --input=<PUBSUB_INPUT_TOPIC>
* --output=<BIGQUERY_OUTPUT_TABLE>
*/
public class DynamicDestinationsPipeline {
/**
* Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
*/
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
String getInput();
void setInput(String value);
@Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
@Validation.Required
String getOutput();
void setOutput(String value);
}
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinations extends DynamicDestinations<String, String> {
private final String tablePrefix;
MyDestinations(String tableId) {
tablePrefix = tableId + "_";
}
/**
* Returns a destination table specifier: initial for input string.
*/
@Override
public String getDestination(ValueInSingleWindow<String> element) {
return element.getValue().substring(0, 1);
}
/**
* Returns a TableDestination object for the destination.
* The table name will be {@code <table_id>_<initial for input string>}.
*/
@Override
public TableDestination getTable(String destination) {
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
/**
* Returns a table schema for the destination.
* The table has only one column: text as STRING.
*/
@Override
public TableSchema getSchema(String destination) {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
return new TableSchema().setFields(fields);
}
}
/**
* A formatter to convert an input text to a BigQuery table row.
*/
static class MyFormatFunction implements SerializableFunction<String, TableRow> {
@Override
public TableRow apply(String input) {
return new TableRow().set("text", input);
}
}
/**
* Run a pipeline.
* It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
* The master table saves all the texts, and other tables save the texts with same initials.
*/
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("text").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
Pipeline p = Pipeline.create(options);
PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
.fromTopic(options.getInput()));
texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
.to(options.getOutput())
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinations(options.getOutput()))
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
p.run();
}
}
- 単一テーブル出力と比較したとき、
BigQueryIO.Write<T>
に対する設定変更は僅かです。-
to()
の引数をDynamicDestinations<T,?>
に変更します。今回は基となるテーブル名をクラスに持たせるために、継承したMyDestinations
を引数としています。 -
withSchema()
は不要です。DynamicDestinations
の中に含んでいます。
-
-
DynamicDestinations
の抽象メソッドをオーバーライドします。
実行前に、Pub/SubのトピックとBigQueryのデータセットを作成しておいてください。stagingLocation等を指定する場合はStorageのバケットも必要です。
Pub/SubへのpublishはGUIからでもできますが、テキストファイルを流し込むために以下のRubyスクリプトを使用しました。
require "google/cloud/pubsub"
pubsub = Google::Cloud::Pubsub.new(
project_id: "<project_id>",
credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"
while (text = gets)
topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt
詰まったところ
最初は MyDestinations
クラスにschemaも渡していたのですが、実行すると IllegalArgumentException
が発生してしまいました。検索したらピンポイントで同じエラーが見つかり、serializeできないものはクラスに入れられないと分かりました。ひとまず getSchema()
の中で毎回作成することにしてエラーを回避しました。
簡易振り分け
今回の例に限らず、テーブルは振り分けるがそのスキーマは全く同じということがあるでしょう。この場合は to()
に SerializableFunction
を指定してテーブル名だけ動的にする方法があります(BigQueryIO の "Sharding BigQuery output tables" 参照)。スキーマは普通に withSchema()
で指定します。
SerializableFunction
の内容は、DynamicDestinations
の getDestination()
と getTable()
をひとまとめにした処理を実装するだけです。以下のコードでは destination
というローカル変数を作っているので、前のコードと対応が分かりやすいと思います。
/**
* Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
* Destination table name will be {@code <table_id>_<initial for input string>}.
*/
static class MyDestinationsFunction
implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
private final String tablePrefix;
MyDestinationsFunction(String tableId) {
tablePrefix = tableId + "_";
}
@Override
public TableDestination apply(ValueInSingleWindow<String> input) {
String destination = input.getValue().substring(0, 1);
return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
}
}
...
texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
.to(new MyDestinationsFunction(options.getOutput()))
.withSchema(schema)
.withFormatFunction(new MyFormatFunction())
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
これからしたいこと
- テーブル振り分けをプロジェクトをまたいで実施したいです。見たところ出力先の指定方法は
"<プロジェクトID>:<データセットID>.<テーブルID>"
となっているので普通に可能で、権限の設定さえ成功すればいけるのだろうと思います。 - データ毎でなく、事前にデータをグループ化して振り分けたいです。今の方法だと処理が重いのではないかと心配しています。(裏で最適化してくれているかもしれませんが)
- Pub/SubやStorageなど他のGCPサービスへも出力振り分けできるか確認したいです。
参考
- Apache Beam SDK ドキュメント
- Dataflow DynamicDestinations unable to serialize org.apache.beam.sdk.io.gcp.bigquery.PrepareWrite - Stack Overflow
- Pokémon List Generator (入力データの生成に使用しました)
-
テーブル名に使えない文字への対処はサボっています。 ↩