1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Debezium - データソースの変更を検知して Kafka Topics に格納されたメッセージを確認する

Last updated at Posted at 2021-12-23

概要

マイクロサービス化案件に携わっており、Debezium を触ったので、明日のためにメモしときます。

登場人物

Kafka

オープンソースの分散メッセージングシステム。メッセージキューって言ってよいのかな。
メッセージキューはマイクロサービスにおけるバックエンドの要。

Kafka Connect

Kafka と他のシステム間でデータをスケーラブルかつ、ストリーミングするためのソフトウェア。
Kafka との接続方法やリバランスなど、手間のかかる問題を処理してくれる。

Debezium

データの変更をキャプチャするためのオープンソースの分散プラットフォーム。
データソース (DBなど) の変更を監視し、変更内容を Kafka Topics に格納する。
Kafka 上に構築されている。
KafKa Connect が使用されている。
トランザクションログを監視している。

ZooKeeper

大規模分散システムでよく利用される、設定情報の集中管理や名前付けなどのサービスを提供するソフトウェア。
kafka の実行に必要なようで。

引用: [DEBEZIUM アーキテクチャー](https://access.redhat.com/documentation/ja-jp/red_hat_integration/2020-q2/html/debezium_user_guide/debezium-architecture) ## 手順 最初は [Tutorial](https://debezium.io/documentation/reference/tutorial.html) に沿ってやっていきます。

1. ZooKeeperの起動

$ docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:1.8

2. Kafkaの起動

$ docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:1.8

3. MySQLの起動

サンプルスキーマが含まれているイメージです。

$ docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:1.8

4. MySQLコマンドラインクライアントの起動

まだ使わないけど、起動しておきます。

$ docker run -it --rm --name mysqlterm --link mysql --rm mysql:8.0 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

5. KafkaConnectを起動

Kafka と MySQL のデータ連携に必要です。
API で操作可能なので curl 使います。

$ docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:1.8

DebeziumMySQLコネクタを登録します。

$ curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.include.list": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

ここまでで、画像のように起動できていればOK

6. 登録されたメッセージを確認する

Kafka 実行中のコンテナに入る。

$ docker exec -it kafka bash

以下のコマンドを使用すると、Kafka に入ってきたメッセージの内容をリアルタイムに確認できます。

[kafka@851e7f6e1d62 ~]$ bin/kafka-console-consumer.sh --topic dbserver1.inventory.customers --bootstrap-server kafka:9092

[--topic dbserver1.inventory.customers]
以下のコマンドで確認。
$ curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector/topics

[--from-beginning]
今回使用していないが、このパラメーターを使用することで、全メッセージを確認できる。

7. データに変更を加える

MySQLコマンドラインクライアントで

use inventory;
UPDATE customers SET first_name='Anne123' WHERE id=1004;

customers テーブルへ変更を加えると、kafka-console-consumer.sh の画面に変更内容のメッセージキューが入ってくることが確認できる。

{
  "schema": {
    "type": "struct",
    "fields": [
      {
          // 省略...
      }
  },
  "payload": {
    "before": {
      "id": 1004,
      "first_name": "Anne", // ★変更前
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "after": {
      "id": 1004,
      "first_name": "Anne123", // ★変更後
      "last_name": "Kretchmar",
      "email": "annek@noanswer.org"
    },
    "source": {
      "version": "1.8.0.Final",
      "connector": "mysql",
      "name": "dbserver1",
      "ts_ms": 1640265034000,
      "snapshot": "false",
      "db": "inventory",
      "sequence": null,
      "table": "customers",
      "server_id": 223344,
      "gtid": null,
      "file": "mysql-bin.000003",
      "pos": 1599,
      "row": 0,
      "thread": null,
      "query": null
    },
    "op": "u",
    "ts_ms": 1640265034477,
    "transaction": null
  }
}

続き

明日は、上記の環境を windows PC で構築し、メッセージキューに入ってきた内容で lambda を叩いてみます。
よく使う環境なら docker-compose にするのが良いんだろうな…

2021-12-24 追記
Windows でうまくコマンドを実行できないときは PowerShell を使うと良い。
Windows には Curl は別途インストールが必要。(または Postman などの API Client)

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?