1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

#9 Confluent for Kubernetes を使用して AKS 上に Confluent Platform を構成してみました - データストリーミング編

Last updated at Posted at 2022-08-18

概要

Confluent for Kubernetes (CFK)は、プライベートクラウド環境(今回は Azure Kubernetes Service(AKS))に Confluent をデプロイして管理するためのクラウドネイティブのコントロールプレーンです。宣言型 API で Confluent Platform をカスタマイズ、デプロイ、管理するための標準的で簡素なインターフェイスが備わっています。

この記事 にあるように、CFK を使用して AKS上に Confluent Platform(以下、CP) をデプロイし、ksqldb-cli を利用してKSQLの設定を施した以下の図の構成において、Producer側のCosmosDBにデータを流し込み、CP の CosmosDB Sink Connector から topic001 を経由して、KSQLのクエリ処理によりデータが抽出されたものが、topic002 にストリーミングされることを確認します。
image.png

  • データストリーミングの流れは以下となります。
    1. Pythonプログラムを利用して疑似データ生成し CosmosDB に書き込みます
    2. CosmosDBに書き込まれたデータは CosmosDB Sink Connector を経由して、CP の topic001 にストリーミングされます
    3. topic001 のストリーミングデータの必要なカラムデータのみ、クエリ処理のため stream001 に転送されます
    4. stream001 のデータをベースに stream002 として「カラム - section : 'C','E','F','W' のデータのみ」の抽出処理されます
    5. stream002 で抽出処理されたストリーミングデータを topic002 に転送します
    6. topic002 のデータを読み込むPythonプログラムを作成し、AKS上のPodで稼働させ、抽出されたデータを確認します

ローカル環境

  • macOS Monterey 12.3.1
  • python 3.8.12
  • Azure CLI 2.34.1
  • helm v3.6.3
  • kubectl v1.21.3

利用プログラム

Producer側のプログラム

上記「データストリーミングの流れ:1」で利用する Pythonプログラムは以下となります。
このプログラムの実行はローカルのMacから行います。

cosmosdb_IoTdummy.py
import time
from datetime import date, datetime
import random
import json
import argparse
import string
from faker.factory import Factory
from azure.cosmos import CosmosClient

endpoint = 'https://iturucosmosdb01.documents.azure.com:443/'
key = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
databaseName = 'CPDemoDB001'
containerName = 'container001'


# ダミーデータ作成のための Faker の使用
Faker = Factory.create
fake = Faker()
fake = Faker("ja_JP")

# ダミーセクション?(大文字アルファベットを定義)
section = string.ascii_uppercase


# JSONダミーデータの作成
def iot_json_data(count, proc):
    iot_items = json.dumps({
        'items': [{
            'id': str(datetime.now().timestamp()),  # id
            'ctid': i,                              # create id
            'proc': proc,                           # データ生成プロセス名
            'section': random.choice(section),      # IoT機器セクション
            'iot_num': fake.zipcode(),              # IoT機器番号
            'iot_state': fake.prefecture(),         # IoT設置場所
            'val_1': random.uniform(100, 200),      # IoT値−1
            'val_2': random.uniform(50, 90),        # IoT値−2
            'created_at': generate_time()           # データ生成時間
            } 
            for i in range(count)
        ]
    }, ensure_ascii=False).encode('utf-8')
    return iot_items


# IoT機器で計測されたダミーデータの生成時間
def generate_time():
    dt_time = datetime.now()
    gtime = json_trans_date(dt_time)
    return gtime


# date, datetimeの変換関数
def json_trans_date(obj):
    # 日付型を文字列に変換
    if isinstance(obj, (datetime, date)):
        return obj.isoformat()
    # 上記以外は対象外.
    raise TypeError ("Type %s not serializable" % type(obj))


# CosmosDBへの接続
def ConnectionToCosmosDB():
    try:
      client = CosmosClient(endpoint, key)
      db = client.get_database_client(databaseName)
      container = db.get_container_client(containerName)
      # print("\nConnection established\n")
      return 1, container
    except Exception as err:
      return 0, err


