3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Apache Kafka で CDC を動かしみた

Last updated at Posted at 2024-12-21

はじめに

レバウェル開発部アドベントカレンダー22日目です。

初めまして。レバウェル開発部でバックエンド開発を担当しています。
私が所属するチームでは、バックエンドのデータベース(以降、DB) データを新しい DB へ移行する計画をしています。
データ移行のツールの候補の一つとして、Apache Kafka(以降、Kafka) があることを初めて知ったので、どのように利用できるか、調査しました。
本記事では PostgreSQL に対する変更を Debezium でキャプチャして、Kafka のトピックにメッセージが書き込まれること、そのメッセージが消費されることをローカル環境で確認してみます。

Kafka とは?

Kafka とは、端的に言うと、イベントストリーミングプラットフォームです。
Kafka を使用することで、データの取り込みや保存、処理、配信をリアルタイムにすることが可能です。

ByteByteGo の Youtube 動画 "Top Kafka Use Cases You Should Know" では用途として、下記を例に挙げています。

  • ログ分析
  • リアルタイムマシーンラーニング用のパイプライン
  • リアルタイムシステムモニタリング・アラーティング
  • リアルタイムのデータ変更検出( CDC (Change Data Capture))
  • システムマイグレーション

今回、私たちが実施しようとしていることは、まさに、CDC に該当します(動画内の03:39頃)。

Kafka に関連する用語をまとめました。

  • プロデューサー
    kafka にデータ(メッセージ)を送信するアプリケーションのこと。プロデューサーは特定のトピックにデータを送信する。
  • コンシューマー
    トピックからメッセージを読み取るアプリケーションのこと。複数のコンシューマーが同じトピックからメッセージを消費することができる。
  • トピック
    メッセージを送受信するための論理的なチャネルのこと。トピックはメッセージのカテゴリーとして機能して、プロデューサーやコンシューマーがトピックを通じてメッセージをやり取りする。
  • パーティション
    各トピックは複数のパーティションに分割することができる。
  • メッセージ
    トピック内でやり取りされるデータの単位のこと。

CDC とは?

CDC とは Change Data Capture の略称です。トランザクションなど、DB の変更をキャプチャリングすることができます。
CDC の仕組みと Kafka を組み合わせることで、現行の DB に対する変更をキャプチャして、リアルタイムに Kafka へ送信して、新しい DB に書き込むというようなことができそうです。
CDC を実現するツールとしては、Debezium があります。

Debezium とは?

Debezium は DB の変更をキャプチャして、アプリケーションがその変更を確認して対応可能にします。各 DB テーブル内の全ての行レベルの変更イベントをイベントストリームとして記録します。

Debezium の公式ドキュメントにシステム構成図のイメージ画像がありますので、ご参考ください。
イメージ画像では、MySQL や PostgreSQL の変更イベントを Debezium がキャプチャして、Kafka にイベントを送信し、Data Warehouse などの他のシステムに連携している様子が分かります。

実際に動かしてみる

Debezium のチュートリアルのページ最下部に、Dokcer Compose を利用して、Debezium と Kafka を起動する方法が記載されています。
実際に使用するリソースは debezium-examplesにあります。

ここでは、README に記載されている "Using Postgres"を試してみます。

1. ローカルに Docker コンテナを起動する

手元の PC に作業ディレクトリを作成し、サンプルコードを Git Clone した後、tutorial ディレクトリに移動します。

git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/tutorial

PostgreSQL 用に用意されている Docker ファイルを実行します。

export DEBEZIUM_VERSION=2.1
docker-compose -f docker-compose-postgres.yaml up

ここで、Docker コンテナが起動していることを一旦、確認してみます。

 docker-compose -f docker-compose-postgres.yaml ps

connect, kafka, postgres, zookeeper という4つの service が起動していればOKです。

