2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

MySQLのTable更新情報をKafka上でksql処理する

Last updated at Posted at 2021-11-11

ねらい

大仰なタイトルだがその実態は下記チュートリアルを体験してみようというもの。「Materialized Cache」というと何のことやらだが、要はTopic内の情報をksqlのtable/stream表現で扱うことらしい。このチュートリアルではDebeziumを用いてMySQLの更新ログをKafka Topicに流し込み、ksqlで処理する。DebeziumはいわゆるChange Data CaptureのためのConnectorであり、この一連の処理の流れは私の秘めたる最終目的とも合致している。

元ネタ:Materialized cache

関連Qiita記事

1.ksqlDB Quickstartを実施してみる
2.ksqlDB を使用したストリーミングクエリの作成(前半)
3.ksqlDB を使用したストリーミングクエリの作成(後半)
4.MySQLのTable更新情報をKafka上でksql処理する

チュートリアル実施

事前準備

その1 : Debeziumの入手

Confluent Hubというダウンロードツールを用いてDebeziumを入手する。(Confluent HubはCommunity Editionには同梱されていないようだったので別途導入した。)
connect-ditributed.propertiesを書き換えようとして失敗しているが、今回はdocker composeで環境を作る際の元ネタのlibararyがあればよいだけのはずなので気にせず進める。

Confluent-Hub実行結果
gen@LAPTOP-O8FG8ES2:~/docker_files$ confluent-hub install --component-dir confluent-hub-components --no-prompt debezium/debezium-connector-mysql:1.1.0
Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache 2.0
https://github.com/debezium/debezium/blob/master/LICENSE.txt
Implicit confirmation of the question: You are about to install 'debezium-connector-mysql' from Debezium Community, as published on Confluent Hub.
Downloading component Debezium MySQL CDC Connector 1.1.0, provided by Debezium Community from Confluent Hub and installing into confluent-hub-components
Adding installation directory to plugin path in the following files:
  /etc/kafka/connect-distributed.properties
Unable to update Worker's configuration file /etc/kafka/connect-distributed.properties
Unable to save to file /etc/kafka/connect-distributed.properties

gen@LAPTOP-O8FG8ES2:~/docker_files/confluent-hub-components/debezium-debezium-connector-mysql$ ls -l
total 16
drwxr-xr-x 2 gen gen 4096 Nov 11 15:35 assets
drwxr-xr-x 2 gen gen 4096 Nov 11 15:35 doc
drwxr-xr-x 2 gen gen 4096 Nov 11 15:35 lib
-rw-r--r-- 1 gen gen 2868 Nov 11 15:35 manifest.json

このライブラリはdocker-compose.ymlによってKSQL_CONNECT_PLUGIN_PATHで指定されたディレクトリとしてマウントされる。

docker-compose.yml抜粋
      ...
    volumes:
      - "./confluent-hub-components/:/usr/share/kafka/plugins/"
    environment:
      ...
      KSQL_CONNECT_PLUGIN_PATH: "/usr/share/kafka/plugins"

なおHow to use connector management に詳しいやり方が書いてあった。

その2: mysqlのconfig準備

docker composeで環境を作る際の元ネタになるmysqlのConfigを準備する。mysqlディレクトリを掘り、中に以下のファイルを配置しておく。どうも「バイナリログに行の変更を全て書き出せ」という指令のようだ。わーいx'99'ログだぁ(独白)。

custom-config.cnf
[mysqld]
server-id                = 223344 
log_bin                  = mysql-bin 
binlog_format            = ROW 
binlog_row_image         = FULL 
expire_logs_days         = 10
gtid_mode                = ON
enforce_gtid_consistency = ON
docker-compose.yml抜粋
    volumes:
      - "./mysql/custom-config.cnf:/etc/mysql/conf.d/custom-config.cnf"

Start the stack

困ったことに元ネタのdocker-compose.ymlのままだとSchema Registryのコンテナ起動に失敗したため、以下の変更を行っている。それにつけてもStackOverflow様様である。

docker-compose.ymlの変更
(元記述)
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
      
(変更後)
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://broker:9092"

Docker Composeで環境を立ち上げた後のステータス。

