0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Confluent Platformを使ってPostgreSQLからElasticsearchへのデータストリームを試してみる2

Last updated at Posted at 2023-09-21

この記事の目的は、Confluent Platform(Apache Kafka)に関する技術ブログを投稿し、自分の理解を整理することです。加えて、これから使ってみようと考えているエンジニアの助力になればと考えています。
記事はいくつかのパートに別けて構成する予定です。
きちんと精査しきれていないため、不正確な内容が含まれているかもしれません。その場合はコメント頂けると嬉しいです。

前置き

当記事ではPostgreSQL、Confluent Platform、Elasticsearchのシステムを構築します。
システム構成の概要図は前回の記事を参照してください。

環境構築

環境構築は、気軽に使い捨てができるdockerを利用します。

docker-compose

今回利用するdocker-composeをGitHubに投稿しました。全体を確認したい場合は、リンク先のgistを参照してください。

参考にしたサイト

docker-composeを作るにあたり、下記のサイトを参考にさせて頂きました。

  • PostgreSQL

  • Elasticsearch

  • Confluent Platform

説明

docker-composeファイルのうち、今回の検証に関わる部分について説明します。

PostgreSQL

別記事でも説明する予定ですが、PostgreSQLと接続するConnectorプラグインは Debezium connector for PostgreSQLを利用します。また、論理デコーダーのプラグインはPostgreSQL+10からデフォルトで提供されているpgoutputを利用します。
論理デコーダーについてもっと詳しい内容を知りたいという方はPostgreSQLの公式ドキュメントや、Debeziumの公式ドキュメントが参考になります。

pgoutputの論理デコーダーを利用するために、PostgreSQLのレプリケーション設定をlogicalに変更しています。

  postgres:
...
    command:
      - "postgres"
      - "-c"
      - "wal_level=logical"
      - "-c"
      - "max_replication_slots=5"
      - "-c"
      - "max_wal_senders=5"

レプリケーション設定が変更されているかどうかは次のSQLクエリを実行し、簡単に確認できます。
以下はクエリの実行結果です。

postgres=# SELECT name, setting, unit, short_desc FROM pg_settings WHERE name = 'wal_level';
   name    | setting | unit |                    short_desc                    
-----------+---------+------+--------------------------------------------------
 wal_level | logical |      | Set the level of information written to the WAL.
(1 row)

Elasticsearch

Elasticsearchは複数のノードでクラスタ構成を組むことができますが、今回はクラスタ構成は必要ないのでdiscovery.typesingle-node(シングルノード)を指定して、単一ノードで構成するようにしています。

  elasticsearch:
...
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node

Kibana

Kibanaは必須ではありませんが、Elasticsearchに登録されたインデックス設定やマッピング定義、ドキュメントを手軽に確認できるため導入しました。

Zookeeper

Zookeeperでは、コンテナのヘルスチェックと起動順序を制御するためにhealthcheckdepends_onを定義しています。これはZookeeperだけでなく、Confluent Platformの他のコンテナも同様に定義しています。

  zookeeper:
...
    healthcheck:
      test: "nc -z localhost 2181 || exit -1"
      interval: 10s
      timeout: 5s
      retries: 5
      start_period: 10s

Broker(Kafka)

BrokerはApache KafkaのKafkaノードの別称です。複数ノードでクラスタ構成を組むことができますが、今回は検証目的であり、クラスタ構成は必要ないので単一ノードで構成するようにしています。
そのため、Topicのレプリケーションファクターの数(KAFKA_OFFSETS_TOPIC_REPLICATION_FACTORKAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR)を1としています。

