Kafkaのメッセージはヘッダー、キー、バリューで構成されています。
キーは、メッセージを一意に識別したい場合、Topic内でキー単位で最新のメッセージを有効にしたい場合、Topicのメッセージを元にDB内のレコードをUPDATE・DELETEしたい場合などに必要になるので重要です。
ソースとなるサービスやコネクターがメッセージのキーを扱わないものの場合、何もしなければKafkaにキーのないメッセージが送られてくることになります。
以下では、コネクターのSMT(Single Message Transforms)を使用してメッセージにキーを付与する方法を示しています。
データベースとしてDb2を、コネクターはConfluentのJDBC Source Connectorを使用した例です。
キーがない場合の例1
JDBC Source Connectorは、主キーがあるテーブルから取得したデータをキーがないメッセージとしてTopicに書き込みます。
$ db2 describe table sample_txt
データ・タイ データ・ 列の スケ
列名 プ・スキーマ タイプ名 長さ ール NULL
------------------------------- --------- ------------------- ---------- ----- ------
ID SYSIBM INTEGER 4 0 いいえ
TEXT1 SYSIBM VARCHAR 5 0 はい
TEXT2 SYSIBM VARCHAR 10 0 はい
TEXT3 SYSIBM VARCHAR 15 0 はい
$ db2 "select * from sample_txt"
ID TEXT1 TEXT2 TEXT3
----------- ----- ---------- ---------------
1 a b c
2 aa bb cc
3 aaa bbb ccc
curl -sX PUT http://localhost:8083/connectors/sample_txt_connector/config \
-H "Content-Type: application/json" \
--data '{
"name":"sample_txt_connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:db2://xxxxx:50000/dbname",
"connection.user":"******",
"connection.password":"*********",
"mode":"bulk",
"query":"select * from schema1.sample_txt",
"poll.interval.ms":"1000000",
"topic.prefix":"sample_txt_topic"
}' | jq
ksql> print 'sample_txt_topic' from beginning;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/13 20:50:40.163 Z, key: <null>, value: {"ID": 1, "TEXT1": "a", "TEXT2": "b", "TEXT3": "c"}, partition: 0
rowtime: 2023/08/13 20:50:40.164 Z, key: <null>, value: {"ID": 2, "TEXT1": "aa", "TEXT2": "bb", "TEXT3": "cc"}, partition: 0
rowtime: 2023/08/13 20:50:40.164 Z, key: <null>, value: {"ID": 3, "TEXT1": "aaa", "TEXT2": "bbb", "TEXT3": "ccc"}, partition: 0
JDBC Source Connectorの対象とするテーブルに主キーがあっても、取得したデータをもとにTopicに書き込むメッセージにはキーがないことがわかります。
キーがない場合の例2
JDBC Source Connectorは、主キーがないテーブルから取得したデータをキーがないメッセージとしてTopicに書き込みます。
$ db2 describe table sample_txt_nokey
データ・タイ データ・ 列の スケ
列名 プ・スキーマ タイプ名 長さ ール NULL
------------------------------- --------- ------------------- ---------- ----- ------
TEXT1 SYSIBM VARCHAR 5 0 はい
TEXT2 SYSIBM VARCHAR 10 0 はい
TEXT3 SYSIBM VARCHAR 15 0 はい
db2 "select * from sample_txt_nokey"
TEXT1 TEXT2 TEXT3
----- ---------- ---------------
x y z
xx yy zz
xxx yyy zzz
curl -sX PUT http://localhost:8083/connectors/sample_txt_nokey_connector/config \
-H "Content-Type: application/json" \
--data '{
"name":"sample_txt_nokey_connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:db2://xxxxx:50000/dbname",
"connection.user":"******",
"connection.password":"*********",
"mode":"bulk",
"query":"select * from schema1.sample_txt_nokey",
"poll.interval.ms":"1000000",
"topic.prefix":"sample_txt_nokey_topic"
}' | jq
メッセージにキーがない場合、key.converterやkey.converter.schema.registry.urlを指定してもエラーはなりませんが使用されることもありません。
ksql> print 'sample_txt_nokey_topic' from beginning;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/13 21:01:33.682 Z, key: <null>, value: {"TEXT1": "x", "TEXT2": "y", "TEXT3": "z"}, partition: 0
rowtime: 2023/08/13 21:01:33.683 Z, key: <null>, value: {"TEXT1": "xx", "TEXT2": "yy", "TEXT3": "zz"}, partition: 0
rowtime: 2023/08/13 21:01:33.683 Z, key: <null>, value: {"TEXT1": "xxx", "TEXT2": "yyy", "TEXT3": "zzz"}, partition: 0
メッセージにキーがないことがわかります。
メッセージにキーを付与する例
SMT(Single Message Transforms)を使えば簡単にコネクターを通過するメッセージを加工することができます。ここではメッセージにキーを付与するValueToKeyを使用します。
curl -sX PUT http://localhost:8083/connectors/sample_txt_addkey_connector/config \
-H "Content-Type: application/json" \
--data '{
"name":"sample_txt_addkey_connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:db2://vm1:50000/dbname",
"connection.user":"******",
"connection.password":"*********",
"mode":"bulk",
"query":"select * from schema1.sample_txt_nokey",
"poll.interval.ms":"1000000",
"topic.prefix":"sample_txt_addkey_topic",
"transforms":"ValueToKey",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields":"TEXT1"
}' | jq
transformsで始まる3行がSMTに該当するプロパティです。これを追加するだけでメッセージにキーが追加できます。
この例ではTEXT1という項目をキーに設定しています。
ksql> print 'sample_txt_addkey_topic' from beginning;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/13 21:19:52.400 Z, key: {"TEXT1": "x"}, value: {"TEXT1": "x", "TEXT2": "y", "TEXT3": "z"}, partition: 0
rowtime: 2023/08/13 21:19:52.403 Z, key: {"TEXT1": "xx"}, value: {"TEXT1": "xx", "TEXT2": "yy", "TEXT3": "zz"}, partition: 0
rowtime: 2023/08/13 21:19:52.403 Z, key: {"TEXT1": "xxx"}, value: {"TEXT1": "xxx", "TEXT2": "yyy", "TEXT3": "zzz"}, partition: 0
これでメッセージにキーが追加されました。