# メイン : ターミナル出力用
def tm_main(count, proc, wait):
    print('ターミナル 出力\n')

    # ダミーデータ生成
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    # ターミナル出力
    for item in json_dict['items']:
        print(item)
        time.sleep(wait)


# メイン : Show Database
def show_db_main():
    print('CosmosDB への接続確認')
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
    else :
      print(conn)


# メイン : Write Database
def write_db_main(count, proc, wait):
    print('データベースへのデータ書き込み')

    # DBへの接続
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
      return

    # ダミーデータ生成
    iotjsondata = iot_json_data(count, proc)
    json_dict = json.loads(iotjsondata)

    ## プログレスバーで進捗状況を表示
    # データのインサート
    try:
      # for item in tqdm(json_dict['items']):
      for item in json_dict['items']:
        print(list(item.values()))
        conn.create_item(item)
        time.sleep(wait)
      print("\nInserted",count,"row(s) of data.")
      print("Done.")
    except Exception as err:
      print(err)
      return


# メイン : Read Database
def read_db_main():
    print('データベースからのデータ読み込み')

    # DBへの接続
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
      return

    # Read table data
    items = conn.read_all_items()

    # print all rows
    try:
      [print(f'{item}') for item in items]
      print("Done.")
    except Exception as err:
      print(err)


# メイン : Delete Database
def delete_db_main():
    print('データベースのデータ全削除')

    # DBへの接続
    code, conn = ConnectionToCosmosDB()

    if code == 0 :
      print(conn)
      return

    try:
      for item in conn.query_items(query='SELECT * FROM container001',enable_cross_partition_query=True):
          print(item['section'])
          conn.delete_item(item, partition_key=item['section'])
      print("Done.")
    except Exception as err:
      print(err)


# メイン
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='ダミーデータの自動生成からのDBへの書込み')
    parser.add_argument('--count', type=int, default=5, help='データ作成件数(デフォルト:5件)')
    parser.add_argument('--proc', type=str, default='111', help='データ作成プロセス名(デフォルト:111)')
    parser.add_argument('--mode', type=str, default='tm', help='tm(Terminal(デフォルト))/ db(db操作)')
    parser.add_argument('--wait', type=float, default=1, help='データ生成間隔(デフォルト:1.0秒)')
    parser.add_argument('--db', type=str, default='show', help='show(デフォルト) / write / read / delete')
    args = parser.parse_args()

    start = time.time()

    if (args.mode == 'tm'): 
      tm_main(args.count, args.proc, args.wait)
    elif (args.mode == 'db'):
      if (args.db == 'show'):
        show_db_main()
      elif (args.db == 'write'):
        write_db_main(args.count, args.proc, args.wait)
      elif (args.db == 'read'):
        read_db_main()
      elif (args.db == 'delete'):
        delete_db_main()
    else :
      print("パラメータ設定を確認ください --help")

    making_time = time.time() - start

    print("")
    print("処理時間:{0}".format(making_time) + " [sec]")
    print("")

Consumer側のプログラム

上記「データストリーミングの流れ:6」で利用する Pythonプログラムは以下となります。
このプログラムの実行は、別途AKS環境上に別途起動するPod内となります。

topic002_consumer.py
from kafka import KafkaConsumer

# データのターミナル出力
def topic_to_tm(consumer):
    print('ターミナル 出力')

    # Read data from kafka
    try :
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, 
                                                    message.key, message.value.decode('utf-8')))
    except KeyboardInterrupt :
        print('\r\n Output to Terminal - interrupted!')
        return

# Kafka Topic からのデータ受取
def get_kafka_topic():
    # Initialize consumer variable and set property for JSON decode
    consumer = KafkaConsumer ('topic002', bootstrap_servers = ['kafka.akscfk231.svc.cluster.local:9092'])
    print(consumer)
    return consumer


if __name__ == '__main__':
    consumer = get_kafka_topic()
    topic_to_tm(consumer)

Consumer側の作業用Podの作成

Consumer側の Pythonプログラム実行する Pod(以下:Consumer-Pod)を作成します。

Pythonプログラムを Configmap への登録

Pythonプログラムファイルを指定して、Configmapを作成します。

## Configmapへの登録
$ kubectl create configmap python-consumer002-configmap --from-file=topic002_consumer.py
configmap/python-consumer002-configmap created


