はじめに
OpenShift上に構成したConfluent PlatformとMySQLをDebeziumのMySQL Source Connectorを利用してデータ連携してみます。
前提
- IBM Cloud® Virtual Private Cloud(VPC)にOpenShift Container Platform(OCP) 4.7を構成
- OCP上にConfluent Platform 6.0.0を構成(Confluent Operator 1.7.0を利用)
- OCP上にMySQL 8.0を構成
参考
Debezium MySQL connector configuration example
Debezium MySQL Source Connector Configuration Properties - Database History Parameters
MySQLのTable更新情報をKafka上でksql処理する
構成確認
Confluent Platformが正常に起動していることを確認する。
>oc get all -n confluent-namespace
I0912 23:39:24.059883 37555 request.go:621] Throttling request took 1.090165589s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/packages.operators.coreos.com/v1?timeout=32s
NAME READY STATUS RESTARTS AGE
pod/cc-operator-6ffb5f5489-b6l6r 1/1 Running 0 13h
pod/connectors-0 1/1 Running 0 13h
pod/connectors-1 1/1 Running 0 13h
pod/controlcenter-0 1/1 Running 0 13h
pod/kafka-0 1/1 Running 0 13h
pod/kafka-1 1/1 Running 0 13h
pod/kafka-2 1/1 Running 0 13h
pod/ksql-0 1/1 Running 0 13h
pod/ksql-1 1/1 Running 0 13h
pod/replicator-0 1/1 Running 0 13h
pod/replicator-1 1/1 Running 0 13h
pod/schemaregistry-0 1/1 Running 0 13h
pod/zookeeper-0 1/1 Running 0 13h
pod/zookeeper-1 1/1 Running 0 13h
pod/zookeeper-2 1/1 Running 0 13h
(以下、省略)
※Kafkaサーバーのauto.create.topics.enableプロパティはtrueにしておくこと。
MySQLが正常に稼働していることを確認する。
>oc get all -n shoheim
I0912 23:02:28.265544 36803 request.go:621] Throttling request took 1.148324247s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/monitoring.operator.ibm.com/v1alpha1?timeout=32s
NAME READY STATUS RESTARTS AGE
pod/mysql-1-deploy 0/1 Completed 0 45s
pod/mysql-1-z54sl 1/1 Running 0 38s
(以下、省略)
※ユーザー権限やログ周りの設定は以下を参考に実施しておく。
Debezium connector for MySQL - Set up
MySQLには以下のデータベース、テーブルを構成しておく。
mysql> show databases;
+--------------------+
| Database |
+--------------------+
| information_schema |
| opdb |
+--------------------+
mysql> SHOW TABLES;
+----------------+
| Tables_in_opdb |
+----------------+
| menus |
+----------------+
1 row in set (0.00 sec)
mysql> SELECT * from menus;
+------+-----------+
| id | name |
+------+-----------+
| 1 | curry |
| 2 | hamburger |
+------+-----------+
2 rows in set (0.00 sec)
MySQL Source Connectorの構成
1. MySQL Connector Pluginのインストール
Confluent Hubクライアント(confluent-hubコマンド)を利用して、ConnectorsのポッドにDebezium MySQL ConnectorのPluginをインストールする。
>oc exec -it connectors-0 /bin/bash
bash-4.4$ confluent-hub install debezium/debezium-connector-mysql:1.6.0
The component can be installed in any of the following Confluent Platform installations:
1. / (installed rpm/deb package)
2. / (where this tool is installed)
Choose one of these to continue the installation (1-2): 2
Do you want to install this into /usr/share/confluent-hub-components? (yN) y
Component's license:
Apache 2.0
https://github.com/debezium/debezium/blob/master/LICENSE.txt
I agree to the software license agreement (yN) y
You are about to install 'debezium-connector-mysql' from Debezium Community, as published on Confluent Hub.
Do you want to continue? (yN) y
Downloading component Debezium MySQL CDC Connector 1.6.0, provided by Debezium Community from Confluent Hub and installing into /usr/share/confluent-hub-components
Detected Worker's configs:
1. Standard: /etc/kafka/connect-distributed.properties
2. Standard: /etc/kafka/connect-standalone.properties
3. Standard: /etc/schema-registry/connect-avro-distributed.properties
4. Standard: /etc/schema-registry/connect-avro-standalone.properties
5. Used by Connect process with PID : /opt/confluent/etc/connect/connect.properties
Do you want to update all detected configs? (yN) y
Adding installation directory to plugin path in the following files:
/etc/kafka/connect-distributed.properties
/etc/kafka/connect-standalone.properties
/etc/schema-registry/connect-avro-distributed.properties
/etc/schema-registry/connect-avro-standalone.properties
/opt/confluent/etc/connect/connect.properties
Completed
※ここでは、Pluginのインストール先を/usr/share/confluent-hub-componentsとしている。そのため、事前にConnectorsポッドには上記ディレクトリをマウントポイントとするPersistent Volumeを構成している。
インストールされたものを確認
bash-4.4$ cd /usr/share/confluent-hub-components
bash-4.4$ ls -ltr
total 0
drwxrwxrwx. 5 1001 root 63 Oct 25 02:26 debezium-debezium-connector-mysql
bash-4.4$ ls -l debezium-debezium-connector-mysql
total 8
drwxrwxrwx. 2 1001 root 59 Oct 25 02:26 assets
drwxrwxrwx. 2 1001 root 175 Oct 25 02:26 doc
drwxrwxrwx. 2 1001 root 4096 Oct 25 02:26 lib
-rw-rw-rw-. 1 1001 root 2868 Oct 25 02:26 manifest.json
bash-4.4$ ls -l debezium-debezium-connector-mysql/lib
total 9624
-rw-rw-rw-. 1 1001 root 337864 Oct 25 02:26 antlr4-runtime-4.8.jar
-rw-rw-rw-. 1 1001 root 20743 Oct 25 02:26 debezium-api-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 376353 Oct 25 02:26 debezium-connector-mysql-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 866648 Oct 25 02:26 debezium-core-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 2777855 Oct 25 02:26 debezium-ddl-parser-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 4617 Oct 25 02:26 failureaccess-1.0.1.jar
-rw-rw-rw-. 1 1001 root 2858426 Oct 25 02:26 guava-30.0-jre.jar
-rw-rw-rw-. 1 1001 root 192762 Oct 25 02:26 mysql-binlog-connector-java-0.25.1.jar
-rw-rw-rw-. 1 1001 root 2397321 Oct 25 02:26 mysql-connector-java-8.0.21.jar
ポッドを再起動して、ローカルマシンからRESTでPluginを確認する。
>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connector-plugins -k |jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 597 100 597 0 0 711 0 --:--:-- --:--:-- --:--:-- 710
[
{
"class": "io.debezium.connector.mysql.MySqlConnector",
"type": "source",
"version": "1.6.0.Final"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "6.1.0-ce"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "6.1.0-ce"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
io.debezium.connector.mysql.MySqlConnectorが表示されている。
(他のPluginはデフォルトで入っていたもの)
Confluent Control Center(C3)からも見える。
2. MySQL Source Connectorの構成
C3からも構成できるが、ここではローカルマシンからRESTでConnectorを構成
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" -k https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/ -d '{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "172.21.17.38", "database.port": "3306", "database.user": "user", "database.password": "pass", "database.server.id": "184055", "database.server.name": "dbserver1", "database.include.list": "opdb", "database.history.kafka.bootstrap.servers": "kafka:9071", "database.history.kafka.topic": "dbhistory.opdb", "table.include.list": "opdb.menus", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "include.schema.changes": "true" } }'
データ部を整形して表示
{
"name": "mysql-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "172.21.17.38",
"database.port": "3306",
"database.user": "user",
"database.password": "pass",
"database.server.id": "184055",
"database.server.name": "dbserver1",
"database.include.list": "opdb",
"database.history.kafka.bootstrap.servers": "kafka:9071",
"database.history.kafka.topic": "dbhistory.opdb",
"table.include.list": "opdb.menus",
"database.history.consumer.security.protocol": "SASL_SSL",
"database.history.consumer.sasl.mechanism": "PLAIN",
"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
"database.history.producer.security.protocol": "SASL_SSL",
"database.history.producer.sasl.mechanism": "PLAIN",
"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
"include.schema.changes": "true"
}
}
プロパティは以下を参照。
MySQL connector configuration example
※Connectorのバージョンによってプロパティが変わることがあるので、使用しているバージョンのドキュメントを参照すること。ここでは1.6を使用。
基本のプロパティは上記リンクで確認できるが、Kafkaサーバーに対してSASL/PLAINで接続するためのプロパティ(database.history.producer.xxx、database.history.consumer.xxx)を追加している。
Debezium MySQL Source Connector Configuration Properties - Database History Parameters
構成したSource Connectorが正常に稼働したことを確認。
>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/mysql-connector/status -k |jq
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 245 100 245 0 0 641 0 --:--:-- --:--:-- --:--:-- 641
{
"name": "mysql-connector",
"connector": {
"state": "RUNNING",
"worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
}
],
"type": "source"
}
トピック上のデータの確認
構成したConnectorによってMySQLのデータがKafkaのトピックに送信されたことを確認する。
まず、Kafka上に作成されたトピックを確認する。
odserver1.opdb.menusトピックに以下のデータが送信されている。
[
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 0,
"timestamp": 1639280629029,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": null,
"after": {
"id": 1,
"name": "curry"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639280628022,
"snapshot": "true",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 3264,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1639280628024,
"transaction": null
}
},
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 1,
"timestamp": 1639280629030,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": null,
"after": {
"id": 2,
"name": "hamburger"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639280628026,
"snapshot": "true",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 3264,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1639280628026,
"transaction": null
}
}
]
Offset=0にmenusテーブルの1行目のデータ、Offset=1に2行目のデータが入っていることが分かる。
menusテーブルに行を追加すると、トピックにも新しいデータが送信される。
INSERT INTO menus (id, name) VALUES (3, 'katsudon');
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 2,
"timestamp": 1639280629030,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": null,
"after": {
"id": 3,
"name": "katsudon"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639280628026,
"snapshot": "true",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 0,
"gtid": null,
"file": "binlog.000002",
"pos": 3264,
"row": 0,
"thread": null,
"query": null
},
"op": "r",
"ts_ms": 1639280628026,
"transaction": null
}
}
テーブル内のデータをUPDATEした場合は、以下のメッセージがトピックに送信される。
UPDATE menus SET name='tendon' WHERE id=3;
[
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 4,
"timestamp": 1639530256773,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": {
"id": 3,
"name": "katsudon"
},
"after": {
"id": 3,
"name": "tendon"
},
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639530256000,
"snapshot": "false",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 3486,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1639530256515,
"transaction": null
}
}
]
beforeにはUPDATE前のデータ、afterにはUPDATE後のデータが入っている。
DELETEの場合は以下。
DELETE FROM menus WHERE id=4;
[
{
"topic": "dbserver1.opdb.menus",
"partition": 0,
"offset": 5,
"timestamp": 1639530687264,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"before": {
"id": 4,
"name": "gyoza"
},
"after": null,
"source": {
"version": "1.6.0.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1639530687000,
"snapshot": "false",
"db": "opdb",
"sequence": null,
"table": "menus",
"server_id": 1,
"gtid": null,
"file": "binlog.000002",
"pos": 3794,
"row": 0,
"thread": null,
"query": null
},
"op": "d",
"ts_ms": 1639530687221,
"transaction": null
}
}
]
Connectorのdatabase.history.kafka.topicプロパティで指定したdbhistory.opdbトピックには以下のメッセージが格納されている。
このトピックは、Connectorがデータベースのスキーマ履歴を保管するために利用される。
[
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 0,
"timestamp": 1639280627942,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "",
"ddl": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 1,
"timestamp": 1639280627970,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "DROP TABLE IF EXISTS `opdb`.`menus`",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 2,
"timestamp": 1639280627973,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "DROP DATABASE IF EXISTS `opdb`",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 3,
"timestamp": 1639280627975,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "CREATE DATABASE `opdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 4,
"timestamp": 1639280627981,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "USE `opdb`",
"tableChanges": []
}
},
{
"topic": "dbhistory.opdb",
"partition": 0,
"offset": 5,
"timestamp": 1639280627998,
"timestampType": "CREATE_TIME",
"headers": [],
"key": null,
"value": {
"source": {
"server": "dbserver1"
},
"position": {
"ts_sec": 1639280627,
"file": "binlog.000002",
"pos": 3264,
"snapshot": true
},
"databaseName": "opdb",
"ddl": "CREATE TABLE `menus` (\n `id` int DEFAULT NULL,\n `name` varchar(100) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
"tableChanges": [
{
"type": "CREATE",
"id": "\"opdb\".\"menus\"",
"table": {
"defaultCharsetName": "utf8mb4",
"primaryKeyColumnNames": [],
"columns": [
{
"name": "id",
"jdbcType": 4,
"typeName": "INT",
"typeExpression": "INT",
"charsetName": null,
"position": 1,
"optional": true,
"autoIncremented": false,
"generated": false
},
{
"name": "name",
"jdbcType": 12,
"typeName": "VARCHAR",
"typeExpression": "VARCHAR",
"charsetName": "utf8mb4",
"length": 100,
"position": 2,
"optional": true,
"autoIncremented": false,
"generated": false
}
]
}
}
]
}
}
]
おわりに
本来はSink Connectorも構成して、他のデータベースにレプリケーションするつもりでしたが、OCP環境が諸般の事情で使えなくなったので、OCP環境が復活したら、再開しようと思います。