稼働コンテナ
gen@LAPTOP-O8FG8ES2:~$ docker ps
CONTAINER ID   IMAGE                                   COMMAND                  CREATED         STATUS         PORTS                                        NAMES
b6b396095bc6   confluentinc/ksqldb-cli:0.21.0          "/bin/sh"                8 minutes ago   Up 8 minutes                                                ksqldb-cli
f7078c17dbe0   confluentinc/ksqldb-server:0.21.0       "/usr/bin/docker/run"    8 minutes ago   Up 8 minutes   0.0.0.0:8088->8088/tcp                       ksqldb-server
27eafbdc49a4   confluentinc/cp-schema-registry:7.0.0   "/etc/confluent/dock…"   8 minutes ago   Up 8 minutes   0.0.0.0:8081->8081/tcp                       schema-registry
582265bdf311   confluentinc/cp-kafka:7.0.0             "/etc/confluent/dock…"   4 hours ago     Up 8 minutes   9092/tcp, 0.0.0.0:29092->29092/tcp           broker
2ad636a2d60b   confluentinc/cp-zookeeper:7.0.0         "/etc/confluent/dock…"   4 hours ago     Up 8 minutes   2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp   zookeeper
909ab8e55de5   mysql:8.0.19                            "docker-entrypoint.s…"   4 hours ago     Up 8 minutes   0.0.0.0:3306->3306/tcp, 33060/tcp            mysql
docker_image
gen@LAPTOP-O8FG8ES2:~$ docker image ls
REPOSITORY                        TAG       IMAGE ID       CREATED         SIZE
confluentinc/cp-schema-registry   7.0.0     24c8939a7458   3 weeks ago     1.65GB
confluentinc/cp-kafka             7.0.0     42807c42b958   3 weeks ago     791MB
confluentinc/cp-zookeeper         7.0.0     3858ad02e9d1   3 weeks ago     791MB
confluentinc/ksqldb-cli           0.21.0    187badc13e5b   2 months ago    852MB
confluentinc/ksqldb-server        0.21.0    187badc13e5b   2 months ago    852MB
mysql                             8.0.19    0c27e8e5fcfa   18 months ago   546MB

Dockerで環境作れるのは楽だけど、数GBはダウンロードするのが嫌・・・自分で全く土地勘のないmysqlを構成するよりはいいか。

Configure MySQL for Debezium

mysqlのコンテナに入り、MySQL CLIからレプリケーションに必要なGRANTを実施する。

GRANT処理
mysql> GRANT ALL PRIVILEGES ON *.* TO 'example-user' WITH GRANT OPTION;
Query OK, 0 rows affected (0.02 sec)

mysql> ALTER USER 'example-user'@'%' IDENTIFIED WITH mysql_native_password BY 'example-pw';
Query OK, 0 rows affected (0.01 sec)

mysql> FLUSH PRIVILEGES;
Query OK, 0 rows affected (0.01 sec)

Create the calls table in MySQL

続けてcall-centerデータベース内にcallsテーブルを作る。コールセンター業務系の記録テーブルであるcallsには「コールした人の名前」、「コールの理由」、「コール時間(秒)」が含まれる。ところでderekよ一体何を買ったんだ?

DBおよびテーブル作成
(DB設定)
mysql> USE call-center;
Database changed

(テーブル作成)
mysql> CREATE TABLE calls (name TEXT, reason TEXT, duration_seconds INT);
Query OK, 0 rows affected (0.08 sec)

(行挿入)
...
mysql> INSERT INTO calls (name, reason, duration_seconds) VALUES ("derek", "refund", 325);
Query OK, 1 row affected (0.01 sec)

(テーブル確認)
mysql> select * from calls;
+---------+----------+------------------+
| name    | reason   | duration_seconds |
+---------+----------+------------------+
| michael | purchase |              540 |
| michael | help     |              224 |
| colin   | help     |              802 |
| derek   | purchase |            10204 |
| derek   | help     |              600 |
| colin   | refund   |              105 |
| michael | help     |             2030 |
| colin   | purchase |              800 |
| derek   | help     |             2514 |
| derek   | refund   |              325 |
+---------+----------+------------------+
10 rows in set (0.00 sec)

Start the Debezium connector

ksqldb-cliのコンテナからksqldbCLIを起動し、まずは常にtopicの先頭からメッセージを読む設定を行う。

環境変数設定
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.

以下のksqlでSource Connectorの使用と構成をkafkaに通知する。え、これだけでいいの?

Source_Connectorの開始
ksql> CREATE SOURCE CONNECTOR calls_reader WITH (
>    'connector.class' = 'io.debezium.connector.mysql.MySqlConnector',
>    'database.hostname' = 'mysql',
>    'database.port' = '3306',
>    'database.user' = 'example-user',
>    'database.password' = 'example-pw',
>    'database.allowPublicKeyRetrieval' = 'true',
>    'database.server.id' = '184054',
>    'database.server.name' = 'call-center-db',
>    'database.whitelist' = 'call-center',
>    'database.history.kafka.bootstrap.servers' = 'broker:9092',
>    'database.history.kafka.topic' = 'call-center',
>    'table.whitelist' = 'call-center.calls',
>    'include.schema.changes' = 'false'
>);

 Message
--------------------------------
 Created connector CALLS_READER
--------------------------------

確かにcall-center-db.call-center.callというTopicが作られている。

