はじめに
レバウェル開発部アドベントカレンダー22日目です。
初めまして。レバウェル開発部でバックエンド開発を担当しています。
私が所属するチームでは、バックエンドのデータベース(以降、DB) データを新しい DB へ移行する計画をしています。
データ移行のツールの候補の一つとして、Apache Kafka(以降、Kafka) があることを初めて知ったので、どのように利用できるか、調査しました。
本記事では PostgreSQL に対する変更を Debezium でキャプチャして、Kafka のトピックにメッセージが書き込まれること、そのメッセージが消費されることをローカル環境で確認してみます。
Kafka とは?
Kafka とは、端的に言うと、イベントストリーミングプラットフォームです。
Kafka を使用することで、データの取り込みや保存、処理、配信をリアルタイムにすることが可能です。
ByteByteGo の Youtube 動画 "Top Kafka Use Cases You Should Know" では用途として、下記を例に挙げています。
- ログ分析
- リアルタイムマシーンラーニング用のパイプライン
- リアルタイムシステムモニタリング・アラーティング
- リアルタイムのデータ変更検出( CDC (Change Data Capture))
- システムマイグレーション
今回、私たちが実施しようとしていることは、まさに、CDC に該当します(動画内の03:39頃)。
Kafka に関連する用語をまとめました。
- プロデューサー
kafka にデータ(メッセージ)を送信するアプリケーションのこと。プロデューサーは特定のトピックにデータを送信する。 - コンシューマー
トピックからメッセージを読み取るアプリケーションのこと。複数のコンシューマーが同じトピックからメッセージを消費することができる。 - トピック
メッセージを送受信するための論理的なチャネルのこと。トピックはメッセージのカテゴリーとして機能して、プロデューサーやコンシューマーがトピックを通じてメッセージをやり取りする。 - パーティション
各トピックは複数のパーティションに分割することができる。 - メッセージ
トピック内でやり取りされるデータの単位のこと。
CDC とは?
CDC とは Change Data Capture の略称です。トランザクションなど、DB の変更をキャプチャリングすることができます。
CDC の仕組みと Kafka を組み合わせることで、現行の DB に対する変更をキャプチャして、リアルタイムに Kafka へ送信して、新しい DB に書き込むというようなことができそうです。
CDC を実現するツールとしては、Debezium があります。
Debezium とは?
Debezium は DB の変更をキャプチャして、アプリケーションがその変更を確認して対応可能にします。各 DB テーブル内の全ての行レベルの変更イベントをイベントストリームとして記録します。
Debezium の公式ドキュメントにシステム構成図のイメージ画像がありますので、ご参考ください。
イメージ画像では、MySQL や PostgreSQL の変更イベントを Debezium がキャプチャして、Kafka にイベントを送信し、Data Warehouse などの他のシステムに連携している様子が分かります。
実際に動かしてみる
Debezium のチュートリアルのページ最下部に、Dokcer Compose を利用して、Debezium と Kafka を起動する方法が記載されています。
実際に使用するリソースは debezium-examplesにあります。
ここでは、README に記載されている "Using Postgres"を試してみます。
1. ローカルに Docker コンテナを起動する
手元の PC に作業ディレクトリを作成し、サンプルコードを Git Clone した後、tutorial ディレクトリに移動します。
git clone https://github.com/debezium/debezium-examples.git
cd debezium-examples/tutorial
PostgreSQL 用に用意されている Docker ファイルを実行します。
export DEBEZIUM_VERSION=2.1
docker-compose -f docker-compose-postgres.yaml up
ここで、Docker コンテナが起動していることを一旦、確認してみます。
docker-compose -f docker-compose-postgres.yaml ps
connect
, kafka
, postgres
, zookeeper
という4つの service が起動していればOKです。
NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS
tutorial-connect-1 quay.io/debezium/connect:2.1 "/docker-entrypoint.…" connect 2 days ago Up 2 days 0.0.0.0:8083->8083/tcp, :::8083->8083/tcp, 9092/tcp
tutorial-kafka-1 quay.io/debezium/kafka:2.1 "/docker-entrypoint.…" kafka 2 days ago Up 2 days 0.0.0.0:9092->9092/tcp, :::9092->9092/tcp
tutorial-postgres-1 quay.io/debezium/example-postgres:2.1 "docker-entrypoint.s…" postgres 2 days ago Up 2 days 0.0.0.0:5432->5432/tcp, :::5432->5432/tcp
tutorial-zookeeper-1 quay.io/debezium/zookeeper:2.1 "/docker-entrypoint.…" zookeeper 2 days ago Up 2 days 0.0.0.0:2181->2181/tcp, :::2181->2181/tcp, 0.0.0.0:2888->2888/tcp, :::2888->2888/tcp, 0.0.0.0:3888->3888/tcp, :::3888->3888/tcp
Debezium
はどこにある!? と思った方が多いかと思います。Debezium
は Kafka Connect というコンポーネント上で動作するフレームワークになるそうなので、connect
コンテナ上で動作するイメージです。zookeeper
については説明を割愛しますが、Kafka を動かす際の周辺コンポーネントです。
2. PostgresConnector の登録をする
コネクタの設定を登録します。コネクタは接続するリソースによって複数種類ありますが、今回は PostgreSQL を使用するため、PostgresConnector の設定を登録します。
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json
(※ tutorial ディレクトリで実行します。)
3. Kafka のトピックを確認してみる
続いて、ここまでの操作で、Kafka の状態がどのように変化しているか見てみます。
下記のコマンドを実行すると、Kafka に作成されたトピック一覧を見ることができます。
docker-compose -f docker-compose-postgres.yaml exec kafka ./bin/kafka-topics.sh --bootstrap-server kafka:9092 --describe
一覧の中に、dbserver1.inventory.customers
というトピックがありますので、試しにピックアップしてみます。
docker-compose -f docker-compose-postgres.yaml exec kafka ./bin/kafka-topics.sh --bootstrap-server kafka:9092 --describe --topic dbserver1.inventory.customers
下記のようにdbserver1.inventory.customers
というトピックの状態を知ることができます。パーティション数が1
になっているようです。
Topic: dbserver1.inventory.customers TopicId: yHsbzUCtRyO2wCDT4vynVA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: dbserver1.inventory.customers Partition: 0 Leader: 1 Replicas: 1 Isr: 1
トピックは PostgreSQL のテーブルと1対1に対応するように作成されます。PostgreSQL にcustomers
テーブルがあるか確かめてみます。
下記コマンドを実行して、PostgreSQL に接続します。
docker-compose -f docker-compose-postgres.yaml exec postgres env PGOPTIONS="--search_path=inventory" bash -c 'psql -U $POSTGRES_USER postgres'
テーブル一覧を出力するコマンドを実行すると、customers
テーブルが確かにありました。
postgres=# \dt
List of relations
Schema | Name | Type | Owner
-----------+------------------+-------+----------
inventory | customers | table | postgres
inventory | geom | table | postgres
inventory | orders | table | postgres
inventory | products | table | postgres
inventory | products_on_hand | table | postgres
inventory | spatial_ref_sys | table | postgres
(6 rows)
postgres=#
customers
テーブルに保存されているレコードを先に確認しておきます。id=1001からid=1004までの4レコードが保存されています。
postgres=# select * from customers;
id | first_name | last_name | email
------+------------+-----------+-----------------------
1001 | Sally | Thomas | sally.thomas@acme.com
1002 | George | Bailey | gbailey@foobar.com
1003 | Edward | Walker | ed@walker.com
1004 | Anne | Kretchmar | annek@noanswer.org
(4 rows)
postgres=#
4. トピックに格納されているメッセージを出力する
dbserver1.inventory.customers
にどんなメッセージが格納されているか、下記のコマンドで出力してみます。
docker-compose -f docker-compose-postgres.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--topic dbserver1.inventory.customers \
--group my-consumer-group \
--from-beginning \
--property print.key=true
出力してみると、customers
テーブルに保存されているレコード1行1行に対応してメッセージが JSON 形式で格納されていました。
出力結果が長いので、一部抜粋してみます。
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"default": 0,
"field": "id"
}
],
"optional": false,
"name": "dbserver1.inventory.customers.Key"
},
"payload": {
"id": 1001
}
}
"payload": {
"before": null,
"after": {
"id": 1001,
"first_name": "Sally",
"last_name": "Thomas",
"email": "sally.thomas@acme.com"
},
"source": {
"version": "2.1.4.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1734335667303,
"snapshot": "first",
"db": "postgres",
"sequence": "[null,\"34466144\"]",
"schema": "inventory",
"table": "customers",
"txId": 766,
"lsn": 34466144,
"xmin": null
},
"op": "r",
"ts_ms": 1734335667360,
"transaction": null
}
id=1001 のレコードに対しては、before が null、 after が id=1001 のレコードに保存されているデータが入っていることが分かりました。
PostgreSQL に対して、INSERT 文のようなトランザクションを実行していないのに、なぜ Kafka のトピックにメッセージが保存されているのか疑問に思った方もいるかも知れません。
これは、Debezium の機能で、snapshots の設定がデフォルトで initial
となっているためだと考えられます。Kafka のトピックにメッセージが存在しない場合は、初回に DB データの snapshot を作成してくれるみたいです。
初回起動時にバッチ処理等でデータを新しい DB に移行しなくて良いのは便利ですね。
5. トランザクションが検知されて、メッセージに保存されるか確かめる
初回はすでに、Kafka トピックにメッセージが保存されていたことが分かりました。
初回以降、PostgreSQL への変更があった場合に、トピックにメッセージが追加されるか確認してみます。
PostgreSQL に対して、INSERT を実行します。
postgres=# insert into customers values (1005, 'leve', 'well', 'levewell@gmail.com');
INSERT 0 1
postgres=#
コンシューマーのログに id=1005 の変更内容が出力されました。
一部抜粋です。
"payload": {
"before": null,
"after": {
"id": 1005,
"first_name": "leve",
"last_name": "well",
"email": "levewell@gmail.com"
},
"source": {
"version": "2.1.4.Final",
"connector": "postgresql",
"name": "dbserver1",
"ts_ms": 1734403280270,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"34471544\",\"34471600\"]",
"schema": "inventory",
"table": "customers",
"txId": 772,
"lsn": 34471600,
"xmin": null
},
"op": "c",
"ts_ms": 1734403280627,
"transaction": null
}
これで、INSERT で実行された変更を Debezium が検知して、Kafka のトピックのメッセージとして保存されていたことが確認できました。
まとめ
PostgreSQL に対する INSERT を Kafka connecor 上で動作する Debezium で検知して、Kafka トピックにメッセージとして保存されること、さらに、それをコンシューマーが消費することの流れを追ってみました。
DB データ移行をするために、コンシューマーが Kafka トピックのメッセージを受け取って新しい DB へ書き込むような処理が必要になるはずですが、その部分については宿題としたいと思います。
今回は触りの部分の雰囲気を掴むことができました。Kafka はマイクロサービスアーキテクチャの文脈もよく耳にすることがあり、サービス規模をスケールする上で習得したい技術です。
Kafka を全く触ったことない方に、少しでも役に立つ記事になっていましたら幸いです。
最後まで読んでいただきまして、ありがとうございました!