この記事の目的は、Confluent Platform(Apache Kafka)に関する技術ブログを投稿し、自分の理解を整理することです。加えて、これから使ってみようと考えているエンジニアの助力になればと考えています。
記事はいくつかのパートに別けて構成する予定です。
きちんと精査しきれていないため、不正確な内容が含まれているかもしれません。その場合はコメント頂けると嬉しいです。
前置き
当記事ではPostgreSQL、Confluent Platform、Elasticsearchのシステムを構築します。
システム構成の概要図は前回の記事を参照してください。
環境構築
環境構築は、気軽に使い捨てができるdockerを利用します。
docker-compose
今回利用するdocker-composeをGitHubに投稿しました。全体を確認したい場合は、リンク先のgistを参照してください。
参考にしたサイト
docker-composeを作るにあたり、下記のサイトを参考にさせて頂きました。
- PostgreSQL
- Elasticsearch
- Confluent Platform
説明
docker-compose
ファイルのうち、今回の検証に関わる部分について説明します。
PostgreSQL
別記事でも説明する予定ですが、PostgreSQLと接続するConnectorプラグインは Debezium connector for PostgreSQLを利用します。また、論理デコーダーのプラグインはPostgreSQL+10からデフォルトで提供されているpgoutput
を利用します。
論理デコーダーについてもっと詳しい内容を知りたいという方はPostgreSQLの公式ドキュメントや、Debeziumの公式ドキュメントが参考になります。
pgoutput
の論理デコーダーを利用するために、PostgreSQLのレプリケーション設定をlogical
に変更しています。
postgres:
...
command:
- "postgres"
- "-c"
- "wal_level=logical"
- "-c"
- "max_replication_slots=5"
- "-c"
- "max_wal_senders=5"
レプリケーション設定が変更されているかどうかは次のSQLクエリを実行し、簡単に確認できます。
以下はクエリの実行結果です。
postgres=# SELECT name, setting, unit, short_desc FROM pg_settings WHERE name = 'wal_level';
name | setting | unit | short_desc
-----------+---------+------+--------------------------------------------------
wal_level | logical | | Set the level of information written to the WAL.
(1 row)
Elasticsearch
Elasticsearchは複数のノードでクラスタ構成を組むことができますが、今回はクラスタ構成は必要ないのでdiscovery.type
にsingle-node
(シングルノード)を指定して、単一ノードで構成するようにしています。
elasticsearch:
...
environment:
- xpack.security.enabled=false
- discovery.type=single-node
Kibana
Kibanaは必須ではありませんが、Elasticsearchに登録されたインデックス設定やマッピング定義、ドキュメントを手軽に確認できるため導入しました。
Zookeeper
Zookeeperでは、コンテナのヘルスチェックと起動順序を制御するためにhealthcheck
とdepends_on
を定義しています。これはZookeeperだけでなく、Confluent Platformの他のコンテナも同様に定義しています。
zookeeper:
...
healthcheck:
test: "nc -z localhost 2181 || exit -1"
interval: 10s
timeout: 5s
retries: 5
start_period: 10s
Broker(Kafka)
BrokerはApache KafkaのKafkaノードの別称です。複数ノードでクラスタ構成を組むことができますが、今回は検証目的であり、クラスタ構成は必要ないので単一ノードで構成するようにしています。
そのため、Topicのレプリケーションファクターの数(KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
、KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
)を1
としています。
KAFKA_LOG_RETENTION_MINUTES
、KAFKA_LOG_CLEANUP_POLICY
、KAFKA_COMPRESSION_TYPE
は、Topicに登録されるログセグメントの生存期間とクリーンアップ方針を定義しています。これは主にチューニングに関する項目です。
-
KAFKA_LOG_RETENTION_MINUTES
Topicに登録されたログセグメントは1
分経過後に破棄されます。通常であれば7
日間(デフォルト値)など、もっと長い期間を指定しますが、今回は検証目的であるためこの値で充分だと判断しました。 -
KAFKA_LOG_CLEANUP_POLICY
delete
は、古いログをクリーンアップする際に完全に削除します。別の設定値としてcompact
を指定できます。compact
の場合は、古いログを削除せず圧縮して保持します。 -
KAFKA_COMPRESSION_TYPE
ログセグメントの圧縮ポリシーです。uncompressed
は圧縮しないで保持します。他にgzip
やsnappy
、lz4
、zstd
など圧縮方式を選択することもできます。(参考サイト)
broker:
...
environment:
...
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
...
KAFKA_LOG_RETENTION_MINUTES: 1
KAFKA_LOG_CLEANUP_POLICY: delete
KAFKA_COMPRESSION_TYPE: uncompressed
Tips
Confluent PlatformのKafkaコンテナのimageの仕様上、コンフィグの設定値はプレフィックスとして
KAFKA_
を付与し、区切り文字_
で定義した環境変数を指定しています。
Schema Registry
Schema Registryのdocker-composeの定義に対して言及する点はありません。
Rest Proxy
Rest Proxyのdocker-composeの定義に対して言及する点はありません。
Connector
レプリケーションファクター
Connectorは複数のノードでグループを構成することができますが、今回は検証目的であるため単一ノードで構成するように定義しています。
そのため、Broker(Kafka)のセクションで述べたことと同様にTopicのレプリケーションファクターの数を1としています。また、Connectorは自信のコンフィグ設定の情報や状態、履歴をTopicに保存しています。CONNECT_CONFIG_STORAGE_TOPIC
、CONNECT_OFFSET_STORAGE_TOPIC
、CONNECT_STATUS_STORAGE_TOPIC
が該当します。これらは障害耐性を上げるための設定値です。
前述した通り、今回は検証目的であるため、全て設定値を1としています。
connect:
...
environment:
...
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
コンバーター
今回の検証ではKey/Valueのコンバーターはorg.apache.kafka.connect.json.JsonConverter
を指定しています。他にも複数のコンバーターが用意されています。詳しくは公式ドキュメントを参照してください。
実際に使ってみた感覚で言えば、Single Message Transformを利用する場合はorg.apache.kafka.connect.json.JsonConverter
よりもorg.apache.kafka.connect.storage.StringConverter
の方が制御しやすい印象があります。ただし、網羅的に検証できていないので不正確かもしれません。
Tips
コンバーターの選択で迷った場合、公式から技術ブログが発行されています。こちらも参考になるかもしれません。
プラグインパス
CONNECT_PLUGIN_PATH
では、Connectorのプラグインを格納するディレクトリパスを指定しています。
別の記事で紹介しますが、今回の検証ではDebeziumから提供されているSMTと自作のSMTを利用したイベント変換を検証したいため追加の指定があります。
-
/usr/share/java
デフォルトで定義されている値。変更しなくてよいもの。 -
/usr/share/confluent-hub-components
confluet-hubクライアントでConnectorプラグインをインストールする際の、デフォルトのインストール先。特に理由がなければ変更しなくてよいもの。 -
/usr/share/plugins/debezium
Debeziumから提供されている、Debezium Scripting
と依存関係にあるGroovy
のライブラリを格納するディレクトリパスです。Debezium Scripting
は、Content-base RoutingのSMTを利用するためのものです。 -
/usr/share/plugins/custom
自作のSMTを格納するディレクトリパスです。
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/plugins/debezium,/usr/share/plugins/custom"
リモートデバッグ用のポート公開
Connectorをリモートデバッグするためのポートを5005
で公開します。クライアントからの接続を待つ場合は、suspend=y
に変更してConnectorを再起動します。
KAFKA_OPTS: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005
Tips
Confluent PlatformのConnectorのimageの仕様上、コンフィグの設定値はプレフィックスとして
CONNECT_
を付与し、区切り文字_
で定義した環境変数を指定しています。
SMTとは、Single Message Transformationの略称です。(参考サイト)
まとめ
dockerコンテナを利用して環境を構築しました。
docker-composeの定義について説明しました。
次回の記事では、Connectorプラグインを利用してPostgreSQLからElasticsearchにデータストリームを行います。
-
投稿済みの記事と投稿予定の記事
- 検証用のシステム構成を紹介
- 環境構築 ★イマココ
- Connectorプラグインの登録、データストリーム
- DebeziumのSMTを利用したメッセージ変換
- 自作のSMTを利用したメッセージ変換
- ElasticsearchのIngest Pipelineを利用した変換
-
投稿するかもしれない記事
- Confluent Platformの解説
- KafkaとConnectorのパフォーマンスチューニング
- Elasticsearchの解説
- Elasticsearchのインデックス設定、マッピング定義の解説
- Elasticsearchの search APIの解説
- Elasticsearchをクラスタ構成に変更