GoogleCloudPlatform
GoogleCloudDataflow
ApacheBeam
AkatsukiDay 14

Beam SQL DSL ファーストインプレッション

More than 1 year has passed since last update.

はいどーも。 apstndb です。A.I.Channel はまだあまり観れていません。
この記事は Akatsuki Advent Calendar 2017 の14日目の記事です。
先日 Apache Beam 2.2.0 の新機能について触れたので、今回はまだ誰も触れているのを見たことがない Beam SQL DSL についていくらか触って現状を確かめてみようかと思います。

何の話?

Apache Beam については私が説明するよりも関連資料をちゃんと噛み砕いて書かれた Googleが考えるストリームデータ処理とは?という記事がオススメです。より詳しくは Apache Beam の公式 とフルマネージドな処理サービスである Cloud Dataflow を調べてみてください。

この記事で触れる Beam SQL は最適化された Beam のパイプラインに変換して処理される SQL 方言の DSL です。 Apache Calcite で実装されているようです。
現在は Java SDK からしか使えないのでこの記事も必然的にジャバみがあります。

環境情報

macOS Sierra
Apache Beam 2.2.0
Java 8
Apache Maven 3.5.0

状況

正直に言って、リリースノートに項目としては書かれていてメインブランチにマージもされているものの公式にあまり説明されていないのでよくわかりません。
ただ確かなのは、現時点だと @Experimental が付いている API も多く、触れるようにはなったものの実用段階ではなさそうです。

2.2.0 時点だと Google Cloud Dataflow のドキュメント内の Release Notes: Dataflow SDK 2.x for Java でも Experimental でありサポートしないと書かれてしまっています。

Known issue: SQL support is not included in this release because it is experimental and not tested on Cloud Dataflow. Using SQL is not recommended.

うーむ。

これからの展望は JIRA の sql-dsl を見るとある程度見えてきます。

使ってみる

実際に使おうとした際に参照する情報として Beam SQL というドキュメントはありますが、12/13に見た時点ではまだ SQL_DSL ブランチはマージされていないということが書いてあったり、今でもサンプルコードに間違いがあったりするのでそれなりの割合を手探りで進める必要があります。

Javadoc もどうやらマージ前のプロトタイプ段階でのクラス名で書いてあったりするようで信用できません。例えば BeamSql では実際には BeamRecord を扱うのに BeamSqlRow と書かれていたりします。

というわけで、 BeamSqlExample.java がもっとも信用できそうです。

プロジェクトを作っていきます。サポートしていないとは言われたものの、試してみたいので DataflowRunner が入っている google-cloud-dataflow-java-archetypes-starter archetype を使います。デフォルトでは Java 7 になってしまうので -DtargetPlatform=1.8 は付け足しました。なお Java 9 では私は動かせませんでした。

$ mvn archetype:generate \
    -DarchetypeArtifactId=google-cloud-dataflow-java-archetypes-starter \
    -DarchetypeGroupId=com.google.cloud.dataflow \
    -DarchetypeVersion=2.2.0 \
    -DgroupId=com.example \
    -DartifactId=first-dataflow \
    -Dversion="0.1" \
    -DinteractiveMode=false \
    -Dpackage=com.example \
    -DtargetPlatform=1.8

普段だとこれで大体のクラスは使えるようになるのですが、 Beam SQL 関係は beam-sdks-java-extensions-sql という別パッケージになっています。

pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-extensions-sql -->
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-extensions-sql</artifactId>
    <version>2.2.0</version>
</dependency>

試すために BeamSqlExample.java を持ってきましょう。(パッケージ名は修正しています)

src/main/java/com/example/BeamSqlExample.java
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.example;

import java.sql.Types;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.BeamRecordSqlType;
import org.apache.beam.sdk.extensions.sql.BeamSql;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.BeamRecord;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;

