LoginSignup
3
1

More than 3 years have passed since last update.

この記事は、Apache Flink 基本チュートリアルシリーズの一部で、5 つの例を使って Flink SQL プログラミングの実践に焦点を当てています。

本ブログは英語版からの翻訳です。オリジナルはこちらからご確認いただけます。一部機械翻訳を使用しております。翻訳の間違いがありましたら、ご指摘いただけると幸いです。

ジャーク・ウーより

この記事では、Ververicaのオープンソースのsql-trainingプロジェクト、およびFlink 1.7.2をベースにしたチュートリアルの練習を説明します。FlinkのSQLプログラミング実習全体を通して5つの例を使用しており、主に以下の点をカバーしています。

  • SQL CLIクライアントの使い方。
  • ストリーム上でSQLクエリを実行する方法。
  • ウィンドウアグリゲートと非ウィンドウアグリゲートを実行して、それらの違いを理解する。
  • SQLを使用してKafkaデータを消費する方法。
  • SQLを使用してKafkaとElasticSearchに結果を書き込む方法。
  • この記事では、すでに基本的なSQLの知識があることを前提としています。

環境を整える

このチュートリアルはDockerをベースにしているので、他のプログラムや追加のプログラムをインストールする必要はありません。この演習はJava環境、Scala、IDEに依存しません。

注意: デフォルトでDockerに設定されているリソースが十分でない可能性があり、そのためにFlinkジョブの実行中にクラッシュする可能性があります。そのため、Docker内のリソースを3~4GB、CPUを3~4台に設定することをお勧めします。

image.png

今回のチュートリアルでは、様々なサービスのコンテナを収容するDocker Composeを使って環境をインストールしました。

  • Flink SQL Client:クエリを送信し、結果を可視化します。
  • Flink JobManagerとTaskManager:Flink SQLタスクを実行します。
  • Apache Kafka:入力ストリームの生成と結果ストリームの書き込み。

image.png

  • Apache ZooKeeper: Kafkaの依存関係。
  • ElasticSearch: 結果を書き込む。

Docker Composeの設定ファイルが用意されています。docker-compose.ymlファイルは直接ダウンロードできます。

次に、コマンドラインウィンドウを開き、docker-compose.ymlファイルが保存されているディレクトリを入力し、以下のコマンドを実行します。

  • Linux & MacOSの場合
docker-compose up -d
  • Windows
set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d

docker-composeコマンドは、必要なコンテナごとに起動します。Dockerは初回起動時にDocker Hubから自動的にイメージ(2.3GB近く)をダウンロードしますが、これには時間がかかるかもしれません。その後はあっという間に起動します。操作が成功すると、コマンドラインに以下のような出力が表示されます。http://localhost:8081 の Flink Web UI にもアクセスできます。

image.png

Flink SQL CLI クライアントの実行

以下のコマンドを実行して、Flink SQL CLIに入ります。

docker-compose exec sql-client ./sql-client.sh

このコマンドを実行すると、コンテナ内のFlink SQL CLIクライアントが起動します。すると、以下のような「welcome」インターフェースが表示されます。

スクリーンショット 2020-08-24 11.51.06.png

データ紹介

一部のテーブルやデータはDocker Composeにあらかじめ登録されているので、show tablesを実行することで見ることができます。記事中のRidesテーブルのデータを利用することで、タクシーの走行記録データのストリームを、時間や場所などを含めて確認することができます。DESCRIBE Rides;コマンドを実行することでテーブル構造を見ることができます。

Flink SQL> DESCRIBE Rides;
root
 |-- rideId: Long           // Ride ID (including two records, an input and an output)
 |-- taxiId: Long           // Taxi ID 
 |-- isStart: Boolean       // Start or end
 |-- lon: Float             // Longitude
 |-- lat: Float             // Latitude
 |-- rideTime: TimeIndicatorTypeInfo(rowtime)     // Time
 |-- psgCnt: Integer        // Number of passengers

Rides テーブルの詳細については、training-config.yaml を参照してください。

例 1. フィルタ

今、ニューヨークの運転記録を調べたいとしましょう。

注意:Docker環境では、ニューヨークに緯度経度があるかどうかを調べるisInNYC(lon, lat)や、特定の地域に変換するtoAreaId(lon, lat)など、いくつかのビルトイン関数があらかじめ定義されています。

