はじめに
にて kafka-upstreamの試用をしました
今回はkafka-consumeプラグインを使ってみます。
セットアップ
前回記事のセットアップも参照ください
terraform資材は以下にあります(kafka-upstreamと共用)
https://github.com/gnt0608/kong_kafka
Route設定はこう
resource "konnect_gateway_route" "kafka_consume" {
control_plane_id = resource.konnect_gateway_control_plane.control_plane.id
name = "KafkaConsume"
paths = ["/consumer"]
protocols = ["http", "https"]
}
plugin設定はこう
resource "konnect_gateway_plugin_kafka_consume" "kafka_consume" {
config = {
bootstrap_servers = [
{
host = "kafka"
port = 29092
},
]
timeout = 10000
topics = [{name = "test-topic"}]
auto_offset_reset = "latest"
commit_strategy = "auto"
mode = "http-get"
}
control_plane_id = resource.konnect_gateway_control_plane.control_plane.id
protocols = ["http", "https"]
route = {
id = resource.konnect_gateway_route.kafka_consume.id
}
}
今回は /consumer を経路とし、 http-get のmodeを試します
websocket などにも対応しているようです
WebSocketの動作は今度やってみようと思います
取得してみる
$ curl localhost:8000/consumer
{"test-topic":{"partitions":{"0":{"aborted_transactions":{},"errcode":0,"high_watermark":6,"last_stable_offset":6,"records":{}},"1":{"aborted_transactions":{},"errcode":0,"high_watermark":7,"last_stable_offset":7,"records":{}},"2":{"aborted_transactions":{},"errcode":0,"high_watermark":1,"last_stable_offset":1,"records":{}}}}}
現時点で recordは 0 です
$ curl -X POST localhost:8000/ -d "test data"
{"message":"message sent"}
一度投げてから
$ curl localhost:8000/consumer
{"test-topic":{"partitions":{"0":{"aborted_transactions":{},"errcode":0,"high_watermark":6,"last_stable_offset":6,"records":{}},"1":{"aborted_transactions":{},"errcode":0,"high_watermark":7,"last_stable_offset":7,"records":{}},"2":{"aborted_transactions":{},"errcode":0,"high_watermark":2,"last_stable_offset":2,"records":[{"timestamp":1766129175000,"value":"{\"body_args\":{\"test data\":true},\"body\":\"test data\"}","offset":1,"key":""}]}}}}
取得すると partition: 2 にあるデータが取得できました。
$ curl localhost:8000/consumer
{"test-topic":{"partitions":{"0":{"aborted_transactions":{},"errcode":0,"high_watermark":6,"last_stable_offset":6,"records":{}},"1":{"aborted_transactions":{},"errcode":0,"high_watermark":7,"last_stable_offset":7,"records":{}},"2":{"aborted_transactions":{},"errcode":0,"high_watermark":2,"last_stable_offset":2,"records":{}}}}}
もう一回実行すると
recordはなくなりました
設定値について
kafka-consume pluginで設定できるもののうち おそらく以下が特に重要な設定値になるのでここも記録しておきます
auto_offset_reset = "latest"
commit_strategy = "auto"
mode = "http-get"
| 項目 | 設定可能値 | 備考 |
|---|---|---|
| auto_offset_reset | latest, earliest | consumer groupの初期設定時にoffsetのどこから始めるか |
| commit_strategy | auto, off | recordを取得した際のcommit挙動 |
| mode | http-get, websocket, server-sent-events | kafka-consumeの実行方法 |
auto_offset_reset
ここは設計で定める領域になりますが
新しく consumer-group を設定して取得をした場合にoffsetの最初とするか最後とするかになります
latest を設定した場合、取得時までのすべてのレコードが読み飛ばされます
earliest を設定した場合、取得時点で保持しているすべてのレコードが連携されます
(そもそもこのPluginでどうやってconsumer-groupを定めるんだ? わかったら追記します)
(2025/12/22 追記)
どうやらPluginのIDを使って一意になるように設定しているらしいので調べてみました
$ /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:29092 --list
com.konghq.kafka.229de9a158a80519c0339defd7333747
com.konghq.kafka.(plugin_id) の形式になってました
正確には Kong上 IDはUUIDで表現されているので、 UUIDから-を取り除いた形になります
commit_strategy
topicを取得した時点で offsetを進めるか、明示的に進めるかの設定になります
auto を設定した場合、取得時点でoffsetが進みます
off を設定した場合、commitを明示的に行うまではoffsetは進みません
業務アプリなどでは off のほうが好ましいものと思います
アプリケーションが落ちた場合など、autoの場合には再取得が難しくなります
(が、どうやってcommitするんだ? わかったら追記します)
(2025/12/22 追記)
現状は HTTP Requestとしてcommitするすべはないらしいです。
今後の拡張に期待します
mode
consumeの実行方法を定める設定になります
http-get を設定した場合、 HTTP , HTTPS で取得します。
server-sent-events を設定した場合、 Server Side Events(SSE) 形式で取得します。
websocket を設定した場合、 ws , wss で取得します。
(そのまますぎる)
websocketが一番利用としては便利そうではありますがここは使い分けかなと思います
http-get は Clientが能動的に取得したいケース
websocket は Clientが受動的に取得したいケース
server-sent-events は 受動的に取得したいけど、WebSocketまではやりたくない
ときで使い分けるのがよさそうです