ねらい
前半に続いて、下記元ネタにおける「ksqlDB でのネスト化されたスキーマ(STRUCT)の使用」以降を実施してみる。(こちらも古いリンクのせいでそれはもう筆舌に尽くしがたい苦しみを味わったのだが、全て闇に葬った。)
元ネタ:Write streaming queries against Apache Kafka® using ksqlDB (Local)
関連Qiita記事
1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する
ksqlDB を使用したストリーミングクエリの作成(後半)
ksqlDB でのネスト化されたスキーマ(STRUCT)の使用
今回Stream/Tableの元ネタになるのはksql-datagenのプリセットの一つのordersである。
bin/ksql-datagen quickstart=orders format=avro topic=orders msgRate=1
ksql-datagenの出力ログは下記の通り。
[0] --> ([ 1504174143763L | 0 | 'Item_878' | 4.036972663514597 | Struct{city=City_67,state=State_25,zipcode=71437} ]) ts:1635225586518
[1] --> ([ 1509355271065L | 1 | 'Item_670' | 9.154643686738641 | Struct{city=City_32,state=State_25,zipcode=81180} ]) ts:1635225586796
[2] --> ([ 1487743369101L | 2 | 'Item_682' | 9.913937197671148 | Struct{city=City_99,state=State_25,zipcode=99831} ]) ts:1635225587794
[3] --> ([ 1514028266544L | 3 | 'Item_915' | 7.027966334147111 | Struct{city=City_68,state=State_78,zipcode=47239} ]) ts:1635225588795
[4] --> ([ 1500111527090L | 4 | 'Item_533' | 9.268916469646818 | Struct{city=City_71,state=State_72,zipcode=15953} ]) ts:1635225589795
[5] --> ([ 1492193039132L | 5 | 'Item_294' | 7.781222807562362 | Struct{city=City_46,state=State_66,zipcode=29977} ]) ts:1635225590795
[6] --> ([ 1495043772684L | 6 | 'Item_322' | 1.2480686742580827 | Struct{city=City_86,state=State_69,zipcode=18948} ]) ts:1635225591795
[7] --> ([ 1505289388278L | 7 | 'Item_233' | 0.15332131177604907 | Struct{city=City_29,state=State_98,zipcode=64327} ]) ts:1635225592795
[8] --> ([ 1515383680311L | 8 | 'Item_521' | 6.810614048446687 | Struct{city=City_63,state=State_28,zipcode=32247} ]) ts:1635225593795
[9] --> ([ 1515133573377L | 9 | 'Item_981' | 4.464408695223682 | Struct{city=City_81,state=State_13,zipcode=87019} ]) ts:1635225594795
[10] --> ([ 1506411308569L | 10 | 'Item_366' | 4.2732457996601845 | Struct{city=City_29,state=State_95,zipcode=93103} ]) ts:1635225595795
[11] --> ([ 1492697874803L | 11 | 'Item_688' | 4.155429455625357 | Struct{city=City_77,state=State_47,zipcode=48054} ]) ts:1635225596795
[12] --> ([ 1498066277947L | 12 | 'Item_438' | 8.852426005487066 | Struct{city=City_35,state=State_52,zipcode=36616} ]) ts:1635225597795
Schema RegistryのログからSubject名を調べて、そこから以下のAPIでSchemaを探ってみる。jqの使い方は先達に教えてもらった。
curl -X GET http://localhost:8081/subjects/orders-value/versions/1 | jq .schema | sed -e 's/^"//' -e 's/"$//' -e 's/\\//g' | jq .
Avro Schema構造を見ると、KsqlDataSourceSchema
Recordのaddress要素の中にKsqlDataSourceSchema_address
というRecordがあり、それがcity, state, zipcode
というフィールドを保持しているようだ。
{
"type": "record",
"name": "KsqlDataSourceSchema",
"namespace": "io.confluent.ksql.avro_schemas",
"fields": [
{
"name": "ordertime",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "orderid",
"type": [
"null",
"int"
],
"default": null
},
{
"name": "itemid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "orderunits",
"type": [
"null",
"double"
],
"default": null
},
{
"name": "address",
"type": [
"null",
{
"type": "record",
"name": "KsqlDataSourceSchema_address",
"fields": [
{
"name": "city",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "state",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "zipcode",
"type": [
"null",
"long"
],
"default": null
}
]
}
],
"default": null
}
]
}
ksqlDB CLIのPRINTでメッセージを観察してみる。Message Keyにはorderidがそのまま入り、address要素の中にネストされてcity, state, zipcode
が入っていることが分かる。
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 05:19:46.518 Z, key: 0, value: {"ordertime": 1504174143763, "orderid": 0, "itemid": "Item_878", "orderunits": 4.036972663514597, "address": {"city": "City_67", "state": "State_25", "zipcode": 71437}}, partition: 0
...
次に発行するCREATEの意味が悩ましかったが、せっかくのAvro Schemaを参照せずにaddress要素のネスト構造を手作業で宣言するにはこうやるんですよ、というサンプルだろうと解釈した。
CREATE STREAM ORDERS
(
ORDERTIME BGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');
DESCRIBEした結果は以下のとおり。
ksql> DESCRIBE ORDERS;
Name : ORDERS
Field | Type
----------------------------------------------------------------------------------
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
実はAvro Schemaから直接スキーマ構造を読み取ることも可能。StreamなのでKey指定も必須ではない。
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');
これで作成しても、DESCRIBEするとちゃんとAVRO Schemaからネスト構造を引張ってきていた。
ksql> DESCRIBE ORDERS;
Name : ORDERS
Field | Type
----------------------------------------------------------------------------------
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
StreamのSELECTも成功した。ネストされた子要素を拾いたいときはアスキーアート的矢印->
を入れろというのは面白い。
SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|ORDERID |CITY |
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|0 |City_75 |
|1 |City_29 |
|2 |City_75 |
|3 |City_24 |
|4 |City_19 |
Limit Reached
Query terminated
INSERT INTO
例によってksql-datagenを用いてプリセットordersを使用した2つのTopicを作成する。
bin/ksql-datagen quickstart=orders format=json topic=orders_local msgRate=2
bin/ksql-datagen quickstart=orders format=json topic=orders_3rdparty msgRate=2
ksqlDB CLIを用いて中身を見ると、JSONで格納されていることが分かる。(Avroと見た目は一緒だが)
ksql> PRINT orders_local FROM BEGINNING;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 05:37:41.369 Z, key: 0, value: {"ordertime":1503864575926,"orderid":0,"itemid":"Item_434","orderunits":4.2864657664768755,"address":{"city":"City_69","state":"State_35","zipcode":46591}}, partition: 0
....
次にこれらのTopicに対応したStreamを作成する。
CREATE STREAM ORDERS_SRC_LOCAL
(
ORDERTIME BIGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');
CREATE STREAM ORDERS_SRC_3RDPARTY
(
ORDERTIME BIGINT,
ORDERID INT,
ITEMID STRING,
ORDERUNITS DOUBLE,
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
)
WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');
両StreamともSELECTで内容を見ることが出来たので問題なさそう。次にStreamのORDERS_SRC_LOCALからStreamのALL_ORDERSを作る。この際にSRC列とそこに含める'LOCAL'という文字列を追加しておく。
CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES;
DESCRIBEで調べるとSRC列が追加されていることが分かる。
ksql> DESCRIBE ALL_ORDERS;
Name : ALL_ORDERS
Field | Type
----------------------------------------------------------------------------------
SRC | VARCHAR(STRING)
ORDERTIME | BIGINT
ORDERID | INTEGER
ITEMID | VARCHAR(STRING)
ORDERUNITS | DOUBLE
ADDRESS | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;
次に作成したStream ALL_ORDERSにORDERS_3RDPARTYをINSERTで追加する。INSERTの引数にSELECTが入るのがすごく気持ち悪い。
INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;
DESCRIBEしても特に結果は変わらなかった。しかしALL_ORDERSをSELECTすると、2つのStreamが統合されているのがわかる。
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|SRC |ORDERTIME |ORDERID |ITEMID |ORDERUNITS |ADDRESS |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|LOCAL |1491607148202 |0 |Item_898 |2.295635561371047 |{CITY=City_75, STATE=State_|
| | | | | |51, ZIPCODE=99961} |
|LOCAL |1502213077209 |1 |Item_140 |2.5099291074319723 |{CITY=City_31, STATE=State_|
| | | | | |85, ZIPCODE=60896} |
|LOCAL |1499994360002 |2 |Item_389 |0.2058594684735549 |{CITY=City_87, STATE=State_|
| | | | | |73, ZIPCODE=21220} |
|LOCAL |1496100207758 |3 |Item_847 |7.477474294032186 |{CITY=City_86, STATE=State_|
| | | | | |32, ZIPCODE=22647} |
|LOCAL |1505599782301 |4 |Item_315 |8.881110008286903 |{CITY=City_96, STATE=State_|
| | | | | |92, ZIPCODE=55875} |
|LOCAL |1513625276385 |5 |Item_125 |1.951656779215225 |{CITY=City_68, STATE=State_|
| | | | | |48, ZIPCODE=48094} |
|3RD PARTY |1516534876769 |0 |Item_276 |9.00734689040726 |{CITY=City_79, STATE=State_|
| | | | | |69, ZIPCODE=90948} |
|3RD PARTY |1491684084899 |1 |Item_724 |9.804594098542523 |{CITY=City_85, STATE=State_|
| | | | | |44, ZIPCODE=41963} |
|LOCAL |1493513975029 |6 |Item_364 |0.3857448600632095 |{CITY=City_83, STATE=State_|
| | | | | |48, ZIPCODE=91698} |
|3RD PARTY |1510339405388 |2 |Item_184 |5.594021107440936 |{CITY=City_56, STATE=State_|
まとめ(後半)
- ネストされた子構造はCREATE時の列名として
ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
のようSTRUCT<>
を用いて指定するが、Avro Shcema利用ならそのまま引張ってきてもよい。 - ネストされた子構造をSELECTする際は列名として
ADDRESS->CITY
のように->
を用いて指定する。 - INSERT INTO + SELECTによって、あるStreamに別のStreamを統合することが出来る(が表構造の一致が前提なんだろうか)。