/**
 * This is a quick example, which uses Beam SQL DSL to create a data pipeline.
 *
 * <p>Run the example with
 * <pre>
 * mvn -pl sdks/java/extensions/sql \
 *   compile exec:java -Dexec.mainClass=org.apache.beam.sdk.extensions.sql.example.BeamSqlExample \
 *   -Dexec.args="--runner=DirectRunner" -Pdirect-runner
 * </pre>
 */
class BeamSqlExample {
    public static void main(String[] args) throws Exception {
        PipelineOptions options = PipelineOptionsFactory.fromArgs(args).as(PipelineOptions.class);
        Pipeline p = Pipeline.create(options);

        //define the input row format
        List<String> fieldNames = Arrays.asList("c1", "c2", "c3");
        List<Integer> fieldTypes = Arrays.asList(Types.INTEGER, Types.VARCHAR, Types.DOUBLE);
        BeamRecordSqlType type = BeamRecordSqlType.create(fieldNames, fieldTypes);
        BeamRecord row1 = new BeamRecord(type, 1, "row", 1.0);
        BeamRecord row2 = new BeamRecord(type, 2, "row", 2.0);
        BeamRecord row3 = new BeamRecord(type, 3, "row", 3.0);

        //create a source PCollection with Create.of();
        PCollection<BeamRecord> inputTable = PBegin.in(p).apply(Create.of(row1, row2, row3)
                .withCoder(type.getRecordCoder()));

        //Case 1. run a simple SQL query over input PCollection with BeamSql.simpleQuery;
        PCollection<BeamRecord> outputStream = inputTable.apply(
                BeamSql.query("select c1, c2, c3 from PCOLLECTION where c1 > 1"));

        //print the output record of case 1;
        outputStream.apply("log_result",
                MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
                    public Void apply(BeamRecord input) {
                        //expect output:
                        //  PCOLLECTION: [3, row, 3.0]
                        //  PCOLLECTION: [2, row, 2.0]
                        System.out.println("PCOLLECTION: " + input.getDataValues());
                        return null;
                    }
                }));

        //Case 2. run the query with BeamSql.query over result PCollection of case 1.
        PCollection<BeamRecord> outputStream2 =
                PCollectionTuple.of(new TupleTag<BeamRecord>("CASE1_RESULT"), outputStream)
                        .apply(BeamSql.queryMulti("select c2, sum(c3) from CASE1_RESULT group by c2"));

        //print the output record of case 2;
        outputStream2.apply("log_result",
                MapElements.<BeamRecord, Void>via(new SimpleFunction<BeamRecord, Void>() {
                    @Override
                    public Void apply(BeamRecord input) {
                        //expect output:
                        //  CASE1_RESULT: [row, 5.0]
                        System.out.println("CASE1_RESULT: " + input.getDataValues());
                        return null;
                    }
                }));

        p.run().waitUntilFinish();
    }
}

実行してみます。

$ mvn compile exec:java -Dexec.mainClass=com.example.BeamSqlExample -Dexec.args="--runner=DirectRunner"
()
[WARNING] 
java.lang.NoClassDefFoundError: org/apache/commons/lang3/tuple/Triple
()

どうやら依存関係に漏れがあるようです。commons-lang3 を追加しましょう。

pom.xml
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-lang3 -->
<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-lang3</artifactId>
    <version>3.7</version>
</dependency>

再度実行すると動くようになります。やったぜ。

$ mvn compile exec:java -Dexec.mainClass=com.example.BeamSqlExample -Dexec.args="--runner=DirectRunner"
(略)
2 14, 2017 1:23:20 午後 org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner validateAndConvert
情報: SQL:
SELECT `PCOLLECTION`.`c1`, `PCOLLECTION`.`c2`, `PCOLLECTION`.`c3`
FROM `PCOLLECTION` AS `PCOLLECTION`
WHERE `PCOLLECTION`.`c1` > 1
12 14, 2017 1:23:21 午後 org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner convertToBeamRel
情報: SQLPlan>
LogicalProject(c1=[$0], c2=[$1], c3=[$2])
  LogicalFilter(condition=[>($0, 1)])
    LogicalTableScan(table=[[PCOLLECTION]])

