ねらい
大仰なタイトルだがその実態は下記チュートリアルを体験してみようというもの。「Materialized Cache」というと何のことやらだが、要はTopic内の情報をksqlのtable/stream表現で扱うことらしい。このチュートリアルではDebeziumを用いてMySQLの更新ログをKafka Topicに流し込み、ksqlで処理する。DebeziumはいわゆるChange Data CaptureのためのConnectorであり、この一連の処理の流れは私の秘めたる最終目的とも合致している。
関連Qiita記事
1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する
チュートリアル実施
事前準備
その1 : Debeziumの入手
Confluent Hubというダウンロードツールを用いてDebeziumを入手する。(Confluent HubはCommunity Editionには同梱されていないようだったので別途導入した。)
connect-ditributed.propertiesを書き換えようとして失敗しているが、今回はdocker composeで環境を作る際の元ネタのlibararyがあればよいだけのはずなので気にせず進める。
gen@LAPTOP-O8FG8ES2:~/docker_files$ confluent-hub install --component-dir confluent-hub-components --no-prompt debezium/debezium-connector-mysql:1.1.0
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache 2.0
https://github.com/debezium/debezium/blob/master/LICENSE.txt
Implicit confirmation of the question: You are about to install 'debezium-connector-mysql' from Debezium Community, as published on Confluent Hub.
Downloading component Debezium MySQL CDC Connector 1.1.0, provided by Debezium Community from Confluent Hub and installing into confluent-hub-components
Adding installation directory to plugin path in the following files:
/etc/kafka/connect-distributed.properties
Unable to update Worker's configuration file /etc/kafka/connect-distributed.properties
Unable to save to file /etc/kafka/connect-distributed.properties
gen@LAPTOP-O8FG8ES2:~/docker_files/confluent-hub-components/debezium-debezium-connector-mysql$ ls -l
total 16
drwxr-xr-x 2 gen gen 4096 Nov 11 15:35 assets
drwxr-xr-x 2 gen gen 4096 Nov 11 15:35 doc
drwxr-xr-x 2 gen gen 4096 Nov 11 15:35 lib
-rw-r--r-- 1 gen gen 2868 Nov 11 15:35 manifest.json
このライブラリはdocker-compose.ymlによってKSQL_CONNECT_PLUGIN_PATHで指定されたディレクトリとしてマウントされる。
...
volumes:
- "./confluent-hub-components/:/usr/share/kafka/plugins/"
environment:
...
KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"
なおHow to use connector management に詳しいやり方が書いてあった。
その2: mysqlのconfig準備
docker composeで環境を作る際の元ネタになるmysqlのConfigを準備する。mysqlディレクトリを掘り、中に以下のファイルを配置しておく。どうも「バイナリログに行の変更を全て書き出せ」という指令のようだ。わーいx'99'ログだぁ(独白)。
[mysqld]
server-id = 223344
log_bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
expire_logs_days = 10
gtid_mode = ON
enforce_gtid_consistency = ON
volumes:
- "./mysql/custom-config.cnf:/etc/mysql/conf.d/custom-config.cnf"
Start the stack
困ったことに元ネタのdocker-compose.ymlのままだとSchema Registryのコンテナ起動に失敗したため、以下の変更を行っている。それにつけてもStackOverflow様様である。
(元記述)
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
(変更後)
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://broker:9092"
Docker Composeで環境を立ち上げた後のステータス。
gen@LAPTOP-O8FG8ES2:~$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
b6b396095bc6 confluentinc/ksqldb-cli:0.21.0 "/bin/sh" 8 minutes ago Up 8 minutes ksqldb-cli
f7078c17dbe0 confluentinc/ksqldb-server:0.21.0 "/usr/bin/docker/run" 8 minutes ago Up 8 minutes 0.0.0.0:8088->8088/tcp ksqldb-server
27eafbdc49a4 confluentinc/cp-schema-registry:7.0.0 "/etc/confluent/dock…" 8 minutes ago Up 8 minutes 0.0.0.0:8081->8081/tcp schema-registry
582265bdf311 confluentinc/cp-kafka:7.0.0 "/etc/confluent/dock…" 4 hours ago Up 8 minutes 9092/tcp, 0.0.0.0:29092->29092/tcp broker
2ad636a2d60b confluentinc/cp-zookeeper:7.0.0 "/etc/confluent/dock…" 4 hours ago Up 8 minutes 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp zookeeper
909ab8e55de5 mysql:8.0.19 "docker-entrypoint.s…" 4 hours ago Up 8 minutes 0.0.0.0:3306->3306/tcp, 33060/tcp mysql
gen@LAPTOP-O8FG8ES2:~$ docker image ls
REPOSITORY TAG IMAGE ID CREATED SIZE
confluentinc/cp-schema-registry 7.0.0 24c8939a7458 3 weeks ago 1.65GB
confluentinc/cp-kafka 7.0.0 42807c42b958 3 weeks ago 791MB
confluentinc/cp-zookeeper 7.0.0 3858ad02e9d1 3 weeks ago 791MB
confluentinc/ksqldb-cli 0.21.0 187badc13e5b 2 months ago 852MB
confluentinc/ksqldb-server 0.21.0 187badc13e5b 2 months ago 852MB
mysql 8.0.19 0c27e8e5fcfa 18 months ago 546MB
Dockerで環境作れるのは楽だけど、数GBはダウンロードするのが嫌・・・自分で全く土地勘のないmysqlを構成するよりはいいか。
Configure MySQL for Debezium
mysqlのコンテナに入り、MySQL CLIからレプリケーションに必要なGRANTを実施する。
mysql> GRANT ALL PRIVILEGES ON *.* TO 'example-user' WITH GRANT OPTION;
Query OK, 0 rows affected (0.02 sec)
mysql> ALTER USER 'example-user'@'%' IDENTIFIED WITH mysql_native_password BY 'example-pw';
Query OK, 0 rows affected (0.01 sec)
mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.01 sec)
Create the calls table in MySQL
続けてcall-centerデータベース内にcallsテーブルを作る。コールセンター業務系の記録テーブルであるcallsには「コールした人の名前」、「コールの理由」、「コール時間(秒)」が含まれる。ところでderekよ一体何を買ったんだ?
(DB設定)
mysql> USE call-center;
Database changed
(テーブル作成)
mysql> CREATE TABLE calls (name TEXT, reason TEXT, duration_seconds INT);
Query OK, 0 rows affected (0.08 sec)
(行挿入)
...
mysql> INSERT INTO calls (name, reason, duration_seconds) VALUES ("derek", "refund", 325);
Query OK, 1 row affected (0.01 sec)
(テーブル確認)
mysql> select * from calls;
+---------+----------+------------------+
| name | reason | duration_seconds |
+---------+----------+------------------+
| michael | purchase | 540 |
| michael | help | 224 |
| colin | help | 802 |
| derek | purchase | 10204 |
| derek | help | 600 |
| colin | refund | 105 |
| michael | help | 2030 |
| colin | purchase | 800 |
| derek | help | 2514 |
| derek | refund | 325 |
+---------+----------+------------------+
10 rows in set (0.00 sec)
Start the Debezium connector
ksqldb-cliのコンテナからksqldbCLIを起動し、まずは常にtopicの先頭からメッセージを読む設定を行う。
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
以下のksqlでSource Connectorの使用と構成をkafkaに通知する。え、これだけでいいの?
ksql> CREATE SOURCE CONNECTOR calls_reader WITH (
> 'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
> 'database.hostname' = 'mysql',
> 'database.port' = '3306',
> 'database.user' = 'example-user',
> 'database.password' = 'example-pw',
> 'database.allowPublicKeyRetrieval' = 'true',
> 'database.server.id' = '184054',
> 'database.server.name' = 'call-center-db',
> 'database.whitelist' = 'call-center',
> 'database.history.kafka.bootstrap.servers' = 'broker:9092',
> 'database.history.kafka.topic' = 'call-center',
> 'table.whitelist' = 'call-center.calls',
> 'include.schema.changes' = 'false'
>);
Message
--------------------------------
Created connector CALLS_READER
--------------------------------
確かにcall-center-db.call-center.callというTopicが作られている。
Kafka Topic | Partitions | Partition Replicas
--------------------------------------------------------------------
_ksql-connect-configs | 1 | 1
_ksql-connect-offsets | 25 | 1
_ksql-connect-statuses | 5 | 1
call-center | 1 | 1
call-center-db.call-center.calls | 1 | 1
default_ksql_processing_log | 1 | 1
--------------------------------------------------------------------
Topicの中身は更新ログから読み出したINSERTの内容ですね。面白い。Key formatが万歳アスキーアートになってるが大丈夫か・・・?MySQLでCREATE TABLEした際にKey Columnを宣言しなかったため、Message KeyがNullになっているせいだと予想。
ksql> PRINT 'call-center-db.call-center.calls' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2021/11/11 11:05:32.288 Z, key: <null>, value: {"before": null, "after": {"name": "michael", "reason": "purchase", "duration_seconds": 540}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "call-center-db", "ts_ms": 0, "snapshot": "true", "db": "call-center", "table": "calls", "server_id": 0, "gtid": null, "file": "mysql-bin.000006", "pos": 195, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1636628731343, "transaction": null}, partition: 0
rowtime: 2021/11/11 11:05:32.291 Z, key: <null>, value: {"before": null, "after": {"name": "michael", "reason": "help", "duration_seconds": 224}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "call-center-db", "ts_ms": 0, "snapshot": "true", "db": "call-center", "table": "calls", "server_id": 0, "gtid": null, "file": "mysql-bin.000006", "pos": 195, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1636628731350, "transaction": null}, partition: 0
...
Debeziumが登録したAvro Schemaを調べてみると、行の「before image」、「after image」、「更新に関するMetadata」で構成されているようだ。Sink Connectorによる他DBMSへのApplyをガッツリ意識した内容だなぁ。
{
"type": "record",
"name": "Envelope",
"namespace": "call_center_db.call_center.calls",
"fields": [
{
"name": "before",
"type": [
"null",
{
"type": "record",
"name": "Value",
"fields": [
{
"name": "name",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "reason",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "duration_seconds",
"type": [
"null",
"int"
],
"default": null
}
],
"connect.name": "call_center_db.call_center.calls.Value"
}
],
"default": null
},
{
"name": "after",
"type": [
"null",
"Value"
],
"default": null
},
{
"name": "source",
"type": {
"type": "record",
"name": "Source",
"namespace": "io.debezium.connector.mysql",
"fields": [
{
"name": "version",
"type": "string"
},
{
"name": "connector",
"type": "string"
},
{
"name": "name",
"type": "string"
},
{
"name": "ts_ms",
"type": "long"
},
{
"name": "snapshot",
"type": [
{
"type": "string",
"connect.version": 1,
"connect.parameters": {
"allowed": "true,last,false"
},
"connect.default": "false",
"connect.name": "io.debezium.data.Enum"
},
"null"
],
"default": "false"
},
{
"name": "db",
"type": "string"
},
{
"name": "table",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "server_id",
"type": "long"
},
{
"name": "gtid",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "file",
"type": "string"
},
{
"name": "pos",
"type": "long"
},
{
"name": "row",
"type": "int"
},
{
"name": "thread",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "query",
"type": [
"null",
"string"
],
"default": null
}
],
"connect.name": "io.debezium.connector.mysql.Source"
}
},
{
"name": "op",
"type": "string"
},
{
"name": "ts_ms",
"type": [
"null",
"long"
],
"default": null
},
{
"name": "transaction",
"type": [
"null",
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "total_order",
"type": "long"
},
{
"name": "data_collection_order",
"type": "long"
}
]
}
],
"default": null
}
],
"connect.name": "call_center_db.call_center.calls.Envelope"
}
Create the ksqlDB calls stream
Streamを作る。Avro Schemaをかぶせておく。
ksql> CREATE STREAM calls WITH (
> kafka_topic = 'call-center-db.call-center.calls',
> value_format = 'avro'
>);
Message
----------------
Stream created
----------------
Create the materialized views
Materialized views、つまりksqlDBのTableを作る。->
を用いることでAvro SchemaにおけるAfter
Recordの中を指定している。このTableでは以下の関数を使用し、NAMEでGroup byしている。
-count_distinct
: 「コール理由の種類数」を数える
-latest_by_offset
: 「最後のコール理由」を取り出す
ksql> CREATE TABLE support_view AS
> SELECT after->name AS name,
> count_distinct(after->reason) AS distinct_reasons,
> latest_by_offset(after->reason) AS last_reason
> FROM calls
> GROUP BY after->name
> EMIT CHANGES;
Message
-------------------------------------------
Created query with ID CTAS_SUPPORT_VIEW_3
-------------------------------------------
もう一つ作る。このtableでは以下の関数を使用し、やはりNAMEでGroup byしている。
-count
: 「コール(理由)数」を数える
-(sum(after->duration_seconds) / 60)
: コール時間(秒)を集計し分に変換する
ksql> CREATE TABLE lifetime_view AS
> SELECT after->name AS name,
> count(after->reason) AS total_calls,
> (sum(after->duration_seconds) / 60) as minutes_engaged
> FROM calls
> GROUP BY after->name
> EMIT CHANGES;
Message
--------------------------------------------
Created query with ID CTAS_LIFETIME_VIEW_5
--------------------------------------------
さっそくksqlによる検索を実施してみる。
ksql> SELECT name, distinct_reasons, last_reason
>FROM support_view
>WHERE name = 'derek';
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|NAME |DISTINCT_REASONS |LAST_REASON |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|derek |3 |refund |
Query terminated
ksql> SELECT name, total_calls, minutes_engaged
>FROM lifetime_view
>WHERE name = 'derek';
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|NAME |TOTAL_CALLS |MINUTES_ENGAGED |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|derek |4 |227 |
Query terminated
上記二つのテーブルをJOINしたテーブルを作ってみる。
ksql> CREATE TABLE support_view_enriched AS
>SELECT support_view.name as name, distinct_reasons, last_reason, total_calls, minutes_engaged
> FROM support_view
> LEFT JOIN lifetime_view
> on support_view.name = lifetime_view.name
> EMIT CHANGES;
Message
-----------------------------------------------------
Created query with ID CTAS_SUPPORT_VIEW_ENRICHED_11
-----------------------------------------------------
ksql検索結果は下記の通り。JOIN成功。
ksql> SELECT name, distinct_reasons, last_reason, total_calls, minutes_engaged FROM support_view_enriched WHERE name='derek';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|NAME |DISTINCT_REASONS |LAST_REASON |TOTAL_CALLS |MINUTES_ENGAGED |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|derek |3 |refund |4 |227 |
Query terminated
新たなレコード追加
MySQL CLIからderekのコールに関する新レコードを挿入。彼はモンスタークレーマーなのだ!
mysql> INSERT INTO calls (name, reason, duration_seconds) VALUES ("derek", "complain", 1200);
Query OK, 1 row affected (0.07 sec)
ksql検索結果にちゃんと反映されている。debeziumがMySQLログから情報を引っ張ってTopicに書き込み、それをksqlDBがリアルタイム処理していることが分かる。
ksql> SELECT * FROM support_view_enriched WHERE name='derek';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|NAME |DISTINCT_REASONS |LAST_REASON |TOTAL_CALLS |MINUTES_ENGAGED |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|derek |4 |complain |5 |247 |
Query terminated
まとめ
- ソースが何であれ、Topic内に格納されているメッセージのAvro Schemaさえ押さえておけばksql処理は可能
- DB更新をソースとする場合、更新出現が完全に業務依存になるためksql処理設計の際は十分注意が必要。今回取り扱ったコールセンター業務のコールログのように単調に追加されるだけであれば扱いは楽だが、行の削除があるDBに対するksql表現はかなりややこしくなりそう。というかうまくサンプルが思いつかない。
- Debeziumはすごい。