1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GLOBISAdvent Calendar 2024

Day 21

DebeziumとKafkaでリアルタイムDB同期を実現しよう

Posted at

DebeziumとKafkaで楽にリアルタイムにデータベース間でデータを同期する、しかも異なるデータベース間でもできる!

今回は「Debezium」と「Kafka」を使って、データベース間をリアルタイムにデータを同期する仕組みをご紹介します。この2つの技術を組み合わせることで、まるでデータベース同士が「データのテレパシー」を持っているかのように連携することが可能です。


仕組み

DebeziumとKafkaを使ったリアルタイムDB同期では、ソースDBのトランザクションログをDebeziumが監視し、変更をKafkaトピックにストリームします。そのデータをKafka Sinkコネクタを通じてターゲットDBに反映します。

DebeziumとKafkaとは

Debeziumは、データベースのトランザクションログを監視し、INSERTやUPDATE、DELETEといった変更をキャプチャして、リアルタイムでイベントとしてストリーム(Kafka)に流してくれる便利なツールです。

Kafkaは、分散型のメッセージングシステムで、リアルタイムのデータストリーム処理を可能にするオープンソース技術です。高スループット、低レイテンシーが特徴で、「プロデューサー」と「コンシューマー」間の非同期通信を実現します。データを「トピック」で管理し、複数の「コンシューマー」からデータを消化できます。


今回の試した内容

  • MySQL (Source): Sourceデータベース。この中のデータが更新します。
  • Debezium Source Connector: MySQLのデータ変更を検知して、Kafkaのトピックに変更データを流す
  • Debezium Sink Connector: KafkaトピックのデータをTargetデータベースに反映する
    • MySQL (Target): Target MySQLデータベース
    • PostgreSQL (Target): Target PostgreSQL、Sourceデータベースと異なるデータベース!

Docker環境を作る

version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.4.3
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    networks:
      - kafka-net

  kafka:
    image: quay.io/debezium/kafka:3.0
    ports:
      - "9092:9092"
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
     - KAFKA_GROUP_MIN_SESSION_TIMEOUT_MS=100
    networks:
      - kafka-net

  mysql-source:
    image: quay.io/debezium/example-mysql:3.0
    ports:
      - "3306:3306"
    environment:
      MYSQL_ROOT_PASSWORD: debezium
      MYSQL_USER: mysqluser
      MYSQL_PASSWORD: mysqlpw
    stdin_open: true
    tty: true
    restart: "no"
    networks:
      - kafka-net

  mysql-target:
    image: mysql:8.0
    ports:
      - "3308:3306"
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: database_b
    volumes:
      - ./mysql-target-init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - kafka-net

  postgres-target:
    image: postgres:13
    ports:
      - "5432:5432"
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: postgres
      POSTGRES_DB: database_b
    volumes:
      - ./postgres-target-init.sql:/docker-entrypoint-initdb.d/init.sql
    networks:
      - kafka-net

  connect:
    image: quay.io/debezium/connect:3.0
    container_name: connect
    links:
      - kafka
      - mysql-source
      - mysql-target
    ports:
      - "8083:8083"
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
    networks:
      - kafka-net

networks:
  kafka-net:
    driver: bridge

mysql-target-init.sql

CREATE DATABASE IF NOT EXISTS database_b;
USE database_b;
CREATE TABLE users (
  id INT PRIMARY KEY AUTO_INCREMENT,
  first_name VARCHAR(100),
  last_name VARCHAR(100),
  email VARCHAR(100)
);

postgres-target-init.sql

CREATE TABLE users (
  id SERIAL PRIMARY KEY,
  first_name VARCHAR(100),
  last_name VARCHAR(100),
  email VARCHAR(100)
);

立ち上げる

docker-compose up

Debezium Source ConnectorでMySQLのデータ変更をキャッチ

Source Connectorを登録します。

curl -i -X POST -H "Content-Type:application/json" localhost:8083/connectors/ -d '{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql-source",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "topic.prefix": "dbserver1",
    "database.include.list": "inventory",
    "table.include.list": "inventory.customers",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schemahistory.inventory"
  }
}'

Sink ConnectorでTargetデータベースにデータを同期する

MySQLをTargetにSink Connectorを登録します

curl -X POST -H "Content-Type: application/json" --data '{
  "name": "mysql-sink-connector",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:mysql://mysql-target:3306/database_b",
    "connection.username": "root",
    "connection.password": "root",
    "topics": "dbserver1.inventory.customers",
    "table.name.format": "users",
    "auto.create": "true",
    "auto.evolve": "true",
    "auto.evolve.schemas": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "primary.key.fields": "id",
    "primary.key.mode": "record_key",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true
  }
}' http://localhost:8083/connectors

PostgreSQLをTargetにSink Connectorを登録します

curl -X POST -H "Content-Type: application/json" --data '{
  "name": "postgres-sink-connector",
  "config": {
    "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://postgres-target:5432/database_b",
    "connection.username": "postgres",
    "connection.password": "postgres",
    "topics": "dbserver1.inventory.customers",
    "table.name.format": "users",
    "auto.create": "true",
    "auto.evolve": "true",
    "insert.mode": "upsert",
    "delete.enabled": "true",
    "primary.key.fields": "id",
    "primary.key.mode": "record_key",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "value.converter.schemas.enable": true
  }
}' http://localhost:8083/connectors

環境の用意は完了!

テレパシーを体験

ソースのMySQLでデータを追加・更新・削除します。

docker exec -it debezium_mysql-source_1 mysql -u mysqluser -p # password is `mysqlpw`
use inventory;
insert into customers(first_name, last_name, email) values('jerry', 'Mouse', 'jerry@y.com');
update customers set first_name = 'tom';
delete from customers where id=1005;

同期結果をターゲットDBで確認します。
MySQLターゲットの確認

docker exec -it debezium_mysql-target_1 mysql -u root -p # password is `root`
use database_b;
select * from users; 

PostgreSQLターゲットの確認

docker exec -it debezium_postgres-target_1 psql -U postgres -d database_b
select * from users;

最後に

DebeziumとKafkaを使えば、こんなことが可能になります

  • データベース間のリアルタイム同期
  • データウェアハウスの構築
    ビジネス分析に必要なデータを瞬時に収集。
  • マイクロサービスのデータ共有
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?