NAME                   IMAGE                                   COMMAND                   SERVICE     CREATED      STATUS      PORTS
tutorial-connect-1     quay.io/debezium/connect:2.1            "/docker-entrypoint.…"   connect     2 days ago   Up 2 days   0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9092/tcp
tutorial-kafka-1       quay.io/debezium/kafka:2.1              "/docker-entrypoint.…"   kafka       2 days ago   Up 2 days   0.0.0.0:9092->9092/tcp, :::9092->9092/tcp
tutorial-postgres-1    quay.io/debezium/example-postgres:2.1   "docker-entrypoint.s…"   postgres    2 days ago   Up 2 days   0.0.0.0:5432->5432/tcp, :::5432->5432/tcp
tutorial-zookeeper-1   quay.io/debezium/zookeeper:2.1          "/docker-entrypoint.…"   zookeeper   2 days ago   Up 2 days   0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 0.0.0.0:2888->2888/tcp, :::2888->2888/tcp, 0.0.0.0:3888->3888/tcp, :::3888->3888/tcp

Debezium はどこにある!? と思った方が多いかと思います。Debeziumは Kafka Connect というコンポーネント上で動作するフレームワークになるそうなので、connectコンテナ上で動作するイメージです。zookeeperについては説明を割愛しますが、Kafka を動かす際の周辺コンポーネントです。

2. PostgresConnector の登録をする

コネクタの設定を登録します。コネクタは接続するリソースによって複数種類ありますが、今回は PostgreSQL を使用するため、PostgresConnector の設定を登録します。

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json

(※ tutorial ディレクトリで実行します。)

3. Kafka のトピックを確認してみる

続いて、ここまでの操作で、Kafka の状態がどのように変化しているか見てみます。
下記のコマンドを実行すると、Kafka に作成されたトピック一覧を見ることができます。

docker-compose -f docker-compose-postgres.yaml exec kafka ./bin/kafka-topics.sh --bootstrap-server kafka:9092 --describe

一覧の中に、dbserver1.inventory.customers というトピックがありますので、試しにピックアップしてみます。

docker-compose -f docker-compose-postgres.yaml exec kafka ./bin/kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic dbserver1.inventory.customers

下記のようにdbserver1.inventory.customersというトピックの状態を知ることができます。パーティション数が1になっているようです。

Topic: dbserver1.inventory.customers	TopicId: yHsbzUCtRyO2wCDT4vynVA	PartitionCount: 1	ReplicationFactor: 1	Configs:
	Topic: dbserver1.inventory.customers	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

トピックは PostgreSQL のテーブルと1対1に対応するように作成されます。PostgreSQL にcustomersテーブルがあるか確かめてみます。
下記コマンドを実行して、PostgreSQL に接続します。

docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'

テーブル一覧を出力するコマンドを実行すると、customersテーブルが確かにありました。

postgres=# \dt
               List of relations
 Schema   |       Name       | Type  |  Owner
-----------+------------------+-------+----------
inventory | customers        | table | postgres
inventory | geom             | table | postgres
inventory | orders           | table | postgres
inventory | products         | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys  | table | postgres
(6 rows)

postgres=#

customersテーブルに保存されているレコードを先に確認しておきます。id=1001からid=1004までの4レコードが保存されています。

postgres=# select * from customers;
  id  | first_name | last_name |         email
------+------------+-----------+-----------------------
 1001 | Sally      | Thomas    | sally.thomas@acme.com
 1002 | George     | Bailey    | gbailey@foobar.com
 1003 | Edward     | Walker    | ed@walker.com
 1004 | Anne       | Kretchmar | annek@noanswer.org
(4 rows)

postgres=#

4. トピックに格納されているメッセージを出力する

dbserver1.inventory.customersにどんなメッセージが格納されているか、下記のコマンドで出力してみます。

docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
   --bootstrap-server kafka:9092 \
   --topic dbserver1.inventory.customers \
   --group my-consumer-group \
   --from-beginning \
   --property print.key=true

