1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

ksqlDB を使用したストリーミングクエリの作成(後半)

Last updated at Posted at 2021-10-26

ねらい

前半に続いて、下記元ネタにおける「ksqlDB でのネスト化されたスキーマ(STRUCT)の使用」以降を実施してみる。(こちらも古いリンクのせいでそれはもう筆舌に尽くしがたい苦しみを味わったのだが、全て闇に葬った。)

元ネタ:Write streaming queries against Apache Kafka® using ksqlDB (Local)

関連Qiita記事

1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する

ksqlDB を使用したストリーミングクエリの作成(後半)

ksqlDB でのネスト化されたスキーマ(STRUCT)の使用

今回Stream/Tableの元ネタになるのはksql-datagenのプリセットの一つのordersである。

ksql_datagenへの指示
bin/ksql-datagen quickstart=orders format=avro topic=orders msgRate=1

ksql-datagenの出力ログは下記の通り。

ksql-datagenのログ
[0] --> ([ 1504174143763L | 0 | 'Item_878' | 4.036972663514597 | Struct{city=City_67,state=State_25,zipcode=71437} ]) ts:1635225586518
[1] --> ([ 1509355271065L | 1 | 'Item_670' | 9.154643686738641 | Struct{city=City_32,state=State_25,zipcode=81180} ]) ts:1635225586796
[2] --> ([ 1487743369101L | 2 | 'Item_682' | 9.913937197671148 | Struct{city=City_99,state=State_25,zipcode=99831} ]) ts:1635225587794
[3] --> ([ 1514028266544L | 3 | 'Item_915' | 7.027966334147111 | Struct{city=City_68,state=State_78,zipcode=47239} ]) ts:1635225588795
[4] --> ([ 1500111527090L | 4 | 'Item_533' | 9.268916469646818 | Struct{city=City_71,state=State_72,zipcode=15953} ]) ts:1635225589795
[5] --> ([ 1492193039132L | 5 | 'Item_294' | 7.781222807562362 | Struct{city=City_46,state=State_66,zipcode=29977} ]) ts:1635225590795
[6] --> ([ 1495043772684L | 6 | 'Item_322' | 1.2480686742580827 | Struct{city=City_86,state=State_69,zipcode=18948} ]) ts:1635225591795
[7] --> ([ 1505289388278L | 7 | 'Item_233' | 0.15332131177604907 | Struct{city=City_29,state=State_98,zipcode=64327} ]) ts:1635225592795
[8] --> ([ 1515383680311L | 8 | 'Item_521' | 6.810614048446687 | Struct{city=City_63,state=State_28,zipcode=32247} ]) ts:1635225593795
[9] --> ([ 1515133573377L | 9 | 'Item_981' | 4.464408695223682 | Struct{city=City_81,state=State_13,zipcode=87019} ]) ts:1635225594795
[10] --> ([ 1506411308569L | 10 | 'Item_366' | 4.2732457996601845 | Struct{city=City_29,state=State_95,zipcode=93103} ]) ts:1635225595795
[11] --> ([ 1492697874803L | 11 | 'Item_688' | 4.155429455625357 | Struct{city=City_77,state=State_47,zipcode=48054} ]) ts:1635225596795
[12] --> ([ 1498066277947L | 12 | 'Item_438' | 8.852426005487066 | Struct{city=City_35,state=State_52,zipcode=36616} ]) ts:1635225597795

Schema RegistryのログからSubject名を調べて、そこから以下のAPIでSchemaを探ってみる。jqの使い方は先達に教えてもらった。

SchemaRegistryAPI
curl -X GET http://localhost:8081/subjects/orders-value/versions/1 | jq .schema | sed -e 's/^"//' -e 's/"$//' -e 's/\\//g' | jq .

Avro Schema構造を見ると、KsqlDataSourceSchema Recordのaddress要素の中にKsqlDataSourceSchema_addressというRecordがあり、それがcity, state, zipcodeというフィールドを保持しているようだ。

ordersのSchema構造
{
  "type": "record",
  "name": "KsqlDataSourceSchema",
  "namespace": "io.confluent.ksql.avro_schemas",
  "fields": [
    {
      "name": "ordertime",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "orderid",
      "type": [
        "null",
        "int"
      ],
      "default": null
    },
    {
      "name": "itemid",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "orderunits",
      "type": [
        "null",
        "double"
      ],
      "default": null
    },
    {
      "name": "address",
      "type": [
        "null",
        {
          "type": "record",
          "name": "KsqlDataSourceSchema_address",
          "fields": [
            {
              "name": "city",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "state",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "zipcode",
              "type": [
                "null",
                "long"
              ],
              "default": null
            }
          ]
        }
      ],
      "default": null
    }
  ]
}

