1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ksqlDB を使用したストリーミングクエリの作成(前半)

Last updated at Posted at 2021-10-26

ねらい

前回実施したksqlDB Quickstartの続きとして、下記元ネタのストリーミングクエリの作成を実施してみた。初学者が動かした際のメモであるため、元ネタと合わせて参照されたい。なお、日本語の直リンクでは古いバージョンのksqlDBのチュートリアルがトップに来るため注意したほうがよい。(そのせいで物凄い、物凄い、それは物凄い苦労があったが、全て闇に葬った。)

元ネタ:Write streaming queries against Apache Kafka® using ksqlDB (Local)

関連Qiita記事

1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する

環境

  • Ubuntu 20.04 (on WSL2)
  • Confluent Platform Community Edition 6.2.1
  • Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)
  • ksqlDB Serverに加えてSchema Registryも必要。ksql-server.propertiesの以下のコメントアウトを外しておくこと。
ksql-server.properties
#------ Schema Registry -------
# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
ksql.schema.registry.url=http://localhost:8081

ksqlDB を使用したストリーミングクエリの作成(前半)

トピックの作成とデータの生成

Confluent Platformを導入するとksql-datagenというMessageを継続的に吐き出してくれるアプリが使えるようになるので、そいつを動かしてtopicにksql処理のためのデータを吐き出す。(なおhelp表示の際に/usr/logsディレクトリを作ろうとして失敗しているので念のため作っておいた。)

今回quickstartで指定するのはksql-datagenに予めデモ用にプリセットされたusersとpageviewsの2つ。動かしっぱなしは嫌なのですぐ止めたが、後のksql実行時には適時再稼働させている。

pageviews生成ログ
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=pageviews format=json topic=pageviews msgRate=5
...
[1635213107712L] --> ([ 1635213107712L | 'User_4' | 'Page_90' ]) ts:1635213108125
[1635213108142L] --> ([ 1635213108142L | 'User_3' | 'Page_78' ]) ts:1635213108143
[1635213108145L] --> ([ 1635213108145L | 'User_7' | 'Page_50' ]) ts:1635213108145
...
users生成ログ
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
...
['User_6'] --> ([ 1490859483194L | 'User_6' | 'Region_4' | 'OTHER' ]) ts:1635213240163
['User_6'] --> ([ 1500448592685L | 'User_6' | 'Region_5' | 'FEMALE' ]) ts:1635213240523
['User_1'] --> ([ 1490186139886L | 'User_1' | 'Region_8' | 'FEMALE' ]) ts:1635213241484
...

ksqlDB CLI の起動 / SHOW および PRINT ステートメントを使用した Kafka トピックの調査

便利、 kafka-topicsより入力文字数が少ないのがよい。

SHOW_TOPICS応答;
ksql> SHOW TOPICS;

 Kafka Topic                 | Partitions | Partition Replicas
---------------------------------------------------------------
 CURRENTLOCATION             | 1          | 1
 RIDERSNEARMOUNTAINVIEW      | 1          | 1
 default_ksql_processing_log | 1          | 1
 locations                   | 1          | 1
 pageviews                   | 1          | 1
 users                       | 1          | 1
---------------------------------------------------------------

PRINTを試そうと思ったらCurrent Offsetから読むものらしくksql-datagenを停止していると1件も返ってこない。FROM BEGINNINGオプションを付与して無理やり表示してみた。

pageviewsの格納Message(JSON)
ksql> PRINT pageviews FROM BEGINNING;
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 01:51:48.125 Z, key: 1635213107712, value: {"viewtime":1635213107712,"userid":"User_4","pageid":"Page_90"}, partition: 0
rowtime: 2021/10/26 01:51:48.143 Z, key: 1635213108142, value: {"viewtime":1635213108142,"userid":"User_3","pageid":"Page_78"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_7","pageid":"Page_50"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_5","pageid":"Page_63"}, partition: 0
...
usersの格納Message(AVRO)
ksql> PRINT users FROM BEGINNING;
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 01:54:00.163 Z, key: User_6, value: {"registertime": 1490859483194, "userid": "User_6", "regionid": "Region_4", "gender": "OTHER"}, partition: 0
rowtime: 2021/10/26 01:54:00.523 Z, key: User_6, value: {"registertime": 1500448592685, "userid": "User_6", "regionid": "Region_5", "gender": "FEMALE"}, partition: 0
rowtime: 2021/10/26 01:54:01.484 Z, key: User_1, value: {"registertime": 1490186139886, "userid": "User_1", "regionid": "Region_8", "gender": "FEMALE"}, partition: 0
...