SWOW_TOPICS応答
 Kafka Topic                      | Partitions | Partition Replicas
--------------------------------------------------------------------
 _ksql-connect-configs            | 1          | 1
 _ksql-connect-offsets            | 25         | 1
 _ksql-connect-statuses           | 5          | 1
 call-center                      | 1          | 1
 call-center-db.call-center.calls | 1          | 1
 default_ksql_processing_log      | 1          | 1
--------------------------------------------------------------------

Topicの中身は更新ログから読み出したINSERTの内容ですね。面白い。Key formatが万歳アスキーアートになってるが大丈夫か・・・?MySQLでCREATE TABLEした際にKey Columnを宣言しなかったため、Message KeyがNullになっているせいだと予想。

TOPICに書き出された内容
ksql> PRINT 'call-center-db.call-center.calls' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2021/11/11 11:05:32.288 Z, key: <null>, value: {"before": null, "after": {"name": "michael", "reason": "purchase", "duration_seconds": 540}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "call-center-db", "ts_ms": 0, "snapshot": "true", "db": "call-center", "table": "calls", "server_id": 0, "gtid": null, "file": "mysql-bin.000006", "pos": 195, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1636628731343, "transaction": null}, partition: 0
rowtime: 2021/11/11 11:05:32.291 Z, key: <null>, value: {"before": null, "after": {"name": "michael", "reason": "help", "duration_seconds": 224}, "source": {"version": "1.1.0.Final", "connector": "mysql", "name": "call-center-db", "ts_ms": 0, "snapshot": "true", "db": "call-center", "table": "calls", "server_id": 0, "gtid": null, "file": "mysql-bin.000006", "pos": 195, "row": 0, "thread": null, "query": null}, "op": "c", "ts_ms": 1636628731350, "transaction": null}, partition: 0
...

Debeziumが登録したAvro Schemaを調べてみると、行の「before image」、「after image」、「更新に関するMetadata」で構成されているようだ。Sink Connectorによる他DBMSへのApplyをガッツリ意識した内容だなぁ。

