Help us understand the problem. What is going on with this article?

有り金溶かさないように Dataflow で BigQuery テーブルを分割する

More than 1 year has passed since last update.

追記: この記事ではパーティションデコレータを使って ingestion-time partitioned table に振り分けていますが、 Column-based partitioned table が Cloud Dataflow/Apache Beam 2.4.0 からサポートされたので、今はこちらを使うのが無難です。

この記事は Google Cloud Platform Advent Calendar 2017 の10日目の記事です。

昨年に GCP 東京リージョンが来ましたが、今年は日本で GCP が盛り上がった一年でしたね。東京リージョンが来る前からユーザが多かった GAE と BigQuery 以外の GCP のサービスの事例が非常に多くありました。私自身も Google 認定プロフェッショナル クラウドアーキテクト取得してから色々なサービスの実経験を積むことができました。

話は変わりますが、私は最近マギアレコード 魔法少女まどか☆マギカ外伝をプレイしています。天音姉妹いいですよね。ねー。
まどか☆マギカといえば魔法少女があんなことになって絶望する話でしたが、 GCP における有名な絶望エピソードとして「BigQueryで150万円溶かした人の顔」という記事が話題になったことがありました。一部で BigQuery に対するネガティブな印象も残ってしまいましたが、想定外の課金が発生しないように正しくクラウドサービスの料金体系を把握することの大切さが啓蒙される機会であったとは思います。

ところで、クエリの料金体系を理解してお金を溶かすことが無くなったとしても、あの記事の著者が元々行おうとしていたテーブル分割という問題をどのように解決するのが良いのでしょうか。
分割されていないモノリシックなテーブルに既に大量のデータが溜まっていたとして、タグフィールドの値によって異なるテーブルに分割しつつ、そのテーブルを現在のベストプラクティスである日付分割テーブルにすることは容易なことではありません。

この記事では複数の日付分割テーブルへの分割問題を GCP のフルマネージドなデータ処理サービスである Cloud Dataflow を用いることで、ワンストップに解決を試みます。

問題設定

入力

  • 下記の schema.json で表されるテーブルの中に大量のデータ
    • time はミリ秒単位の UNIX タイムスタンプ
    • tag は enum 的な限られた種類の文字列
schema.json
[
  {
    "name": "time",
    "type": "INTEGER"
  },
  {
    "name": "tag",
    "type": "STRING"
  },
  {
    "name": "message",
    "type": "STRING"
  }
]

出力

  • 同一のスキーマで新しく作られた tag の値を suffix としたテーブルに各行を振り分け
    • 更に各行は time に対応するパーティションに振り分け

実行方法

下記の Maven の一度の実行で全てを行う

$ mvn compile exec:java -Dexec.mainClass=net.apstndb.BigQuerySplitter \
 -Dexec.args="--runner=DataflowRunner --project=apstndb-sandbox --dataset=split_example --tempLocation=gs://apstndb-sandbox-dataflow/temp --schemaFile=schema.json --inputTable=monolith_table --outputTableBasename=split_table --timeField=time --tagField=tag"

実装

今回の記事では2017年12月8日に Cloud Dataflow でサポートされたばかりの Apache Beam 2.2.0 を使います。それというのも実は Dataflow から日付分割されたテーブルを作ることができなかったのが Beam-2390(allow user to use .setTimePartitioning in BigQueryIO.write) として修正されたためです。
私もあまり Java が好きというわけではありませんが、新しい Beam SDK の機能を使うには Java SDK を使うのも仕方ないので Java 8 を使っています。

ここから、パイプラインの主要な部分を解説していきます。全体はこの記事の最後に載せます。

入力

Pipeline を作り、BigQuery の対象のテーブルを読み込みます。

final Pipeline p = Pipeline.create(options);
p.apply("ReadFromBQ", BigQueryIO.readTableRows().from(inputTableSpec))

特に気をつけることはないですが BigQueryIO.read() が 2.2.0 で deprecated になったので、 BigQueryIO.readTableRows を使っています。

実際の動作としては GCS に Avro フォーマットのオブジェクトとして全体をエクスポートしてから読み込むので、 BigQuery の読み込みはエクスポート1回分だけ課金されます。

タイムスタンプの抽出

パーティションに使われる日時は出力のタイミングで取得しても良いのですが、 Beam はタイムスタンプを使ってウィンドウ処理などをするモデルなので、先にタイムスタンプを抽出しておきます。

.apply("SetTimestamp", WithTimestamps.of((SerializableFunction<TableRow, Instant>) input ->
        new Instant(Long.parseLong(input.get(timeFieldName).toString()))))

ParDo ではなく WithTimestamps を使うことでラムダを使うことができます。TableRow から BigQuery の INTEGER 型のフィールドを取り出す際には Java の String 型になってしまうので、 Long.parseLong を使っていること以外は特に問題ないですね。

出力

ここまでは単純でしたが、振り分けはテーブルとパーティション両方絡んでくるので比較的複雑です。

.apply("WriteToBQ", BigQueryIO.writeTableRows()
        .to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input ->
                new TableDestination(
                        String.format("%s:%s.%s$%s",
                                projectName,
                                datasetName,
                                outputTableBasename + "_" + input.getValue().get(tagFieldName),
                                ISODateTimeFormat.basicDate().print(input.getTimestamp())),
                        null,
                        new TimePartitioning().setType("DAY")))
        .withSchema(tableSchema)
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));

