Kafkaにおけるメッセージは、連携元で発生するイベントそのものであったり、CDCによってキャプチャーされたソーステーブルの変更内容を示すものとして利用されたりします。
ソーステーブルの変更内容として利用される場合、ソーステーブル側でのINSERT/UPDATE/DELETEがメッセージに体現されますが、DELETEのときのメッセージの内容はすこし特殊です。
メッセージの内容はCDC製品仕様に依存しますが、大体上記の通りになります。
ポイント:
- メッセージはキー部とバリュー部に分かれます。
- INSERT/UPDATEがあると、更新後のレコードの主キーをキー部に、全カラムをバリュー部にセットしたメッセージがKafkaに連携されます。
- DELETEがあると、削除されたレコードの主キーをキー部に、バリュー部はnullにしたメッセージがKafkaに連携されます。
DELETEによるメッセージはTombstoneレコードと呼ばれます。
Tombstoneレコードは、Kafkaのストリームにおけるテーブル(KTable)やDBのテーブルの、キーが同じレコードを削除する振る舞いをします。
Kafkaにメッセージを送るコンポーネントによっては、削除の振る舞いをさせるためにメッセージを加工してTombostoneレコードを自力で造成する必要が出てくることもあります。
海外サイトを中心に調べ、トライ&エラーを繰り返して辿り着いた方法ですこしマニアックな内容ですが、誰かの何かのためになれば幸いです。
やろうとしていること
- kafka-avro-console-producerでTopic1にメッセージを書き込みます。メッセージはキーを示す「ID」、更新区分を示す項目「TYPE」、値を示す「VAL」で構成します。
- Topic1に対するストリームStream1を作成し、加工処理を行うストリームから参照できるようにします。
- Stream2を作成しTopic1のメッセージの更新区分「TYPE」が'DEL'のメッセージをTombstoneレコードに変換します。変換後のメッセージはTopic2に書き込まれます。
- Topic2に対してJDBC Sink ConnectorをアタッチしてDB内のTable1が更新されるようにします。結果的に、Talbe1のレコードが上記図で示す通りになっていれば成功です。
1. kafka-avro-console-producerでTopic1にメッセージを書き込む
kafka-avro-console-producer \
--broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic topic1 \
--property parse.key=true \
--property key.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"ID","type":"string"}]}' \
--property key.separator=" " \
--property value.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"ID","type":"string"},
{"name":"TYPE","type":"string"},
{"name":"VAL","type":"string"}]}'
{"ID":"1"} {"ID":"1","TYPE":"INS","VAL":"aa"}
{"ID":"2"} {"ID":"2","TYPE":"INS","VAL":"bb"}
{"ID":"3"} {"ID":"3","TYPE":"INS","VAL":"cc"}
{"ID":"2"} {"ID":"2","TYPE":"UPD","VAL":"bbb"}
{"ID":"3"} {"ID":"3","TYPE":"DEL","VAL":"cc"}
ksql> print 'topic1' from beginning;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "TYPE": "INS", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "INS", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "INS", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "TYPE": "UPD", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: {"ID": "3", "TYPE": "DEL", "VAL": "cc"}, partition: 0
2. Stream1を作成する(メッセージを加工する準備)
CREATE STREAM STREAM1
WITH (KAFKA_TOPIC='topic1', KEY_FORMAT='AVRO', VALUE_FORMAT='AVRO');
3. Stream2でTopic1のメッセージの一部をTombstoneレコードに加工する
CREATE STREAM STREAM2
WITH (WRAP_SINGLE_VALUE=FALSE,
KAFKA_TOPIC='topic2',
KEY_FORMAT='AVRO',
VALUE_FORMAT='AVRO')
AS SELECT
ROWKEY,
CASE WHEN TYPE='DEL'
THEN CAST(NULL AS STRUCT<ID STRING, VAL STRING>)
ELSE STRUCT(ID:=STREAM1.ID, VAL:=STREAM1.VAL)
END
FROM
STREAM1
EMIT CHANGES;
- WITH句の「WRAP_SINGLE_VALUE」がポイントです。
- TRUE(デフォルト):ksqlDB は列がレコード内の名前付き列としてシリアル化されていることを期待します。
- FALSE:ksqlDB は列が匿名値としてシリアル化されていると想定します。名前のついた列ではなくJSON構造になるSTRUCTもしくはnullを抽出対象としてSELECTで指定できます。
- SELECT句のCASE式もポイントです。
- TYPEが'DEL'(削除)なら、バリュー部をnullにします。通常のRDBMSのSQLのようにCASE式の複数条件結果はどれも同じ型である必要があります。メッセージのバリューに複数の項目がある場合は上記のようにNULLをSTRUCT型でCASTしたものを抽出対象とします。
- TYPEが'DEL'(削除)以外なら、メッセージのバリューに複数の項目がある場合は、上記のようにSTRUCTに値をセットした結果を抽出対象とします。
ksql> print 'topic2' from beginning;
Key format: AVRO or KAFKA_BIGINT or KAFKA_DOUBLE or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/14 11:21:32.621 Z, key: {"ID": "1"}, value: {"ID": "1", "VAL": "aa"}, partition: 0
rowtime: 2023/08/14 11:21:38.634 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bb"}, partition: 0
rowtime: 2023/08/14 11:21:44.117 Z, key: {"ID": "3"}, value: {"ID": "3", "VAL": "cc"}, partition: 0
rowtime: 2023/08/14 11:21:51.773 Z, key: {"ID": "2"}, value: {"ID": "2", "VAL": "bbb"}, partition: 0
rowtime: 2023/08/14 11:21:56.328 Z, key: {"ID": "3"}, value: <null>, partition: 0
5本目のメッセージがTombstoneレコードになっていることを確認できました。
4. Topic2に対してJDBC Sink Connectorをアタッチして後続の連携先であるDBに更新を反映する
curl -X PUT localhost:8083/connectors/table1-connector/config -H "Content-Type: application/json" \--data '{
"name":"table1-connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics":"topic2",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:postgresql://localhost:5432/testdb1",
"connection.user":"******",
"connection.password":"*********",
"insert.mode":"upsert",
"auto.create":"true",
"delete.enabled":"true",
"table.name.format":"public.table1",
"pk.mode":"record_key",
"pk.fields":"ID",
"confluent.topic.bootstrap.servers":"localhost:9092",
"confluent.topic.replication.factor": "1"
}' | jq
JDBC Sink Connectorが正常に稼働している場合は、後続DBのテーブルにデータが反映されます。
以下は、今回の検証で採用しているPostgreSQLのDBを確認した結果です。
testdb1=# \d table1;
テーブル "public.table1"
列 | 型 | 照合順序 | Null 値を許容 | デフォルト
-----+------+----------+---------------+------------
ID | text | | not null |
VAL | text | | |
インデックス:
"table1_pkey" PRIMARY KEY, btree ("ID")
testdb1=# select * from table1;
ID | VAL
----+-----
1 | aa
2 | bbb
(2 行)
ksqlDBでメッセージを加工してTombstoneレコードに変換することができました。
ksqlDBであれば、プログラミング言語でアプリケーションを実装することなく、SQLライクな実装で簡単にこのようなことが実現できます。
よく参考にしているサイト:ksqlDB Reference