Help us understand the problem. What is going on with this article?

Dataflowでデータに応じてBigQueryのテーブルを振り分け

More than 1 year has passed since last update.

Google Cloud Dataflowでは、パイプラインを分岐することでデータを複数の場所に出力できます。しかし、この方法では事前に出力先が決まっていなければならず、例えば「データ内で指定された任意のBigQueryテーブルへ出力」みたいなことはできません(多分)。そのような動的な出力振り分けを実現する方法を探していたところ、汎用的なものとして DynamicDestinations というクラスを利用すればいいことが分かりました。

本記事ではこの DynamicDestinations を用いた方法および簡略化された方法の2通りでテーブル振り分けを試してみます。

サンプル構成

今回は出力のみ興味があるので、余計なデータ処理を挟まず「Pub/Subから取得したテキストをそのままBigQueryに保存する」ことをします。ただし、通常の単一テーブルへの出力と比較できるよう、パイプラインを分岐して2種類の出力処理をつけます。

  • 片方は、全てのテキストを指定したテーブルへ出力します。
  • もう片方は、テキストの先頭文字に応じてテーブルを変更します。

dd-pipeline.png

BigQueryに保存されたデータの具体例は以下の通りです。

$ table="<project_id>:<dataset_id>.<table_id>"
$ bq head -n 3 "${table}"
+-----------+
|   text    |
+-----------+
| Charizard |
| Alakazam  |
| Nidoqueen |
+-----------+
$ bq head -n 3 "${table}_C"
+------------+
|    text    |
+------------+
| Cloyster   |
| Clefairy   |
| Charmander |
+------------+

${table} には全ての入力データが入っているのに対し、例えば ${table}_C には C で始まるテキストのみが入っています。振り分け用の各テーブルは、コード内で生成した名前で、必要になったものだけ実行時に作られます。

コード

src/main/java/com/example/dataflow/DynamicDestinationsPipeline.java
package com.example.dataflow;

import java.util.ArrayList;
import java.util.List;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.DynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.ValueInSingleWindow;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;

/**
 * An example of BigQuery dynamic destinations.
 *
 * <p>To run this example using managed resource in Google Cloud
 * Platform, you should specify the following command-line options:
 *   --project=<YOUR_PROJECT_ID>
 *   --jobName=<JOB_NAME>
 *   --stagingLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --tempLocation=<STAGING_LOCATION_IN_CLOUD_STORAGE>
 *   --runner=DataflowRunner
 *   --input=<PUBSUB_INPUT_TOPIC>
 *   --output=<BIGQUERY_OUTPUT_TABLE>
 */
public class DynamicDestinationsPipeline {
  /**
   * Add commandline options: input (Pub/Sub topic ID) and output (BigQuery table ID).
   */
  public interface MyOptions extends PipelineOptions {
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
    @Validation.Required
    String getInput();
    void setInput(String value);

    @Description("BigQuery table to write to, specified as <project_id>:<dataset_id>.<table_id>")
    @Validation.Required
    String getOutput();
    void setOutput(String value);
  }

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinations extends DynamicDestinations<String, String> {
    private final String tablePrefix;

    MyDestinations(String tableId) {
      tablePrefix = tableId + "_";
    }

    /**
     * Returns a destination table specifier: initial for input string.
     */
    @Override
    public String getDestination(ValueInSingleWindow<String> element) {
      return element.getValue().substring(0, 1);
    }

    /**
     * Returns a TableDestination object for the destination.
     * The table name will be {@code <table_id>_<initial for input string>}.
     */
    @Override
    public TableDestination getTable(String destination) {
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }

    /**
     * Returns a table schema for the destination.
     * The table has only one column: text as STRING.
     */
    @Override
    public TableSchema getSchema(String destination) {
      List<TableFieldSchema> fields = new ArrayList<>();
      fields.add(new TableFieldSchema().setName("text").setType("STRING"));
      return new TableSchema().setFields(fields);
    }
  }

  /**
   * A formatter to convert an input text to a BigQuery table row.
   */
  static class MyFormatFunction implements SerializableFunction<String, TableRow> {
    @Override
    public TableRow apply(String input) {
      return new TableRow().set("text", input);
    }
  }

