1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

OpenShift上に構成したConfluent PlatformとMySQLをDebeziumのConnectorで連携してみる

Last updated at Posted at 2021-12-27

はじめに

OpenShift上に構成したConfluent PlatformとMySQLをDebeziumのMySQL Source Connectorを利用してデータ連携してみます。

前提

  • IBM Cloud® Virtual Private Cloud(VPC)にOpenShift Container Platform(OCP) 4.7を構成
  • OCP上にConfluent Platform 6.0.0を構成(Confluent Operator 1.7.0を利用)
  • OCP上にMySQL 8.0を構成

参考

Debezium MySQL connector configuration example
Debezium MySQL Source Connector Configuration Properties - Database History Parameters
MySQLのTable更新情報をKafka上でksql処理する

構成確認

Confluent Platformが正常に起動していることを確認する。

>oc get all -n confluent-namespace
I0912 23:39:24.059883   37555 request.go:621] Throttling request took 1.090165589s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/packages.operators.coreos.com/v1?timeout=32s
NAME                               READY   STATUS    RESTARTS   AGE
pod/cc-operator-6ffb5f5489-b6l6r   1/1     Running   0          13h
pod/connectors-0                   1/1     Running   0          13h
pod/connectors-1                   1/1     Running   0          13h
pod/controlcenter-0                1/1     Running   0          13h
pod/kafka-0                        1/1     Running   0          13h
pod/kafka-1                        1/1     Running   0          13h
pod/kafka-2                        1/1     Running   0          13h
pod/ksql-0                         1/1     Running   0          13h
pod/ksql-1                         1/1     Running   0          13h
pod/replicator-0                   1/1     Running   0          13h
pod/replicator-1                   1/1     Running   0          13h
pod/schemaregistry-0               1/1     Running   0          13h
pod/zookeeper-0                    1/1     Running   0          13h
pod/zookeeper-1                    1/1     Running   0          13h
pod/zookeeper-2                    1/1     Running   0          13h
(以下、省略)

※Kafkaサーバーのauto.create.topics.enableプロパティはtrueにしておくこと。

MySQLが正常に稼働していることを確認する。

>oc get all -n shoheim

I0912 23:02:28.265544   36803 request.go:621] Throttling request took 1.148324247s, request: GET:https://c100-e.jp-tok.containers.cloud.ibm.com:31330/apis/monitoring.operator.ibm.com/v1alpha1?timeout=32s
NAME                 READY   STATUS      RESTARTS   AGE
pod/mysql-1-deploy   0/1     Completed   0          45s
pod/mysql-1-z54sl    1/1     Running     0          38s
(以下、省略)

※ユーザー権限やログ周りの設定は以下を参考に実施しておく。
Debezium connector for MySQL - Set up

MySQLには以下のデータベース、テーブルを構成しておく。

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| opdb               |
+--------------------+

mysql> SHOW TABLES;
+----------------+
| Tables_in_opdb |
+----------------+
| menus          |
+----------------+
1 row in set (0.00 sec)

mysql> SELECT * from menus;
+------+-----------+
| id   | name      |
+------+-----------+
|    1 | curry     |
|    2 | hamburger |
+------+-----------+
2 rows in set (0.00 sec)

MySQL Source Connectorの構成

1. MySQL Connector Pluginのインストール

Confluent Hubクライアント(confluent-hubコマンド)を利用して、ConnectorsのポッドにDebezium MySQL ConnectorのPluginをインストールする。

>oc exec -it connectors-0 /bin/bash
bash-4.4$ confluent-hub install debezium/debezium-connector-mysql:1.6.0
The component can be installed in any of the following Confluent Platform installations: 
  1. / (installed rpm/deb package) 
  2. / (where this tool is installed) 
Choose one of these to continue the installation (1-2): 2
Do you want to install this into /usr/share/confluent-hub-components? (yN) y

 
Component's license: 
Apache 2.0 
https://github.com/debezium/debezium/blob/master/LICENSE.txt 
I agree to the software license agreement (yN) y

You are about to install 'debezium-connector-mysql' from Debezium Community, as published on Confluent Hub. 
Do you want to continue? (yN) y

Downloading component Debezium MySQL CDC Connector 1.6.0, provided by Debezium Community from Confluent Hub and installing into /usr/share/confluent-hub-components 
Detected Worker's configs: 
  1. Standard: /etc/kafka/connect-distributed.properties 
  2. Standard: /etc/kafka/connect-standalone.properties 
  3. Standard: /etc/schema-registry/connect-avro-distributed.properties 
  4. Standard: /etc/schema-registry/connect-avro-standalone.properties 
  5. Used by Connect process with PID : /opt/confluent/etc/connect/connect.properties 
Do you want to update all detected configs? (yN) y

