0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AKS 上の Confluent Platform と Azure Synapse Analytics を連携させてデータ分析基盤を構築してみました

Last updated at Posted at 2022-05-31

概要

AKS 上の Confluent Platform に topic / connector / stream の設定を施し、Kafka Connector を利用して、データソース側として Azure Database for MySQL と接続し、データシンク側として Azure Synapse Analytics のデータ分析基盤と接続させる手順をまとめました。

MySQLにデータを書き込み、Confluent Platform の Stream でリアルタイムにデータが処理され、Synapse Analytics にデータが蓄積されることも確認しました。

構成イメージは以下となります。
image.png


ローカル環境

  • macOS Monterey 12.3.1
  • python 3.8.12
  • Azure CLI 2.34.1
  • helm v3.6.3
  • mssql-cli v1.0.0

事前準備

  1. この記事 を実行して、AKS上でConfluent Platform が稼働していること
  2. この記事 を実行して、Azure Database for MySQL を Single構成で稼働していること
  3. この記事 を実行して、Azure Synapse Analytics が稼働していること

各種設定

MySQLサーバの追加設定(確認)

AKSで稼働している Confluent Platform から Azure Database for MySQL にアクセスを許可するために、Azure Portal を利用して、そのリソースの 「接続のセキュリティ」 の設定における 「Azure サービスへのアクセスを許可」 を 「はい」 に変更(確認)ください

Azure Database for MySQL サーバーのファイアウォール規則を作成する を参照ください

また、「サーバパラメータ」 の項目の値が以下のように変更(確認)ください

  • time_zone : SYSTEM → +9:00
  • binlog_row_image : MINIMAL → FULL

Pod の確認

## AKSクラスタ接続のためのための認証情報の取得
$ az aks get-credentials --resource-group rg_ituru_aks01 --name aks_ituru_cp01

## namespace のデフォルトとしての指定
$ kubectl config set-context --current --namespace akscp610

## Pod の確認
$ kubectl get pod -o wide
NAME                                       READY   STATUS    RESTARTS        AGE     IP          NODE                               NOMINATED NODE   READINESS GATES
cp610-cp-control-center-f46bc647d-nb42w    1/1     Running   2 (2m52s ago)   3m33s   10.0.1.36   aks-cpdemo01-26269800-vmss000001   <none>           <none>
cp610-cp-kafka-0                           2/2     Running   1 (113s ago)    3m33s   10.0.1.16   aks-cpdemo01-26269800-vmss000000   <none>           <none>
cp610-cp-kafka-1                           2/2     Running   0               2m39s   10.0.1.47   aks-cpdemo01-26269800-vmss000001   <none>           <none>
cp610-cp-kafka-2                           2/2     Running   0               2m7s    10.0.1.79   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-kafka-connect-7b6cd684b5-6s5gf    2/2     Running   2 (111s ago)    3m33s   10.0.1.11   aks-cpdemo01-26269800-vmss000000   <none>           <none>
cp610-cp-ksql-server-656b866794-kw76z      2/2     Running   0               3m33s   10.0.1.68   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-schema-registry-d8466d9dd-msm5p   2/2     Running   2 (2m43s ago)   3m33s   10.0.1.67   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-zookeeper-0                       2/2     Running   0               3m33s   10.0.1.44   aks-cpdemo01-26269800-vmss000001   <none>           <none>
cp610-cp-zookeeper-1                       2/2     Running   0               2m39s   10.0.1.73   aks-cpdemo01-26269800-vmss000002   <none>           <none>
cp610-cp-zookeeper-2                       2/2     Running   0               115s    10.0.1.12   aks-cpdemo01-26269800-vmss000000   <none>           <none>
kafka-client                               1/1     Running   0               6m15s   10.0.1.69   aks-cpdemo01-26269800-vmss000002   <none>           <none>
ksql-client                                1/1     Running   0               6m2s    10.0.1.14   aks-cpdemo01-26269800-vmss000000   <none>           <none>