## Configmapの確認
$ kubectl get configmap python-consumer002-configmap                            
NAME                           DATA   AGE
python-consumer002-configmap   1      32s


## Configmapの詳細確認
$ kubectl describe configmap python-consumer002-configmap
Name:         python-consumer002-configmap
Namespace:    akscfk231
Labels:       <none>
Annotations:  <none>

Data
====
topic002_consumer.py:
----
from kafka import KafkaConsumer

# データのターミナル出力
def topic_to_tm(consumer):
    print('ターミナル 出力')

    # Read data from kafka
    try :
        for message in consumer:
            print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition, message.offset, 
                                                    message.key, message.value.decode('utf-8')))
    except KeyboardInterrupt :
        print('\r\n Output to Terminal - interrupted!')
        return

# Kafka Topic からのデータ受取
def get_kafka_topic():
    # Initialize consumer variable and set property for JSON decode
    consumer = KafkaConsumer ('topic002', bootstrap_servers = ['kafka.akscfk231.svc.cluster.local:9092'])
    print(consumer)
    return consumer


if __name__ == '__main__':
    consumer = get_kafka_topic()
    topic_to_tm(consumer)

Events:  <none>

Consumer-Pod の起動

Podのマニフェストファイルは以下となります。

python-client2.yaml
apiVersion: v1
kind: Pod
metadata:
  name: python-client2
  namespace: akscfk231
spec:
  containers:
  - name: python-client2
    image: python:3.7.13-slim
    command:
      - bash
      - -c
      - "exec tail -f /dev/null"
    volumeMounts:
      - mountPath: /pyturu
        name: python-consumer
  volumes:
  - name: python-consumer
    configMap:
      name: python-consumer002-configmap

Podを起動させ必要な設定をおこないます。

## Pod起動
$ kubectl apply -f python-client2.yaml                      
pod/python-client2 created


## Podへの接続
$ kubectl exec -it python-client2 -- /bin/bash
root@python-client2:/# 


## Pythonプログラムの確認
root@python-client2:/# cd /pyturu
root@python-client2:/pyturu# ls -l
total 0
lrwxrwxrwx 1 root root 27 Aug 18 08:53 topic002_consumer.py -> ..data/topic002_consumer.py


## 必要なPythonライブラリのインストール
root@python-client2:/# pip install kafka-python
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 246.5/246.5 KB 3.6 MB/s eta 0:00:00
Installing collected packages: kafka-python
Successfully installed kafka-python-2.0.2

プログラムの実行と結果

最初に、Consumer側のプログラムを実行しデータを受け取れる上にします。その後、Producer側からデータを生成し設定したデータ抽出条件でリアルタイムにデータ処理され、Consumer側での受信を確認します。

Consumer側

以下のようにプログラムを実行します。データ受信状態となります。(Consumer-Podに接続した状態で実行)

root@python-client2:/pyturu# python topic002_consumer.py 
<kafka.consumer.group.KafkaConsumer object at 0x7fe75aec91d0>
ターミナル 出力

Producer側

以下のようにプログラムを実行します。 0.5秒間隔で30件のデータを生成させます。