12 14, 2017 1:23:21 午後 org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner validateAndConvert
情報: SQL:
SELECT `CASE1_RESULT`.`c2`, SUM(`CASE1_RESULT`.`c3`)
FROM `CASE1_RESULT` AS `CASE1_RESULT`
GROUP BY `CASE1_RESULT`.`c2`
12 14, 2017 1:23:21 午後 org.apache.beam.sdk.extensions.sql.impl.planner.BeamQueryPlanner convertToBeamRel
情報: SQLPlan>
LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
  LogicalProject(c2=[$1], c3=[$2])
    LogicalTableScan(table=[[CASE1_RESULT]])

12 14, 2017 1:23:21 午後 org.apache.beam.sdk.Pipeline validate
警告: The following transforms do not have stable unique names: log_result2
PCOLLECTION: [2, row, 2.0]
PCOLLECTION: [3, row, 3.0]
CASE1_RESULT: [row, 5.0]
(略)

Pipeline の実行の前に、一度 BeamQueryPlanner によって実行計画が作られ、最適化された PTransform が作られるという挙動になるようです。
Dataflow のドキュメントではサポートしないと書かれていましたが、このサンプルなら Cloud Dataflow でも実行可能でした。動かなかったり何かが起こっても誰も助けてはくれないので注意しましょう。

$ mvn compile exec:java -Dexec.mainClass=com.example.BeamSqlExample -Dexec.args="--runner=DataflowRunner"

上の実行計画に相当するパイプラインが実際に動いていることが Dataflow パイプライン上からも分かります。(全展開したのでかなり縦長)

スクリーンショット 2017-12-14 13.36.10.png

機能について

現在 Experimental ということでサポートされている DML は SELECT 文だけですが、上の BeamSqlExample.java を見て分かるように、 GROUP BYWHERE, 各種集計関数など、基本的なところはあるようです。
PCollectionTuple の入力を増やすことで一部の JOIN もできました。

PCollection<BeamRecord> outputStream =
        PCollectionTuple.of(new TupleTag<>("INPUT1"), inputTable1)
                .and(new TupleTag<>("INPUT2"), inputTable2)
                .apply(BeamSql.queryMulti("select INPUT2.c1, INPUT1.c3 * INPUT2.c3 from INPUT1 join INPUT2 on INPUT1.c1 = INPUT2.c1"));

上記のパイプラインからは下記のような実行計画のパイプラインが出力されます。

LogicalProject(c1=[$3], EXPR$1=[*($2, $5)])
  LogicalJoin(condition=[=($0, $3)], joinType=[inner])
    LogicalTableScan(table=[[INPUT1]])
    LogicalTableScan(table=[[INPUT2]])

実行結果もそれっぽかったので良いですね。

Kinesis Analytics SQL Reference と読み比べるとまだ足りないところは色々とありますが、必要なら Java Beam SDK と組み合わせることもできますし、Issue に書かれているものを見ると中長期的には解決しそうです。

データの入力のために BeamRecordSqlType を定義し、他の入力から PCollection<BeamRecord> を作るのが少し大変ですが、 BigQueryIO の TableRow との変換などは機械的にできそうではあります。また、BeamTextCSVTable というクラスがあったりと、標準でも改善されそうです。なお、私はしばらく試行錯誤しましたが使えませんでした。

まとめ

Apache Beam 2.2.0 で Beam SQL DSL がマージされました。
まだ Experimental で仕様の変更がありそうですが、これから機能が充実していくことと考えられます。
今までデータ処理というのはインフラとしての構築・運用が難しく、それぞれで大きく異なるモデルの修得も難しいものでしたが、フルマネージドサービスの Cloud Dataflow を含む複数の実行エンジン上で処理可能な一つの SQL 方言で記述できるようにする方向に期待したいですね。

今年は GCP ユーザの中で Cloud Dataflow を使うための Apache Beam の事例をかなり見ました。
来年は Beam SQL に関する情報が増えていくのではないかと思うので Apache Beam Advent Calendar 2018 が楽しみです。