出力してみると、customersテーブルに保存されているレコード1行1行に対応してメッセージが JSON 形式で格納されていました。
出力結果が長いので、一部抜粋してみます。

{
   "schema": {
       "type": "struct",
       "fields": [
           {
               "type": "int32",
               "optional": false,
               "default": 0,
               "field": "id"
           }
       ],
       "optional": false,
       "name": "dbserver1.inventory.customers.Key"
   },
   "payload": {
       "id": 1001
   }
}
"payload": {
       "before": null,
       "after": {
           "id": 1001,
           "first_name": "Sally",
           "last_name": "Thomas",
           "email": "sally.thomas@acme.com"
       },
       "source": {
           "version": "2.1.4.Final",
           "connector": "postgresql",
           "name": "dbserver1",
           "ts_ms": 1734335667303,
           "snapshot": "first",
           "db": "postgres",
           "sequence": "[null,\"34466144\"]",
           "schema": "inventory",
           "table": "customers",
           "txId": 766,
           "lsn": 34466144,
           "xmin": null
       },
       "op": "r",
       "ts_ms": 1734335667360,
       "transaction": null
   }

id=1001 のレコードに対しては、before が null、 after が id=1001 のレコードに保存されているデータが入っていることが分かりました。

PostgreSQL に対して、INSERT 文のようなトランザクションを実行していないのに、なぜ Kafka のトピックにメッセージが保存されているのか疑問に思った方もいるかも知れません。
これは、Debezium の機能で、snapshots の設定がデフォルトで initial となっているためだと考えられます。Kafka のトピックにメッセージが存在しない場合は、初回に DB データの snapshot を作成してくれるみたいです。
初回起動時にバッチ処理等でデータを新しい DB に移行しなくて良いのは便利ですね。

5. トランザクションが検知されて、メッセージに保存されるか確かめる

初回はすでに、Kafka トピックにメッセージが保存されていたことが分かりました。
初回以降、PostgreSQL への変更があった場合に、トピックにメッセージが追加されるか確認してみます。

PostgreSQL に対して、INSERT を実行します。

postgres=# insert into customers values (1005, 'leve', 'well', 'levewell@gmail.com');
INSERT 0 1
postgres=#

コンシューマーのログに id=1005 の変更内容が出力されました。
一部抜粋です。

"payload": {
        "before": null,
        "after": {
            "id": 1005,
            "first_name": "leve",
            "last_name": "well",
            "email": "levewell@gmail.com"
        },
        "source": {
            "version": "2.1.4.Final",
            "connector": "postgresql",
            "name": "dbserver1",
            "ts_ms": 1734403280270,
            "snapshot": "false",
            "db": "postgres",
            "sequence": "[\"34471544\",\"34471600\"]",
            "schema": "inventory",
            "table": "customers",
            "txId": 772,
            "lsn": 34471600,
            "xmin": null
        },
        "op": "c",
        "ts_ms": 1734403280627,
        "transaction": null
    }

これで、INSERT で実行された変更を Debezium が検知して、Kafka のトピックのメッセージとして保存されていたことが確認できました。

まとめ

PostgreSQL に対する INSERT を Kafka connecor 上で動作する Debezium で検知して、Kafka トピックにメッセージとして保存されること、さらに、それをコンシューマーが消費することの流れを追ってみました。
DB データ移行をするために、コンシューマーが Kafka トピックのメッセージを受け取って新しい DB へ書き込むような処理が必要になるはずですが、その部分については宿題としたいと思います。
今回は触りの部分の雰囲気を掴むことができました。Kafka はマイクロサービスアーキテクチャの文脈もよく耳にすることがあり、サービス規模をスケールする上で習得したい技術です。

Kafka を全く触ったことない方に、少しでも役に立つ記事になっていましたら幸いです。
最後まで読んでいただきまして、ありがとうございました!

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?