Debeziumが作ったAvroSchema
{
  "type": "record",
  "name": "Envelope",
  "namespace": "call_center_db.call_center.calls",
  "fields": [
    {
      "name": "before",
      "type": [
        "null",
        {
          "type": "record",
          "name": "Value",
          "fields": [
            {
              "name": "name",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "reason",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "duration_seconds",
              "type": [
                "null",
                "int"
              ],
              "default": null
            }
          ],
          "connect.name": "call_center_db.call_center.calls.Value"
        }
      ],
      "default": null
    },
    {
      "name": "after",
      "type": [
        "null",
        "Value"
      ],
      "default": null
    },
    {
      "name": "source",
      "type": {
        "type": "record",
        "name": "Source",
        "namespace": "io.debezium.connector.mysql",
        "fields": [
          {
            "name": "version",
            "type": "string"
          },
          {
            "name": "connector",
            "type": "string"
          },
          {
            "name": "name",
            "type": "string"
          },
          {
            "name": "ts_ms",
            "type": "long"
          },
          {
            "name": "snapshot",
            "type": [
              {
                "type": "string",
                "connect.version": 1,
                "connect.parameters": {
                  "allowed": "true,last,false"
                },
                "connect.default": "false",
                "connect.name": "io.debezium.data.Enum"
              },
              "null"
            ],
            "default": "false"
          },
          {
            "name": "db",
            "type": "string"
          },
          {
            "name": "table",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "server_id",
            "type": "long"
          },
          {
            "name": "gtid",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "file",
            "type": "string"
          },
          {
            "name": "pos",
            "type": "long"
          },
          {
            "name": "row",
            "type": "int"
          },
          {
            "name": "thread",
            "type": [
              "null",
              "long"
            ],
            "default": null
          },
          {
            "name": "query",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ],
        "connect.name": "io.debezium.connector.mysql.Source"
      }
    },
    {
      "name": "op",
      "type": "string"
    },
    {
      "name": "ts_ms",
      "type": [
        "null",
        "long"
      ],
      "default": null
    },
    {
      "name": "transaction",
      "type": [
        "null",
        {
          "type": "record",
          "name": "ConnectDefault",
          "namespace": "io.confluent.connect.avro",
          "fields": [
            {
              "name": "id",
              "type": "string"
            },
            {
              "name": "total_order",
              "type": "long"
            },
            {
              "name": "data_collection_order",
              "type": "long"
            }
          ]
        }
      ],
      "default": null
    }
  ],
  "connect.name": "call_center_db.call_center.calls.Envelope"
}

Create the ksqlDB calls stream

Streamを作る。Avro Schemaをかぶせておく。

CREATE_STREAM
ksql> CREATE STREAM calls WITH (
>    kafka_topic = 'call-center-db.call-center.calls',
>    value_format = 'avro'
>);

 Message
----------------
 Stream created
----------------

Create the materialized views

Materialized views、つまりksqlDBのTableを作る。->を用いることでAvro SchemaにおけるAfter Recordの中を指定している。このTableでは以下の関数を使用し、NAMEでGroup byしている。
-count_distinct : 「コール理由の種類数」を数える
-latest_by_offset: 「最後のコール理由」を取り出す

support_viewテーブル
ksql> CREATE TABLE support_view AS
>    SELECT after->name AS name,
>           count_distinct(after->reason) AS distinct_reasons,
>           latest_by_offset(after->reason) AS last_reason
>    FROM calls
>    GROUP BY after->name
>    EMIT CHANGES;

 Message
-------------------------------------------
 Created query with ID CTAS_SUPPORT_VIEW_3
-------------------------------------------

もう一つ作る。このtableでは以下の関数を使用し、やはりNAMEでGroup byしている。
-count : 「コール(理由)数」を数える
-(sum(after->duration_seconds) / 60) : コール時間(秒)を集計し分に変換する

lifetime_viewテーブル
ksql> CREATE TABLE lifetime_view AS
>    SELECT after->name AS name,
>           count(after->reason) AS total_calls,
>           (sum(after->duration_seconds) / 60) as minutes_engaged
>    FROM calls
>    GROUP BY after->name
>    EMIT CHANGES;

 Message
--------------------------------------------
 Created query with ID CTAS_LIFETIME_VIEW_5
--------------------------------------------

さっそくksqlによる検索を実施してみる。

support_viewのdrek情報
ksql> SELECT name, distinct_reasons, last_reason
>FROM support_view
>WHERE name = 'derek';
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|NAME                                                     |DISTINCT_REASONS                                         |LAST_REASON                                              |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|derek                                                    |3                                                        |refund                                                   |
Query terminated
lifetime_viewのderek情報
ksql> SELECT name, total_calls, minutes_engaged
>FROM lifetime_view
>WHERE name = 'derek';
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|NAME                                                     |TOTAL_CALLS                                              |MINUTES_ENGAGED                                          |
+---------------------------------------------------------+---------------------------------------------------------+---------------------------------------------------------+
|derek                                                    |4                                                        |227                                                      |
Query terminated

上記二つのテーブルをJOINしたテーブルを作ってみる。

2つのTableのJOIN
ksql> CREATE TABLE support_view_enriched AS
>SELECT support_view.name as name, distinct_reasons, last_reason, total_calls, minutes_engaged
> FROM support_view
> LEFT JOIN lifetime_view
>   on support_view.name = lifetime_view.name
> EMIT CHANGES;

 Message
-----------------------------------------------------
 Created query with ID CTAS_SUPPORT_VIEW_ENRICHED_11
-----------------------------------------------------

ksql検索結果は下記の通り。JOIN成功。

support_view_enrichedのderek情報
ksql> SELECT name, distinct_reasons, last_reason, total_calls, minutes_engaged FROM support_view_enriched WHERE name='derek';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|NAME                             |DISTINCT_REASONS                 |LAST_REASON                      |TOTAL_CALLS                      |MINUTES_ENGAGED                  |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|derek                            |3                                |refund                           |4                                |227                              |
Query terminated

新たなレコード追加

MySQL CLIからderekのコールに関する新レコードを挿入。彼はモンスタークレーマーなのだ!

MySQL_CLIから新レコード追加
mysql> INSERT INTO calls (name, reason, duration_seconds) VALUES ("derek", "complain", 1200);
Query OK, 1 row affected (0.07 sec)

ksql検索結果にちゃんと反映されている。debeziumがMySQLログから情報を引っ張ってTopicに書き込み、それをksqlDBがリアルタイム処理していることが分かる。

support_view_enrichedのderek情報
ksql> SELECT * FROM support_view_enriched WHERE name='derek';
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|NAME                             |DISTINCT_REASONS                 |LAST_REASON                      |TOTAL_CALLS                      |MINUTES_ENGAGED                  |
+---------------------------------+---------------------------------+---------------------------------+---------------------------------+---------------------------------+
|derek                            |4                                |complain                         |5                                |247                              |
Query terminated

まとめ

  • ソースが何であれ、Topic内に格納されているメッセージのAvro Schemaさえ押さえておけばksql処理は可能
  • DB更新をソースとする場合、更新出現が完全に業務依存になるためksql処理設計の際は十分注意が必要。今回取り扱ったコールセンター業務のコールログのように単調に追加されるだけであれば扱いは楽だが、行の削除があるDBに対するksql表現はかなりややこしくなりそう。というかうまくサンプルが思いつかない。
  • Debeziumはすごい。
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?