12
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Debezium で MySQL の変更データを取得する

Last updated at Posted at 2019-09-29

はじめに

Debezium はDBの変更をイベントストリームに変換することができる分散プラットホームです。例えば MySQL に対して行った Update を 行レベルで確認することができます(Json形式)。

~省略~
  "payload": {
    "before": {
      "id": 100,
      "first_name": "Oliver",
      "last_name": "Jack",
      "email": "jack@test.com"
    },
    "after": {
      "id": 100,
      "first_name": "Oliver",
      "last_name": "Harry",
      "email": "harry@test.com"
    }
~省略~

Debezium は Apache Kafka をベースに実装されており、データ変更の履歴を Kafkaログに記録します。アプリケーションが停止した場合でも、再起動すれば中断したイベントから変更を確認できます。
さらに、MySQL、PostgreSQL、MongoDB、SQL Server、Oracle など多くの DB に対応しています。

チュートリアルがあるので試してみました。
https://debezium.io/documentation/reference/0.9/tutorial.html

チュートリアルが長いので、なるべく簡略化して書きたいと思います。
チュートリアルでは MySQL の変更データを取得する内容になっています。

実行環境

AWS EC2 Linux2
事前に Docker のインストールが必要。

セットアップ

Debezium は ZooKeeper、Kafka、Debeziumコネクタサービスの3つが主なコンポーネントとなります。
動作を確認するために docker run毎に新しいターミナルを立ち上げてください。docker run のオプションで -it を指定することでインタラクティブにターミナルに内容が表示されます。また、--rm を指定することでチュートリアルが終わり停止させたときに削除されます。

zookeeper 起動

zookeeper は、分散処理システムを構築するためのミドルウェアです。
Kafka は複数台の Kafka サーバーでクラスターを組むことができ、各 Kafka サーバーを連携させるのが zookeeperです。

docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.9

Kafka 起動

Kafka は分散メッセージキューです。一時的にデータ保持するミドルウェアです。

docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.9

MySQL 起動

MySQL はリレーショナルデータベースです。
今回はチュートリアルのため既にテーブルなどが作成されたイメージを使用します。

docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.9

MySQL command line client 起動

MySQL を操作するためにコマンドラインクライアントを起動します。

docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'

MySQL command line client にてuse inventory;を実行し、次にshow tables; を実行すると、すでにテーブルが存在することが分かります。

Kafka Connect 起動

Kafka Connect はKafkaと周辺のシステムのストリームデータをやりとりするための通信規格とライブラリとツールです。

docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.9

curl -H "Accept:application/json" localhost:8083/ を実行して値が返ってくれば Kafka Connect REST API が起動しています。

MySQL モニタリング

connector から DB、Kafka に接続するために必要な情報を設定します。

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

HTTP/1.1 201 Created と表示されることを確認します。

コマンドで設定されたことを確認します。

curl -i -X GET -H "Accept:application/json" localhost:8083/connectors/inventory-connector

以上でセットアップは完了です。

データ変更時の動作確認

MySQL のデータを変更して変更内容が取得できるか見てみます。

Kafka にメッセージが入ったかを確認するために watcher を起動します。

docker run -it --name watcher --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k dbserver1.inventory.customers

初期接続時にはテーブル作成時からのすべての変更を読み取るため、全データが変更データとして表示されます。次回からは読み取り済みのログの続きから読み取るようになっています。

WARNING: Using default BROKER_ID=1, which is valid only for non-clustered installations.
Using ZOOKEEPER_CONNECT=172.17.0.2:2181
Using KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.7:9092
Using KAFKA_BROKER=172.17.0.3:9092
Contents of topic dbserver1.inventory.customers:
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1001}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1002}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1003}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"dbserver1.inventory.customers.Key"},"payload":{"id":1004}}        {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"dbserver1.inventory.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"server_id"},{"type":"int64","optional":false,"field":"ts_sec"},{"type":"string","optional":true,"field":"gtid"},{"type":"string","optional":false,"field":"file"},{"type":"int64","optional":false,"field":"pos"},{"type":"int32","optional":false,"field":"row"},{"type":"boolean","optional":true,"default":false,"field":"snapshot"},{"type":"int64","optional":true,"field":"thread"},{"type":"string","optional":true,"field":"db"},{"type":"string","optional":true,"field":"table"},{"type":"string","optional":true,"field":"query"}],"optional":false,"name":"io.debezium.connector.mysql.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"dbserver1.inventory.customers.Envelope"},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"mysql","name":"dbserver1","server_id":0,"ts_sec":0,"gtid":null,"file":"mysql-bin.000003","pos":154,"row":0,"snapshot":true,"thread":null,"db":"inventory","table":"customers","query":null},"op":"c","ts_ms":1569744224564}}