今回は入力と出力が同じスキーマなので、 BigQueryIO.Write<TableRow>.to(SerializableFunction<ValueInSingleWindow<TableRow>,TableDestination> を使うことができます。 SerializableFunction の出力である TableDestination に殆どの情報が含まれています。
コンストラクタの引数はそれぞれ次のようになります。
- 第一引数は出力先のテーブルのパーティションをパーティションデコレータとして指定
- 第二引数の tableDescription は日付パーティションでは null 固定
- 第三引数は TimePartitioning.setTime("DAY") することで日付分割を指示

その他はテーブルスキーマと、テーブルが存在しなかった場合に新規作成し、パーティションを新しいデータに置き換えることを指示しています。

p.run().waitUntilFinish();

最後にパイプラインの終了を待って、完成です。
実際の動作としては、各パーティションへの書き込みは個別の BigQuery の Job として処理されます。クオータ等は少し気にする必要がありそうです。

実装の全ソースコード

実装全体としては下記のようになります。オプションの処理などがかなり行数を使っていますが、実装本体は Java 8 のおかげもあってわりと簡潔だと言えるんじゃないでしょうか。

BigQuerySplitter.java
package net.apstndb;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.api.services.bigquery.model.*;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.options.*;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithTimestamps;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.joda.time.Instant;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;

import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.joda.time.format.ISODateTimeFormat;

public class BigQuerySplitter {
    public interface BigQuerySplitterOptions extends GcpOptions {
        @Description("BigQuery dataset name")
        @Default.String("")
        String getDataset();
        void setDataset(String dataset);

        @Description("BigQuery input table name")
        String getInputTable();
        void setInputTable(String table);

        @Description("BigQuery table schema file")
        String getSchemaFile();
        void setSchemaFile(String schemaFile);

        @Description("BigQuery output table basename")
        String getOutputTableBasename();
        void setOutputTableBasename(String outputTableBasename);

        @Description("BigQuery field name contains unix time in milliseconds")
        String getTimeField();
        void setTimeField(String field);

        @Description("BigQuery field name contains tag")
        String getTagField();
        void setTagField(String field);
    }

    public static void main(String[] args) throws IOException {
        final BigQuerySplitterOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(BigQuerySplitterOptions.class);
        final String projectName = options.getProject();
        final String datasetName = options.getDataset();
        final String timeFieldName = options.getTimeField();
        final String tagFieldName = options.getTagField();
        final String inputTableSpec = String.format("%s:%s.%s", projectName, datasetName, options.getInputTable());
        final String outputTableBasename = options.getOutputTableBasename();

        final String tableSchemaJson = new String(Files.readAllBytes(Paths.get(options.getSchemaFile())), Charset.forName("UTF-8"));
        final TableSchema tableSchema = new TableSchema().setFields(
                new ObjectMapper().reader().forType(new TypeReference<List<TableFieldSchema>>() {
                }).readValue(tableSchemaJson));

        final Pipeline p = Pipeline.create(options);
        p.apply("ReadFromBQ", BigQueryIO.readTableRows().from(inputTableSpec))
                .apply("SetTimestamp", WithTimestamps.of((SerializableFunction<TableRow, Instant>) input ->
                        new Instant(Long.parseLong(input.get(timeFieldName).toString()))))
                .apply("WriteToBQ", BigQueryIO.writeTableRows()
                        .to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) input ->
                                new TableDestination(
                                        String.format("%s:%s.%s$%s",
                                                projectName,
                                                datasetName,
                                                outputTableBasename + "_" + input.getValue().get(tagFieldName),
                                                ISODateTimeFormat.basicDate().print(input.getTimestamp())),
                                        null,
                                        new TimePartitioning().setType("DAY")))
                        .withSchema(tableSchema)
                        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_EMPTY));
        p.run().waitUntilFinish();
    }
}

まとめ

Cloud Dataflow を使うことで BigQuery のテーブル分割を一括して行うことができました。BigQuery に対して繰り返される抽出のためのクエリを生まれる前に消し去りたいという祈りにより、絶望する必要が無くなることが期待できます。

また、この記事ではリリースされたばかりの新機能を使って日付分割テーブルの動的生成を実現できたように Apache Beam も高速に進化しており、実行エンジンの Cloud Dataflow と合わせて GCP の差別化に寄与するものと考えられます。
Spotify では Scala バインディングの Scio を使って1割のストリーミングを含む1300のパイプラインを処理しているとのことです。
来年は日本でもストリーミングの事例が更に増えることが期待できるため Cloud Dataflow と Apache Beam に注目していくつもりです。

追記: 入力を少し変えれば BigQuery 以外に蓄積されているものを日付分割された複数のテーブルに振り分けることもできますね。大量のファイルや他のデータベースからの ETL というユースケースはわりとありそうなので合成可能なパイプラインモデルのメリットが生きそうです。

参考記事

Dataflowでデータに応じてBigQueryのテーブルを振り分け: テーブル分割については被っていたのでこの記事ではあまり説明しませんでした。
FUN WITH SERIALIZABLE FUNCTIONS AND DYNAMIC DESTINATIONS IN CLOUD DATAFLOW: Beam 2.2.0 直前の記事でテーブルの振り分けと日付分割の振り分けを両方合わせた例は出てきませんが、複数の例が出てきます。
Field based partitioning: BigQuery は将来的にはフィールドの値でパーティションできるようになる計画があるようです。これが入ると更に楽になりそうですね。追記: Dataflow でもサポートされた。

apstndb
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away