0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Debezium CDC (Change Data Capture)

Posted at

1.Debeziumとは何か?

Debeziumはオープンソースのデータ変更キャプチャ(Change Data Capture, CDC)プラットフォームであり、データベースで発生するすべての変更(挿入、更新、削除など)をリアルタイムで追跡し、他のシステムに転送することができます。DebeziumはKafkaをベースに動作し、Kafkaコネクターとして提供されており、イベント駆動のデータストリーミングが必要な環境で広く使用されています。

Debeziumの主な機能

  1. リアルタイムデータストリーミング: データベース内のすべての変更をキャプチャしてリアルタイムでストリーミングし、最新のデータを維持しながらシステム間でデータ同期が可能です。
  2. データ整合性の保証: トランザクション単位でデータを送信して整合性を保証し、Kafkaの強力なデータ処理機能と組み合わせることで、障害発生時にもデータの損失なく復旧が容易です。
  3. 複雑なETL作業の簡素化: データ変更のみをストリーミングすることで、大容量のデータベースでも効率的にデータ処理が可能です。

Debeziumの代表的なコネクター

  • MySQLコネクター: MySQLデータベースの変更をキャプチャし、ストリーミングします。
  • PostgreSQLコネクター: PostgreSQLの論理レプリケーションを利用して変更をキャプチャします。
  • MongoDBコネクター: MongoDBの操作ログ(OpLog)を読み取り、データ変更を追跡します。
  • SQL Serverコネクター: SQL ServerのCDC機能を利用して変更をキャプチャします。
  • Oracleコネクター: Oracleデータベースの変更をキャプチャするコネクターです。

その他のコネクター

Debezium以外にも、Kafka Connectエコシステムにはさまざまなデータソースとシンクをサポートするコネクターが存在します。例えば:

  1. Confluentコネクター: Confluentが提供するKafkaコネクターで、Salesforce、Google BigQuery、ElasticSearchなど、さまざまな外部システムとの統合をサポートするコネクターがあります。
  2. JDBCコネクター: 一般的なデータベースとの接続が可能で、JDBCドライバーを使用するデータソースとの連携ができます。
  3. Elasticsearchコネクター: KafkaストリームデータをElasticsearchに転送して、検索機能に活用できます。
  4. AWS S3コネクター: KafkaデータをAWS S3に転送し、長期保存やアーカイブに使用します。

Debeziumは特に データベースとイベント駆動アーキテクチャ間のリアルタイムデータ同期に適しており、リアルタイム分析、モニタリング、マイクロサービスアーキテクチャなどで多く利用されています。

2.DebeziumはKafkaでのみ使用可能か?

Debeziumは基本的に Kafkaを通じてデータ変更イベントを伝達するよう設計されていますが、Kafkaなしでも他のメッセージングシステムやデータ処理システムと接続できるように、いくつかの拡張オプションが存在します。

KafkaなしでDebeziumを使用する方法

  1. Debezium Server:

    • DebeziumはKafka以外のシステムにデータをストリーミングできるように Debezium Serverを提供しています。
    • Debezium Serverは基本的にKafkaコネクターフレームワークを維持しつつ、Kafka以外のメッセージブローカーやデータベースにCDCイベントを転送できます。
    • サポートされる出力先: Amazon KinesisGoogle Pub/SubApache PulsarRedis Streamsなどです。このオプションを使用すると、Kafka環境なしでさまざまなシステムにCDCイベントを転送できます。
  2. Kafka Connectを通じたHTTP Sink:

    • Kafkaを使用しない場合でも、HTTP APIを利用して特定のアプリケーションにCDCイベントを送信できます。
    • この方法では Kafka ConnectのHTTP Sinkコネクターを使用して、データ変更をRESTful HTTP APIに転送できます。
  3. カスタムSinkの開発:

    • Debeziumのデータを他のシステムに送信するために カスタムSinkアプリケーションを開発することも可能です。
    • たとえば、DebeziumがCDCイベントをKafkaに送信するよう設定し、Kafkaコンシューマーアプリケーションを開発してデータを特定のアプリケーションやデータベースに直接転送することができます。

アーキテクチャ

image.png

サンプルコード

1. サービス登録

image.png

version: "3.7"
services:
  postgres:
    image: debezium/postgres:13
    ports:
      - 5432:5432
    environment:
      - POSTGRES_USER=docker
      - POSTGRES_PASSWORD=docker
      - POSTGRES_DB=exampledb
    networks:
      - cdc_network

  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
    networks:
      - cdc_network

  kafka:
    image: confluentinc/cp-enterprise-kafka:5.5.3
    depends_on: [zookeeper]
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_BROKER_ID: 1
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9991
    ports:
      - 9092:9092
    networks:
      - cdc_network

  debezium:
    image: debezium/connect:1.4
    environment:
      BOOTSTRAP_SERVERS: kafka:9092
      GROUP_ID: 1
      CONFIG_STORAGE_TOPIC: connect_configs
      OFFSET_STORAGE_TOPIC: connect_offsets
      STATUS_STORAGE_TOPIC: connect_statuses
      KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONNECT_PLUGIN_PATH: /kafka/connect
      CONNECT_SECURITY_PROTOCOL: PLAINTEXT
    depends_on: [kafka]
    ports:
      - 8083:8083
    networks:
      - cdc_network

  schema-registry:
    image: confluentinc/cp-schema-registry:5.5.3
    environment:
      - SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
      - SCHEMA_REGISTRY_HOST_NAME=schema-registry
      - SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
    ports:
      - 8081:8081
    depends_on: [zookeeper, kafka]
    networks:
      - cdc_network

networks:
  cdc_network:
    driver: bridge

# psql -U docker -d exampledb -w
# exampledb=# CREATE TABLE student (id integer primary key, name varchar);
# CREATE TABLE
# exampledb=# ALTER TABLE public.student REPLICA IDENTITY FULL;
# ALTER TABLE
# exampledb=# INSERT INTO student (id,name) VALUES (1, 'bob');
# INSERT 0 1
# exampledb=# select * from student;
#  id | name
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?