## 起動サービスの確認
$ kubectl get svc
NAME                          TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
cp610-cp-control-center       ClusterIP   10.1.0.73    <none>        9021/TCP            4m16s
cp610-cp-kafka                ClusterIP   10.1.0.63    <none>        9092/TCP,5556/TCP   4m16s
cp610-cp-kafka-connect        ClusterIP   10.1.0.26    <none>        8083/TCP,5556/TCP   4m16s
cp610-cp-kafka-headless       ClusterIP   None         <none>        9092/TCP            4m16s
cp610-cp-ksql-server          ClusterIP   10.1.0.86    <none>        8088/TCP,5556/TCP   4m16s
cp610-cp-schema-registry      ClusterIP   10.1.0.157   <none>        8081/TCP,5556/TCP   4m16s
cp610-cp-zookeeper            ClusterIP   10.1.0.119   <none>        2181/TCP,5556/TCP   4m16s
cp610-cp-zookeeper-headless   ClusterIP   None         <none>        2888/TCP,3888/TCP   4m16s

Kafka topic の作成

## Kafka-client への接続
$ kubectl exec -it kafka-client -- /bin/bash

## topic_002 の作成
[appuser@kafka-client ~]$ kafka-topics --zookeeper cp610-cp-zookeeper:2181 --create --topic topic002 --partitions 1 --replication-factor 1
Created topic topic002.

## topic の確認
[appuser@kafka-client ~]$ kafka-topics --zookeeper cp610-cp-zookeeper:2181 --list

Connector の定義ファイル

MySqlConnector の定義ファイル

Debeziumドキュメント-入門-チュートリアル を参照ください

MySqlSourceConnector.json
{
  "name": "MySqlSourceConnector_1",
  "config": {
    "connectionTimeZone": "JST",
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "iturumysql01.mysql.database.azure.com",
    "database.port": "3306",
    "database.user": "adminadmin@iturumysql01",
    "database.password": "HogeHogeHoge!",
    "database.server.name": "iturumysql01",
    "database.ssl.mode": "preferred",
    "database.history.kafka.bootstrap.servers": "cp610-cp-kafka:9092",
    "database.history.kafka.topic": "history.cpdemo.inventory",
    "database.include.list": "iotdummydb"
  }
}

SqlDwSinkConnector の定義ファイル

Azure Synapse Analytics Sink Connector for Confluent Platform を参照ください

SqlDwSinkConnector.json
{
  "name": "SqlDwSinkConnector_1",
  "config": {
    "connector.class": "io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector",
    "tasks.max": "1",
    "topics": "topic002",
    "azure.sql.dw.url": "jdbc:sqlserver://cpworkspace01.sql.azuresynapse.net:1433;",
    "azure.sql.dw.user": "adminadmin",
    "azure.sql.dw.password": "HogeHogeHoge1!",
    "azure.sql.dw.database.name": "cpdemosqldb",
    "auto.create": "true",
    "auto.evolve": "true",
    "table.name.format": "kafka_${topic}"
  }
}

Connector の設定

kafka Connect の接続ポート(8083)をローカル環境にリダイレクトします

$ kubectl port-forward --address localhost svc/cp610-cp-kafka-connect 8083:8083
Forwarding from 127.0.0.1:8083 -> 8083
Forwarding from [::1]:8083 -> 8083

※ CTRL+C で終了できます

別のターミナルを立ち上げ、以下の Kafka Connector を作成します

source - MySqlConnector の作成

  • 以下の3つのトピックが自動生成されています
    • iturumysql01 :CREATE,DROP / Database,Table 等の操作ログ+Schema情報が保存される
    • iturumysql01.iotdummydb.inventory :データが保存される(MySQLの対象DBにデータがあるとき)
    • history.cpdemo.inventory :Connectorがデータベースのスキーマ履歴を保管するために利用
## MySQLへの接続確認がとれないとエラーとなります
$ curl -i -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' --data @MySqlSourceConnector.json

HTTP/1.1 201 Created
Date: Mon, 23 May 2022 23:43:16 GMT
Location: http://localhost:8083/connectors/MySqlSourceConnector_1
Content-Type: application/json
Content-Length: 600
Server: Jetty(9.4.33.v20201020)

