1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ksqlDBでメッセージを加工してTombstoneレコードに変換するテクニック

Posted at

Kafkaにおけるメッセージは、連携元で発生するイベントそのものであったり、CDCによってキャプチャーされたソーステーブルの変更内容を示すものとして利用されたりします。
ソーステーブルの変更内容として利用される場合、ソーステーブル側でのINSERT/UPDATE/DELETEがメッセージに体現されますが、DELETEのときのメッセージの内容はすこし特殊です。

image.png

メッセージの内容はCDC製品仕様に依存しますが、大体上記の通りになります。

ポイント:

  • メッセージはキー部とバリュー部に分かれます。
  • INSERT/UPDATEがあると、更新後のレコードの主キーをキー部に、全カラムをバリュー部にセットしたメッセージがKafkaに連携されます。
  • DELETEがあると、削除されたレコードの主キーをキー部に、バリュー部はnullにしたメッセージがKafkaに連携されます。

DELETEによるメッセージはTombstoneレコードと呼ばれます。
Tombstoneレコードは、Kafkaのストリームにおけるテーブル(KTable)やDBのテーブルの、キーが同じレコードを削除する振る舞いをします。

Kafkaにメッセージを送るコンポーネントによっては、削除の振る舞いをさせるためにメッセージを加工してTombostoneレコードを自力で造成する必要が出てくることもあります。
海外サイトを中心に調べ、トライ&エラーを繰り返して辿り着いた方法ですこしマニアックな内容ですが、誰かの何かのためになれば幸いです。

やろうとしていること

image.png

  • kafka-avro-console-producerでTopic1にメッセージを書き込みます。メッセージはキーを示す「ID」、更新区分を示す項目「TYPE」、値を示す「VAL」で構成します。
  • Topic1に対するストリームStream1を作成し、加工処理を行うストリームから参照できるようにします。
  • Stream2を作成しTopic1のメッセージの更新区分「TYPE」が'DEL'のメッセージをTombstoneレコードに変換します。変換後のメッセージはTopic2に書き込まれます。
  • Topic2に対してJDBC Sink ConnectorをアタッチしてDB内のTable1が更新されるようにします。結果的に、Talbe1のレコードが上記図で示す通りになっていれば成功です。

1. kafka-avro-console-producerでTopic1にメッセージを書き込む

Topic1へのAvro形式のメッセージ書き込み
kafka-avro-console-producer \
  --broker-list localhost:9092 \
  --property schema.registry.url=http://localhost:8081 \
  --topic topic1 \
  --property parse.key=true \
  --property key.schema='{"type":"record",
                          "name":"myrecord",
                          "fields":[{"name":"ID","type":"string"}]}' \
  --property key.separator=" " \
  --property value.schema='{"type":"record",
                            "name":"myrecord",
                            "fields":[{"name":"ID","type":"string"},
                                      {"name":"TYPE","type":"string"},
                                      {"name":"VAL","type":"string"}]}'
上のコマンドにより対話モードになるのでメッセージを入力する(1行ずつEnter)
{"ID":"1"} {"ID":"1","TYPE":"INS","VAL":"aa"}
{"ID":"2"} {"ID":"2","TYPE":"INS","VAL":"bb"}
{"ID":"3"} {"ID":"3","TYPE":"INS","VAL":"cc"}
{"ID":"2"} {"ID":"2","TYPE":"UPD","VAL":"bbb"}
{"ID":"3"} {"ID":"3","TYPE":"DEL","VAL":"cc"}
Topic1に書き込まれたメッセージの確認
ksql> print 'topic1' from beginning;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "TYPE": "INS", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "INS", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "INS", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "UPD", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "DEL", "VAL": "cc"}, partition: 0

2. Stream1を作成する(メッセージを加工する準備)

Stream1の作成
CREATE STREAM STREAM1
  WITH (KAFKA_TOPIC='topic1', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');

3. Stream2でTopic1のメッセージの一部をTombstoneレコードに加工する

Stream2の作成
CREATE STREAM STREAM2
  WITH (WRAP_SINGLE_VALUE=FALSE,
        KAFKA_TOPIC='topic2',
        KEY_FORMAT='AVRO',
        VALUE_FORMAT='AVRO')
  AS SELECT
    ROWKEY,
    CASE WHEN TYPE='DEL' 
      THEN CAST(NULL AS STRUCT<ID STRING, VAL STRING>)
      ELSE STRUCT(ID:=STREAM1.ID, VAL:=STREAM1.VAL)
    END
  FROM
    STREAM1
  EMIT CHANGES;
  • WITH句の「WRAP_SINGLE_VALUE」がポイントです。
    • TRUE(デフォルト):ksqlDB は列がレコード内の名前付き列としてシリアル化されていることを期待します。
    • FALSE:ksqlDB は列が匿名値としてシリアル化されていると想定します。名前のついた列ではなくJSON構造になるSTRUCTもしくはnullを抽出対象としてSELECTで指定できます。
  • SELECT句のCASE式もポイントです。
    • TYPEが'DEL'(削除)なら、バリュー部をnullにします。通常のRDBMSのSQLのようにCASE式の複数条件結果はどれも同じ型である必要があります。メッセージのバリューに複数の項目がある場合は上記のようにNULLをSTRUCT型でCASTしたものを抽出対象とします。
    • TYPEが'DEL'(削除)以外なら、メッセージのバリューに複数の項目がある場合は、上記のようにSTRUCTに値をセットした結果を抽出対象とします。
Topic2に書き込まれたメッセージの確認
ksql> print 'topic2' from beginning;
Key format: AVRO or KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: <null>, partition: 0

5本目のメッセージがTombstoneレコードになっていることを確認できました。

4. Topic2に対してJDBC Sink Connectorをアタッチして後続の連携先であるDBに更新を反映する

JDBC Sink Connectorの作成・起動
curl -X PUT localhost:8083/connectors/table1-connector/config -H "Content-Type: application/json" \--data '{
"name":"table1-connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"topic2",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:postgresql://localhost:5432/testdb1",
"connection.user":"******",
"connection.password":"*********",
"insert.mode":"upsert",
"auto.create":"true",
"delete.enabled":"true",
"table.name.format":"public.table1",
"pk.mode":"record_key",
"pk.fields":"ID",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor": "1"
}' | jq

JDBC Sink Connectorが正常に稼働している場合は、後続DBのテーブルにデータが反映されます。
以下は、今回の検証で採用しているPostgreSQLのDBを確認した結果です。

PostgreSQL
testdb1=# \d table1;
              テーブル "public.table1"
   |    | 照合順序 | Null 値を許容 | デフォルト 
-----+------+----------+---------------+------------
 ID  | text |          | not null      | 
 VAL | text |          |               | 
インデックス:
    "table1_pkey" PRIMARY KEY, btree ("ID")

testdb1=# select * from table1;
 ID | VAL 
----+-----
 1  | aa
 2  | bbb
(2 )

想定通りの結果となりました。
image.png

ksqlDBでメッセージを加工してTombstoneレコードに変換することができました。
ksqlDBであれば、プログラミング言語でアプリケーションを実装することなく、SQLライクな実装で簡単にこのようなことが実現できます。

よく参考にしているサイト:ksqlDB Reference

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?