ストリームおよびテーブルの作成

まずはJSON区切りのpageviewsに対してStreamをCREATEしてDESCRIBEしてみる。このStreamに対しては明示的なKeyを宣言していない。

STREAM作成
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
    (kafka_topic='pageviews', value_format='JSON');

DESCRIBE結果は下記の通り。

DESCRIBE結果
ksql> DESCRIBE pageviews_original;

Name                 : PAGEVIEWS_ORIGINAL
 Field    | Type
----------------------------
 VIEWTIME | BIGINT
 USERID   | VARCHAR(STRING)
 PAGEID   | VARCHAR(STRING)
----------------------------

次はAvro Schema区切りのusersに対してTableを作る。このCREATEの意味は「Message Keyから引っ張ってきた内容をid列にして、Message ValueはAvro Schemaに従って列として付与してね」、ということ。

TABLE作成
CREATE TABLE users_original (id VARCHAR PRIMARY KEY) WITH
    (kafka_topic='users', value_format='AVRO');

DESCRIBEすると、Primary Keyとしてidが存在し、残りの列がAvro Schemaに基づいて作成されている。先のMessage内容から、ID列とUSERID列は同じ内容を持っていることが分かる。

DESCRIBE結果
ksql> DESCRIBE users_original;

Name                 : USERS_ORIGINAL
 Field        | Type
-----------------------------------------------
 ID           | VARCHAR(STRING)  (primary key)
 REGISTERTIME | BIGINT
 USERID       | VARCHAR(STRING)
 REGIONID     | VARCHAR(STRING)
 GENDER       | VARCHAR(STRING)
-----------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

データの表示

users_originalからのSELECT結果は下記の通り。これはPull Query扱いとして記載されているが、EMIT CHANGES指定があるからPush Queryに思えてしょうがない。確かにこちらは特になにもしなくてもTopicの先頭から読みにいっているようではあるが…Tableに対するQueryはPull Query扱いになるのだろうか。ID列に格納されている値はMessage Key由来。

usesrs_originalの内容
ksql> SELECT * FROM users_original EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|ID                               |REGISTERTIME                     |USERID                           |REGIONID                         |GENDER                           |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|User_9                           |1496749900085                    |User_9                           |Region_8                         |OTHER                            |
|User_6                           |1488503784213                    |User_6                           |Region_4                         |OTHER                            |
|User_4                           |1492836860050                    |User_4                           |Region_4                         |MALE                             |
|User_7                           |1505758611024                    |User_7                           |Region_7                         |OTHER                            |
|User_2                           |1513237144819                    |User_2                           |Region_9                         |MALE                             |
Limit Reached
Query terminated

pageviews_originalからのSELECT結果は下記の通り。Push Query扱いとなっているが、こちらは環境変数をいじらない限り(set 'auto.offset.reset'='earliest')、現在のTopic中のOffset以降のMessageしか表示しないようだ。

pageviews_originalの内容
ksql> SELECT * FROM pageviews_original emit changes LIMIT 3;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|VIEWTIME                                                 |USERID                                                   |PAGEID                                                   |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635215795084                                            |User_3                                                   |Page_58                                                  |
|1635215795419                                            |User_6                                                   |Page_49                                                  |
|1635215795419                                            |User_4                                                   |Page_63                                                  |
Limit Reached
Query terminated

クエリの書き込み

pageviews_original(Stream)にusers_original(Table)をLEFT JOINしてユーザー情報を拡張するSELECT文を作成する。SELECTの際にASで上書きする元の列はMessage Key由来のものであることが必須のようだ。

JOIN処理
SELECT users_original.id AS userid, pageid, regionid, gender
    FROM pageviews_original
    LEFT JOIN users_original
      ON pageviews_original.userid = users_original.id
    EMIT CHANGES
    LIMIT 5;

SELECTした結果は下記の通り、JOINによってあるページを見た人の情報にそのリージョンIDと性別をくっつけてることができている。