KAFKA_LOG_RETENTION_MINUTESKAFKA_LOG_CLEANUP_POLICYKAFKA_COMPRESSION_TYPEは、Topicに登録されるログセグメントの生存期間とクリーンアップ方針を定義しています。これは主にチューニングに関する項目です。

  • KAFKA_LOG_RETENTION_MINUTES
    Topicに登録されたログセグメントは1分経過後に破棄されます。通常であれば7日間(デフォルト値)など、もっと長い期間を指定しますが、今回は検証目的であるためこの値で充分だと判断しました。
  • KAFKA_LOG_CLEANUP_POLICY
    deleteは、古いログをクリーンアップする際に完全に削除します。別の設定値としてcompactを指定できます。compactの場合は、古いログを削除せず圧縮して保持します。
  • KAFKA_COMPRESSION_TYPE
    ログセグメントの圧縮ポリシーです。uncompressedは圧縮しないで保持します。他にgzipsnappylz4zstdなど圧縮方式を選択することもできます。(参考サイト
  broker:
...
    environment:
...
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
...
      KAFKA_LOG_RETENTION_MINUTES: 1
      KAFKA_LOG_CLEANUP_POLICY: delete
      KAFKA_COMPRESSION_TYPE: uncompressed

Tips

Confluent PlatformのKafkaコンテナのimageの仕様上、コンフィグの設定値はプレフィックスとしてKAFKA_を付与し、区切り文字_で定義した環境変数を指定しています。

Schema Registry

Schema Registryのdocker-composeの定義に対して言及する点はありません。

Rest Proxy

Rest Proxyのdocker-composeの定義に対して言及する点はありません。

Connector

レプリケーションファクター

Connectorは複数のノードでグループを構成することができますが、今回は検証目的であるため単一ノードで構成するように定義しています。
そのため、Broker(Kafka)のセクションで述べたことと同様にTopicのレプリケーションファクターの数を1としています。また、Connectorは自信のコンフィグ設定の情報や状態、履歴をTopicに保存しています。CONNECT_CONFIG_STORAGE_TOPICCONNECT_OFFSET_STORAGE_TOPICCONNECT_STATUS_STORAGE_TOPICが該当します。これらは障害耐性を上げるための設定値です。
前述した通り、今回は検証目的であるため、全て設定値を1としています。

  connect:
...
    environment:
...
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1

コンバーター

今回の検証ではKey/Valueのコンバーターはorg.apache.kafka.connect.json.JsonConverterを指定しています。他にも複数のコンバーターが用意されています。詳しくは公式ドキュメントを参照してください。

実際に使ってみた感覚で言えば、Single Message Transformを利用する場合はorg.apache.kafka.connect.json.JsonConverterよりもorg.apache.kafka.connect.storage.StringConverterの方が制御しやすい印象があります。ただし、網羅的に検証できていないので不正確かもしれません。

Tips

コンバーターの選択で迷った場合、公式から技術ブログが発行されています。こちらも参考になるかもしれません。

プラグインパス

CONNECT_PLUGIN_PATHでは、Connectorのプラグインを格納するディレクトリパスを指定しています。
別の記事で紹介しますが、今回の検証ではDebeziumから提供されているSMTと自作のSMTを利用したイベント変換を検証したいため追加の指定があります。

  • /usr/share/java
    デフォルトで定義されている値。変更しなくてよいもの。
  • /usr/share/confluent-hub-components
    confluet-hubクライアントでConnectorプラグインをインストールする際の、デフォルトのインストール先。特に理由がなければ変更しなくてよいもの。
  • /usr/share/plugins/debezium
    Debeziumから提供されている、Debezium Scriptingと依存関係にあるGroovyのライブラリを格納するディレクトリパスです。Debezium Scriptingは、Content-base RoutingのSMTを利用するためのものです。
  • /usr/share/plugins/custom
    自作のSMTを格納するディレクトリパスです。
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/usr/share/plugins/debezium,/usr/share/plugins/custom"

リモートデバッグ用のポート公開

Connectorをリモートデバッグするためのポートを5005で公開します。クライアントからの接続を待つ場合は、suspend=yに変更してConnectorを再起動します。

KAFKA_OPTS: -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005

Tips

Confluent PlatformのConnectorのimageの仕様上、コンフィグの設定値はプレフィックスとしてCONNECT_を付与し、区切り文字_で定義した環境変数を指定しています。
SMTとは、Single Message Transformationの略称です。(参考サイト)

まとめ

dockerコンテナを利用して環境を構築しました。
docker-composeの定義について説明しました。
次回の記事では、Connectorプラグインを利用してPostgreSQLからElasticsearchにデータストリームを行います。

  • 投稿済みの記事と投稿予定の記事

    • 検証用のシステム構成を紹介
    • 環境構築 ★イマココ
    • Connectorプラグインの登録、データストリーム
    • DebeziumのSMTを利用したメッセージ変換
    • 自作のSMTを利用したメッセージ変換
    • ElasticsearchのIngest Pipelineを利用した変換
  • 投稿するかもしれない記事

    • Confluent Platformの解説
    • KafkaとConnectorのパフォーマンスチューニング
    • Elasticsearchの解説
    • Elasticsearchのインデックス設定、マッピング定義の解説
    • Elasticsearchの search APIの解説
    • Elasticsearchをクラスタ構成に変更
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?