$ python cosmosdb_IoTdummy.py --mode db --db write --wait 0.5 --count 30
データベースへのデータ書き込み
['1660834305.732015', 0, '111', 'O', '859-1514', '熊本県', 158.00226177654937, 50.97531387546896, '2022-08-18T23:51:45.732165']
['1660834305.732184', 1, '111', 'O', '075-8277', '茨城県', 170.42993612928407, 77.83170884785234, '2022-08-18T23:51:45.732221']
['1660834305.732227', 2, '111', 'E', '554-6753', '北海道', 148.92876929443668, 82.22632577434096, '2022-08-18T23:51:45.732255']
['1660834305.73226', 3, '111', 'B', '487-6201', '滋賀県', 112.52823513843735, 71.56402107831435, '2022-08-18T23:51:45.732286']
['1660834305.73229', 4, '111', 'N', '170-6136', '青森県', 119.9081080641646, 73.91280181768438, '2022-08-18T23:51:45.732314']
['1660834305.732319', 5, '111', 'H', '394-5687', '福井県', 185.08908349053524, 54.60720831400024, '2022-08-18T23:51:45.732343']
['1660834305.732347', 6, '111', 'O', '752-3195', '徳島県', 163.65979062146778, 68.80266444058988, '2022-08-18T23:51:45.732370']
['1660834305.732374', 7, '111', 'V', '313-9684', '鳥取県', 186.76982229974786, 71.43296034005864, '2022-08-18T23:51:45.732397']
['1660834305.732401', 8, '111', 'Q', '128-7149', '沖縄県', 110.07222152971069, 74.39862438101605, '2022-08-18T23:51:45.732425']
['1660834305.732429', 9, '111', 'R', '361-6022', '兵庫県', 166.9902488217956, 89.09971572382713, '2022-08-18T23:51:45.732452']
['1660834305.732456', 10, '111', 'Z', '098-6251', '三重県', 158.87197665118143, 81.01997557755357, '2022-08-18T23:51:45.732480']
['1660834305.732488', 11, '111', 'O', '743-4912', '愛知県', 168.45018805372104, 58.38400703727193, '2022-08-18T23:51:45.732513']
['1660834305.732517', 12, '111', 'N', '244-6470', '宮崎県', 140.53376931066012, 81.83117092131732, '2022-08-18T23:51:45.732540']
['1660834305.732544', 13, '111', 'U', '093-0428', '秋田県', 137.67206357275325, 82.49823923324556, '2022-08-18T23:51:45.732571']
['1660834305.732576', 14, '111', 'R', '918-1763', '鳥取県', 133.23228040733093, 79.4502829818707, '2022-08-18T23:51:45.732600']
['1660834305.732604', 15, '111', 'J', '845-5792', '栃木県', 190.33081017479677, 56.93608388242184, '2022-08-18T23:51:45.732628']
['1660834305.732632', 16, '111', 'H', '282-8691', '三重県', 135.48589527133828, 68.90330426502534, '2022-08-18T23:51:45.732655']
['1660834305.732659', 17, '111', 'R', '971-8072', '東京都', 128.7487058948935, 75.92087004159501, '2022-08-18T23:51:45.732682']
['1660834305.732686', 18, '111', 'C', '734-2744', '山梨県', 172.25052850462095, 53.00702358904964, '2022-08-18T23:51:45.732708']
['1660834305.732712', 19, '111', 'I', '006-3248', '鹿児島県', 131.41670420062965, 84.72459508340276, '2022-08-18T23:51:45.732734']
['1660834305.732738', 20, '111', 'W', '090-5446', '香川県', 190.06449809221584, 81.7200152031348, '2022-08-18T23:51:45.732761']
['1660834305.732765', 21, '111', 'G', '648-4463', '群馬県', 182.8725661666906, 76.47743809460934, '2022-08-18T23:51:45.732788']
['1660834305.732792', 22, '111', 'T', '911-9217', '兵庫県', 158.22348413535988, 64.60388246056716, '2022-08-18T23:51:45.732814']
['1660834305.732818', 23, '111', 'C', '891-9247', '北海道', 176.11284094147544, 75.70547358717785, '2022-08-18T23:51:45.732840']
['1660834305.732848', 24, '111', 'B', '060-1112', '北海道', 192.3259788508905, 79.94946793864301, '2022-08-18T23:51:45.732872']
['1660834305.732876', 25, '111', 'Z', '953-7789', '山梨県', 124.87882930311295, 72.23162082053496, '2022-08-18T23:51:45.732899']
['1660834305.732903', 26, '111', 'W', '364-3961', '福井県', 132.41009809155906, 65.1689165300587, '2022-08-18T23:51:45.732925']
['1660834305.732929', 27, '111', 'P', '785-6103', '福島県', 104.55147953912095, 66.21606634357327, '2022-08-18T23:51:45.732952']
['1660834305.732956', 28, '111', 'Y', '916-4663', '宮城県', 194.5279802822507, 64.50228129345219, '2022-08-18T23:51:45.732978']
['1660834305.732982', 29, '111', 'I', '904-0606', '新潟県', 179.6807544670632, 69.48480016692815, '2022-08-18T23:51:45.733005']