SELECT結果
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID                                    |PAGEID                                    |REGIONID                                  |GENDER                                    |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_4                                    |Page_22                                   |Region_3                                  |FEMALE                                    |
|User_2                                    |Page_66                                   |Region_2                                  |OTHER                                     |
|User_7                                    |Page_73                                   |Region_9                                  |MALE                                      |
|User_5                                    |Page_88                                   |Region_4                                  |MALE                                      |
|User_6                                    |Page_99                                   |Region_1                                  |MALE                                      |
Limit Reached
Query terminated

このSELECT文をStreamとして永続化する。ON条件にもMessage Key由来の列が一つは必須だった(気がする)。

STREAM化
CREATE STREAM pageviews_enriched AS
  SELECT users_original.id AS userid, pageid, regionid, gender
  FROM pageviews_original
  LEFT JOIN users_original
    ON pageviews_original.userid = users_original.id
  EMIT CHANGES;

一応DESCRIBEもしておく。

DESCRIBE結果
ksql> DESCRIBE pageviews_enriched;

Name                 : PAGEVIEWS_ENRICHED
 Field    | Type
-----------------------------------
 USERID   | VARCHAR(STRING)  (key)
 PAGEID   | VARCHAR(STRING)
 REGIONID | VARCHAR(STRING)
 GENDER   | VARCHAR(STRING)
-----------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

STREAMをSELECTしてみると、先のksqlと同じような結果が得られた。このksqlは延々と実行されStream(Topic)に結果を書き続けるのだろう。

SELECT結果
ksql> SELECT * FROM pageviews_enriched EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID                                    |PAGEID                                    |REGIONID                                  |GENDER                                    |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8                                    |Page_71                                   |Region_9                                  |OTHER                                     |
|User_5                                    |Page_47                                   |Region_1                                  |MALE                                      |
|User_5                                    |Page_14                                   |Region_1                                  |MALE                                      |
|User_2                                    |Page_34                                   |Region_2                                  |OTHER                                     |
|User_2                                    |Page_61                                   |Region_2                                  |OTHER                                     |
Limit Reached
Query terminated

このStreamからさらに女性だけを選択するStream(pageviews_female)を作成する。

pageviews_femaleのCREATE
CREATE STREAM pageviews_female AS
  SELECT * FROM pageviews_enriched
  WHERE gender = 'FEMALE'
  EMIT CHANGES;

そこから更にRegion_8もしくはRegion_9に住んでいる人のみを選択するStream(pageview_female_like89)を作成する。

pageview_female_like89のCREATE
CREATE STREAM pageviews_female_like_89
  WITH (kafka_topic='pageviews_enriched_r8_r9') AS
  SELECT * FROM pageviews_female
  WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
  EMIT CHANGES;

SELECTすると以下のようにフェミニンかつ地域限定の結果が得られる。

SELECT結果
ksql> select * from pageviews_female_like_89 EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID                                    |PAGEID                                    |REGIONID                                  |GENDER                                    |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8                                    |Page_77                                   |Region_9                                  |FEMALE                                    |
|User_7                                    |Page_84                                   |Region_9                                  |FEMALE                                    |
|User_7                                    |Page_79                                   |Region_9                                  |FEMALE                                    |
|User_7                                    |Page_33                                   |Region_9                                  |FEMALE                                    |

次に性別とリージョンIDでGROUP BYしたTableを作成する。このCREATEは内部的にGROUP BY対象のgender列+regionid列をMessage Keyとして持つため、KEY_FORMAT='json'指定で複数の列がMessage Key内に存在できるよう定義する必要がある。

pageviews_regionsのCREATE
CREATE TABLE pageviews_regions
  WITH (KEY_FORMAT='json') AS
SELECT gender, regionid , COUNT(*) AS numusers
FROM pageviews_enriched
  WINDOW TUMBLING (size 30 second)
GROUP BY gender, regionid
EMIT CHANGES;

DESCRIBEしてみると、以下のようにgender/regionidともにPrimary Keyになっている。JSON形式でMessage Key内に2つの列が格納されているのだろう。

DESCRIBE結果
ksql> DESCRIBE pageviews_regions;

Name                 : PAGEVIEWS_REGIONS
 Field    | Type
-------------------------------------------------------------------
 GENDER   | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 REGIONID | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 NUMUSERS | BIGINT
