DebeziumとKafkaで楽にリアルタイムにデータベース間でデータを同期する、しかも異なるデータベース間でもできる!
今回は「Debezium」と「Kafka」を使って、データベース間をリアルタイムにデータを同期する仕組みをご紹介します。この2つの技術を組み合わせることで、まるでデータベース同士が「データのテレパシー」を持っているかのように連携することが可能です。
仕組み
DebeziumとKafkaを使ったリアルタイムDB同期では、ソースDBのトランザクションログをDebeziumが監視し、変更をKafkaトピックにストリームします。そのデータをKafka Sinkコネクタを通じてターゲットDBに反映します。
DebeziumとKafkaとは
Debeziumは、データベースのトランザクションログを監視し、INSERTやUPDATE、DELETEといった変更をキャプチャして、リアルタイムでイベントとしてストリーム(Kafka)に流してくれる便利なツールです。
Kafkaは、分散型のメッセージングシステムで、リアルタイムのデータストリーム処理を可能にするオープンソース技術です。高スループット、低レイテンシーが特徴で、「プロデューサー」と「コンシューマー」間の非同期通信を実現します。データを「トピック」で管理し、複数の「コンシューマー」からデータを消化できます。
今回の試した内容
- MySQL (Source): Sourceデータベース。この中のデータが更新します。
- Debezium Source Connector: MySQLのデータ変更を検知して、Kafkaのトピックに変更データを流す
-
Debezium Sink Connector: KafkaトピックのデータをTargetデータベースに反映する
- MySQL (Target): Target MySQLデータベース
- PostgreSQL (Target): Target PostgreSQL、Sourceデータベースと異なるデータベース!
Docker環境を作る
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.4.3
ports:
- 2181:2181
- 2888:2888
- 3888:3888
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- kafka-net
kafka:
image: quay.io/debezium/kafka:3.0
ports:
- "9092:9092"
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=100
networks:
- kafka-net
mysql-source:
image: quay.io/debezium/example-mysql:3.0
ports:
- "3306:3306"
environment:
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw
stdin_open: true
tty: true
restart: "no"
networks:
- kafka-net
mysql-target:
image: mysql:8.0
ports:
- "3308:3306"
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: database_b
volumes:
- ./mysql-target-init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- kafka-net
postgres-target:
image: postgres:13
ports:
- "5432:5432"
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
POSTGRES_DB: database_b
volumes:
- ./postgres-target-init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- kafka-net
connect:
image: quay.io/debezium/connect:3.0
container_name: connect
links:
- kafka
- mysql-source
- mysql-target
ports:
- "8083:8083"
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
networks:
- kafka-net
networks:
kafka-net:
driver: bridge
mysql-target-init.sql
CREATE DATABASE IF NOT EXISTS database_b;
USE database_b;
CREATE TABLE users (
id INT PRIMARY KEY AUTO_INCREMENT,
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(100)
);
postgres-target-init.sql
CREATE TABLE users (
id SERIAL PRIMARY KEY,
first_name VARCHAR(100),
last_name VARCHAR(100),
email VARCHAR(100)
);
立ち上げる
docker-compose up
Debezium Source ConnectorでMySQLのデータ変更をキャッチ
Source Connectorを登録します。
curl -i -X POST -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql-source",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"topic.prefix": "dbserver1",
"database.include.list": "inventory",
"table.include.list": "inventory.customers",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schemahistory.inventory"
}
}'
Sink ConnectorでTargetデータベースにデータを同期する
MySQLをTargetにSink Connectorを登録します
curl -X POST -H "Content-Type: application/json" --data '{
"name": "mysql-sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:mysql://mysql-target:3306/database_b",
"connection.username": "root",
"connection.password": "root",
"topics": "dbserver1.inventory.customers",
"table.name.format": "users",
"auto.create": "true",
"auto.evolve": "true",
"auto.evolve.schemas": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.fields": "id",
"primary.key.mode": "record_key",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": true,
"value.converter.schemas.enable": true
}
}' http://localhost:8083/connectors
PostgreSQLをTargetにSink Connectorを登録します
curl -X POST -H "Content-Type: application/json" --data '{
"name": "postgres-sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://postgres-target:5432/database_b",
"connection.username": "postgres",
"connection.password": "postgres",
"topics": "dbserver1.inventory.customers",
"table.name.format": "users",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"primary.key.fields": "id",
"primary.key.mode": "record_key",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": true,
"value.converter.schemas.enable": true
}
}' http://localhost:8083/connectors
環境の用意は完了!
テレパシーを体験
ソースのMySQLでデータを追加・更新・削除します。
docker exec -it debezium_mysql-source_1 mysql -u mysqluser -p # password is `mysqlpw`
use inventory;
insert into customers(first_name, last_name, email) values('jerry', 'Mouse', 'jerry@y.com');
update customers set first_name = 'tom';
delete from customers where id=1005;
同期結果をターゲットDBで確認します。
MySQLターゲットの確認
docker exec -it debezium_mysql-target_1 mysql -u root -p # password is `root`
use database_b;
select * from users;
PostgreSQLターゲットの確認
docker exec -it debezium_postgres-target_1 psql -U postgres -d database_b
select * from users;
最後に
DebeziumとKafkaを使えば、こんなことが可能になります
- データベース間のリアルタイム同期
- データウェアハウスの構築
ビジネス分析に必要なデータを瞬時に収集。 - マイクロサービスのデータ共有