Inserted 30 row(s) of data.
Done.

処理時間:16.733893156051636 [sec]

Consumer側のデータの確認

上記のProducer側からの30件のデータにおいて、4つ目の項目(section)が「C, E, F, W」である5つのデータが抽出され、Consumer側に転送されてきました。

root@python-client2:/pyturu# python topic002_consumer.py
<kafka.consumer.group.KafkaConsumer object at 0x7f6d50f9e210>
ターミナル 出力
topic002:0:0: key=None value={"ID":"1660834305.732227","CTID":2,"SECTION":"E","IOT_STATE":"北海道","VAL_1":148.92876929443668,"VAL_2":82.22632577434096,"CREATED_AT":"2022-08-18T23:51:45.732255"}
topic002:0:1: key=None value={"ID":"1660834305.732686","CTID":18,"SECTION":"C","IOT_STATE":"山梨県","VAL_1":172.25052850462095,"VAL_2":53.00702358904964,"CREATED_AT":"2022-08-18T23:51:45.732708"}
topic002:0:2: key=None value={"ID":"1660834305.732738","CTID":20,"SECTION":"W","IOT_STATE":"香川県","VAL_1":190.06449809221584,"VAL_2":81.7200152031348,"CREATED_AT":"2022-08-18T23:51:45.732761"}
topic002:0:3: key=None value={"ID":"1660834305.732818","CTID":23,"SECTION":"C","IOT_STATE":"北海道","VAL_1":176.11284094147544,"VAL_2":75.70547358717785,"CREATED_AT":"2022-08-18T23:51:45.732840"}
topic002:0:4: key=None value={"ID":"1660834305.732903","CTID":26,"SECTION":"W","IOT_STATE":"福井県","VAL_1":132.41009809155906,"VAL_2":65.1689165300587,"CREATED_AT":"2022-08-18T23:51:45.732925"}

その他の確認

stream情報の確認

2つの stream の状況を確認してみます。上記のデータ以外に追加で30件のデータをProducer側から流し込んでいます。

## ksql-client2 の Podに接続します
$ kubectl exec -it ksql-client2 -- /bin/bash
[appuser@ksql-client2 ~]$ 


## stream001 の状況
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream001 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM001
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : AVRO
Kafka topic          : topic001 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM001 (ID STRING, CTID BIGINT, SECTION STRING, IOT_STATE STRING, VAL_1 DOUBLE, VAL_2 DOUBLE, CREATED_AT STRING) WITH (KAFKA_TOPIC='topic001', KEY_FORMAT='KAFKA', VALUE_FORMAT='AVRO');

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Sources that have a DROP constraint on this source
--------------------------------------------------
STREAM002

Queries that read from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Runtime statistics by host
-------------------------
 Host                                             | Metric                    | Value      | Last Message             
----------------------------------------------------------------------------------------------------------------------
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-messages-per-sec |          0 | 2022-08-18T15:15:46.583Z 
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-total-bytes      |      15781 | 2022-08-18T15:15:46.583Z 
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | consumer-total-messages   |         60 | 2022-08-18T15:15:46.583Z 
----------------------------------------------------------------------------------------------------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic001)


## stream002 の状況
[appuser@ksql-client2 ~]$ ksql --execute "DESCRIBE stream002 EXTENDED;" -- http://ksqldb.akscfk231.svc.cluster.local:8088
OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.

Name                 : STREAM002
Type                 : STREAM
Timestamp field      : Not set - using <ROWTIME>
Key format           : KAFKA
Value format         : JSON_SR
Kafka topic          : topic002 (partitions: 1, replication: 3)
Statement            : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT *
FROM STREAM001 STREAM001
WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W'))
EMIT CHANGES;

 Field      | Type            
------------------------------
 ID         | VARCHAR(STRING) 
 CTID       | BIGINT          
 SECTION    | VARCHAR(STRING) 
 IOT_STATE  | VARCHAR(STRING) 
 VAL_1      | DOUBLE          
 VAL_2      | DOUBLE          
 CREATED_AT | VARCHAR(STRING) 
------------------------------