Adding installation directory to plugin path in the following files: 
  /etc/kafka/connect-distributed.properties 
  /etc/kafka/connect-standalone.properties 
  /etc/schema-registry/connect-avro-distributed.properties 
  /etc/schema-registry/connect-avro-standalone.properties 
  /opt/confluent/etc/connect/connect.properties 
 
Completed 

※ここでは、Pluginのインストール先を/usr/share/confluent-hub-componentsとしている。そのため、事前にConnectorsポッドには上記ディレクトリをマウントポイントとするPersistent Volumeを構成している。

インストールされたものを確認

bash-4.4$ cd /usr/share/confluent-hub-components
bash-4.4$ ls -ltr
total 0
drwxrwxrwx. 5 1001 root 63 Oct 25 02:26 debezium-debezium-connector-mysql
bash-4.4$ ls -l debezium-debezium-connector-mysql
total 8
drwxrwxrwx. 2 1001 root   59 Oct 25 02:26 assets
drwxrwxrwx. 2 1001 root  175 Oct 25 02:26 doc
drwxrwxrwx. 2 1001 root 4096 Oct 25 02:26 lib
-rw-rw-rw-. 1 1001 root 2868 Oct 25 02:26 manifest.json
bash-4.4$ ls -l debezium-debezium-connector-mysql/lib
total 9624
-rw-rw-rw-. 1 1001 root  337864 Oct 25 02:26 antlr4-runtime-4.8.jar
-rw-rw-rw-. 1 1001 root   20743 Oct 25 02:26 debezium-api-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root  376353 Oct 25 02:26 debezium-connector-mysql-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root  866648 Oct 25 02:26 debezium-core-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root 2777855 Oct 25 02:26 debezium-ddl-parser-1.6.0.Final.jar
-rw-rw-rw-. 1 1001 root    4617 Oct 25 02:26 failureaccess-1.0.1.jar
-rw-rw-rw-. 1 1001 root 2858426 Oct 25 02:26 guava-30.0-jre.jar
-rw-rw-rw-. 1 1001 root  192762 Oct 25 02:26 mysql-binlog-connector-java-0.25.1.jar
-rw-rw-rw-. 1 1001 root 2397321 Oct 25 02:26 mysql-connector-java-8.0.21.jar

ポッドを再起動して、ローカルマシンからRESTでPluginを確認する。

>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connector-plugins -k |jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   597  100   597    0     0    711      0 --:--:-- --:--:-- --:--:--   710
[
  {
    "class": "io.debezium.connector.mysql.MySqlConnector",
    "type": "source",
    "version": "1.6.0.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "6.1.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "6.1.0-ce"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "1"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "1"
  }
]

io.debezium.connector.mysql.MySqlConnectorが表示されている。
(他のPluginはデフォルトで入っていたもの)

Confluent Control Center(C3)からも見える。
Connector_2021-10-26-11-06-24.png

2. MySQL Source Connectorの構成

C3からも構成できるが、ここではローカルマシンからRESTでConnectorを構成

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" -k https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/ -d '{ "name": "mysql-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "172.21.17.38", "database.port": "3306", "database.user": "user", "database.password": "pass", "database.server.id": "184055", "database.server.name": "dbserver1", "database.include.list": "opdb", "database.history.kafka.bootstrap.servers": "kafka:9071", "database.history.kafka.topic": "dbhistory.opdb", "table.include.list": "opdb.menus", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "PLAIN", "database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "PLAIN", "database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";", "include.schema.changes": "true" } }'

データ部を整形して表示

{
	"name": "mysql-connector",
	"config": {
		"connector.class": "io.debezium.connector.mysql.MySqlConnector",
		"tasks.max": "1",
		"database.hostname": "172.21.17.38",
		"database.port": "3306",
		"database.user": "user",
		"database.password": "pass",
		"database.server.id": "184055",
		"database.server.name": "dbserver1",
		"database.include.list": "opdb",
		"database.history.kafka.bootstrap.servers": "kafka:9071",
		"database.history.kafka.topic": "dbhistory.opdb",
		"table.include.list": "opdb.menus",
		"database.history.consumer.security.protocol": "SASL_SSL",
		"database.history.consumer.sasl.mechanism": "PLAIN",
		"database.history.consumer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
		"database.history.producer.security.protocol": "SASL_SSL",
		"database.history.producer.sasl.mechanism": "PLAIN",
		"database.history.producer.sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"user\" password=\"password\";",
		"include.schema.changes": "true"
	}
}

プロパティは以下を参照。

MySQL connector configuration example
※Connectorのバージョンによってプロパティが変わることがあるので、使用しているバージョンのドキュメントを参照すること。ここでは1.6を使用。

基本のプロパティは上記リンクで確認できるが、Kafkaサーバーに対してSASL/PLAINで接続するためのプロパティ(database.history.producer.xxx、database.history.consumer.xxx)を追加している。

