概要
Apache Kafka (Confluent Platform環境) にて、簡単なトピック作成からメッセージの送受信までの方法について記載します。
初心者向けに書いているつもりです。
前提環境
前に投稿しましたDockerでConfluent(Kafka)の開発環境を簡単構築で作成したConfluent 開発環境上で実行するように記載しています。
ファイルやコマンドは全てDockerホスト上のbashから行うように記載しています。
Confluentについては、Confluent Platfromの魅力について書いてみたでも解説しています。
構成・用語の解説
- メッセージとは
Kafkaで交換されるデータのレコードの事です。
メッセージの中身に制限は特になく、テキストだけではなく画像ファイルのデータなどバイナリデータも入れることができ
その内容に制限はありません。
1メッセージの大きさは、デフォルトでは最大1MBになっていますが、拡張することができるのでサイズの大きな動画などでも
入れることができますし。
JSONやAVROなどを利用することで、1つのメッセージに住所+名前といった複数項目の情報を入れることができます。
※ Kafkaではキー(key)値(value)といったフィールドが用意されておりどちらもバイナリデータを自由に入れることができます。
難しくなるので今回はキー/値の区別せず記述しています。
- ブローカーとは
メッセージを仲介するサーバの事です。受けたメッセージを一時的に保存したり、保存されているメッセージを取り出したりすることができます。
※ 本番環境では3台以上でクラスターを組んで利用するのが一般的です。今回の環境では1台構成で動作させています。
- プロヂューサーとは
メッセージをブローカーに送るプログラムを指します。
- コンシューマーとは
メッセージをプローカーから受け取るプログラムを指します。
- トピックとは
ブローカー(クラスタ内)にあるメッセージを入れるキューの事です。
1つのクラスタ内に複数のトピックを作成することができ、トピックはトピック名で識別して利用します。
※ 1つのプログラム内に、コンシューマーとプロヂューサーの機能の両方を持たせて、メッセージを受信=>処理=>結果を別のメッセージ送信することができます。この場合、受信トピックと送信トピックを分ける必要があります。
- REST-PROXYについて
Confluent社が作っているPROXYサーバで、JSON形式のREST-APIを利用してプロヂューサーとコンシューマー及びトピックのコンフィグレーションを行えるようにしたものです。
REST-APIがほぼ全てのプログラミング言語から利用できるので、実質プログラム言語を選ばずに利用可能にするサーバです。
REST-PROXYの利用方法
3つのHTTPメソッドを利用してAPIを利用します。
・GETメソッド 情報を取得する
・POSTメソッド 情報を送り、送った情報の結果を取得する
・DELETEメソッド コンテンツを消す
メソッドとお合わせてHTTPのAcceptヘッダを付けてアクセスします。
Accept: application/vnd.kafka.v2+json
※ Acceptを付けないと正しく動作しないことがあります。
本書は、シェル上から以下の2つのコマンドを使って操作を行います。また、POSTメソッドの送信内容をわかりやすくするためファイルに中身を記載する方法をとっています。
- curlコマンド REST-APIへのアクセス用
- jqコマンド Jsonの結果をわかりやすく
メッセージ送信方法(プロデューサ)
今回は トピック名: demo.topic1 に対して、任意の文字列を送ります。
送信するRESTの中身を作成します。
{
"records": [
{
"value": "初めてのKafka!!"
}
]
}
送信する内容はJSON形式で記載します。recordsとなっている部分にメッセージを配列で記載します。今回は1メッセージのため配列は1つです。その中身に、valueに本文の文字列を入れます。
※ valueの中身はjson形式で記載ができますが、今回はわかりやすく単純な文字列のみを入れています。
作成した中身をcurlコマンドから送信します。
$ curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" \
-H "Accept: application/vnd.kafka.v2+json" \
-d @demo.topic1.msg0.json "http://localhost:8082/topics/demo.topic1" | jq
Content-Typeはjson形式で送信する場合の定型文です。つけない場合はエラーになります。
宛先は REST-PROXYサーバの8082ポートにHTTPで送信します。パスのtopicsはプロデューサとしてメッセージを送る場合の固定文字列でそのあとの'demo.topic1'がトピック名です。トピックが無い場合は自動的にトピックが作られます(デフォルトの設定)
正しくメッセージを送信すると以下のレスポンスが返ってきます。
{
"offsets": [
{
"partition": 0,
"offset": 0,
"error_code": null,
"error": null
}
],
"key_schema_id": null,
"value_schema_id": null
}
partition: パーティション番号 今回は必ず0になります。パーティション機能については別の機会に記事を書く予定ですので今回は省略します。
offset: Kafkaブローカーに保存されたメッセージの番号です。0から始まり1,2,3...と自動的に採番されます。
error_code: エラーがあった場合のエラーコード番号、無い場合はnullを返します。
error: エラーがあった場合のエラーメッセージ、無い場合はnullを返します。
key_schema_id: avroという機能を使った場合にID番号が表示されます。今回は利用していないためnullになります。
value_schema_id: avroという機能を使った場合にID番号が表示されます。今回は利用していないためnullになります。※ avroに関しても別の機会に記事を書く予定です。
コンシューマ(受信側プログラム)
送ったメッセージをコンシューマから取り出します。
コンシューマーは少し複雑で、まずはメッセージを取り出すグループとインスタンスを作ります。そのあとで、インスタンスに対応するメッセージを取得します。
※ グループの考え方は後述に記載します。
※ インスタンスはパーティションと関連がありますので別の機会に記事を書く予定です。今回は任意のインスタンスを作ってから受信すると覚えてください。
グループとインスタンスの初期化
POST内容のファイルを作ります。
{
"name": "instance0",
"format": "json",
"auto.offset.reset": "earliest"
}
name: インスタンス名を文字列で指定します。
format: 出力する形式を選びます。今回はjsonで保存したため同じjsonを選びます。ほかにはavro
binary
が選べます。
auto.offset.reset: コンシューマグループを初めて作成した時に読みこむメッセージの場所を指定します。earliestを選んだ場合は一番初めのメッセージから読み込まれます。latestを選んだ場合は、トピックの内部に存在するメッセージは新たに受信するメッセージから表示します。
作成したJSONの内容をREST PROXYへPOSTしてインスタンスを初期化します。
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" \
--data @demo.topic1.consumer_init.json \
http://localhost:8082/consumers/cgroup1 | jq
パスのconsumersはコンシューマを利用する場合に利用する固定文字列です。
cgroup1はコンシューマーグループ名です。任意の文字列として利用します。
成功すると、以下のような結果が表示されます。
{
"instance_id": "instance0",
"base_uri": "http://rest-proxy:8082/consumers/cgroup1/instances/instance0"
}
instance_id: インスタンス名を表示します
base_uri: インスタンスへアクセスを行うためのURIを表示します。
(注意)ホスト名がrest-proxy
になっています。Dockerホストから利用する場合は、localhost
へ書き換えて利用します。
トピックの選択
POST内容のファイルを作ります。
{
"topics": ["topic1"]
}
topics: メッセージを受信したいトピック名一覧を配列で記載します。
作成したJSONの内容をREST PROXYへPOSTしてインスタンスを初期化します。
$ curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data @demo.topic1.consumer.settopic.json \
http://localhost:8082/consumers/cgroup1/instances/instance0/subscription -i
コンテンツが出力されないため、確認のためにHTTPのレスポンスヘッダを表示するオプション-i
を追加しています。
パスのinstancesはインスタンス関連のアクションをする場合の固定文字列です。
instance0は、インスタンス名で任意の名前を付けます。
subscriptionは、トピックとの関連付けを行うときに利用します。追加(POST)、表示(GET)、削除(DELETE)ができます。
応答の確認
HTTP/1.1 204 No Content
Date: Wed, 16 Oct 2019 06:52:07 GMT
Server: Jetty(9.4.18.v20190429)
コマンドが成功すると204で応答します。
※ 選択済みのトピック一覧はhttp://localhost:8082/consumers/cgroup1/instances/instance0/subscription
をGETすることで確認できます。
メッセージの読み取り
GETでメッセージを取得します。
curl -X GET -H "Accept: application/vnd.kafka.json.v2+json" \
http://localhost:8082/consumers/cgroup1/instances/instance0/records | jq
パスのrecordsはレコードを読み取るときに利用する固定文字列です。
応答確認
[
{
"topic": "topic1",
"key": null,
"value": "初めてのKafka!!",
"partition": 0,
"offset": 0
}
]
topic: メッセージのトピック名
key: キーの内容(今回は利用していないためnull)
value: メッセージの値の内容
partition: パーティション番号
offset: オフセット番号
今回の手順では1つのメッセージしかないため1メッセージの表示ですが、
複数のメッセージが存在する場合は、複数のメッセージ分だけ表示されます。
インスタンスの削除
利用し終わったインスタンスをDELETEメソッドで削除します。
プログラムの終了時などインスタンスは必ず削除しましょう。削除しないと、メッセージを正しく取り出せなかったり正常にオフセットを情報を保存できずに同じレコードを何度も読み込むなどの問題が発生します。
※ 今回の環境ではインスタンスに対して5分以上APIアクセスが無い場合は自動的に削除されます。
curl -X DELETE -H "Content-Type: application/vnd.kafka.v2+json" \
http://localhost:8082/consumers/cgroup1/instances/instance0 -i
コンシューマグループの役割(コンシューマーの再開)
コンシューマグループは、同じトピックに保存されたメッセージ列をどこまで読み込んだかを管理しているグループです。
グループを分けることで、同じトピック上の同じメッセージをそれぞれのグループから読み込むことができるようになります。
グループが同じ場合は排他となり同じメッセージを読み込まなくなります。
コンシューマグループは、メッセージのオフセット情報を独立して管理しており、グループごとにどのメッセージまで取り出したかをオフセットとして管理しています。(保存場所はzookeeper)
そのため、グループを分けることでプログラムごとにどこまで読み込んだのかを管理することができるようになります。
※ REST-PROXYでは、recodes
でメッセージを取り出した後、再度インスタンスに対してAPIアクセスを行ったときにオフセットを更新します。APIアクセスが無くタイムアウトした場合はrecodesは失敗したとみなされ同じオフセットからメッセージを送ります。
(補足)コンシューマグループの確認方法(コマンド)
コンシューマーリスト一覧表示
# docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --list
現在のオフセット情報の表示
# docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --describe --group cgroup1
コンシューマグループの削除
# docker-compose exec broker kafka-consumer-groups --bootstrap-server broker:9092 --delete --group cgroup1