3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ksqlDB Quickstartを実施してみる

Last updated at Posted at 2021-10-19

ねらい

ksqlDBに触る必要が生じたので、Confluentご謹製のQuickstart実施がてらのメモ作成。
元ネタ:ksqlDB Quickstart

関連Qiita記事

1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する

環境

  • Ubuntu 20.04 (on WSL2)
  • Confluent Platform Community Edition 6.2.1
  • Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)

ksqlDB Quickstart

1. Get Confluent Platform

省略。

2. Start ksqlDB's server

先にzookeeperとbrokerを起動しておく。Dockerは使用していないので直接コマンドで起動する。ksql-server.propertiesは特に変更していない。
ksqlDB サーバーの起動
bin/ksql-server-start etc/ksqldb/ksql-server.properties

サーバーログ抜粋
...
[2021-10-19 14:38:38,630] INFO Waiting until monitored service is ready for metrics collection (io.confluent.support.metrics.BaseMetricsReporter:171)
[2021-10-19 14:38:38,630] INFO Monitored service is now ready (io.confluent.support.metrics.BaseMetricsReporter:183)
[2021-10-19 14:38:38,630] INFO Attempting to collect and submit metrics (io.confluent.support.metrics.BaseMetricsReporter:142)
[2021-10-19 14:38:38,633] INFO ksqlDB API server listening on http://0.0.0.0:8088 (io.confluent.ksql.rest.server.KsqlRestApplication:375)
[2021-10-19 14:38:38,634] INFO Server up and running (io.confluent.ksql.rest.server.KsqlServerMain:90)
[2021-10-19 14:38:39,648] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter:146)

ksqlサーバー起動後、3つほど内部処理用のtopicが作成されていた。

  • transaction_state
  • confluent-ksql-default__command_topic
  • default_ksql_processing_log
topicリスト
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/kafka-topics --list --bootstrap-server localhost:9092
__consumer_offsets
__transaction_state
_confluent-ksql-default__command_topic
_schemas
default_ksql_processing_log

3. Start ksqlDB's interactive CLI

Dockerは使用していないので直接コマンドで起動する。
ksqlDB CLI の起動
上記ガイドどおり、事前にksql_logs保管用のディレクトリを作成し、環境変数で引き渡す。
LOG_DIR=./ksql_logs bin/ksql http://localhost:8088

ksqlcli
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ LOG_DIR=./ksql_logs bin/ksql http://localhost:8088

                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2021 Confluent Inc.

CLI v6.2.1, Server v6.2.1 located at http://localhost:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql>

私こういうアスキーアート好き!(バアァァン)

4. Create a stream

streamというものを作る。kafka topicにschemaをかぶせたようなもの、らしい。以下の属性を与える。

  • 名前: streamの名前
  • kafka-topic: このstreamに対応するtopic名
  • value_format: topicに格納したメッセージのEncoding。JSONって書いとけばよさそう。
  • partitions: topic内のパーティション数

今回ksql CLIから作成するstreamの属性は以下の通り。

  • 名前: riderLocations(ライダーの居場所みたいなもん)
  • 属性値
    • profileId VARCHAR(プロファイルID値)
    • latitude DOUBLE(緯度)
    • longitude DOUBLE(経度)
  • kafka-topic: locations
  • value_format: json
  • partitions: 1
ksqlcli
ksql> CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE)
>  WITH (kafka_topic='locations', value_format='json', partitions=1);

 Message
----------------
 Stream created
----------------

5. Create materialized views

2つテーブルを作る。1つ目のcurrentLocationは先ほど作ったstreamのriderLocationから最新位置だけを格納するもの。LATEST_BY_OFFSETがSQL集計関数として使用されており、これはlatitude/longitudeの(topic/partition中の)最新オフセットの値を返却するもの。EMIT CHANGESはこのSQLがpush queryであり、継続的にSQL処理が実行され続けるようなタイプのqueryであることを表す。

currentLocationテーブル
CREATE TABLE currentLocation AS
  SELECT profileId,
         LATEST_BY_OFFSET(latitude) AS la,
         LATEST_BY_OFFSET(longitude) AS lo
  FROM riderlocations
  GROUP BY profileId
  EMIT CHANGES;

2つ目のテーブルriderNearMoutainViewがややこしいが、頑張って解読してみる。

riderNearMoutainViewテーブル
CREATE TABLE ridersNearMountainView AS
  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
         COLLECT_LIST(profileId) AS riders,
         COUNT(*) AS count
  FROM currentLocation
  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);