-------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

もう一つのこのCREATEのポイントはWINDOW TUMBLING (size 30 second)だろう。ksqlドキュメントによるとある一定の時間間隔の集計を行うものらしい。試しにSELECTしてみる。

ksql> SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER                           |REGIONID                         |WINDOWSTART                      |WINDOWEND                        |NUMUSERS                         |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE                           |Region_1                         |1635222870000                    |1635222900000                    |1                                |
|MALE                             |Region_8                         |1635222870000                    |1635222900000                    |3                                |
|MALE                             |Region_9                         |1635222870000                    |1635222900000                    |3                                |
|MALE                             |Region_2                         |1635222870000                    |1635222900000                    |5                                |
|FEMALE                           |Region_3                         |1635222870000                    |1635222900000                    |4                                |

WINDOWSTART列/WINDOWEND列が勝手に付与されており、これが時間間隔を表している(と思うのがだがタイムスタンプの見方がよく分からない)。

次にPull Queryを発行してみるが、genderとregionidをそれぞれ指定することになる。

Pull_Queryの発行
SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
Pull_Query応答
ksql> SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER                           |REGIONID                         |WINDOWSTART                      |WINDOWEND                        |NUMUSERS                         |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE                           |Region_4                         |1635222900000                    |1635222930000                    |1                                |
Query terminated

ちなみに、Pull Queryに対してWHERE指定が無いと以下のように怒られる。どうしてもやりたきゃ
SET 'ksql.query.pull.table.scan.enabled'='true'
を指定してもよいはず。

WHEREを指定しないとこうなる
ksql> SELECT * FROM pageviews_regions;
Missing WHERE clause.  See https://cnfl.io/queries for more info.
Add EMIT CHANGES if you intended to issue a push query.
Pull queries require a WHERE clause that:
 - includes a key equality expression, e.g. `SELECT * FROM X WHERE <key-column>=Y;`.
 - in the case of a multi-column key, is a conjunction of equality expressions that cover all key columns.
 - (optionally) limits the time bounds of the windowed table.
         Bounds on [`WINDOWSTART`, `WINDOWEND`] are supported
         Supported operators are [EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL]
If more flexible queries are needed, table scans can be enabled by setting ksql.query.pull.table.scan.enabled=true.

WINDOWSTART列を指定することも可能。

WINDOWSTART修飾
SELECT NUMUSERS FROM pageviews_regions WHERE
  gender='FEMALE' AND regionid='Region_1' AND WINDOWSTART=1635222870000;
WINDOWSTART修飾への応答
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|NUMUSERS                                                                                                                                                                         |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1                                                                                                                                                                                |
Query terminated
  Query terminated

WINDOWSTART/WINDOWENDの範囲指定も可能。

時間範囲指定
SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE
gender='FEMALE' AND regionid='Region_3' 
AND 1635222870000 <= WINDOWSTART 
AND WINDOWSTART <= 1635222930000; 
時間範囲指定ヘの応答
ksql> 
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|WINDOWSTART                                              |WINDOWEND                                                |NUMUSERS                                                 |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635222870000                                            |1635222900000                                            |4                                                        |
Query terminated

なお、書き出し先がStreamもしくはTableの永続化されたQueryは以下のコマンドで一覧できる。

