3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kong API Gateway を経由して Kafka のデータを取得する

Last updated at Posted at 2025-12-19

はじめに

にて kafka-upstreamの試用をしました

今回はkafka-consumeプラグインを使ってみます。

セットアップ

前回記事のセットアップも参照ください

terraform資材は以下にあります(kafka-upstreamと共用)
https://github.com/gnt0608/kong_kafka

Route設定はこう

route.tf
resource "konnect_gateway_route" "kafka_consume" {
  control_plane_id           = resource.konnect_gateway_control_plane.control_plane.id
  name                       = "KafkaConsume"
  paths                      = ["/consumer"]
  protocols                  = ["http", "https"]
}

plugin設定はこう

plugin.tf
resource "konnect_gateway_plugin_kafka_consume" "kafka_consume" {
  config = {
    bootstrap_servers = [
      {
        host = "kafka"
        port = 29092
      },
    ]
    timeout           = 10000
    topics             = [{name = "test-topic"}]
    auto_offset_reset = "latest"
    commit_strategy   = "auto"
    mode              = "http-get"
  }
  control_plane_id = resource.konnect_gateway_control_plane.control_plane.id
  protocols        = ["http", "https"]
  route = {
    id = resource.konnect_gateway_route.kafka_consume.id
  }
}

今回は /consumer を経路とし、 http-get のmodeを試します
websocket などにも対応しているようです
WebSocketの動作は今度やってみようと思います

取得してみる

$ curl localhost:8000/consumer
{"test-topic":{"partitions":{"0":{"aborted_transactions":{},"errcode":0,"high_watermark":6,"last_stable_offset":6,"records":{}},"1":{"aborted_transactions":{},"errcode":0,"high_watermark":7,"last_stable_offset":7,"records":{}},"2":{"aborted_transactions":{},"errcode":0,"high_watermark":1,"last_stable_offset":1,"records":{}}}}}

現時点で recordは 0 です

$ curl -X POST localhost:8000/ -d "test data"
{"message":"message sent"}

一度投げてから

$ curl localhost:8000/consumer
{"test-topic":{"partitions":{"0":{"aborted_transactions":{},"errcode":0,"high_watermark":6,"last_stable_offset":6,"records":{}},"1":{"aborted_transactions":{},"errcode":0,"high_watermark":7,"last_stable_offset":7,"records":{}},"2":{"aborted_transactions":{},"errcode":0,"high_watermark":2,"last_stable_offset":2,"records":[{"timestamp":1766129175000,"value":"{\"body_args\":{\"test data\":true},\"body\":\"test data\"}","offset":1,"key":""}]}}}}

取得すると partition: 2 にあるデータが取得できました。

$ curl localhost:8000/consumer
{"test-topic":{"partitions":{"0":{"aborted_transactions":{},"errcode":0,"high_watermark":6,"last_stable_offset":6,"records":{}},"1":{"aborted_transactions":{},"errcode":0,"high_watermark":7,"last_stable_offset":7,"records":{}},"2":{"aborted_transactions":{},"errcode":0,"high_watermark":2,"last_stable_offset":2,"records":{}}}}}

もう一回実行すると
recordはなくなりました

設定値について

kafka-consume pluginで設定できるもののうち おそらく以下が特に重要な設定値になるのでここも記録しておきます

auto_offset_reset = "latest"
commit_strategy   = "auto"
mode              = "http-get"
項目 設定可能値 備考
auto_offset_reset latest, earliest consumer groupの初期設定時にoffsetのどこから始めるか
commit_strategy auto, off recordを取得した際のcommit挙動
mode http-get, websocket, server-sent-events kafka-consumeの実行方法

auto_offset_reset

ここは設計で定める領域になりますが
新しく consumer-group を設定して取得をした場合にoffsetの最初とするか最後とするかになります
latest を設定した場合、取得時までのすべてのレコードが読み飛ばされます
earliest を設定した場合、取得時点で保持しているすべてのレコードが連携されます
(そもそもこのPluginでどうやってconsumer-groupを定めるんだ? わかったら追記します)

(2025/12/22 追記)
どうやらPluginのIDを使って一意になるように設定しているらしいので調べてみました

$ /opt/kafka/bin/kafka-consumer-groups.sh --bootstrap-server kafka:29092 --list
com.konghq.kafka.229de9a158a80519c0339defd7333747

com.konghq.kafka.(plugin_id) の形式になってました
正確には Kong上 IDはUUIDで表現されているので、 UUIDから-を取り除いた形になります

commit_strategy

topicを取得した時点で offsetを進めるか、明示的に進めるかの設定になります
auto を設定した場合、取得時点でoffsetが進みます
off を設定した場合、commitを明示的に行うまではoffsetは進みません
業務アプリなどでは off のほうが好ましいものと思います
アプリケーションが落ちた場合など、autoの場合には再取得が難しくなります
(が、どうやってcommitするんだ? わかったら追記します)

(2025/12/22 追記)
現状は HTTP Requestとしてcommitするすべはないらしいです。
今後の拡張に期待します

mode

consumeの実行方法を定める設定になります
http-get を設定した場合、 HTTP , HTTPS で取得します。
server-sent-events を設定した場合、 Server Side Events(SSE) 形式で取得します。
websocket を設定した場合、 ws , wss で取得します。
(そのまますぎる)

websocketが一番利用としては便利そうではありますがここは使い分けかなと思います

http-get は Clientが能動的に取得したいケース
websocket は Clientが受動的に取得したいケース
server-sent-events は 受動的に取得したいけど、WebSocketまではやりたくない
ときで使い分けるのがよさそうです

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?