Queries that write from this STREAM
-----------------------------------
CSAS_STREAM002_1 (RUNNING) : CREATE STREAM STREAM002 WITH (KAFKA_TOPIC='topic002', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='JSON_SR') AS SELECT * FROM STREAM001 STREAM001 WHERE ((((STREAM001.SECTION = 'C') OR (STREAM001.SECTION = 'E')) OR (STREAM001.SECTION = 'F')) OR (STREAM001.SECTION = 'W')) EMIT CHANGES;

For query topology and execution plan please run: EXPLAIN <QueryId>

Runtime statistics by host
-------------------------
 Host                                             | Metric           | Value      | Last Message             
-------------------------------------------------------------------------------------------------------------
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | messages-per-sec |          0 | 2022-08-18T15:15:46.577Z 
 ksqldb-0.ksqldb.akscfk231.svc.cluster.local:8088 | total-messages   |         10 | 2022-08-18T15:15:46.577Z 
-------------------------------------------------------------------------------------------------------------
(Statistics of the local KSQL server interaction with the Kafka topic topic002)

Consumer Groups summary:

Consumer Group       : _confluent-ksql-akscfk231.ksqldb_query_CSAS_STREAM002_1

Kafka topic          : topic001
Max lag              : 0

 Partition | Start Offset | End Offset | Offset | Lag 
------------------------------------------------------
 0         | 0            | 65         | 65     | 0   
------------------------------------------------------

Schme情報の確認

2つの topic で自動生成される Schema情報は以下となっています(Confluent Control Center からのダウンロード)

自動生成される topic001 の Schema情報

schema-topic001-value-v1.avsc
{
  "connect.name": "inferred_name__1785624689",
  "fields": [
    {
      "name": "id",
      "type": "string"
    },
    {
      "name": "ctid",
      "type": "long"
    },
    {
      "name": "proc",
      "type": "string"
    },
    {
      "name": "section",
      "type": "string"
    },
    {
      "name": "iot_num",
      "type": "string"
    },
    {
      "name": "iot_state",
      "type": "string"
    },
    {
      "name": "val_1",
      "type": "double"
    },
    {
      "name": "val_2",
      "type": "double"
    },
    {
      "name": "created_at",
      "type": "string"
    },
    {
      "name": "_rid",
      "type": "string"
    },
    {
      "name": "_self",
      "type": "string"
    },
    {
      "name": "_etag",
      "type": "string"
    },
    {
      "name": "_attachments",
      "type": "string"
    },
    {
      "name": "_ts",
      "type": "long"
    },
    {
      "name": "_lsn",
      "type": "long"
    }
  ],
  "name": "inferred_name__1785624689",
  "type": "record"
}

自動生成される topic002 の Schema情報

schema-topic002-value-v1.json
{
  "properties": {
    "CREATED_AT": {
      "connect.index": 6,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "CTID": {
      "connect.index": 1,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "connect.type": "int64",
          "type": "integer"
        }
      ]
    },
    "ID": {
      "connect.index": 0,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "IOT_STATE": {
      "connect.index": 3,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "SECTION": {
      "connect.index": 2,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "type": "string"
        }
      ]
    },
    "VAL_1": {
      "connect.index": 4,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "connect.type": "float64",
          "type": "number"
        }
      ]
    },
    "VAL_2": {
      "connect.index": 5,
      "oneOf": [
        {
          "type": "null"
        },
        {
          "connect.type": "float64",
          "type": "number"
        }
      ]
    }
  },
  "type": "object"
}

まとめ

Confluent for Kubernetes (CFK)を利用して Azure Kubernetes Service(AKS)上に Confluent Platform をデプロイし、データストリーミングを全9回に渡り確認することができました。

CFKを利用することにより、Confluent Platform で DevOps(IaC化)することが可能となります。 時間があればトライしてみたいと思います。

参考情報

以下の情報を参考にさせていただきました。感謝申し上げます。

キーコンバーターと値コンバーターの構成
Using Kafka Connect with Schema Registry
Avro,SchemaRegistryことはじめ
Schema Registry について書いていく その1: Confluent Schema Registry
Confluent Platform メモ - (3)Schema Registry簡易テスト

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?