概要
Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。
CFK を使用して AKS上にデプロイされている Confluent Platform に この記事 を参考にして下図のような構成を組みます
ソース側としてのCosmosDBに対してデータを書き込み、Confluent Platform 上の topic を経由して、シンク側のCosmosDBにそのままのデータが蓄積されることを確認します。KSQLによるデータ抽出等を実装する前の単純な確認作業となります。
ローカル環境
- macOS Monterey 12.3.1
- python 3.8.12
- Azure CLI 2.34.1
- helm v3.6.3
- kubectl v1.21.3
事前準備
- この記事 を参考にして、上図の構成が組まれていること
Sink CosmosDB 設定項目 | 値 |
---|---|
Endpoint | https://iturucosmosdb01.documents.azure.com:443/ |
CosmosDB アカウント名 | iturucosmosdb01 |
データベース名 | CPDemoDB002 |
コンテナ名 | container002 |
パーティション | section |
Source CosmosDB 設定項目 | 値 |
---|---|
Endpoint | https://iturucosmosdb01.documents.azure.com:443/ |
CosmosDB アカウント名 | iturucosmosdb01 |
データベース名 | CPDemoDB001 |
コンテナ名 | container001 |
パーティション | section |
Simpleテストの実行
ソース側 CosmosDB へのデータ書き込み
この記事 にあるデータ生成プログラムを利用して、ローカル環境からソース側 CosmosDB にデータを書き込みます(連続で8件のデータを生成)。
$ python cosmosdb_IoTdummy.py --mode db --db write --wait 0 --count 8
データベースへのデータ書き込み
['1657199670.821585', 0, '111', 'O', '432-2664', '福井県', 107.22296451311148, 69.81618009103482, '2022-07-07T22:14:30.821725']
['1657199670.821737', 1, '111', 'C', '118-3635', '沖縄県', 121.79679326570849, 77.41292379149846, '2022-07-07T22:14:30.821773']
['1657199670.82178', 2, '111', 'M', '342-8730', '埼玉県', 148.90352784331895, 59.66836612610244, '2022-07-07T22:14:30.821807']
['1657199670.821812', 3, '111', 'Z', '065-7284', '香川県', 160.81864815360737, 87.7771508916959, '2022-07-07T22:14:30.821838']
['1657199670.821843', 4, '111', 'X', '267-3435', '新潟県', 140.15235764972246, 71.62379403858466, '2022-07-07T22:14:30.821868']
['1657199670.821873', 5, '111', 'D', '406-8826', '島根県', 144.91235299507284, 51.99074105853973, '2022-07-07T22:14:30.821897']
['1657199670.821901', 6, '111', 'A', '133-0863', '佐賀県', 150.37960537206877, 53.52995764628445, '2022-07-07T22:14:30.821926']
['1657199670.82193', 7, '111', 'Z', '121-8609', '大分県', 136.44769557485782, 58.31648238948629, '2022-07-07T22:14:30.821954']
Inserted 8 row(s) of data.
Done.
処理時間:1.1323838233947754 [sec]
Simpleテストの結果
Confluent Control Center 上での確認
Confluent Control Center にログインします。
$ kubectl confluent dashboard controlcenter
http://localhost:9021
該当トピック「topic002」のメッセージを確認します。結果は想定の通りとなっています。
合わせて、Schema情報(value / key)を確認します。結果は想定の通りとなっています。
Azure Portal 上での確認
Azure Portalにサインインし、該当のCosmosDBリソースにアクセスします。
ソース側のCosmosDB「CPDemo001/container001」の「items」を確認します。結果は想定の通りとなっています。
シンク側のCosmosDB「CPDemo002/container002」の「items」を確認します。結果は想定の通り上記と同じデータが同じ順番で蓄積されています。
まとめ
Confluent Platform 上に構成された 「Source Connector」→「topic」→「Sink Connector」 の経路を通って、データが問題なく流れていることを確認できました。これで、KSQLの実装に進むことができます。