{"name":"MySqlSourceConnector_1","config":{"connectionTimeZone":"JST","connector.class":"io.debezium.connector.mysql.MySqlConnector","tasks.max":"1","database.hostname":"iturumysql01.mysql.database.azure.com","database.port":"3306","database.user":"mysqladmin@iturumysql01","database.password":"NetworldPsg2!","database.server.name":"iturumysql01","database.ssl.mode":"preferred","database.history.kafka.bootstrap.servers":"cp610-cp-kafka:9092","database.history.kafka.topic":"history.cpdemo.inventory","database.include.list":"iotdummydb","name":"MySqlSourceConnector_1"},"tasks":[],"type":"source"}

ーーー ちなみに ーーー
## 削除の場合
$ curl -X DELETE http://localhost:8083/connectors/MySqlSourceConnector_1

Sink - SqlDwSinkConnector の作成

## Synapse への接続確認がとれないとエラーとなります
$ curl -i -X POST http://localhost:8083/connectors -H 'Content-Type: application/json' --data @SqlDwSinkConnector.json

HTTP/1.1 201 Created
Date: Thu, 26 May 2022 05:44:34 GMT
Location: http://localhost:8083/connectors/SqlDwSinkConnector_1
Content-Type: application/json
Content-Length: 493
Server: Jetty(9.4.33.v20201020)

{"name":"SqlDwSinkConnector_1","config":{"connector.class":"io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector","tasks.max":"1","topics":"topic002","azure.sql.dw.url":"jdbc:sqlserver://cpworkspace01.sql.azuresynapse.net:1433;","azure.sql.dw.user":"synapseadmin","azure.sql.dw.password":"NetworldPsg2!","azure.sql.dw.database.name":"CPDemoDataWarehouse","auto.create":"true","auto.evolve":"true","table.name.format":"kafka_${topic}","name":"SqlDwSinkConnector_1"},"tasks":[],"type":"sink"}

ーーー ちなみに ーーー
## 削除の場合
$ curl -X DELETE http://localhost:8083/connectors/SqlDwSinkConnector_1

Connector の確認

## ワーカーのアクティブなコネクターのリストの表示
$ curl localhost:8083/connectors
["MySqlSourceConnector_1","SqlDwSinkConnector_1"]


## MySqlSourceConnector の状態確認
$ curl localhost:8083/connectors/MySqlSourceConnector_1/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   176  100   176    0     0   3365      0 --:--:-- --:--:-- --:--:--  3384
{
  "name": "MySqlSourceConnector_1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.1.11:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.1.11:8083"
    }
  ],
  "type": "source"
}


## SqlDwSinkConnector の状態確認
$ curl localhost:8083/connectors/SqlDwSinkConnector_1/status | jq
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   172  100   172    0     0   2487      0 --:--:-- --:--:-- --:--:--  2492
{
  "name": "SqlDwSinkConnector_1",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.0.1.11:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.0.1.11:8083"
    }
  ],
  "type": "sink"
}


## まとめて確認する場合
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
          jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
          column -s : -t| sed 's/\"//g'| sort

sink    |  SqlDwSinkConnector_1    |  RUNNING  |  RUNNING  |  io.confluent.connect.azuresqldw.AzureSqlDwSinkConnector
source  |  MySqlSourceConnector_1  |  RUNNING  |  RUNNING  |  io.debezium.connector.mysql.MySqlConnector

KSQL の設定

KSQL への接続

## Ksql-client への接続
$ kubectl exec -it ksql-client -- /bin/bash

## KsqlDB への接続
[appuser@ksql-client ~]$ ksql http://cp610-cp-ksql-server:8088

OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
                  
                  ===========================================
                  =       _              _ ____  ____       =
                  =      | | _____  __ _| |  _ \| __ )      =
                  =      | |/ / __|/ _` | | | | |  _ \      =
                  =      |   <\__ \ (_| | | |_| | |_) |     =
                  =      |_|\_\___/\__, |_|____/|____/      =
                  =                   |_|                   =
                  =  Event Streaming Database purpose-built =
                  =        for stream processing apps       =
                  ===========================================

Copyright 2017-2020 Confluent Inc.

CLI v6.1.0, Server v6.1.0 located at http://cp610-cp-ksql-server:8088
Server Status: RUNNING

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!


## topic の確認
ksql> list topics;

 Kafka Topic                       | Partitions | Partition Replicas 
---------------------------------------------------------------------
 cp610-cp-kafka-connect-config     | 1          | 3                  
 cp610-cp-kafka-connect-offset     | 25         | 3                  
 cp610-cp-kafka-connect-status     | 5          | 3                  
 history.cpdemo.inventory          | 1          | 1                  
 iturumysql01                      | 1          | 1                  
 iturumysql01.iotdummydb.inventory | 1          | 1                  
 topic002                          | 1          | 1                  
---------------------------------------------------------------------

Avro Schemaを紐付けて Stream の作成

## MySQL Source Connector 用の stream001 を作成
ksql> CREATE STREAM stream001 WITH (kafka_topic = 'iturumysql01.iotdummydb.inventory', value_format = 'avro');

 Message        
----------------
 Stream created 
----------------


## Synapse Sink Connector 用の stream002 を作成し topic002 に紐付ける
ksql> CREATE STREAM stream002 WITH (KAFKA_TOPIC='topic002', VALUE_FORMAT='AVRO') AS 
        SELECT  s001.after->id as id,
                s001.after->section as section, 
                s001.after->iot_state as prefecture,
                s001.after->val_1 as val_1,
                s001.after->val_2 as val_2
        FROM stream001 s001; 

 Message                                
----------------------------------------
 Created query with ID CSAS_STREAM002_1 
----------------------------------------


## Streamの確認
ksql> show streams;

 Stream Name | Kafka Topic                       | Key Format | Value Format | Windowed 
----------------------------------------------------------------------------------------
 STREAM001   | iturumysql01.iotdummydb.inventory | KAFKA      | AVRO         | false    
 STREAM002   | topic002                          | KAFKA      | AVRO         | false    
----------------------------------------------------------------------------------------

これですべての環境が整いました


データストリーミングの確認

Stream の確認

## データストリーミング状況を確認します
ksql> SELECT * FROM stream002 emit changes;

Press CTRL-C to interrupt

アプリケーションの実行

この記事 にある「mysql_IoTdummy.py」の Python プログラムをローカルターミナルから 実行します。1秒間隔で5件のデータを生成させました。

$ python mysql_IoTdummy.py --mode db  --wait 1--count 10

データベース・テーブルへのデータ書き込み

Connection established

['1653059784.916943', 0, '111', 'V', '786-4201', '徳島県', 158.33322633209377, 84.49057325157214, 1653059784.91714]
['1653059784.917148', 1, '111', 'L', '575-1674', '山口県', 135.8599096871533, 75.064630664443, 1653059784.917203]
['1653059784.917208', 2, '111', 'C', '607-0145', '佐賀県', 156.0637293462609, 78.21672571181065, 1653059784.917251]
['1653059784.917256', 3, '111', 'A', '749-7168', '北海道', 123.96884840139384, 80.95952082333669, 1653059784.917296]
['1653059784.917301', 4, '111', 'L', '843-4658', '香川県', 199.07957653525887, 53.37983182659535, 1653059784.917351]
Inserted 5 row(s) of data.
Done.

処理時間:6.554551839828491 [sec]

リアルタイムで「stream002」にてストリーミングを確認できます

ksql> SELECT * FROM stream002 emit changes;

+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|ID                     |SECTION                |PREFECTURE             |VAL_1                  |VAL_2                  |
+-----------------------+-----------------------+-----------------------+-----------------------+-----------------------+
|1653059784.916943      |V                      |徳島県                    |158.33322143554688     |84.49057006835938      |
|1653059784.917148      |L                      |山口県                    |135.8599090576172      |75.06462860107422      |
|1653059784.917208      |C                      |佐賀県                    |156.06373596191406     |78.21672821044922      |
|1653059784.917256      |A                      |北海道                    |123.9688491821289      |80.95951843261719      |
|1653059784.917301      |L                      |香川県                    |199.07957458496094     |53.37983322143555      |

Press CTRL-C to interrupt

トピック「topic002」でもデータがストリーミングされているか確認します

ksql> PRINT 'iturumysql01.iotdummydb.inventory' FROM BEGINNING;

ksql> PRINT 'topic002' FROM BEGINNING;

Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO
rowtime: 2022/05/20 15:16:25.700 Z, key: <null>, value: {"ID": "1653059784.916943", "SECTION": "V", "PREFECTURE": "徳島県", "VAL_1": 158.33322143554688, "VAL_2": 84.49057006835938}
rowtime: 2022/05/20 15:16:26.703 Z, key: <null>, value: {"ID": "1653059784.917148", "SECTION": "L", "PREFECTURE": "山口県", "VAL_1": 135.8599090576172, "VAL_2": 75.06462860107422}
rowtime: 2022/05/20 15:16:27.705 Z, key: <null>, value: {"ID": "1653059784.917208", "SECTION": "C", "PREFECTURE": "佐賀県", "VAL_1": 156.06373596191406, "VAL_2": 78.21672821044922}
rowtime: 2022/05/20 15:16:29.209 Z, key: <null>, value: {"ID": "1653059784.917256", "SECTION": "A", "PREFECTURE": "北海道", "VAL_1": 123.9688491821289, "VAL_2": 80.95951843261719}
rowtime: 2022/05/20 15:16:30.211 Z, key: <null>, value: {"ID": "1653059784.917301", "SECTION": "L", "PREFECTURE": "香川県", "VAL_1": 199.07957458496094, "VAL_2": 53.37983322143555}

Press CTRL-C to interrupt

Azure Synapse Analytics でのデータチェック

「mssql-cli」を使用することにより、Synapse の SQLプールでデータベースに対しクエリを実行します。Confluent Platform から Kafka Connector を経由してデータが蓄積されていることを確認します。

## 引数として SQLプール(データベース)名、ユーザー名、パスワードを指定して、Synapse の workspace に接続
$ mssql-cli -S cpworkspace01.sql.azuresynapse.net -U synapseadmin -P NetworldPsg2! -d cpdemosqldb

## テーブルでクエリを実行して、そのコンテンツを表示
cpdemosqldb> select * from kafka_topic002;
+------------------+-------------------+-----------+------------------+--------------+
| VAL_1            | ID                | SECTION   | VAL_2            | PREFECTURE   |
|------------------+-------------------+-----------+------------------+--------------|
| 111.479415893555 | 1653984067.079385 | B         | 66.560661315918  | 岡山県       |
| 189.111267089844 | 1653984229.080214 | S         | 68.9491729736328 | 長野県       |
| 191.615036010742 | 1653984100.803468 | Q         | 53.9585876464844 | 福岡県       |
| 199.33251953125  | 1653984229.080527 | F         | 61.3692970275879 | 福岡県       |
| 151.738159179688 | 1653983738.752873 | B         | 87.595329284668  | 大阪府       |
| 194.590621948242 | 1653983783.798177 | X         | 87.0414962768555 | 栃木県       |
| 114.615188598633 | 1653983738.752919 | D         | 89.7937088012695 | 宮城県       |
| 124.387306213379 | 1653984441.154836 | S         | 74.9007797241211 | 長崎県       |
| 164.049896240234 | 1653983783.798038 | I         | 72.0705490112305 | 岐阜県       |
| 198.455078125    | 1653984100.803527 | T         | 65.5769958496094 | 福井県       |
      :                    :            :                 :             :

後処理

Pod / namespace のアンインストール方法

## Pod : Confluent Platform
$ helm delete cp610             
W0407 17:38:18.658767   39907 warnings.go:70] policy/v1beta1 PodDisruptionBudget is deprecated in v1.21+, unavailable in v1.25+; use policy/v1 PodDisruptionBudget
release "cp610" uninstalled

## Pod : kafka-client / ksql-client
$ kubectl delete -f cp-helm-charts/examples/kafka-client.yaml
$ kubectl delete -f cp-helm-charts/examples/ksql-client.yaml

## namespace の削除方法(namespace配下のPodも削除される)
$ kubectl delete namespace akscp610
namespace "akscp610" deleted

AKSクラスターの停止・起動

$ az aks stop -g rg_ituru_aks01 -n aks_ituru_cp01
$ az aks start -g rg_ituru_aks01 -n aks_ituru_cp01

まとめ

これで、Confluent Platform をコアにしたデータストリーミング+データ分析基盤をAzure上に手軽に構築することができました。Kafka Connect はたくさんのリソースへの接続をサポートしているので、データの集約も可能となり、かつ、データ分析基盤への接続も容易であることも確認できました。

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。

また、以下の情報も参考にしました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?