Debezium MySQL Source Connector Configuration Properties - Database History Parameters

構成したSource Connectorが正常に稼働したことを確認。

>curl -H "Accept:application/json" https://connectors.test-ocp-f83035ec98caa9e1c443f354e8208d2f-0000.jp-tok.containers.appdomain.cloud:443/connectors/mysql-connector/status -k |jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   245  100   245    0     0    641      0 --:--:-- --:--:-- --:--:--   641
{
  "name": "mysql-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "connectors-1.connectors.confluent-namespace.svc:9083"
    }
  ],
  "type": "source"
}

トピック上のデータの確認

構成したConnectorによってMySQLのデータがKafkaのトピックに送信されたことを確認する。

まず、Kafka上に作成されたトピックを確認する。

以下の3つのトピックが作成されている。
Connector_2021-12-12-12-58-36.png

odserver1.opdb.menusトピックに以下のデータが送信されている。

[
	{
		"topic": "dbserver1.opdb.menus",
		"partition": 0,
		"offset": 0,
		"timestamp": 1639280629029,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"before": null,
			"after": {
				"id": 1,
				"name": "curry"
			},
			"source": {
				"version": "1.6.0.Final",
				"connector": "mysql",
				"name": "dbserver1",
				"ts_ms": 1639280628022,
				"snapshot": "true",
				"db": "opdb",
				"sequence": null,
				"table": "menus",
				"server_id": 0,
				"gtid": null,
				"file": "binlog.000002",
				"pos": 3264,
				"row": 0,
				"thread": null,
				"query": null
			},
			"op": "r",
			"ts_ms": 1639280628024,
			"transaction": null
		}
	},
	{
		"topic": "dbserver1.opdb.menus",
		"partition": 0,
		"offset": 1,
		"timestamp": 1639280629030,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"before": null,
			"after": {
				"id": 2,
				"name": "hamburger"
			},
			"source": {
				"version": "1.6.0.Final",
				"connector": "mysql",
				"name": "dbserver1",
				"ts_ms": 1639280628026,
				"snapshot": "true",
				"db": "opdb",
				"sequence": null,
				"table": "menus",
				"server_id": 0,
				"gtid": null,
				"file": "binlog.000002",
				"pos": 3264,
				"row": 0,
				"thread": null,
				"query": null
			},
			"op": "r",
			"ts_ms": 1639280628026,
			"transaction": null
		}
	}
]

Offset=0にmenusテーブルの1行目のデータ、Offset=1に2行目のデータが入っていることが分かる。

menusテーブルに行を追加すると、トピックにも新しいデータが送信される。

INSERT INTO menus (id, name) VALUES (3, 'katsudon');
	{
		"topic": "dbserver1.opdb.menus",
		"partition": 0,
		"offset": 2,
		"timestamp": 1639280629030,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"before": null,
			"after": {
				"id": 3,
				"name": "katsudon"
			},
			"source": {
				"version": "1.6.0.Final",
				"connector": "mysql",
				"name": "dbserver1",
				"ts_ms": 1639280628026,
				"snapshot": "true",
				"db": "opdb",
				"sequence": null,
				"table": "menus",
				"server_id": 0,
				"gtid": null,
				"file": "binlog.000002",
				"pos": 3264,
				"row": 0,
				"thread": null,
				"query": null
			},
			"op": "r",
			"ts_ms": 1639280628026,
			"transaction": null
		}
	}

テーブル内のデータをUPDATEした場合は、以下のメッセージがトピックに送信される。

UPDATE menus SET name='tendon' WHERE id=3;
[
	{
		"topic": "dbserver1.opdb.menus",
		"partition": 0,
		"offset": 4,
		"timestamp": 1639530256773,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"before": {
				"id": 3,
				"name": "katsudon"
			},
			"after": {
				"id": 3,
				"name": "tendon"
			},
			"source": {
				"version": "1.6.0.Final",
				"connector": "mysql",
				"name": "dbserver1",
				"ts_ms": 1639530256000,
				"snapshot": "false",
				"db": "opdb",
				"sequence": null,
				"table": "menus",
				"server_id": 1,
				"gtid": null,
				"file": "binlog.000002",
				"pos": 3486,
				"row": 0,
				"thread": null,
				"query": null
			},
			"op": "u",
			"ts_ms": 1639530256515,
			"transaction": null
		}
	}
]

beforeにはUPDATE前のデータ、afterにはUPDATE後のデータが入っている。

DELETEの場合は以下。

