ねらい
前回実施したksqlDB Quickstartの続きとして、下記元ネタのストリーミングクエリの作成を実施してみた。初学者が動かした際のメモであるため、元ネタと合わせて参照されたい。なお、日本語の直リンクでは古いバージョンのksqlDBのチュートリアルがトップに来るため注意したほうがよい。(そのせいで物凄い、物凄い、それは物凄い苦労があったが、全て闇に葬った。)
元ネタ:Write streaming queries against Apache Kafka® using ksqlDB (Local)
関連Qiita記事
1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する
環境
- Ubuntu 20.04 (on WSL2)
- Confluent Platform Community Edition 6.2.1
- Kafkaクラスターは最小構成で稼働(*.propertiesは殆どいじっていない)
- ksqlDB Serverに加えてSchema Registryも必要。ksql-server.propertiesの以下のコメントアウトを外しておくこと。
#------ Schema Registry -------
# Uncomment and complete the following to enable KSQL's integration to the Confluent Schema Registry:
ksql.schema.registry.url=http://localhost:8081
ksqlDB を使用したストリーミングクエリの作成(前半)
トピックの作成とデータの生成
Confluent Platformを導入するとksql-datagenというMessageを継続的に吐き出してくれるアプリが使えるようになるので、そいつを動かしてtopicにksql処理のためのデータを吐き出す。(なおhelp表示の際に/usr/logsディレクトリを作ろうとして失敗しているので念のため作っておいた。)
今回quickstartで指定するのはksql-datagenに予めデモ用にプリセットされたusersとpageviewsの2つ。動かしっぱなしは嫌なのですぐ止めたが、後のksql実行時には適時再稼働させている。
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=pageviews format=json topic=pageviews msgRate=5
...
[1635213107712L] --> ([ 1635213107712L | 'User_4' | 'Page_90' ]) ts:1635213108125
[1635213108142L] --> ([ 1635213108142L | 'User_3' | 'Page_78' ]) ts:1635213108143
[1635213108145L] --> ([ 1635213108145L | 'User_7' | 'Page_50' ]) ts:1635213108145
...
gen@LAPTOP-O8FG8ES2:~/confluent-6.2.1$ bin/ksql-datagen quickstart=users format=avro topic=users msgRate=1
...
['User_6'] --> ([ 1490859483194L | 'User_6' | 'Region_4' | 'OTHER' ]) ts:1635213240163
['User_6'] --> ([ 1500448592685L | 'User_6' | 'Region_5' | 'FEMALE' ]) ts:1635213240523
['User_1'] --> ([ 1490186139886L | 'User_1' | 'Region_8' | 'FEMALE' ]) ts:1635213241484
...
ksqlDB CLI の起動 / SHOW および PRINT ステートメントを使用した Kafka トピックの調査
便利、 kafka-topicsより入力文字数が少ないのがよい。
ksql> SHOW TOPICS;
Kafka Topic | Partitions | Partition Replicas
---------------------------------------------------------------
CURRENTLOCATION | 1 | 1
RIDERSNEARMOUNTAINVIEW | 1 | 1
default_ksql_processing_log | 1 | 1
locations | 1 | 1
pageviews | 1 | 1
users | 1 | 1
---------------------------------------------------------------
PRINTを試そうと思ったらCurrent Offsetから読むものらしくksql-datagenを停止していると1件も返ってこない。FROM BEGINNINGオプションを付与して無理やり表示してみた。
ksql> PRINT pageviews FROM BEGINNING;
Key format: KAFKA_BIGINT or KAFKA_DOUBLE
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 01:51:48.125 Z, key: 1635213107712, value: {"viewtime":1635213107712,"userid":"User_4","pageid":"Page_90"}, partition: 0
rowtime: 2021/10/26 01:51:48.143 Z, key: 1635213108142, value: {"viewtime":1635213108142,"userid":"User_3","pageid":"Page_78"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_7","pageid":"Page_50"}, partition: 0
rowtime: 2021/10/26 01:51:48.145 Z, key: 1635213108145, value: {"viewtime":1635213108145,"userid":"User_5","pageid":"Page_63"}, partition: 0
...
ksql> PRINT users FROM BEGINNING;
Key format: KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 01:54:00.163 Z, key: User_6, value: {"registertime": 1490859483194, "userid": "User_6", "regionid": "Region_4", "gender": "OTHER"}, partition: 0
rowtime: 2021/10/26 01:54:00.523 Z, key: User_6, value: {"registertime": 1500448592685, "userid": "User_6", "regionid": "Region_5", "gender": "FEMALE"}, partition: 0
rowtime: 2021/10/26 01:54:01.484 Z, key: User_1, value: {"registertime": 1490186139886, "userid": "User_1", "regionid": "Region_8", "gender": "FEMALE"}, partition: 0
...
ストリームおよびテーブルの作成
まずはJSON区切りのpageviewsに対してStreamをCREATEしてDESCRIBEしてみる。このStreamに対しては明示的なKeyを宣言していない。
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH
(kafka_topic='pageviews', value_format='JSON');
DESCRIBE結果は下記の通り。
ksql> DESCRIBE pageviews_original;
Name : PAGEVIEWS_ORIGINAL
Field | Type
----------------------------
VIEWTIME | BIGINT
USERID | VARCHAR(STRING)
PAGEID | VARCHAR(STRING)
----------------------------
次はAvro Schema区切りのusersに対してTableを作る。このCREATEの意味は「Message Keyから引っ張ってきた内容をid
列にして、Message ValueはAvro Schemaに従って列として付与してね」、ということ。
CREATE TABLE users_original (id VARCHAR PRIMARY KEY) WITH
(kafka_topic='users', value_format='AVRO');
DESCRIBEすると、Primary Keyとしてid
が存在し、残りの列がAvro Schemaに基づいて作成されている。先のMessage内容から、ID
列とUSERID
列は同じ内容を持っていることが分かる。
ksql> DESCRIBE users_original;
Name : USERS_ORIGINAL
Field | Type
-----------------------------------------------
ID | VARCHAR(STRING) (primary key)
REGISTERTIME | BIGINT
USERID | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
-----------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
データの表示
users_originalからのSELECT結果は下記の通り。これはPull Query扱いとして記載されているが、EMIT CHANGES
指定があるからPush Queryに思えてしょうがない。確かにこちらは特になにもしなくてもTopicの先頭から読みにいっているようではあるが…Tableに対するQueryはPull Query扱いになるのだろうか。ID
列に格納されている値はMessage Key由来。
ksql> SELECT * FROM users_original EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|ID |REGISTERTIME |USERID |REGIONID |GENDER |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|User_9 |1496749900085 |User_9 |Region_8 |OTHER |
|User_6 |1488503784213 |User_6 |Region_4 |OTHER |
|User_4 |1492836860050 |User_4 |Region_4 |MALE |
|User_7 |1505758611024 |User_7 |Region_7 |OTHER |
|User_2 |1513237144819 |User_2 |Region_9 |MALE |
Limit Reached
Query terminated
pageviews_originalからのSELECT結果は下記の通り。Push Query扱いとなっているが、こちらは環境変数をいじらない限り(set 'auto.offset.reset'='earliest'
)、現在のTopic中のOffset以降のMessageしか表示しないようだ。
ksql> SELECT * FROM pageviews_original emit changes LIMIT 3;
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|VIEWTIME |USERID |PAGEID |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635215795084 |User_3 |Page_58 |
|1635215795419 |User_6 |Page_49 |
|1635215795419 |User_4 |Page_63 |
Limit Reached
Query terminated
クエリの書き込み
pageviews_original(Stream)にusers_original(Table)をLEFT JOINしてユーザー情報を拡張するSELECT文を作成する。SELECTの際にAS
で上書きする元の列はMessage Key由来のものであることが必須のようだ。
SELECT users_original.id AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users_original
ON pageviews_original.userid = users_original.id
EMIT CHANGES
LIMIT 5;
SELECTした結果は下記の通り、JOINによってあるページを見た人の情報にそのリージョンIDと性別をくっつけてることができている。
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID |PAGEID |REGIONID |GENDER |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_4 |Page_22 |Region_3 |FEMALE |
|User_2 |Page_66 |Region_2 |OTHER |
|User_7 |Page_73 |Region_9 |MALE |
|User_5 |Page_88 |Region_4 |MALE |
|User_6 |Page_99 |Region_1 |MALE |
Limit Reached
Query terminated
このSELECT文をStreamとして永続化する。ON
条件にもMessage Key由来の列が一つは必須だった(気がする)。
CREATE STREAM pageviews_enriched AS
SELECT users_original.id AS userid, pageid, regionid, gender
FROM pageviews_original
LEFT JOIN users_original
ON pageviews_original.userid = users_original.id
EMIT CHANGES;
一応DESCRIBEもしておく。
ksql> DESCRIBE pageviews_enriched;
Name : PAGEVIEWS_ENRICHED
Field | Type
-----------------------------------
USERID | VARCHAR(STRING) (key)
PAGEID | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
GENDER | VARCHAR(STRING)
-----------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
STREAMをSELECTしてみると、先のksqlと同じような結果が得られた。このksqlは延々と実行されStream(Topic)に結果を書き続けるのだろう。
ksql> SELECT * FROM pageviews_enriched EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID |PAGEID |REGIONID |GENDER |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8 |Page_71 |Region_9 |OTHER |
|User_5 |Page_47 |Region_1 |MALE |
|User_5 |Page_14 |Region_1 |MALE |
|User_2 |Page_34 |Region_2 |OTHER |
|User_2 |Page_61 |Region_2 |OTHER |
Limit Reached
Query terminated
このStreamからさらに女性だけを選択するStream(pageviews_female)を作成する。
CREATE STREAM pageviews_female AS
SELECT * FROM pageviews_enriched
WHERE gender = 'FEMALE'
EMIT CHANGES;
そこから更にRegion_8もしくはRegion_9に住んでいる人のみを選択するStream(pageview_female_like89)を作成する。
CREATE STREAM pageviews_female_like_89
WITH (kafka_topic='pageviews_enriched_r8_r9') AS
SELECT * FROM pageviews_female
WHERE regionid LIKE '%_8' OR regionid LIKE '%_9'
EMIT CHANGES;
SELECTすると以下のようにフェミニンかつ地域限定の結果が得られる。
ksql> select * from pageviews_female_like_89 EMIT CHANGES LIMIT 5;
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|USERID |PAGEID |REGIONID |GENDER |
+------------------------------------------+------------------------------------------+------------------------------------------+------------------------------------------+
|User_8 |Page_77 |Region_9 |FEMALE |
|User_7 |Page_84 |Region_9 |FEMALE |
|User_7 |Page_79 |Region_9 |FEMALE |
|User_7 |Page_33 |Region_9 |FEMALE |
次に性別とリージョンIDでGROUP BYしたTableを作成する。このCREATEは内部的にGROUP BY対象のgender列+regionid列をMessage Keyとして持つため、KEY_FORMAT='json'
指定で複数の列がMessage Key内に存在できるよう定義する必要がある。
CREATE TABLE pageviews_regions
WITH (KEY_FORMAT='json') AS
SELECT gender, regionid , COUNT(*) AS numusers
FROM pageviews_enriched
WINDOW TUMBLING (size 30 second)
GROUP BY gender, regionid
EMIT CHANGES;
DESCRIBEしてみると、以下のようにgender/regionidともにPrimary Keyになっている。JSON形式でMessage Key内に2つの列が格納されているのだろう。
ksql> DESCRIBE pageviews_regions;
Name : PAGEVIEWS_REGIONS
Field | Type
-------------------------------------------------------------------
GENDER | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
REGIONID | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
NUMUSERS | BIGINT
-------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
もう一つのこのCREATEのポイントはWINDOW TUMBLING (size 30 second)
だろう。ksqlドキュメントによるとある一定の時間間隔の集計を行うものらしい。試しにSELECTしてみる。
ksql> SELECT * FROM pageviews_regions EMIT CHANGES LIMIT 5;
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER |REGIONID |WINDOWSTART |WINDOWEND |NUMUSERS |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE |Region_1 |1635222870000 |1635222900000 |1 |
|MALE |Region_8 |1635222870000 |1635222900000 |3 |
|MALE |Region_9 |1635222870000 |1635222900000 |3 |
|MALE |Region_2 |1635222870000 |1635222900000 |5 |
|FEMALE |Region_3 |1635222870000 |1635222900000 |4 |
WINDOWSTART列/WINDOWEND列が勝手に付与されており、これが時間間隔を表している(と思うのがだがタイムスタンプの見方がよく分からない)。
次にPull Queryを発行してみるが、genderとregionidをそれぞれ指定することになる。
SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
ksql> SELECT * FROM pageviews_regions WHERE gender='FEMALE' AND regionid='Region_4';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|GENDER |REGIONID |WINDOWSTART |WINDOWEND |NUMUSERS |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|FEMALE |Region_4 |1635222900000 |1635222930000 |1 |
Query terminated
ちなみに、Pull Queryに対してWHERE指定が無いと以下のように怒られる。どうしてもやりたきゃ
SET 'ksql.query.pull.table.scan.enabled'='true'
を指定してもよいはず。
ksql> SELECT * FROM pageviews_regions;
Missing WHERE clause. See https://cnfl.io/queries for more info.
Add EMIT CHANGES if you intended to issue a push query.
Pull queries require a WHERE clause that:
- includes a key equality expression, e.g. `SELECT * FROM X WHERE <key-column>=Y;`.
- in the case of a multi-column key, is a conjunction of equality expressions that cover all key columns.
- (optionally) limits the time bounds of the windowed table.
Bounds on [`WINDOWSTART`, `WINDOWEND`] are supported
Supported operators are [EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL]
If more flexible queries are needed, table scans can be enabled by setting ksql.query.pull.table.scan.enabled=true.
WINDOWSTART列を指定することも可能。
SELECT NUMUSERS FROM pageviews_regions WHERE
gender='FEMALE' AND regionid='Region_1' AND WINDOWSTART=1635222870000;
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|NUMUSERS |
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1 |
Query terminated
Query terminated
WINDOWSTART/WINDOWENDの範囲指定も可能。
SELECT WINDOWSTART, WINDOWEND, NUMUSERS FROM pageviews_regions WHERE
gender='FEMALE' AND regionid='Region_3'
AND 1635222870000 <= WINDOWSTART
AND WINDOWSTART <= 1635222930000;
ksql>
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|WINDOWSTART |WINDOWEND |NUMUSERS |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|1635222870000 |1635222900000 |4 |
Query terminated
なお、書き出し先がStreamもしくはTableの永続化されたQueryは以下のコマンドで一覧できる。
ksql> SHOW QUERIES;
Query ID | Query Type | Status | Sink Name | Sink Kafka Topic | Query String
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
CTAS_RIDERSNEARMOUNTAINVIEW_5 | PERSISTENT | RUNNING:1 | RIDERSNEARMOUNTAINVIEW | RIDERSNEARMOUNTAINVIEW | CREATE TABLE RIDERSNEARMOUNTAINVIEW WITH (KAFKA_TOPIC='RIDERSNEARMOUNTAINVIEW', PARTITIONS=1, REPLICAS=1) AS SELECT ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) DISTANCEINMILES, COLLECT_LIST(CURRENTLOCATION.PROFILEID) RIDERS, COUNT(*) COUNT FROM CURRENTLOCATION CURRENTLOCATION GROUP BY ROUND(GEO_DISTANCE(CURRENTLOCATION.LA, CURRENTLOCATION.LO, 37.4133, -122.1162), -1) EMIT CHANGES;
CSAS_PAGEVIEWS_ENRICHED_113 | PERSISTENT | RUNNING:1 | PAGEVIEWS_ENRICHED | PAGEVIEWS_ENRICHED | CREATE STREAM PAGEVIEWS_ENRICHED WITH (KAFKA_TOPIC='PAGEVIEWS_ENRICHED', PARTITIONS=1, REPLICAS=1) AS SELECT USERS_ORIGINAL.ID USERID, PAGEVIEWS_ORIGINAL.PAGEID PAGEID, USERS_ORIGINAL.REGIONID REGIONID, USERS_ORIGINAL.GENDER GENDER FROM PAGEVIEWS_ORIGINAL PAGEVIEWS_ORIGINAL LEFT OUTER JOIN USERS_ORIGINAL USERS_ORIGINAL ON ((PAGEVIEWS_ORIGINAL.USERID = USERS_ORIGINAL.ID)) EMIT CHANGES;
CSAS_PAGEVIEWS_FEMALE_115 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE | PAGEVIEWS_FEMALE | CREATE STREAM PAGEVIEWS_FEMALE WITH (KAFKA_TOPIC='PAGEVIEWS_FEMALE', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WHERE (PAGEVIEWS_ENRICHED.GENDER = 'FEMALE') EMIT CHANGES;
CTAS_PAGEVIEWS_REGIONS_119 | PERSISTENT | RUNNING:1 | PAGEVIEWS_REGIONS | PAGEVIEWS_REGIONS | CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;
CSAS_PAGEVIEWS_FEMALE_LIKE_89_117 | PERSISTENT | RUNNING:1 | PAGEVIEWS_FEMALE_LIKE_89 | pageviews_enriched_r8_r9 | CREATE STREAM PAGEVIEWS_FEMALE_LIKE_89 WITH (KAFKA_TOPIC='pageviews_enriched_r8_r9', PARTITIONS=1, REPLICAS=1) AS SELECT * FROM PAGEVIEWS_FEMALE PAGEVIEWS_FEMALE WHERE ((PAGEVIEWS_FEMALE.REGIONID LIKE '%_8') OR (PAGEVIEWS_FEMALE.REGIONID LIKE '%_9')) EMIT CHANGES;
CTAS_CURRENTLOCATION_3 | PERSISTENT | RUNNING:1 | CURRENTLOCATION | CURRENTLOCATION | CREATE TABLE CURRENTLOCATION WITH (KAFKA_TOPIC='CURRENTLOCATION', PARTITIONS=1, REPLICAS=1) AS SELECT RIDERLOCATIONS.PROFILEID PROFILEID, LATEST_BY_OFFSET(RIDERLOCATIONS.LATITUDE) LA, LATEST_BY_OFFSET(RIDERLOCATIONS.LONGITUDE) LO FROM RIDERLOCATIONS RIDERLOCATIONS GROUP BY RIDERLOCATIONS.PROFILEID EMIT CHANGES;
-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
DESCRIBEにEXTENDEDオプションを付けると、どれぐらいのメッセージを処理した(Consumer Group Summary)のか等が分かる。
ksql> DESCRIBE PAGEVIEWS_REGIONS EXTENDED;
Name : PAGEVIEWS_REGIONS
Type : TABLE
Timestamp field : Not set - using <ROWTIME>
Key format : JSON
Value format : JSON
Kafka topic : PAGEVIEWS_REGIONS (partitions: 1, replication: 1)
Statement : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT
PAGEVIEWS_ENRICHED.GENDER GENDER,
PAGEVIEWS_ENRICHED.REGIONID REGIONID,
COUNT(*) NUMUSERS
FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED
WINDOW TUMBLING ( SIZE 30 SECONDS )
GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID
EMIT CHANGES;
Field | Type
-------------------------------------------------------------------
GENDER | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
REGIONID | VARCHAR(STRING) (primary key) (Window type: TUMBLING)
NUMUSERS | BIGINT
-------------------------------------------------------------------
Queries that write from this TABLE
-----------------------------------
CTAS_PAGEVIEWS_REGIONS_119 (RUNNING) : CREATE TABLE PAGEVIEWS_REGIONS WITH (KAFKA_TOPIC='PAGEVIEWS_REGIONS', KEY_FORMAT='json', PARTITIONS=1, REPLICAS=1) AS SELECT PAGEVIEWS_ENRICHED.GENDER GENDER, PAGEVIEWS_ENRICHED.REGIONID REGIONID, COUNT(*) NUMUSERS FROM PAGEVIEWS_ENRICHED PAGEVIEWS_ENRICHED WINDOW TUMBLING ( SIZE 30 SECONDS ) GROUP BY PAGEVIEWS_ENRICHED.GENDER, PAGEVIEWS_ENRICHED.REGIONID EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Local runtime statistics
------------------------
messages-per-sec: 0 total-messages: 102 last-message: 2021-10-26T04:35:30.3Z
(Statistics of the local KSQL server interaction with the Kafka topic PAGEVIEWS_REGIONS)
Consumer Groups summary:
Consumer Group : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119
Kafka topic : PAGEVIEWS_ENRICHED
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 319 | 319 | 0
------------------------------------------------------
Kafka topic : _confluent-ksql-default_query_CTAS_PAGEVIEWS_REGIONS_119-Aggregate-GroupBy-repartition
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 175 | 175 | 175 | 0
------------------------------------------------------
まとめ(前半)
-ksql処理をCREATEで永続化する際のStreamとTableの使い分けが良く分からない。後者は何らかのKeyが必須になるらしいが…
- Pull Query/Push Queryの明確な定義がよく分からない…とりあえずEMIT CHANGES指定は原則Push Queryと理解しておくが、テスト結果からは何か違う気もする。
-
SELECT WINDOW TUMBLING (size x second)
でx秒間隔での集計が可能。 - GROUP BY対象とした列がそのTableのMessage Keyになるようだ。
ずいぶん長くなったので後半は別文書に・・・・