はじめに
Debezium で MySQL の変更データを取得する の記事を以前書きました。
続いて Debezium で SQL Server の変更データを取得する方法を確認しました。
下記2つを参考にしました。
https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server
https://debezium.io/documentation/reference/0.9/connectors/sqlserver.html
実行環境
AWS EC2 Linux2
事前に Docker のインストールが必要。
セットアップ
今回は用意されたファイルを利用しますので、まずファイルダウンロードします。
curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/register-sqlserver.json
curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/docker-compose-sqlserver.yaml
curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
- register-sqlserver.json:SQL Serverのコネクタインスタンスを記載するファイル
- docker-compose-sqlserver.yaml:zookeeper、kafka、SQL Server、connectのコンテナを起動するためのファイル
- debezium-sqlserver-init/inventory.sql:SQL Server内にDB、テーブル、データを作成・登録するためのファイル
各コンテナ起動
zookeeper、kafka、SQL Server、connectのコンテナを起動します。
export DEBEZIUM_VERSION=0.9
docker-compose -f docker-compose-sqlserver.yaml up
docker ps
にて起動したことを確認します。
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
899bba830789 debezium/connect:0.9 "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, 9779/tcp ec2-user_connect_1
4fabee9ff69e debezium/kafka:0.9 "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp ec2-user_kafka_1
73765146588s debezium/zookeeper:0.9 "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, 9779/tcp ec2-user_zookeeper_1
cf0b6e715693 microsoft/mssql-server-linux:2017-CU9-GDR2 "/opt/mssql/bin/sqls…" 2 minutes ago Up 2 minutes 0.0.0.0:1433->1433/tcp ec2-user_sqlserver_1
SQL ServerにDB、テーブル、データを登録
cat inventory.sql | docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
以下のコマンドでデータがインサートされたことを確認できます。
docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -q "use testDB; select * from customers;"'
SQL Server Connector を設定
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
register-sqlserver.jsonは以下の構成になっています。
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "1433", (4)
"database.user": "sa", (5)
"database.password": "Password!", (6)
"database.dbname": "testDB", (7)
"database.server.name": "fullfillment", (8)
"table.whitelist": "dbo.customers", (9)
"database.history.kafka.bootstrap.servers": "kafka:9092", (10)
"database.history.kafka.topic": "dbhistory.fullfillment" (11)
}
}
- (1) Kafka Connectサービスに登録するときのコネクタ名称。
- (2) SQL Serverコネクタクラス名称。
- (3) SQL Serverインスタンスのアドレス。
- (4) SQL Serverインスタンスのポート番号(1433)。
- (5) SQL Serverユーザー名称。
- (6) SQL Serverユーザーパスワード。
- (7) 変更をキャプチャするデータベース名称。
- (8) SQL Serverインスタンス/クラスターの論理名。
- (9) Debeziumがキャプチャする必要がある変更を含むすべてのテーブルのリスト
- (10) コネクターがデータベース履歴トピックへのDDLステートメントの書き込みおよび回復に使用するKafkaブローカーリスト。
- (11) コネクターがDDLステートメントを作成およびリカバリーするデータベース履歴トピックの名前。
参考資料
セットアップは以上となります。
変更メッセージ取得
実際にJSON形式で変更をキャプチャーしてみます。
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
--bootstrap-server kafka:9092 \
--from-beginning \
--property print.key=true \
--topic server1.dbo.customers
customersテーブルにインサートしたデータがJSON形式で表示されました。
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1001}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118560,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118560}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1002}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1003}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1004}} {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}
さらに、customers に対して Update してみます。
id=1004 に対して first_name を「Anne→123」に変更します。
docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -q "use testDB; update customers set first_name = 123 where id=1004;"'
変更がキャプチャーされました。
{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"123","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569825287100,"change_lsn":"00000028:00003640:0002","commit_lsn":"00000028:00003640:0003","snapshot":false},"op":"u","ts_ms":1569825292094}}
変更箇所抜粋。first_nameが変わっていることが分かりました。
"payload": {
"before": {
"id": 1004,
"first_name": "Anne",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
"after": {
"id": 1004,
"first_name": "123",
"last_name": "Kretchmar",
"email": "annek@noanswer.org"
},
最後に
確認が完了したのでコンテナを削除します。
docker-compose -f docker-compose-sqlserver.yaml down