SHOW_QUERIES応答
ksql> SHOW QUERIES;

 Query ID                          | Query Type | Status    | Sink Name                | Sink Kafka Topic         | Query String                                                                                                                                                                                                                                                                                                                                                                                                                         
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 CTAS_RIDERSNEARMOUNTAINVIEW_5     | PERSISTENT | RUNNING:1 | RIDERSNEARMOUNTAINVIEW   | RIDERSNEARMOUNTAINVIEW   | CREATE TABLE RIDERSNEARMOUNTAINVIEW WITH (KAFKA_TOPIC='RIDERSNEARMOUNTAINVIEW', PARTITIONS=1, REPLICAS=1) AS SELECT   ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) DISTANCEINMILES,   COLLECT_LIST(CURRENTLOCATION.PROFILEID) RIDERS,   COUNT(*) COUNT FROM CURRENTLOCATION CURRENTLOCATION GROUP BY ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) EMIT CHANGES;
 CSAS_PAGEVIEWS_ENRICHED_113       | PERSISTENT | RUNNING:1 | PAGEVIEWS_ENRICHED       | PAGEVIEWS_ENRICHED       | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT   USERS_ORIGINAL.ID USERID,   PAGEVIEWS_ORIGINAL.PAGEID PAGEID,   USERS_ORIGINAL.REGIONID REGIONID,   USERS_ORIGINAL.GENDER GENDER FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL LEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.ID)) EMIT CHANGES;                          
 CSAS_PAGEVIEWS_FEMALE_115         | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE         | PAGEVIEWS_FEMALE         | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE') EMIT CHANGES;                                                                                                                                                                                                                     
 CTAS_PAGEVIEWS_REGIONS_119        | PERSISTENT | RUNNING:1 | PAGEVIEWS_REGIONS        | PAGEVIEWS_REGIONS        | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT   PAGEVIEWS_ENRICHED.GENDER GENDER,   PAGEVIEWS_ENRICHED.REGIONID REGIONID,   COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS )  GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;                                           
 CSAS_PAGEVIEWS_FEMALE_LIKE_89_117 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9')) EMIT CHANGES;                                                                                                                                                             
 CTAS_CURRENTLOCATION_3            | PERSISTENT | RUNNING:1 | CURRENTLOCATION          | CURRENTLOCATION          | CREATE TABLE CURRENTLOCATION WITH (KAFKA_TOPIC='CURRENTLOCATION', PARTITIONS=1, REPLICAS=1) AS SELECT   RIDERLOCATIONS.PROFILEID PROFILEID,   LATEST_BY_OFFSET(RIDERLOCATIONS.LATITUDE) LA,   LATEST_BY_OFFSET(RIDERLOCATIONS.LONGITUDE) LO FROM RIDERLOCATIONS RIDERLOCATIONS GROUP BY RIDERLOCATIONS.PROFILEID EMIT CHANGES;                                                                                                       
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

DESCRIBEにEXTENDEDオプションを付けると、どれぐらいのメッセージを処理した(Consumer Group Summary)のか等が分かる。

EXTENDEDオプション付きDESCRIBE応答
ksql> DESCRIBE PAGEVIEWS_REGIONS EXTENDED;

Name                 : PAGEVIEWS_REGIONS
Type                 : TABLE
Timestamp field      : Not set - using <ROWTIME>
Key format           : JSON
Value format         : JSON
Kafka topic          : PAGEVIEWS_REGIONS (partitions: 1, replication: 1)
Statement            : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT
  PAGEVIEWS_ENRICHED.GENDER GENDER,
  PAGEVIEWS_ENRICHED.REGIONID REGIONID,
  COUNT(*) NUMUSERS
FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED
WINDOW TUMBLING ( SIZE 30 SECONDS )
GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID
EMIT CHANGES;

 Field    | Type
-------------------------------------------------------------------
 GENDER   | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 REGIONID | VARCHAR(STRING)  (primary key) (Window type: TUMBLING)
 NUMUSERS | BIGINT
-------------------------------------------------------------------

Queries that write from this TABLE
-----------------------------------
CTAS_PAGEVIEWS_REGIONS_119 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT   PAGEVIEWS_ENRICHED.GENDER GENDER,   PAGEVIEWS_ENRICHED.REGIONID REGIONID,   COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS )  GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Local runtime statistics
------------------------
messages-per-sec:         0   total-messages:       102     last-message: 2021-10-26T04:35:30.3Z

(Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119

Kafka topic          : PAGEVIEWS_ENRICHED
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
 0         | 0            | 319        | 319    | 0
------------------------------------------------------

Kafka topic          : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119-Aggregate-GroupBy-repartition
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
 0         | 175          | 175        | 175    | 0
------------------------------------------------------

まとめ(前半)

-ksql処理をCREATEで永続化する際のStreamとTableの使い分けが良く分からない。後者は何らかのKeyが必須になるらしいが…

  • Pull Query/Push Queryの明確な定義がよく分からない…とりあえずEMIT CHANGES指定は原則Push Queryと理解しておくが、テスト結果からは何か違う気もする。
  • SELECT WINDOW TUMBLING (size x second)でx秒間隔での集計が可能。
  • GROUP BY対象とした列がそのTableのMessage Keyになるようだ。

ずいぶん長くなったので後半は別文書に・・・・

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?