1.Debeziumとは何か?
Debeziumはオープンソースのデータ変更キャプチャ(Change Data Capture, CDC)プラットフォームであり、データベースで発生するすべての変更(挿入、更新、削除など)をリアルタイムで追跡し、他のシステムに転送することができます。DebeziumはKafkaをベースに動作し、Kafkaコネクターとして提供されており、イベント駆動のデータストリーミングが必要な環境で広く使用されています。
Debeziumの主な機能:
- リアルタイムデータストリーミング: データベース内のすべての変更をキャプチャしてリアルタイムでストリーミングし、最新のデータを維持しながらシステム間でデータ同期が可能です。
- データ整合性の保証: トランザクション単位でデータを送信して整合性を保証し、Kafkaの強力なデータ処理機能と組み合わせることで、障害発生時にもデータの損失なく復旧が容易です。
- 複雑なETL作業の簡素化: データ変更のみをストリーミングすることで、大容量のデータベースでも効率的にデータ処理が可能です。
Debeziumの代表的なコネクター:
- MySQLコネクター: MySQLデータベースの変更をキャプチャし、ストリーミングします。
- PostgreSQLコネクター: PostgreSQLの論理レプリケーションを利用して変更をキャプチャします。
- MongoDBコネクター: MongoDBの操作ログ(OpLog)を読み取り、データ変更を追跡します。
- SQL Serverコネクター: SQL ServerのCDC機能を利用して変更をキャプチャします。
- Oracleコネクター: Oracleデータベースの変更をキャプチャするコネクターです。
その他のコネクター:
Debezium以外にも、Kafka Connectエコシステムにはさまざまなデータソースとシンクをサポートするコネクターが存在します。例えば:
- Confluentコネクター: Confluentが提供するKafkaコネクターで、Salesforce、Google BigQuery、ElasticSearchなど、さまざまな外部システムとの統合をサポートするコネクターがあります。
- JDBCコネクター: 一般的なデータベースとの接続が可能で、JDBCドライバーを使用するデータソースとの連携ができます。
- Elasticsearchコネクター: KafkaストリームデータをElasticsearchに転送して、検索機能に活用できます。
- AWS S3コネクター: KafkaデータをAWS S3に転送し、長期保存やアーカイブに使用します。
Debeziumは特に データベースとイベント駆動アーキテクチャ間のリアルタイムデータ同期に適しており、リアルタイム分析、モニタリング、マイクロサービスアーキテクチャなどで多く利用されています。
2.DebeziumはKafkaでのみ使用可能か?
Debeziumは基本的に Kafkaを通じてデータ変更イベントを伝達するよう設計されていますが、Kafkaなしでも他のメッセージングシステムやデータ処理システムと接続できるように、いくつかの拡張オプションが存在します。
KafkaなしでDebeziumを使用する方法:
-
Debezium Server:
- DebeziumはKafka以外のシステムにデータをストリーミングできるように Debezium Serverを提供しています。
- Debezium Serverは基本的にKafkaコネクターフレームワークを維持しつつ、Kafka以外のメッセージブローカーやデータベースにCDCイベントを転送できます。
- サポートされる出力先: Amazon Kinesis、Google Pub/Sub、Apache Pulsar、Redis Streamsなどです。このオプションを使用すると、Kafka環境なしでさまざまなシステムにCDCイベントを転送できます。
-
Kafka Connectを通じたHTTP Sink:
- Kafkaを使用しない場合でも、HTTP APIを利用して特定のアプリケーションにCDCイベントを送信できます。
- この方法では Kafka ConnectのHTTP Sinkコネクターを使用して、データ変更をRESTful HTTP APIに転送できます。
-
カスタムSinkの開発:
- Debeziumのデータを他のシステムに送信するために カスタムSinkアプリケーションを開発することも可能です。
- たとえば、DebeziumがCDCイベントをKafkaに送信するよう設定し、Kafkaコンシューマーアプリケーションを開発してデータを特定のアプリケーションやデータベースに直接転送することができます。
アーキテクチャ
サンプルコード
1. サービス登録
version: "3.7"
services:
postgres:
image: debezium/postgres:13
ports:
- 5432:5432
environment:
- POSTGRES_USER=docker
- POSTGRES_PASSWORD=docker
- POSTGRES_DB=exampledb
networks:
- cdc_network
zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- cdc_network
kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
- 9092:9092
networks:
- cdc_network
debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_PLUGIN_PATH: /kafka/connect
CONNECT_SECURITY_PROTOCOL: PLAINTEXT
depends_on: [kafka]
ports:
- 8083:8083
networks:
- cdc_network
schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
ports:
- 8081:8081
depends_on: [zookeeper, kafka]
networks:
- cdc_network
networks:
cdc_network:
driver: bridge
# psql -U docker -d exampledb -w
# exampledb=# CREATE TABLE student (id integer primary key, name varchar);
# CREATE TABLE
# exampledb=# ALTER TABLE public.student REPLICA IDENTITY FULL;
# ALTER TABLE
# exampledb=# INSERT INTO student (id,name) VALUES (1, 'bob');
# INSERT 0 1
# exampledb=# select * from student;
# id | name