ねらい
ksqlDBに触る必要が生じたので、Confluentご謹製のQuickstart実施がてらのメモ作成。
元ネタ:ksqlDB Quickstart
関連Qiita記事
1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する
環境
- Ubuntu 20.04 (on WSL2)
- Confluent Platform Community Edition 6.2.1
- Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)
ksqlDB Quickstart
1. Get Confluent Platform
省略。
2. Start ksqlDB's server
先にzookeeperとbrokerを起動しておく。Dockerは使用していないので直接コマンドで起動する。ksql-server.propertiesは特に変更していない。
ksqlDB サーバーの起動
bin/ksql-server-start etc/ksqldb/ksql-server.properties
...
[2021-10-19 14:38:38,630] INFO Waiting until monitored service is ready for metrics collection (io.confluent.support.metrics.BaseMetricsReporter:171)
[2021-10-19 14:38:38,630] INFO Monitored service is now ready (io.confluent.support.metrics.BaseMetricsReporter:183)
[2021-10-19 14:38:38,630] INFO Attempting to collect and submit metrics (io.confluent.support.metrics.BaseMetricsReporter:142)
[2021-10-19 14:38:38,633] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:375)
[2021-10-19 14:38:38,634] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:90)
[2021-10-19 14:38:39,648] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)
ksqlサーバー起動後、3つほど内部処理用のtopicが作成されていた。
- transaction_state
- confluent-ksql-default__command_topic
- default_ksql_processing_log
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
__transaction_state
_confluent-ksql-default__command_topic
_schemas
default_ksql_processing_log
3. Start ksqlDB's interactive CLI
Dockerは使用していないので直接コマンドで起動する。
ksqlDB CLI の起動
上記ガイドどおり、事前にksql_logs保管用のディレクトリを作成し、環境変数で引き渡す。
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ LOG_DIR=./ksql_logs bin/ksql http://localhost:8088
===========================================
= _ _ ____ ____ =
= | | _____ __ _| | _ \| __ ) =
= | |/ / __|/ _` | | | | | _ \ =
= | <\__ \ (_| | | |_| | |_) | =
= |_|\_\___/\__, |_|____/|____/ =
= |_| =
= Event Streaming Database purpose-built =
= for stream processing apps =
===========================================
Copyright 2017-2021 Confluent Inc.
CLI v6.2.1, Server v6.2.1 located at http://localhost:8088
Server Status: RUNNING
Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
ksql>
私こういうアスキーアート好き!(バアァァン)
4. Create a stream
streamというものを作る。kafka topicにschemaをかぶせたようなもの、らしい。以下の属性を与える。
- 名前: streamの名前
- kafka-topic: このstreamに対応するtopic名
- value_format: topicに格納したメッセージのEncoding。JSONって書いとけばよさそう。
- partitions: topic内のパーティション数
今回ksql CLIから作成するstreamの属性は以下の通り。
- 名前: riderLocations(ライダーの居場所みたいなもん)
- 属性値
- profileId VARCHAR(プロファイルID値)
- latitude DOUBLE(緯度)
- longitude DOUBLE(経度)
- kafka-topic: locations
- value_format: json
- partitions: 1
ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
> WITH (kafka_topic='locations', value_format='json', partitions=1);
Message
----------------
Stream created
----------------
5. Create materialized views
2つテーブルを作る。1つ目のcurrentLocationは先ほど作ったstreamのriderLocationから最新位置だけを格納するもの。LATEST_BY_OFFSET
がSQL集計関数として使用されており、これはlatitude/longitudeの(topic/partition中の)最新オフセットの値を返却するもの。EMIT CHANGES
はこのSQLがpush queryであり、継続的にSQL処理が実行され続けるようなタイプのqueryであることを表す。
CREATE TABLE currentLocation AS
SELECT profileId,
LATEST_BY_OFFSET(latitude) AS la,
LATEST_BY_OFFSET(longitude) AS lo
FROM riderlocations
GROUP BY profileId
EMIT CHANGES;
2つ目のテーブルriderNearMoutainViewがややこしいが、頑張って解読してみる。
CREATE TABLE ridersNearMountainView AS
SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
COLLECT_LIST(profileId) AS riders,
COUNT(*) AS count
FROM currentLocation
GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
GEO_DISTANCE(la, lo, 37.4133, -122.1162)
緯度と経度から2点間の大円距離をKMで返却するスカラー関数。
ROUND(xxx, -1)
丸め関数。スケールが負数の場合はvalue is rounded to the right of the decimal pointとのことだが、いまいち意味が分からない。
COLLECT_LIST(profileId)
値を全て配列として返却する集計関数。
COUNT(*)
行数を返却する集計関数
つまりこのSQLはFROM指定とGROUP BY処理によって**「最新時点(currentLocation)において、ある地点からの距離(distanceInMiles)毎にそこにいる全ライダーのプロファイル名(riders)と人数(count)を返却する」**という処理を行うものに見える。ちなみにある地点(緯度37.4133、経度-122.1162)はカリフォルニアのMountainViewっぽい。ksqlcliによるテーブル作成は特に問題なく終了。
ksql> CREATE TABLE currentLocation AS
> SELECT profileId,
> LATEST_BY_OFFSET(latitude) AS la,
> LATEST_BY_OFFSET(longitude) AS lo
> FROM riderlocations
> GROUP BY profileId
> EMIT CHANGES;
Message
----------------------------------------------
Created query with ID CTAS_CURRENTLOCATION_3
----------------------------------------------
ksql> CREATE TABLE ridersNearMountainView AS
> SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
> COLLECT_LIST(profileId) AS riders,
> COUNT(*) AS count
> FROM currentLocation
> GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);
Message
-----------------------------------------------------
Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------
6. Run a push query over the stream
streamに対してpush queryを発行する。MoutainViewからの対円距離が5KM以内の位置情報を片っ端から表示していく(CofuluentのサイトではMile表記だがGEO_DISTANCE関数のデフォルトはKMであった。)。EMIT CHANGES指定なので実行しっ放しになるようだ。
-----------------------------------------------------
ksql> -- Mountain View lat, long: 37.4133, -122.1162
ksql> SELECT * FROM riderLocations
> WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
7. Start another CLI session
別ウインドウでksqlCLIセッションを上げる。
8. Populate the stream with events
別ウインドウで起動したksqlCLIからstreamにデータを6件挿入する。
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);
これにより、ksqlCLIその1にはMoutainViewから5KM以内のレコードだけが表示されている。
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID |LATITUDE |LONGITUDE |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|4ab5cbad |37.3952 |-122.0813 |
|8b6eae59 |37.3944 |-122.0813 |
|4a7c7b41 |37.4049 |-122.0822 |
9. Run a Pull query against the materialized view
最後にridersNearMountainViewテーブルから距離が10km以内のレコードだけを抽出してみる。SETでパラメーターを指定することで、pushではなくpullでSQLを実行する。おまけで全レコードも抽出してみたが、ROUNDの丸め方はやはりよくわからない。
ksql> SET 'ksql.query.pull.table.scan.enabled'='true';
Successfully changed local property 'ksql.query.pull.table.scan.enabled' to 'true'. Use the UNSET command to revert your change.
ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES |RIDERS |COUNT |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3 |
|10.0 |[18f4ea86] |1 |
ksql> SELECT * from ridersNearMountainView;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES |RIDERS |COUNT |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0 |[4ab5cbad, 8b6eae59, 4a7c7b41] |3 |
|10.0 |[18f4ea86] |1 |
|50.0 |[c2309eec, 4ddad000] |2 |
まとめ
- 大元のtopicをstreamとして定義
- streamからtableも作れるし、tableからtableも作れる。これをmaterialized viewと呼ぶ?
- ksqlの実行はpush型とpull型の2つがある
- pushで作ったtableをpull→pullとmaterialized viewで引張ったりもできる
- 関数はかなり沢山ありそう