そのため、isInNYCを使用して、ニューヨークの運転記録を素早くフィルタリングすることができます。SQL CLIで以下のクエリを実行します。

SELECT * FROM Rides WHERE isInNYC(lon, lat)

SQL CLIはSQLタスクをDockerクラスタにサブミットします; それは継続的にデータソース(RidesのストリームはKafkaにあります)からデータをプルし、isInNYCを介して必要なデータをフィルタリングします。また、SQL CLIは可視化モードに入り、フィルタリングされた結果を常にリフレッシュしてリアルタイムで表示します。

スクリーンショット 2020-08-24 11.53.33.png

また、http://localhost:8081 にアクセスして、Flink ジョブの実行状況を確認することもできます。

例 2: グループ集計

もう一つの要件は、異なる数の乗客を乗せた運転イベントの数を計算することです。例えば、1人の乗客を運ぶドライビングイベントの数、2人の乗客を運ぶドライビングイベントの数などです。

乗客数をpsgCntでグループ化し、COUNT(*)を使用して各グループのイベント数を計算します。グループ化する前に、ニューヨークで発生した運転記録データ(isInNYC)をフィルタリングする必要があることに注意してください。SQL CLIで以下のクエリを実行します。

SELECT psgCnt, COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat)
GROUP BY psgCnt;

SQL CLIで可視化された結果は以下のようになっています;結果は1秒ごとに変化していますが、乗客の最大数は6人を超えることはできません。

スクリーンショット 2020-08-24 11.54.14.png

例 3: Windowの集計

ニューヨークの交通の流れを継続的に監視するには、5分ごとに各エリアに入る車の数を計算する必要があります。少なくとも5台の車が入ってくるエリアにのみ、そして主に焦点を当てたいとします。

この処理にはウィンドウの集約(5分ごと)が必要なので、Tumbling Window構文が必要です。

各エリアについては、進入した車の数に基づいてAreaIdでグループ化する必要があります。グループ化する前に、isStartフィールドを使用して入力された車両の走行記録をフィルタリングし、COUNT(*)を使用して車両数をカウントする必要があります。

車両数が5台以上のエリア これは、SQL HAVING句を使用して統計値に基づいてフィルタ条件を設定したものです。

最終的なクエリは以下のようになります。

SELECT 
  toAreaId(lon, lat) AS area, 
  TUMBLE_END(rideTime, INTERVAL '5' MINUTE) AS window_end, 
  COUNT(*) AS cnt 
FROM Rides 
WHERE isInNYC(lon, lat) and isStart
GROUP BY 
  toAreaId(lon, lat), 
  TUMBLE(rideTime, INTERVAL '5' MINUTE) 
HAVING COUNT(*) >= 5;

SQL CLIでクエリを実行すると、可視化された結果は以下のようになります。エリアごと、window_endごとの結果は出力後も変化はありませんが、5分ごとに新しいウィンドウのバッチが生成された結果が出力されます。

Docker環境でソースを読み込むことで10倍速を実行しているため、デモ中は30秒ごとに新しいウィンドウのバッチが生成されています(本来の速度との相対的な比較)。

スクリーンショット 2020-08-24 13.05.53.png

Window AggregateとGroup Aggregatの違い

例2と例3の結果から、Window AggregateとGroup Aggregateが明らかな違いを表示していることがわかります。主な違いは、Window Aggregateの出力結果はウィンドウが終了したときにのみ表示され、出力結果は最終的な値であり、変更されていないことです。出力ストリームはAppendストリームです。

しかし、Group Aggregateは、データの一部が処理されるたびに最新の結果を出力します。結果は、データベース内のデータと同様に更新されたステータスを持ち、その出力ストリームは更新ストリームです。

もう一つの違いは、ウィンドウに透かしがあることです。この機能を使えば、安定した状態サイズを確保するために、期限切れ状態をクリアするためにどのウィンドウが期限切れになったかを正確に知ることができます。

しかし、Group Aggregateの場合、どのデータが期限切れになったかを知ることができないため、ステートサイズが無限に大きくなってしまい、本番運用では安定した状態ではありません。そのため、グループアグリゲートのジョブでは、ステートTTLを設定してください。

スクリーンショット 2020-08-24 13.08.21.png

