0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Kafkaのメッセージにキーを付ける方法(コネクター・SMT編)

Posted at

Kafkaのメッセージはヘッダー、キー、バリューで構成されています。
キーは、メッセージを一意に識別したい場合、Topic内でキー単位で最新のメッセージを有効にしたい場合、Topicのメッセージを元にDB内のレコードをUPDATE・DELETEしたい場合などに必要になるので重要です。

ソースとなるサービスやコネクターがメッセージのキーを扱わないものの場合、何もしなければKafkaにキーのないメッセージが送られてくることになります。
以下では、コネクターのSMT(Single Message Transforms)を使用してメッセージにキーを付与する方法を示しています。
データベースとしてDb2を、コネクターはConfluentのJDBC Source Connectorを使用した例です。

キーがない場合の例1

JDBC Source Connectorは、主キーがあるテーブルから取得したデータをキーがないメッセージとしてTopicに書き込みます。

ソースとなるテーブルの構成(IDが主キー)
$ db2 describe table sample_txt

                                データ・タイ データ・         列の       スケ
列名                            プ・スキーマ タイプ名         長さ       ール  NULL
------------------------------- --------- ------------------- ---------- ----- ------
ID                              SYSIBM    INTEGER                      4     0 いいえ
TEXT1                           SYSIBM    VARCHAR                      5     0 はい  
TEXT2                           SYSIBM    VARCHAR                     10     0 はい  
TEXT3                           SYSIBM    VARCHAR                     15     0 はい 
ソースとなるテーブルのデータ(IDが主キー)
$ db2 "select * from sample_txt"

ID          TEXT1 TEXT2      TEXT3          
----------- ----- ---------- ---------------
          1 a     b          c              
          2 aa    bb         cc             
          3 aaa   bbb        ccc  
JDBC Source Connectorの作成コマンド(IDが主キー)
curl -sX PUT http://localhost:8083/connectors/sample_txt_connector/config \
-H "Content-Type: application/json" \
--data '{
"name":"sample_txt_connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:db2://xxxxx:50000/dbname",
"connection.user":"******",
"connection.password":"*********",
"mode":"bulk",
"query":"select * from schema1.sample_txt",
"poll.interval.ms":"1000000",
"topic.prefix":"sample_txt_topic"
}' | jq
Topicに書き込まれたメッセージ(IDが主キー、だがメッセージにキーがない)
ksql> print 'sample_txt_topic' from beginning;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/13 20:50:40.163 Z, key: <null>, value: {"ID": 1, "TEXT1": "a", "TEXT2": "b", "TEXT3": "c"}, partition: 0
rowtime: 2023/08/13 20:50:40.164 Z, key: <null>, value: {"ID": 2, "TEXT1": "aa", "TEXT2": "bb", "TEXT3": "cc"}, partition: 0
rowtime: 2023/08/13 20:50:40.164 Z, key: <null>, value: {"ID": 3, "TEXT1": "aaa", "TEXT2": "bbb", "TEXT3": "ccc"}, partition: 0

JDBC Source Connectorの対象とするテーブルに主キーがあっても、取得したデータをもとにTopicに書き込むメッセージにはキーがないことがわかります。

キーがない場合の例2

JDBC Source Connectorは、主キーがないテーブルから取得したデータをキーがないメッセージとしてTopicに書き込みます。

ソースとなるテーブルの構成(主キーなし)
$ db2 describe table sample_txt_nokey

                                データ・タイ データ・         列の       スケ
列名                            プ・スキーマ タイプ名         長さ       ール  NULL
------------------------------- --------- ------------------- ---------- ----- ------
TEXT1                           SYSIBM    VARCHAR                      5     0 はい  
TEXT2                           SYSIBM    VARCHAR                     10     0 はい  
TEXT3                           SYSIBM    VARCHAR                     15     0 はい  
ソースとなるテーブルのデータ(主キーなし)
db2 "select * from sample_txt_nokey"

TEXT1 TEXT2      TEXT3          
----- ---------- ---------------
x     y          z              
xx    yy         zz             
xxx   yyy        zzz    
JDBC Source Connectorの作成コマンド(主キーなし)
curl -sX PUT http://localhost:8083/connectors/sample_txt_nokey_connector/config \
-H "Content-Type: application/json" \
--data '{
"name":"sample_txt_nokey_connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:db2://xxxxx:50000/dbname",
"connection.user":"******",
"connection.password":"*********",
"mode":"bulk",
"query":"select * from schema1.sample_txt_nokey",
"poll.interval.ms":"1000000",
"topic.prefix":"sample_txt_nokey_topic"
}' | jq

メッセージにキーがない場合、key.converterやkey.converter.schema.registry.urlを指定してもエラーはなりませんが使用されることもありません。

Topicに書き込まれたメッセー(キーなし)
ksql> print 'sample_txt_nokey_topic' from beginning;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/13 21:01:33.682 Z, key: <null>, value: {"TEXT1": "x", "TEXT2": "y", "TEXT3": "z"}, partition: 0
rowtime: 2023/08/13 21:01:33.683 Z, key: <null>, value: {"TEXT1": "xx", "TEXT2": "yy", "TEXT3": "zz"}, partition: 0
rowtime: 2023/08/13 21:01:33.683 Z, key: <null>, value: {"TEXT1": "xxx", "TEXT2": "yyy", "TEXT3": "zzz"}, partition: 0

メッセージにキーがないことがわかります。

メッセージにキーを付与する例

SMT(Single Message Transforms)を使えば簡単にコネクターを通過するメッセージを加工することができます。ここではメッセージにキーを付与するValueToKeyを使用します。

JDBC Source Connectorの作成コマンド(SMTでキーを追加)
curl -sX PUT http://localhost:8083/connectors/sample_txt_addkey_connector/config \
-H "Content-Type: application/json" \
--data '{
"name":"sample_txt_addkey_connector",
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max":"1",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://localhost:8081",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://localhost:8081",
"connection.url":"jdbc:db2://vm1:50000/dbname",
"connection.user":"******",
"connection.password":"*********",
"mode":"bulk",
"query":"select * from schema1.sample_txt_nokey",
"poll.interval.ms":"1000000",
"topic.prefix":"sample_txt_addkey_topic",
"transforms":"ValueToKey",
"transforms.ValueToKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ValueToKey.fields":"TEXT1"
}' | jq

transformsで始まる3行がSMTに該当するプロパティです。これを追加するだけでメッセージにキーが追加できます。
この例ではTEXT1という項目をキーに設定しています。

Topicに書き込まれたメッセージ(SMTでキーを追加)
ksql> print 'sample_txt_addkey_topic' from beginning;
Key format: AVRO or KAFKA_STRING
Value format: AVRO or KAFKA_STRING
rowtime: 2023/08/13 21:19:52.400 Z, key: {"TEXT1": "x"}, value: {"TEXT1": "x", "TEXT2": "y", "TEXT3": "z"}, partition: 0
rowtime: 2023/08/13 21:19:52.403 Z, key: {"TEXT1": "xx"}, value: {"TEXT1": "xx", "TEXT2": "yy", "TEXT3": "zz"}, partition: 0
rowtime: 2023/08/13 21:19:52.403 Z, key: {"TEXT1": "xxx"}, value: {"TEXT1": "xxx", "TEXT2": "yyy", "TEXT3": "zzz"}, partition: 0

これでメッセージにキーが追加されました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?