技術検証や調査の過程などで、とにかくクイックにKafkaのTopicにAvro形式のメッセージを送りたいという場面で役にたつサンプルコマンドです。
Avro形式のメッセージ(キーあり)をTopicに書き込む
コマンド
kafka-avro-console-producer \
--broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic sample-topic \
--property parse.key=true \
--property key.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"}]}' \
--property key.separator=" " \
--property value.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"},
{"name":"col2","type":"string"},
{"name":"col3","type":"string"}]}'
上記コマンド実行後、ターミナルで入力を求められるので以下のメッセージを入力してメッセージをKafkaのTopicに送る。※1メッセージ毎にEnter(改行)
{"col1":"a1"} {"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"} {"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"} {"col1":"a3","col2":"aaa","col3":"test3"}
入力が終わったらCtrl+Cで終了。
(ワンライナーでやる場合)
コマンド
echo '{"col1":"a4"} {"col1":"a4","col2":"aaaa","col3":"test4"}' | \
kafka-avro-console-producer \
--broker-list localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--topic sample-topic \
--property parse.key=true \
--property key.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"}]}' \
--property key.separator=" " \
--property value.schema='{"type":"record",
"name":"myrecord",
"fields":[{"name":"col1","type":"string"},
{"name":"col2","type":"string"},
{"name":"col3","type":"string"}]}'
ループさせて断続的にメッセージを書き込んだりしたい場合はこれを参考に組み立てる。
Avro形式のメッセージ(キーあり)をTopicから読む
コマンド
kafka-avro-console-consumer \
--from-beginning --topic sample-topic \
--bootstrap-server localhost:9092 \
--property print.key=true \
--property schema.registry.url=http://localhost:8081
出力例
{"col1":"a1"} {"col1":"a1","col2":"a","col3":"test1"}
{"col1":"a2"} {"col1":"a2","col2":"aa","col3":"test2"}
{"col1":"a3"} {"col1":"a3","col2":"aaa","col3":"test3"}
{"col1":"a4"} {"col1":"a4","col2":"aaaa","col3":"test4"}
Ctrl+Cで終了。
Topicに書き込まれたメッセージのスキーマを確認してみよう
Confluentのスキーマレジストリを使用。
サブジェクトの確認
コマンド
curl -sX GET http://localhost:8081/subjects | jq . | grep sample-topic
出力例
"sample-topic-key",
"sample-topic-value",
キーのスキーマを確認
コマンド
curl -sX GET http://localhost:8081/subjects/sample-topic-key/versions/latest | jq .schema -r | jq
出力例
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "col1",
"type": "string"
}
]
}
バリューのスキーマを確認
コマンド
curl -sX GET http://localhost:8081/subjects/sample-topic-value/versions/latest | jq .schema -r | jq
出力例
{
"type": "record",
"name": "myrecord",
"fields": [
{
"name": "col1",
"type": "string"
},
{
"name": "col2",
"type": "string"
},
{
"name": "col3",
"type": "string"
}
]
}