例えば、各店舗のリアルタイムPVを毎日カウントするには、1日前の状態は一般的には使われないので、TTLを24時間以上に設定する必要があります。

SELECT  DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id, COUNT(*) as pv
FROM T
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd'), shop_id

TTL値が小さすぎると、有用な状態やデータのクリアが行われ、データの精度の問題が発生します。これもユーザーが考慮する必要があるパラメータです。

例 4: Kafka へのアペンドストリームの書き込み

前項では、Window AggregateとGroup Aggregateの違い、AppendストリームとUpdateストリームの違いについて説明しました。Flinkでは、MySQL、HBase、ElasticSearchなどのアップデートをサポートする外部ストレージシステムにのみアップデートストリームを書き込むことができます。

While Appendストリームを任意のストレージシステムやKafkaなどのログシステムに書き込むことは可能です。

例えば、「10分ごとの乗客数」というストリームをKafkaに書きたいとします。

Kafka の結果テーブル Sink_TenMinPsgCnts があらかじめ定義されています (テーブルの完全な定義については training-config.yaml を参照してください)。

クエリを実行する前に、以下のコマンドを実行して、TenMinPsgCntsトピックに書き込まれたデータを監視します。

docker-compose exec sql-client /opt/kafka-client/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic TenMinPsgCnts --from-beginning

タンブリングウィンドウには、10分ごとの乗客数を記述することができます。クエリ結果を結果テーブルに直接書き込むには、INSERT INTO Sink_TenMinPsgCntsコマンドを使用することができます。

INSERT INTO Sink_TenMinPsgCnts 
SELECT 
  TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart,  
  TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd,
  CAST(SUM(psgCnt) AS BIGINT) AS cnt 
FROM Rides 
GROUP BY TUMBLE(rideTime, INTERVAL '10' MINUTE);

上で見たように、TenMinPsgCntsのデータトピックをJSON形式でKafkaに書き込みます。

スクリーンショット 2020-08-24 13.10.35.png

例5: ElasticSearchに更新ストリームを書き込む

最後に、継続的に更新される更新ストリームをElasticSearch(ES)に書きます。ESには「"各エリア発車数"」のストリームを書きたいと思います。

ElasticSearchの結果テーブルSink_AreaCntsが定義されています(テーブルの完全な定義については、training-config.yamlを参照してください)。このテーブルには、areaId と cnt の 2 つのフィールドのみが含まれています。

同様に、INSERT INTOを使用して、クエリ結果をSink_AreaCnts tableに直接書き込むこともできます。

INSERT INTO Sink_AreaCnts 
SELECT toAreaId(lon, lat) AS areaId, COUNT(*) AS cnt 
FROM Rides 
WHERE isStart
GROUP BY toAreaId(lon, lat);

SQL CLIで先行するクエリを実行すると、Elasticsearchは自動的にarea-cntsインデックスを作成します。ElasticsearchはREST APIを提供しています。以下のURLにアクセスできます。

  • area-cnts インデックスを表示するには: http://localhost:9200/area-cnts
  • area-cntsインデックスの統計情報を表示するには: http://localhost:9200/area-cnts/_stats
  • area-cntsインデックスの内容を返すには: http://localhost:9200/area-cnts/_search
  • エリア 49791 の車両数を表示するには: http://localhost:9200/area-cnts/_search?q=areaId:49791

クエリを実行し続けると、いくつかの統計値(_all.primaries.docs.count and _all.primaries.docs.deleted)が常に増加していることがわかります: http://localhost:9200/area-cnts/_stats

概要

この記事では、Docker Composeを使ってFlinkのSQLプログラミングをすぐに始めるためのガイドをしました。また、Window AggregateとGroup Aggregateの違いを比較し、この2種類のジョブを外部システムに書き込む方法も紹介しています。

興味があれば、独自のUDF、UDTF、UDAFを実行したり、他の組み込みのソーステーブルをクエリするなど、Docker環境をベースにしたより深い実践を行うことができます。

アリババクラウドは日本に2つのデータセンターを有し、世界で60を超えるアベラビリティーゾーンを有するアジア太平洋地域No.1(2019ガートナー)のクラウドインフラ事業者です。
アリババクラウドの詳細は、こちらからご覧ください。
アリババクラウドジャパン公式ページ

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1