GEO_DISTANCE(la, lo, 37.4133, -122.1162)
緯度と経度から2点間の大円距離をKMで返却するスカラー関数。

ROUND(xxx, -1)
丸め関数。スケールが負数の場合はvalue is rounded to the right of the decimal pointとのことだが、いまいち意味が分からない。

COLLECT_LIST(profileId)
値を全て配列として返却する集計関数。

COUNT(*)
行数を返却する集計関数

つまりこのSQLはFROM指定とGROUP BY処理によって**「最新時点(currentLocation)において、ある地点からの距離(distanceInMiles)毎にそこにいる全ライダーのプロファイル名(riders)と人数(count)を返却する」**という処理を行うものに見える。ちなみにある地点(緯度37.4133、経度-122.1162)はカリフォルニアのMountainViewっぽい。ksqlcliによるテーブル作成は特に問題なく終了。

ksqlcli
ksql> CREATE TABLE currentLocation AS
>  SELECT profileId,
>         LATEST_BY_OFFSET(latitude) AS la,
>         LATEST_BY_OFFSET(longitude) AS lo
>  FROM riderlocations
>  GROUP BY profileId
>  EMIT CHANGES;

 Message
----------------------------------------------
 Created query with ID CTAS_CURRENTLOCATION_3
----------------------------------------------
ksql> CREATE TABLE ridersNearMountainView AS
>  SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles,
>         COLLECT_LIST(profileId) AS riders,
>         COUNT(*) AS count
>  FROM currentLocation
>  GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);

 Message
-----------------------------------------------------
 Created query with ID CTAS_RIDERSNEARMOUNTAINVIEW_5
-----------------------------------------------------

6. Run a push query over the stream

streamに対してpush queryを発行する。MoutainViewからの対円距離が5KM以内の位置情報を片っ端から表示していく(CofuluentのサイトではMile表記だがGEO_DISTANCE関数のデフォルトはKMであった。)。EMIT CHANGES指定なので実行しっ放しになるようだ。

ksqlcli
-----------------------------------------------------
ksql> -- Mountain View lat, long: 37.4133, -122.1162
ksql> SELECT * FROM riderLocations
>  WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID                                                                    |LATITUDE                                                                     |LONGITUDE                                                                    |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+

7. Start another CLI session

別ウインドウでksqlCLIセッションを上げる。

8. Populate the stream with events

別ウインドウで起動したksqlCLIからstreamにデータを6件挿入する。

ksqlcliその2
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
ksql> INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

これにより、ksqlCLIその1にはMoutainViewから5KM以内のレコードだけが表示されている。

ksqlcliその1
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|PROFILEID                                                                    |LATITUDE                                                                     |LONGITUDE                                                                    |
+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+-----------------------------------------------------------------------------+
|4ab5cbad                                                                     |37.3952                                                                      |-122.0813                                                                    |
|8b6eae59                                                                     |37.3944                                                                      |-122.0813                                                                    |
|4a7c7b41                                                                     |37.4049                                                                      |-122.0822                                                                    |

9. Run a Pull query against the materialized view

最後にridersNearMountainViewテーブルから距離が10km以内のレコードだけを抽出してみる。SETでパラメーターを指定することで、pushではなくpullでSQLを実行する。おまけで全レコードも抽出してみたが、ROUNDの丸め方はやはりよくわからない。

ksqlcliその2
ksql> SET 'ksql.query.pull.table.scan.enabled'='true';
Successfully changed local property 'ksql.query.pull.table.scan.enabled' to 'true'. Use the UNSET command to revert your change.
ksql> SELECT * from ridersNearMountainView WHERE distanceInMiles <= 10;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES                                          |RIDERS                                                   |COUNT                                                    |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0                                                      |[4ab5cbad, 8b6eae59, 4a7c7b41]                           |3                                                        |
|10.0                                                     |[18f4ea86]                                               |1                                                        |
ksql> SELECT * from ridersNearMountainView;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|DISTANCEINMILES                                          |RIDERS                                                   |COUNT                                                    |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|0.0                                                      |[4ab5cbad, 8b6eae59, 4a7c7b41]                           |3                                                        |
|10.0                                                     |[18f4ea86]                                               |1                                                        |
|50.0                                                     |[c2309eec, 4ddad000]                                     |2                                                        |

まとめ

  • 大元のtopicをstreamとして定義
  • streamからtableも作れるし、tableからtableも作れる。これをmaterialized viewと呼ぶ?
  • ksqlの実行はpush型とpull型の2つがある
  • pushで作ったtableをpull→pullとmaterialized viewで引張ったりもできる
  • 関数はかなり沢山ありそう
3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?