  /**
   * Run a pipeline.
   * It reads texts from a Pub/Sub topic and writes them to BigQuery tables.
   * The master table saves all the texts, and other tables save the texts with same initials.
   */
  public static void main(String[] args) {
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

    List<TableFieldSchema> fields = new ArrayList<>();
    fields.add(new TableFieldSchema().setName("text").setType("STRING"));
    TableSchema schema = new TableSchema().setFields(fields);

    Pipeline p = Pipeline.create(options);

    PCollection<String> texts = p.apply("ReadFromPubSub", PubsubIO.readStrings()
        .fromTopic(options.getInput()));
    texts.apply("WriteToBigQuery", BigQueryIO.<String>write()
        .to(options.getOutput())
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinations(options.getOutput()))
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

    p.run();
  }
}
  • 単一テーブル出力と比較したとき、BigQueryIO.Write<T> に対する設定変更は僅かです。
    • to() の引数を DynamicDestinations<T,?> に変更します。今回は基となるテーブル名をクラスに持たせるために、継承した MyDestinations を引数としています。
    • withSchema() は不要です。DynamicDestinations の中に含んでいます。
  • DynamicDestinations の抽象メソッドをオーバーライドします。
    • getDestination() は、データの振り分けを識別するためのオブジェクト(destination)を返します。今回は「テキストのイニシャル」であり、型は String としています。
    • getTable() は、destinationに対応するテーブルを返します。今回は基となるテーブル名にイニシャルを付加しています1
    • getSchema() は、destinationに対応するテーブルのスキーマを返します。今回は全テーブル共通なので、全く同じものを作成しています。(※ここで詰まりました。次節参照)

実行前に、Pub/SubのトピックとBigQueryのデータセットを作成しておいてください。stagingLocation等を指定する場合はStorageのバケットも必要です。

Pub/SubへのpublishはGUIからでもできますが、テキストファイルを流し込むために以下のRubyスクリプトを使用しました。

publish.rb
require "google/cloud/pubsub"

pubsub = Google::Cloud::Pubsub.new(
  project_id: "<project_id>",
  credentials: "credential.json"
)
topic = pubsub.topic "<topic_id>"

while (text = gets)
  topic.publish text.chomp
end
$ head -3 list.txt
Bulbasaur
Ivysaur
Venusaur
$ ruby publish.rb < list.txt

詰まったところ

最初は MyDestinations クラスにschemaも渡していたのですが、実行すると IllegalArgumentException が発生してしまいました。検索したらピンポイントで同じエラーが見つかり、serializeできないものはクラスに入れられないと分かりました。ひとまず getSchema() の中で毎回作成することにしてエラーを回避しました。

簡易振り分け

今回の例に限らず、テーブルは振り分けるがそのスキーマは全く同じということがあるでしょう。この場合は to()SerializableFunction を指定してテーブル名だけ動的にする方法があります(BigQueryIO の "Sharding BigQuery output tables" 参照)。スキーマは普通に withSchema() で指定します。

SerializableFunction の内容は、DynamicDestinationsgetDestination()getTable() をひとまとめにした処理を実装するだけです。以下のコードでは destination というローカル変数を作っているので、前のコードと対応が分かりやすいと思います。

  /**
   * Define BigQuery dynamic destinations for {@code PCollection<String>} input elements.
   * Destination table name will be {@code <table_id>_<initial for input string>}.
   */
  static class MyDestinationsFunction
      implements SerializableFunction<ValueInSingleWindow<String>, TableDestination> {
    private final String tablePrefix;

    MyDestinationsFunction(String tableId) {
      tablePrefix = tableId + "_";
    }

    @Override
    public TableDestination apply(ValueInSingleWindow<String> input) {
      String destination = input.getValue().substring(0, 1);
      return new TableDestination(tablePrefix + destination, "Table for initial " + destination);
    }
  }

...

    texts.apply("WriteToBigQueryShards", BigQueryIO.<String>write()
        .to(new MyDestinationsFunction(options.getOutput()))
        .withSchema(schema)
        .withFormatFunction(new MyFormatFunction())
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

これからしたいこと

  • テーブル振り分けをプロジェクトをまたいで実施したいです。見たところ出力先の指定方法は "<プロジェクトID>:<データセットID>.<テーブルID>" となっているので普通に可能で、権限の設定さえ成功すればいけるのだろうと思います。
  • データ毎でなく、事前にデータをグループ化して振り分けたいです。今の方法だと処理が重いのではないかと心配しています。(裏で最適化してくれているかもしれませんが)
  • Pub/SubやStorageなど他のGCPサービスへも出力振り分けできるか確認したいです。

参考


  1. テーブル名に使えない文字への対処はサボっています。 

HMMNRST
計算資源は有限。効率よく使って、浮いた分で思い切り遊びたい。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away