ksqlDB CLIのPRINTでメッセージを観察してみる。Message Keyにはorderidがそのまま入り、address要素の中にネストされてcity, state, zipcodeが入っていることが分かる。

Message内容
Key format: KAFKA_INT or KAFKA_STRING
Value format: AVRO
rowtime: 2021/10/26 05:19:46.518 Z, key: 0, value: {"ordertime": 1504174143763, "orderid": 0, "itemid": "Item_878", "orderunits": 4.036972663514597, "address": {"city": "City_67", "state": "State_25", "zipcode": 71437}}, partition: 0
...

次に発行するCREATEの意味が悩ましかったが、せっかくのAvro Schemaを参照せずにaddress要素のネスト構造を手作業で宣言するにはこうやるんですよ、というサンプルだろうと解釈した。

CREATEその1
CREATE STREAM ORDERS
  (
    ORDERTIME BGINT,
    ORDERID INT,
    ITEMID STRING,
    ORDERUNITS DOUBLE,
    ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
  )
   WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');

DESCRIBEした結果は以下のとおり。

DESCRIBE結果その1
ksql> DESCRIBE ORDERS;

Name                 : ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------

実はAvro Schemaから直接スキーマ構造を読み取ることも可能。StreamなのでKey指定も必須ではない。

CREATEその2
CREATE STREAM ORDERS WITH (KAFKA_TOPIC='orders', VALUE_FORMAT='avro');

これで作成しても、DESCRIBEするとちゃんとAVRO Schemaからネスト構造を引張ってきていた。

DESCRIBE結果その2
ksql> DESCRIBE ORDERS;

Name                 : ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

StreamのSELECTも成功した。ネストされた子要素を拾いたいときはアスキーアート的矢印->を入れろというのは面白い。

ネスト構造のSELECT
SELECT ORDERID, ADDRESS->CITY FROM ORDERS EMIT CHANGES LIMIT 5;
子要素の表示
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|ORDERID                                                                                |CITY                                                                                   |
+---------------------------------------------------------------------------------------+---------------------------------------------------------------------------------------+
|0                                                                                      |City_75                                                                                |
|1                                                                                      |City_29                                                                                |
|2                                                                                      |City_75                                                                                |
|3                                                                                      |City_24                                                                                |
|4                                                                                      |City_19                                                                                |
Limit Reached
Query terminated

INSERT INTO

例によってksql-datagenを用いてプリセットordersを使用した2つのTopicを作成する。

ksql-datagenへの指示
bin/ksql-datagen quickstart=orders format=json topic=orders_local msgRate=2
bin/ksql-datagen quickstart=orders format=json topic=orders_3rdparty msgRate=2

ksqlDB CLIを用いて中身を見ると、JSONで格納されていることが分かる。(Avroと見た目は一緒だが)

Message内容
ksql> PRINT orders_local FROM BEGINNING;
Key format: KAFKA_INT or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2021/10/26 05:37:41.369 Z, key: 0, value: {"ordertime":1503864575926,"orderid":0,"itemid":"Item_434","orderunits":4.2864657664768755,"address":{"city":"City_69","state":"State_35","zipcode":46591}}, partition: 0
....

次にこれらのTopicに対応したStreamを作成する。

CREATE処理
CREATE STREAM ORDERS_SRC_LOCAL
 (
   ORDERTIME BIGINT,
   ORDERID INT,
   ITEMID STRING,
   ORDERUNITS DOUBLE,
   ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
 )
  WITH (KAFKA_TOPIC='orders_local', VALUE_FORMAT='JSON');

CREATE STREAM ORDERS_SRC_3RDPARTY
 (
   ORDERTIME BIGINT,
   ORDERID INT,
   ITEMID STRING,
   ORDERUNITS DOUBLE,
   ADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>
 )
  WITH (KAFKA_TOPIC='orders_3rdparty', VALUE_FORMAT='JSON');

両StreamともSELECTで内容を見ることが出来たので問題なさそう。次にStreamのORDERS_SRC_LOCALからStreamのALL_ORDERSを作る。この際にSRC列とそこに含める'LOCAL'という文字列を追加しておく。