DELETE FROM menus WHERE id=4;
[
	{
		"topic": "dbserver1.opdb.menus",
		"partition": 0,
		"offset": 5,
		"timestamp": 1639530687264,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"before": {
				"id": 4,
				"name": "gyoza"
			},
			"after": null,
			"source": {
				"version": "1.6.0.Final",
				"connector": "mysql",
				"name": "dbserver1",
				"ts_ms": 1639530687000,
				"snapshot": "false",
				"db": "opdb",
				"sequence": null,
				"table": "menus",
				"server_id": 1,
				"gtid": null,
				"file": "binlog.000002",
				"pos": 3794,
				"row": 0,
				"thread": null,
				"query": null
			},
			"op": "d",
			"ts_ms": 1639530687221,
			"transaction": null
		}
	}
]

Connectorのdatabase.history.kafka.topicプロパティで指定したdbhistory.opdbトピックには以下のメッセージが格納されている。
このトピックは、Connectorがデータベースのスキーマ履歴を保管するために利用される。

[
	{
		"topic": "dbhistory.opdb",
		"partition": 0,
		"offset": 0,
		"timestamp": 1639280627942,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"source": {
				"server": "dbserver1"
			},
			"position": {
				"ts_sec": 1639280627,
				"file": "binlog.000002",
				"pos": 3264,
				"snapshot": true
			},
			"databaseName": "",
			"ddl": "SET character_set_server=utf8mb4, collation_server=utf8mb4_0900_ai_ci",
			"tableChanges": []
		}
	},
	{
		"topic": "dbhistory.opdb",
		"partition": 0,
		"offset": 1,
		"timestamp": 1639280627970,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"source": {
				"server": "dbserver1"
			},
			"position": {
				"ts_sec": 1639280627,
				"file": "binlog.000002",
				"pos": 3264,
				"snapshot": true
			},
			"databaseName": "opdb",
			"ddl": "DROP TABLE IF EXISTS `opdb`.`menus`",
			"tableChanges": []
		}
	},
	{
		"topic": "dbhistory.opdb",
		"partition": 0,
		"offset": 2,
		"timestamp": 1639280627973,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"source": {
				"server": "dbserver1"
			},
			"position": {
				"ts_sec": 1639280627,
				"file": "binlog.000002",
				"pos": 3264,
				"snapshot": true
			},
			"databaseName": "opdb",
			"ddl": "DROP DATABASE IF EXISTS `opdb`",
			"tableChanges": []
		}
	},
	{
		"topic": "dbhistory.opdb",
		"partition": 0,
		"offset": 3,
		"timestamp": 1639280627975,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"source": {
				"server": "dbserver1"
			},
			"position": {
				"ts_sec": 1639280627,
				"file": "binlog.000002",
				"pos": 3264,
				"snapshot": true
			},
			"databaseName": "opdb",
			"ddl": "CREATE DATABASE `opdb` CHARSET utf8mb4 COLLATE utf8mb4_0900_ai_ci",
			"tableChanges": []
		}
	},
	{
		"topic": "dbhistory.opdb",
		"partition": 0,
		"offset": 4,
		"timestamp": 1639280627981,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"source": {
				"server": "dbserver1"
			},
			"position": {
				"ts_sec": 1639280627,
				"file": "binlog.000002",
				"pos": 3264,
				"snapshot": true
			},
			"databaseName": "opdb",
			"ddl": "USE `opdb`",
			"tableChanges": []
		}
	},
	{
		"topic": "dbhistory.opdb",
		"partition": 0,
		"offset": 5,
		"timestamp": 1639280627998,
		"timestampType": "CREATE_TIME",
		"headers": [],
		"key": null,
		"value": {
			"source": {
				"server": "dbserver1"
			},
			"position": {
				"ts_sec": 1639280627,
				"file": "binlog.000002",
				"pos": 3264,
				"snapshot": true
			},
			"databaseName": "opdb",
			"ddl": "CREATE TABLE `menus` (\n  `id` int DEFAULT NULL,\n  `name` varchar(100) DEFAULT NULL\n) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci",
			"tableChanges": [
				{
					"type": "CREATE",
					"id": "\"opdb\".\"menus\"",
					"table": {
						"defaultCharsetName": "utf8mb4",
						"primaryKeyColumnNames": [],
						"columns": [
							{
								"name": "id",
								"jdbcType": 4,
								"typeName": "INT",
								"typeExpression": "INT",
								"charsetName": null,
								"position": 1,
								"optional": true,
								"autoIncremented": false,
								"generated": false
							},
							{
								"name": "name",
								"jdbcType": 12,
								"typeName": "VARCHAR",
								"typeExpression": "VARCHAR",
								"charsetName": "utf8mb4",
								"length": 100,
								"position": 2,
								"optional": true,
								"autoIncremented": false,
								"generated": false
							}
						]
					}
				}
			]
		}
	}
]

おわりに

本来はSink Connectorも構成して、他のデータベースにレプリケーションするつもりでしたが、OCP環境が諸般の事情で使えなくなったので、OCP環境が復活したら、再開しようと思います。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?