4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Debezium で SQL Server の変更データを取得する

Posted at

はじめに

Debezium で MySQL の変更データを取得する の記事を以前書きました。
続いて Debezium で SQL Server の変更データを取得する方法を確認しました。

下記2つを参考にしました。
https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server
https://debezium.io/documentation/reference/0.9/connectors/sqlserver.html

実行環境

AWS EC2 Linux2
事前に Docker のインストールが必要。

セットアップ

今回は用意されたファイルを利用しますので、まずファイルダウンロードします。

curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/register-sqlserver.json
curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/docker-compose-sqlserver.yaml
curl -O https://raw.githubusercontent.com/debezium/debezium-examples/master/tutorial/debezium-sqlserver-init/inventory.sql
  • register-sqlserver.json:SQL Serverのコネクタインスタンスを記載するファイル
  • docker-compose-sqlserver.yaml:zookeeper、kafka、SQL Server、connectのコンテナを起動するためのファイル
  • debezium-sqlserver-init/inventory.sql:SQL Server内にDB、テーブル、データを作成・登録するためのファイル

各コンテナ起動

zookeeper、kafka、SQL Server、connectのコンテナを起動します。

export DEBEZIUM_VERSION=0.9
docker-compose -f docker-compose-sqlserver.yaml up

docker ps にて起動したことを確認します。

CONTAINER ID        IMAGE                                        COMMAND                  CREATED             STATUS              PORTS                                                                                        NAMES
899bba830789        debezium/connect:0.9                         "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        8778/tcp, 9092/tcp, 0.0.0.0:8083->8083/tcp, 9779/tcp                                         ec2-user_connect_1
4fabee9ff69e        debezium/kafka:0.9                           "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        8778/tcp, 9779/tcp, 0.0.0.0:9092->9092/tcp                                                   ec2-user_kafka_1
73765146588s        debezium/zookeeper:0.9                       "/docker-entrypoint.…"   2 minutes ago       Up 2 minutes        0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 8778/tcp, 0.0.0.0:3888->3888/tcp, 9779/tcp   ec2-user_zookeeper_1
cf0b6e715693        microsoft/mssql-server-linux:2017-CU9-GDR2   "/opt/mssql/bin/sqls…"   2 minutes ago       Up 2 minutes        0.0.0.0:1433->1433/tcp                                                                       ec2-user_sqlserver_1

SQL ServerにDB、テーブル、データを登録

cat inventory.sql | docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'

以下のコマンドでデータがインサートされたことを確認できます。

docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -q "use testDB; select * from customers;"'

SQL Server Connector を設定

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json

register-sqlserver.jsonは以下の構成になっています。

{
  "name": "inventory-connector",  (1)
  "config": {
    "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", (2)
    "database.hostname": "192.168.99.100", (3)
    "database.port": "1433", (4)
    "database.user": "sa", (5)
    "database.password": "Password!", (6)
    "database.dbname": "testDB", (7)
    "database.server.name": "fullfillment", (8)
    "table.whitelist": "dbo.customers", (9)
    "database.history.kafka.bootstrap.servers": "kafka:9092", (10)
    "database.history.kafka.topic": "dbhistory.fullfillment" (11)
  }
}
  • (1) Kafka Connectサービスに登録するときのコネクタ名称。
  • (2) SQL Serverコネクタクラス名称。
  • (3) SQL Serverインスタンスのアドレス。
  • (4) SQL Serverインスタンスのポート番号(1433)。
  • (5) SQL Serverユーザー名称。
  • (6) SQL Serverユーザーパスワード。
  • (7) 変更をキャプチャするデータベース名称。
  • (8) SQL Serverインスタンス/クラスターの論理名。
  • (9) Debeziumがキャプチャする必要がある変更を含むすべてのテーブルのリスト
  • (10) コネクターがデータベース履歴トピックへのDDLステートメントの書き込みおよび回復に使用するKafkaブローカーリスト。
  • (11) コネクターがDDLステートメントを作成およびリカバリーするデータベース履歴トピックの名前。

参考資料

セットアップは以上となります。

変更メッセージ取得

実際にJSON形式で変更をキャプチャーしてみます。

docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic server1.dbo.customers

customersテーブルにインサートしたデータがJSON形式で表示されました。

{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1001}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1001,"first_name":"Sally","last_name":"Thomas","email":"sally.thomas@acme.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118560,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118560}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1002}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1002,"first_name":"George","last_name":"Bailey","email":"gbailey@foobar.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1003}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1003,"first_name":"Edward","last_name":"Walker","email":"ed@walker.com"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"}],"optional":false,"name":"server1.dbo.customers.Key"},"payload":{"id":1004}}                                                                            {"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":null,"after":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569814118561,"change_lsn":null,"commit_lsn":"00000027:00000078:0001","snapshot":true},"op":"r","ts_ms":1569814118561}}

さらに、customers に対して Update してみます。
id=1004 に対して first_name を「Anne→123」に変更します。

docker exec -i ec2-user_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD -q "use testDB; update customers set first_name = 123 where id=1004;"'

変更がキャプチャーされました。

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"id"},{"type":"string","optional":false,"field":"first_name"},{"type":"string","optional":false,"field":"last_name"},{"type":"string","optional":false,"field":"email"}],"optional":true,"name":"server1.dbo.customers.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":true,"field":"version"},{"type":"string","optional":true,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":true,"field":"ts_ms"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"boolean","optional":true,"field":"snapshot"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"server1.dbo.customers.Envelope"},"payload":{"before":{"id":1004,"first_name":"Anne","last_name":"Kretchmar","email":"annek@noanswer.org"},"after":{"id":1004,"first_name":"123","last_name":"Kretchmar","email":"annek@noanswer.org"},"source":{"version":"0.9.5.Final","connector":"sqlserver","name":"server1","ts_ms":1569825287100,"change_lsn":"00000028:00003640:0002","commit_lsn":"00000028:00003640:0003","snapshot":false},"op":"u","ts_ms":1569825292094}}

変更箇所抜粋。first_nameが変わっていることが分かりました。

	"payload": {
		"before": {
			"id": 1004,
			"first_name": "Anne",
			"last_name": "Kretchmar",
			"email": "annek@noanswer.org"
		},
		"after": {
			"id": 1004,
			"first_name": "123",
			"last_name": "Kretchmar",
			"email": "annek@noanswer.org"
		},

最後に

確認が完了したのでコンテナを削除します。

docker-compose -f docker-compose-sqlserver.yaml down
4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?