データ Update

データを Update してみます。

use inventory;
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
出力結果
{
	"schema": {
		"type": "struct",
		"fields": [
			{
				"type": "struct",
				"fields": [
					{
						"type": "int32",
						"optional": false,
						"field": "id"
					},
					{
						"type": "string",
						"optional": false,
						"field": "first_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "last_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "email"
					}
				],
				"optional": true,
				"name": "dbserver1.inventory.customers.Value",
				"field": "before"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "int32",
						"optional": false,
						"field": "id"
					},
					{
						"type": "string",
						"optional": false,
						"field": "first_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "last_name"
					},
					{
						"type": "string",
						"optional": false,
						"field": "email"
					}
				],
				"optional": true,
				"name": "dbserver1.inventory.customers.Value",
				"field": "after"
			},
			{
				"type": "struct",
				"fields": [
					{
						"type": "string",
						"optional": true,
						"field": "version"
					},
					{
						"type": "string",
						"optional": true,
						"field": "connector"
					},
					{
						"type": "string",
						"optional": false,
						"field": "name"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "server_id"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "ts_sec"
					},
					{
						"type": "string",
						"optional": true,
						"field": "gtid"
					},
					{
						"type": "string",
						"optional": false,
						"field": "file"
					},
					{
						"type": "int64",
						"optional": false,
						"field": "pos"
					},
					{
						"type": "int32",
						"optional": false,
						"field": "row"
					},
					{
						"type": "boolean",
						"optional": true,
						"default": false,
						"field": "snapshot"
					},
					{
						"type": "int64",
						"optional": true,
						"field": "thread"
					},
					{
						"type": "string",
						"optional": true,
						"field": "db"
					},
					{
						"type": "string",
						"optional": true,
						"field": "table"
					},
					{
						"type": "string",
						"optional": true,
						"field": "query"
					}
				],
				"optional": false,
				"name": "io.debezium.connector.mysql.Source",
				"field": "source"
			},
			{
				"type": "string",
				"optional": false,
				"field": "op"
			},
			{
				"type": "int64",
				"optional": true,
				"field": "ts_ms"
			}
		],
		"optional": false,
		"name": "dbserver1.inventory.customers.Envelope"
	},
	"payload": {
		"before": {
			"id": 1004,
			"first_name": "Anne",
			"last_name": "Kretchmar",
			"email": "annek@noanswer.org"
		},
		"after": {
			"id": 1004,
			"first_name": "Anne Marie",
			"last_name": "Kretchmar",
			"email": "annek@noanswer.org"
		},
		"source": {
			"version": "0.9.5.Final",
			"connector": "mysql",
			"name": "dbserver1",
			"server_id": 223344,
			"ts_sec": 1569745138,
			"gtid": null,
			"file": "mysql-bin.000003",
			"pos": 364,
			"row": 0,
			"snapshot": false,
			"thread": 2,
			"db": "inventory",
			"table": "customers",
			"query": null
		},
		"op": "u",
		"ts_ms": 1569745138230
	}
}

実行結果の結果が長いため一部抜粋しました。変更前後のデータが取得できていることが分かります。

	"payload": {
		"before": {
			"id": 1004,
			"first_name": "Anne",
			"last_name": "Kretchmar",
			"email": "annek@noanswer.org"
		},
		"after": {
			"id": 1004,
			"first_name": "Anne Marie",
			"last_name": "Kretchmar",
			"email": "annek@noanswer.org"
		},

最後に

チュートリアルが終わったら Docker で起動した各サービスを必要に応じて停止させます。
docker run 時のオプションで --rm を指定したため、停止することで削除されます。

docker stop mysqlterm watcher connect mysql kafka zookeeper

実行しているプロセスがないことを確認して完了です。

docker ps -a
12
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?