ALL_ORDERS作成
CREATE STREAM ALL_ORDERS AS SELECT 'LOCAL' AS SRC, * FROM ORDERS_SRC_LOCAL EMIT CHANGES;

DESCRIBEで調べるとSRC列が追加されていることが分かる。

DESCRIBE結果
ksql> DESCRIBE ALL_ORDERS;

Name                 : ALL_ORDERS
 Field      | Type
----------------------------------------------------------------------------------
 SRC        | VARCHAR(STRING)
 ORDERTIME  | BIGINT
 ORDERID    | INTEGER
 ITEMID     | VARCHAR(STRING)
 ORDERUNITS | DOUBLE
 ADDRESS    | STRUCT<CITY VARCHAR(STRING), STATE VARCHAR(STRING), ZIPCODE BIGINT>
----------------------------------------------------------------------------------
For runtime statistics and query details run: DESCRIBE <Stream,Table> EXTENDED;

次に作成したStream ALL_ORDERSにORDERS_3RDPARTYをINSERTで追加する。INSERTの引数にSELECTが入るのがすごく気持ち悪い。

STREAMへのINSERT処理
INSERT INTO ALL_ORDERS SELECT '3RD PARTY' AS SRC, * FROM ORDERS_SRC_3RDPARTY EMIT CHANGES;

DESCRIBEしても特に結果は変わらなかった。しかしALL_ORDERSをSELECTすると、2つのStreamが統合されているのがわかる。

ALL_ORDEERSのSELECT結果
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|SRC                        |ORDERTIME                  |ORDERID                    |ITEMID                     |ORDERUNITS                 |ADDRESS                    |
+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+---------------------------+
|LOCAL                      |1491607148202              |0                          |Item_898                   |2.295635561371047          |{CITY=City_75, STATE=State_|
|                           |                           |                           |                           |                           |51, ZIPCODE=99961}         |
|LOCAL                      |1502213077209              |1                          |Item_140                   |2.5099291074319723         |{CITY=City_31, STATE=State_|
|                           |                           |                           |                           |                           |85, ZIPCODE=60896}         |
|LOCAL                      |1499994360002              |2                          |Item_389                   |0.2058594684735549         |{CITY=City_87, STATE=State_|
|                           |                           |                           |                           |                           |73, ZIPCODE=21220}         |
|LOCAL                      |1496100207758              |3                          |Item_847                   |7.477474294032186          |{CITY=City_86, STATE=State_|
|                           |                           |                           |                           |                           |32, ZIPCODE=22647}         |
|LOCAL                      |1505599782301              |4                          |Item_315                   |8.881110008286903          |{CITY=City_96, STATE=State_|
|                           |                           |                           |                           |                           |92, ZIPCODE=55875}         |
|LOCAL                      |1513625276385              |5                          |Item_125                   |1.951656779215225          |{CITY=City_68, STATE=State_|
|                           |                           |                           |                           |                           |48, ZIPCODE=48094}         |
|3RD PARTY                  |1516534876769              |0                          |Item_276                   |9.00734689040726           |{CITY=City_79, STATE=State_|
|                           |                           |                           |                           |                           |69, ZIPCODE=90948}         |
|3RD PARTY                  |1491684084899              |1                          |Item_724                   |9.804594098542523          |{CITY=City_85, STATE=State_|
|                           |                           |                           |                           |                           |44, ZIPCODE=41963}         |
|LOCAL                      |1493513975029              |6                          |Item_364                   |0.3857448600632095         |{CITY=City_83, STATE=State_|
|                           |                           |                           |                           |                           |48, ZIPCODE=91698}         |
|3RD PARTY                  |1510339405388              |2                          |Item_184                   |5.594021107440936          |{CITY=City_56, STATE=State_|

まとめ(後半)

  • ネストされた子構造はCREATE時の列名としてADDRESS STRUCT<CITY STRING, STATE STRING, ZIPCODE BIGINT>のようSTRUCT<>を用いて指定するが、Avro Shcema利用ならそのまま引張ってきてもよい。
  • ネストされた子構造をSELECTする際は列名としてADDRESS->CITYのように->を用いて指定する。
  • INSERT INTO + SELECTによって、あるStreamに別のStreamを統合することが出来る(が表構造の一致が前提なんだろうか)。
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?