概要
Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。
この記事 にあるように、CFK を使用して AKS上に Confluent Platform(以下、CP) をデプロイし、ksqldb-cli を利用してKSQLの設定を施した以下の図の構成において、Producer側のCosmosDBにデータを流し込み、CP の CosmosDB Sink Connector から topic001 を経由して、KSQLのクエリ処理によりデータが抽出されたものが、topic002 にストリーミングされることを確認します。
- データストリーミングの流れは以下となります。
- Pythonプログラムを利用して疑似データ生成し CosmosDB に書き込みます
- CosmosDBに書き込まれたデータは CosmosDB Sink Connector を経由して、CP の topic001 にストリーミングされます
- topic001 のストリーミングデータの必要なカラムデータのみ、クエリ処理のため stream001 に転送されます
- stream001 のデータをベースに stream002 として「カラム - section : 'C','E','F','W' のデータのみ」の抽出処理されます
- stream002 で抽出処理されたストリーミングデータを topic002 に転送します
- topic002 のデータを読み込むPythonプログラムを作成し、AKS上のPodで稼働させ、抽出されたデータを確認します
ローカル環境
- macOS Monterey 12.3.1
- python 3.8.12
- Azure CLI 2.34.1
- helm v3.6.3
- kubectl v1.21.3
利用プログラム
Producer側のプログラム
上記「データストリーミングの流れ:1」で利用する Pythonプログラムは以下となります。
このプログラムの実行はローカルのMacから行います。
import time
from datetime import date, datetime
import random
import json
import argparse
import string
from faker.factory import Factory
from azure.cosmos import CosmosClient
endpoint = 'https://iturucosmosdb01.documents.azure.com:443/'
key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
databaseName = 'CPDemoDB001'
containerName = 'container001'
# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")
# ダミーセクション?(大文字アルファベットを定義)
section = string.ascii_uppercase
# JSONダミーデータの作成
def iot_json_data(count, proc):
iot_items = json.dumps({
'items': [{
'id': str(datetime.now().timestamp()), # id
'ctid': i, # create id
'proc': proc, # データ生成プロセス名
'section': random.choice(section), # IoT機器セクション
'iot_num': fake.zipcode(), # IoT機器番号
'iot_state': fake.prefecture(), # IoT設置場所
'val_1': random.uniform(100, 200), # IoT値−1
'val_2': random.uniform(50, 90), # IoT値−2
'created_at': generate_time() # データ生成時間
}
for i in range(count)
]
}, ensure_ascii=False).encode('utf-8')
return iot_items
# IoT機器で計測されたダミーデータの生成時間
def generate_time():
dt_time = datetime.now()
gtime = json_trans_date(dt_time)
return gtime
# date, datetimeの変換関数
def json_trans_date(obj):
# 日付型を文字列に変換
if isinstance(obj, (datetime, date)):
return obj.isoformat()
# 上記以外は対象外.
raise TypeError ("Type %s not serializable" % type(obj))
# CosmosDBへの接続
def ConnectionToCosmosDB():
try:
client = CosmosClient(endpoint, key)
db = client.get_database_client(databaseName)
container = db.get_container_client(containerName)
# print("\nConnection established\n")
return 1, container
except Exception as err:
return 0, err
# メイン : ターミナル出力用
def tm_main(count, proc, wait):
print('ターミナル 出力\n')
# ダミーデータ生成
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
# ターミナル出力
for item in json_dict['items']:
print(item)
time.sleep(wait)
# メイン : Show Database
def show_db_main():
print('CosmosDB への接続確認')
code, conn = ConnectionToCosmosDB()
if code == 0 :
print(conn)
else :
print(conn)
# メイン : Write Database
def write_db_main(count, proc, wait):
print('データベースへのデータ書き込み')
# DBへの接続
code, conn = ConnectionToCosmosDB()
if code == 0 :
print(conn)
return
# ダミーデータ生成
iotjsondata = iot_json_data(count, proc)
json_dict = json.loads(iotjsondata)
## プログレスバーで進捗状況を表示
# データのインサート
try:
# for item in tqdm(json_dict['items']):
for item in json_dict['items']:
print(list(item.values()))
conn.create_item(item)
time.sleep(wait)
print("\nInserted",count,"row(s) of data.")
print("Done.")
except Exception as err:
print(err)
return
# メイン : Read Database
def read_db_main():
print('データベースからのデータ読み込み')
# DBへの接続
code, conn = ConnectionToCosmosDB()
if code == 0 :
print(conn)
return
# Read table data
items = conn.read_all_items()
# print all rows
try:
[print(f'{item}') for item in items]
print("Done.")
except Exception as err:
print(err)
# メイン : Delete Database
def delete_db_main():
print('データベースのデータ全削除')
# DBへの接続
code, conn = ConnectionToCosmosDB()
if code == 0 :
print(conn)
return
try:
for item in conn.query_items(query='SELECT * FROM container001',enable_cross_partition_query=True):
print(item['section'])
conn.delete_item(item, partition_key=item['section'])
print("Done.")
except Exception as err:
print(err)
# メイン
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='ダミーデータの自動生成からのDBへの書込み')
parser.add_argument('--count', type=int, default=5, help='データ作成件数(デフォルト:5件)')
parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名(デフォルト:111)')
parser.add_argument('--mode', type=str, default='tm', help='tm(Terminal(デフォルト))/ db(db操作)')
parser.add_argument('--wait', type=float, default=1, help='データ生成間隔(デフォルト:1.0秒)')
parser.add_argument('--db', type=str, default='show', help='show(デフォルト) / write / read / delete')
args = parser.parse_args()
start = time.time()
if (args.mode == 'tm'):
tm_main(args.count, args.proc, args.wait)
elif (args.mode == 'db'):
if (args.db == 'show'):
show_db_main()
elif (args.db == 'write'):
write_db_main(args.count, args.proc, args.wait)
elif (args.db == 'read'):
read_db_main()
elif (args.db == 'delete'):
delete_db_main()
else :
print("パラメータ設定を確認ください --help")
making_time = time.time() - start
print("")
print("処理時間:{0}".format(making_time) + " [sec]")
print("")
Consumer側のプログラム
上記「データストリーミングの流れ:6」で利用する Pythonプログラムは以下となります。
このプログラムの実行は、別途AKS環境上に別途起動するPod内となります。
from kafka import KafkaConsumer
# データのターミナル出力
def topic_to_tm(consumer):
print('ターミナル 出力')
# Read data from kafka
try :
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset,
message.key, message.value.decode('utf-8')))
except KeyboardInterrupt :
print('\r\n Output to Terminal - interrupted!')
return
# Kafka Topic からのデータ受取
def get_kafka_topic():
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('topic002', bootstrap_servers = ['kafka.akscfk231.svc.cluster.local:9092'])
print(consumer)
return consumer
if __name__ == '__main__':
consumer = get_kafka_topic()
topic_to_tm(consumer)
Consumer側の作業用Podの作成
Consumer側の Pythonプログラム実行する Pod(以下:Consumer-Pod)を作成します。
Pythonプログラムを Configmap への登録
Pythonプログラムファイルを指定して、Configmapを作成します。
## Configmapへの登録
$ kubectl create configmap python-consumer002-configmap --from-file=topic002_consumer.py
configmap/python-consumer002-configmap created
## Configmapの確認
$ kubectl get configmap python-consumer002-configmap
NAME DATA AGE
python-consumer002-configmap 1 32s
## Configmapの詳細確認
$ kubectl describe configmap python-consumer002-configmap
Name: python-consumer002-configmap
Namespace: akscfk231
Labels: <none>
Annotations: <none>
Data
====
topic002_consumer.py:
----
from kafka import KafkaConsumer
# データのターミナル出力
def topic_to_tm(consumer):
print('ターミナル 出力')
# Read data from kafka
try :
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset,
message.key, message.value.decode('utf-8')))
except KeyboardInterrupt :
print('\r\n Output to Terminal - interrupted!')
return
# Kafka Topic からのデータ受取
def get_kafka_topic():
# Initialize consumer variable and set property for JSON decode
consumer = KafkaConsumer ('topic002', bootstrap_servers = ['kafka.akscfk231.svc.cluster.local:9092'])
print(consumer)
return consumer
if __name__ == '__main__':
consumer = get_kafka_topic()
topic_to_tm(consumer)
Events: <none>
Consumer-Pod の起動
Podのマニフェストファイルは以下となります。
apiVersion: v1
kind: Pod
metadata:
name: python-client2
namespace: akscfk231
spec:
containers:
- name: python-client2
image: python:3.7.13-slim
command:
- bash
- -c
- "exec tail -f /dev/null"
volumeMounts:
- mountPath: /pyturu
name: python-consumer
volumes:
- name: python-consumer
configMap:
name: python-consumer002-configmap
Podを起動させ必要な設定をおこないます。
## Pod起動
$ kubectl apply -f python-client2.yaml
pod/python-client2 created
## Podへの接続
$ kubectl exec -it python-client2 -- /bin/bash
root@python-client2:/#
## Pythonプログラムの確認
root@python-client2:/# cd /pyturu
root@python-client2:/pyturu# ls -l
total 0
lrwxrwxrwx 1 root root 27 Aug 18 08:53 topic002_consumer.py -> ..data/topic002_consumer.py
## 必要なPythonライブラリのインストール
root@python-client2:/# pip install kafka-python
Collecting kafka-python
Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 3.6 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2
プログラムの実行と結果
最初に、Consumer側のプログラムを実行しデータを受け取れる上にします。その後、Producer側からデータを生成し設定したデータ抽出条件でリアルタイムにデータ処理され、Consumer側での受信を確認します。
Consumer側
以下のようにプログラムを実行します。データ受信状態となります。(Consumer-Podに接続した状態で実行)
root@python-client2:/pyturu# python topic002_consumer.py
<kafka.consumer.group.KafkaConsumer object at 0x7fe75aec91d0>
ターミナル 出力
Producer側
以下のようにプログラムを実行します。 0.5秒間隔で30件のデータを生成させます。
$ python cosmosdb_IoTdummy.py --mode db --db write --wait 0.5 --count 30
データベースへのデータ書き込み
['1660834305.732015', 0, '111', 'O', '859-1514', '熊本県', 158.00226177654937, 50.97531387546896, '2022-08-18T23:51:45.732165']
['1660834305.732184', 1, '111', 'O', '075-8277', '茨城県', 170.42993612928407, 77.83170884785234, '2022-08-18T23:51:45.732221']
['1660834305.732227', 2, '111', 'E', '554-6753', '北海道', 148.92876929443668, 82.22632577434096, '2022-08-18T23:51:45.732255']
['1660834305.73226', 3, '111', 'B', '487-6201', '滋賀県', 112.52823513843735, 71.56402107831435, '2022-08-18T23:51:45.732286']
['1660834305.73229', 4, '111', 'N', '170-6136', '青森県', 119.9081080641646, 73.91280181768438, '2022-08-18T23:51:45.732314']
['1660834305.732319', 5, '111', 'H', '394-5687', '福井県', 185.08908349053524, 54.60720831400024, '2022-08-18T23:51:45.732343']
['1660834305.732347', 6, '111', 'O', '752-3195', '徳島県', 163.65979062146778, 68.80266444058988, '2022-08-18T23:51:45.732370']
['1660834305.732374', 7, '111', 'V', '313-9684', '鳥取県', 186.76982229974786, 71.43296034005864, '2022-08-18T23:51:45.732397']
['1660834305.732401', 8, '111', 'Q', '128-7149', '沖縄県', 110.07222152971069, 74.39862438101605, '2022-08-18T23:51:45.732425']
['1660834305.732429', 9, '111', 'R', '361-6022', '兵庫県', 166.9902488217956, 89.09971572382713, '2022-08-18T23:51:45.732452']
['1660834305.732456', 10, '111', 'Z', '098-6251', '三重県', 158.87197665118143, 81.01997557755357, '2022-08-18T23:51:45.732480']
['1660834305.732488', 11, '111', 'O', '743-4912', '愛知県', 168.45018805372104, 58.38400703727193, '2022-08-18T23:51:45.732513']
['1660834305.732517', 12, '111', 'N', '244-6470', '宮崎県', 140.53376931066012, 81.83117092131732, '2022-08-18T23:51:45.732540']
['1660834305.732544', 13, '111', 'U', '093-0428', '秋田県', 137.67206357275325, 82.49823923324556, '2022-08-18T23:51:45.732571']
['1660834305.732576', 14, '111', 'R', '918-1763', '鳥取県', 133.23228040733093, 79.4502829818707, '2022-08-18T23:51:45.732600']
['1660834305.732604', 15, '111', 'J', '845-5792', '栃木県', 190.33081017479677, 56.93608388242184, '2022-08-18T23:51:45.732628']
['1660834305.732632', 16, '111', 'H', '282-8691', '三重県', 135.48589527133828, 68.90330426502534, '2022-08-18T23:51:45.732655']
['1660834305.732659', 17, '111', 'R', '971-8072', '東京都', 128.7487058948935, 75.92087004159501, '2022-08-18T23:51:45.732682']
['1660834305.732686', 18, '111', 'C', '734-2744', '山梨県', 172.25052850462095, 53.00702358904964, '2022-08-18T23:51:45.732708']
['1660834305.732712', 19, '111', 'I', '006-3248', '鹿児島県', 131.41670420062965, 84.72459508340276, '2022-08-18T23:51:45.732734']
['1660834305.732738', 20, '111', 'W', '090-5446', '香川県', 190.06449809221584, 81.7200152031348, '2022-08-18T23:51:45.732761']
['1660834305.732765', 21, '111', 'G', '648-4463', '群馬県', 182.8725661666906, 76.47743809460934, '2022-08-18T23:51:45.732788']
['1660834305.732792', 22, '111', 'T', '911-9217', '兵庫県', 158.22348413535988, 64.60388246056716, '2022-08-18T23:51:45.732814']
['1660834305.732818', 23, '111', 'C', '891-9247', '北海道', 176.11284094147544, 75.70547358717785, '2022-08-18T23:51:45.732840']
['1660834305.732848', 24, '111', 'B', '060-1112', '北海道', 192.3259788508905, 79.94946793864301, '2022-08-18T23:51:45.732872']
['1660834305.732876', 25, '111', 'Z', '953-7789', '山梨県', 124.87882930311295, 72.23162082053496, '2022-08-18T23:51:45.732899']
['1660834305.732903', 26, '111', 'W', '364-3961', '福井県', 132.41009809155906, 65.1689165300587, '2022-08-18T23:51:45.732925']
['1660834305.732929', 27, '111', 'P', '785-6103', '福島県', 104.55147953912095, 66.21606634357327, '2022-08-18T23:51:45.732952']
['1660834305.732956', 28, '111', 'Y', '916-4663', '宮城県', 194.5279802822507, 64.50228129345219, '2022-08-18T23:51:45.732978']
['1660834305.732982', 29, '111', 'I', '904-0606', '新潟県', 179.6807544670632, 69.48480016692815, '2022-08-18T23:51:45.733005']
Inserted 30 row(s) of data.
Done.
処理時間:16.733893156051636 [sec]
Consumer側のデータの確認
上記のProducer側からの30件のデータにおいて、4つ目の項目(section)が「C, E, F, W」である5つのデータが抽出され、Consumer側に転送されてきました。
root@python-client2:/pyturu# python topic002_consumer.py
<kafka.consumer.group.KafkaConsumer object at 0x7f6d50f9e210>
ターミナル 出力
topic002:0:0: key=None value={"ID":"1660834305.732227","CTID":2,"SECTION":"E","IOT_STATE":"北海道","VAL_1":148.92876929443668,"VAL_2":82.22632577434096,"CREATED_AT":"2022-08-18T23:51:45.732255"}
topic002:0:1: key=None value={"ID":"1660834305.732686","CTID":18,"SECTION":"C","IOT_STATE":"山梨県","VAL_1":172.25052850462095,"VAL_2":53.00702358904964,"CREATED_AT":"2022-08-18T23:51:45.732708"}
topic002:0:2: key=None value={"ID":"1660834305.732738","CTID":20,"SECTION":"W","IOT_STATE":"香川県","VAL_1":190.06449809221584,"VAL_2":81.7200152031348,"CREATED_AT":"2022-08-18T23:51:45.732761"}
topic002:0:3: key=None value={"ID":"1660834305.732818","CTID":23,"SECTION":"C","IOT_STATE":"北海道","VAL_1":176.11284094147544,"VAL_2":75.70547358717785,"CREATED_AT":"2022-08-18T23:51:45.732840"}
topic002:0:4: key=None value={"ID":"1660834305.732903","CTID":26,"SECTION":"W","IOT_STATE":"福井県","VAL_1":132.41009809155906,"VAL_2":65.1689165300587,"CREATED_AT":"2022-08-18T23:51:45.732925"}
その他の確認
stream情報の確認
2つの stream の状況を確認してみます。上記のデータ以外に追加で30件のデータをProducer側から流し込んでいます。
## ksql-client2 の Podに接続します
$ kubectl exec -it ksql-client2 -- /bin/bash
[appuser@ksql-client2 ~]$
## stream001 の状況
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream001 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Name : STREAM001
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : AVRO
Kafka topic : topic001 (partitions: 1, replication: 3)
Statement : CREATE STREAM STREAM001 (ID STRING, CTID BIGINT, SECTION STRING, IOT_STATE STRING, VAL_1 DOUBLE, VAL_2 DOUBLE, CREATED_AT STRING) WITH (KAFKA_TOPIC='topic001', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');
Field | Type
------------------------------
ID | VARCHAR(STRING)
CTID | BIGINT
SECTION | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VAL_1 | DOUBLE
VAL_2 | DOUBLE
CREATED_AT | VARCHAR(STRING)
------------------------------
Sources that have a DROP constraint on this source
--------------------------------------------------
STREAM002
Queries that read from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Runtime statistics by host
-------------------------
Host | Metric | Value | Last Message
----------------------------------------------------------------------------------------------------------------------
ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-messages-per-sec | 0 | 2022-08-18T15:15:46.583Z
ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-total-bytes | 15781 | 2022-08-18T15:15:46.583Z
ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-total-messages | 60 | 2022-08-18T15:15:46.583Z
----------------------------------------------------------------------------------------------------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic001)
## stream002 の状況
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream002 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Name : STREAM002
Type : STREAM
Timestamp field : Not set - using <ROWTIME>
Key format : KAFKA
Value format : JSON_SR
Kafka topic : topic002 (partitions: 1, replication: 3)
Statement : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT *
FROM STREAM001 STREAM001
WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W'))
EMIT CHANGES;
Field | Type
------------------------------
ID | VARCHAR(STRING)
CTID | BIGINT
SECTION | VARCHAR(STRING)
IOT_STATE | VARCHAR(STRING)
VAL_1 | DOUBLE
VAL_2 | DOUBLE
CREATED_AT | VARCHAR(STRING)
------------------------------
Queries that write from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;
For query topology and execution plan please run: EXPLAIN <QueryId>
Runtime statistics by host
-------------------------
Host | Metric | Value | Last Message
-------------------------------------------------------------------------------------------------------------
ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | messages-per-sec | 0 | 2022-08-18T15:15:46.577Z
ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | total-messages | 10 | 2022-08-18T15:15:46.577Z
-------------------------------------------------------------------------------------------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic002)
Consumer Groups summary:
Consumer Group : _confluent-ksql-akscfk231.ksqldb_query_CSAS_STREAM002_1
Kafka topic : topic001
Max lag : 0
Partition | Start Offset | End Offset | Offset | Lag
------------------------------------------------------
0 | 0 | 65 | 65 | 0
------------------------------------------------------
Schme情報の確認
2つの topic で自動生成される Schema情報は以下となっています(Confluent Control Center からのダウンロード)
自動生成される topic001 の Schema情報
{
"connect.name": "inferred_name__1785624689",
"fields": [
{
"name": "id",
"type": "string"
},
{
"name": "ctid",
"type": "long"
},
{
"name": "proc",
"type": "string"
},
{
"name": "section",
"type": "string"
},
{
"name": "iot_num",
"type": "string"
},
{
"name": "iot_state",
"type": "string"
},
{
"name": "val_1",
"type": "double"
},
{
"name": "val_2",
"type": "double"
},
{
"name": "created_at",
"type": "string"
},
{
"name": "_rid",
"type": "string"
},
{
"name": "_self",
"type": "string"
},
{
"name": "_etag",
"type": "string"
},
{
"name": "_attachments",
"type": "string"
},
{
"name": "_ts",
"type": "long"
},
{
"name": "_lsn",
"type": "long"
}
],
"name": "inferred_name__1785624689",
"type": "record"
}
自動生成される topic002 の Schema情報
{
"properties": {
"CREATED_AT": {
"connect.index": 6,
"oneOf": [
{
"type": "null"
},
{
"type": "string"
}
]
},
"CTID": {
"connect.index": 1,
"oneOf": [
{
"type": "null"
},
{
"connect.type": "int64",
"type": "integer"
}
]
},
"ID": {
"connect.index": 0,
"oneOf": [
{
"type": "null"
},
{
"type": "string"
}
]
},
"IOT_STATE": {
"connect.index": 3,
"oneOf": [
{
"type": "null"
},
{
"type": "string"
}
]
},
"SECTION": {
"connect.index": 2,
"oneOf": [
{
"type": "null"
},
{
"type": "string"
}
]
},
"VAL_1": {
"connect.index": 4,
"oneOf": [
{
"type": "null"
},
{
"connect.type": "float64",
"type": "number"
}
]
},
"VAL_2": {
"connect.index": 5,
"oneOf": [
{
"type": "null"
},
{
"connect.type": "float64",
"type": "number"
}
]
}
},
"type": "object"
}
まとめ
Confluent for Kubernetes (CFK)を利用して Azure Kubernetes Service(AKS)上に Confluent Platform をデプロイし、データストリーミングを全9回に渡り確認することができました。
- 第1回:#1 CP on AKS with CFK - 準備編
- 第2回:#2 CP on AKS with CFK - CCC認証編
- 第3回:#3 CP on AKS with CFK - Connector Plugin編
- 第4回:#4 CP on AKS with CFK - Topic作成編
- 第5回:#5 CP on AKS with CFK - Sink Connector作成編
- 第6回:#6 CP on AKS with CFK - Source Connector作成編
- 第7回:#7 CP on AKS with CFK - Simpleテスト編
- 第8回:#8 CP on AKS with CFK - KSQL Stream 作成編
- 第9回:#9 CP on AKS with CFK - データストリーミング編
CFKを利用することにより、Confluent Platform で DevOps(IaC化)することが可能となります。 時間があればトライしてみたいと思います。
参考情報
以下の情報を参考にさせていただきました。感謝申し上げます。
キーコンバーターと値コンバーターの構成
Using Kafka Connect with Schema Registry
Avro,SchemaRegistryことはじめ
Schema Registry について書いていく その1: Confluent Schema Registry
Confluent Platform メモ - (3)Schema Registry簡易テスト