LoginSignup
0
2

More than 3 years have passed since last update.

Equalumにkafkaを繋げてみる(準備編)

Last updated at Posted at 2021-04-11

以前に書いたEqualum検証の続き・・・・

前に投稿したEqualumの検証で、kafkaを上流側データソースとして使えないのか?・・・なるご質問を頂くケースが結構有りましたので、今回は簡単なkafka環境との接続検証を行ってみたいと思います。

まずはkafka環境の構築

ハードウエアを正式に用意して、保守本流・王道のkafka環境を構築する・・という方向性も有るのですが、今回は普段使いのMBPの上にサラッとkafka環境を構築して、その環境に対してPythonで作ったスクリプトでメッセージを投げ、そのメッセージをEqualumが受け取って必要な処理(今回はメッセージの分解とカラム化を行います)をFLOWで作成し、ターゲットとして同じMBP上で動いているDocker環境で稼働しているMySQLに着地させてみることにします。

事前の言い訳・・・m(_ _)m(苦笑)

kafka周辺の技術情報は、既に多くの諸先輩方がQiita、その他の記事として公開されていますので、それらの記事を是非!参考にして見てください。今回の検証は、あくまでも動くかどうかを確認する・・という部分で戦略的妥協をしております(汗)。。。

また、今回は紙面の都合上・・kafka周りの準備までになります(Equalumとの連携は次回を予定)ので、その辺は何卒ご理解頂きたく・・・

まず最初にbrew環境の導入(導入済みの場合不要です)

Mac環境へhomebrewをインストールします。

kafkaのインストール(zookeeperも一緒にインストールされます)

% brew install kafka 

kafkaを起動してみます

無事にインストールが出来れば、環境を立ち上げてみましょう。

% brew services start zookeeper
% brew services start kafka 

zookeeper は localhost:2181, kafka は localhost:9092 で起動します(標準設定)
また、環境設定のファイルは/usr/local/etc/kafka/server.propertiesに有ると思いますので、取り急ぎトピックを削除出来るように設定を変更しておきます(最初、コマンドで試験的に作った幾つかのトピックを、初期化&リソースを解放しようと削除していたのですが、コマンドを使って既存リストを見ると永久不滅のトピック状態で、あれ?・・・という事で探し当てた設定になります・・・ご参考まで)

delete.topic.enable=true

この設定をファイルに加えてから、環境を再起動します。起動中の環境を止める場合は

% brew services stop kafka 
% brew services stop zookeeper

を行います。

本格的にkafkaを弄ってみる

まずは、検証に使うトピックを作成します。

% kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic XXXXX 



中パラメータは全て必要みたいなので、このままコピペで使って最後のトピック名だけ適宜変更します

因みに・・・・
* --create でトピックの作成
* --zookeeper localhost:2181 で zookeeper の LISTEN アドレスの指定
* --replication-factor 1 でレプリカを 1 つのみ生成
* --partitions 1 でパーティションを 1 つのみ生成
* --topic でトピック名を指定 
です

今回は、MBPの資源保護の観点から最小構成で設定しました。

トピック周りの処理について

環境が起動して、取り急ぎのトピックを作成したらリストで表示してみます。

% kafka-topics --list --zookeeper localhost:2181

また、初期化等を行う場合は、kafkaの作法に従って(?)トピック自体を削除・初期化する必要が有りますので、以下のコマンドで処理を行います。(前述の設定をお忘れなく・・・)

% kafka-topics --delete --zookeeper localhost:2181 --topic XXXXXXX

今回の検証では使いませんが、コマンド処理で弄る場合の参考として・・・

プロデューサーを起動する場合は、以下のコマンドで処理を行います。

% kafka-console-producer --broker-list localhost:9092 --topic XXXXXXX

同様にコンシューマの起動は、少し長めになりますが

% kafka-console-consumer --bootstrap-server localhost:9092 --topic XXXXXXX --from-beginning

でメッセージ待受状態になりますので、双方が起動している状態で適当な文字列を入力して、リターンキーを押す事にコンシューマ側のコンソールに入力された文字列がメッセージとして表示されると思います。

では、検証用のPythonを書きます

今回の検証では、Pythonのモジュールを使って(pyautogui)画面上のマウス座標を取得し、その情報と識別情報としてタイムスタンプ的な情報を組み合わせ、JSON形式でEqualumを通過させたいと思います。ネット上には諸先輩方の歴戦の苦労の賜物が沢山公開されておりますので、それらを参考にして頂けば、さほど難しくなく「取り敢えず動く=動きを作る(動作する)」版は作れるかと思います。今回作成して検証に使ったスクリプトは以下の通りです。

#
#   Equalumのkafka接続検証
#
#   プロデューサ側の処理(JSON形式でメッセージを送信)
#

from kafka import KafkaProducer
import pyautogui
import time
import json


try:

    # 変数の初期化
    Counter = 1
    TimeOut = 60
    Sleep = 1

    # プロデューサーとの接続処理
    producer = KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda m: json.dumps(m).encode('utf-8'))
  
    #安易ですが、無限ループで処理します。
    while True:

        # 日付情報と時間情報の取得
        from datetime import datetime
        messageId = datetime.now().strftime("%Y/%m/%d-%H:%M:%S:%f")

        # JSON形式にメッセージを作成
        kafka_msg = {"id" : messageId, "x" : pyautogui.position().x, "y" : pyautogui.position().y}

        print(kafka_msg)

        # プロデューサーにメッセージを送る       
        result = producer.send('JSON', kafka_msg).get(timeout=TimeOut)

        #print(result)

        time.sleep(Sleep)

        Counter = Counter + 1

except KeyboardInterrupt:

    print('!!!!! 割り込み発生 !!!!!')

finally:

    print ("処理の終了")    
    print(str(Counter) + "回の処理を実行しました")

Equalumに接続する前に、取り急ぎMBP上で立ち上げたkafka環境を通してみたいと思います。
トピック名をJSONとして先程のコマンドで作成しておきます。

データが届いているかを確認するために、コンシューマ側の仕組みを用意する

今回は、取り急ぎの動作検証ですので、以下のスクリプトで受信状況を確認します。

#
#   Equalumのkafka接続検証
#
#   コンシューマ側の処理
#

from kafka import KafkaConsumer
import sys
import json

try:

    # 変数の初期化
    Counter = 1

    # コンシューマに接続する
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092', auto_offset_reset='latest',value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    consumer.subscribe(['JSON'])

    for message in consumer:

        print("コンシューマ側に送られてきたJSONメッセージ : " + message)

        print("\n読み出したデータ\n")
        print("Data ID:",message[6]['id'])
        print("Data X:",message[6]['x'])
        print("Data Y:",message[6]['y'])

        Counter = Counter + 1


    # Terminate the script
    sys.exit()

except KeyboardInterrupt:

    print('!!!!! 割り込み発生 !!!!!')

finally:

    print ("処理の終了")    
    print(str(Counter) + "回の処理を実行しました")

準備が出来たら動作確認します

両方のスクリプトを起動して、画面上でマウスを「心のままに」動かすと、1秒間隔でデータを取り込んでくれます。動作状況は双方のコンソールに出力されるようになっていますので、その結果を確認して問題が無い様で有れば、いよいよEqualumを経由させてMySQLに着地させる事になります。

コンシューマ側の出力

コンシューマ側に送られてきたメッセージ : 
ConsumerRecord(topic='JSON', partition=0, offset=137, timestamp=1618055496288, timestamp_type=0, key=None, value={'id': '2021/04/10-20:51:36:287301', 'x': 910, 'y': 454}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=56, serialized_header_size=-1)

読み出したデータ

Data ID: 2021/04/10-20:51:36:287301
Data X: 910
Data Y: 454

////// 途中省略 //////////

コンシューマ側に送られてきたメッセージ : 
ConsumerRecord(topic='JSON', partition=0, offset=146, timestamp=1618055505336, timestamp_type=0, key=None, value={'id': '2021/04/10-20:51:45:335406', 'x': 701, 'y': 253}, headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=56, serialized_header_size=-1)

読み出したデータ

Data ID: 2021/04/10-20:51:45:335406
Data X: 701
Data Y: 253
!!!!! 割り込み発生 !!!!!
処理の終了
11回の処理を実行しました

プロデューサ側の入力

{'id': '2021/04/10-20:51:36:287301', 'x': 910, 'y': 454}
{'id': '2021/04/10-20:51:37:293747', 'x': 905, 'y': 458}
//////// 途中省略 /////////
{'id': '2021/04/10-20:51:44:327730', 'x': 605, 'y': 254}
{'id': '2021/04/10-20:51:45:335406', 'x': 701, 'y': 253}

今回のまとめ

今回は、Equalumの上流側データソースとして、kafkaを繋げるための前準備を行いました。Equalum社の製品・開発のVP、CTOとやりとりをさせてもらった際に、RDB系のCDC版Exactly Onceだけではなくて、kafkaを上流に設定した場合も、Equalum以降では、Exactly Onceを同じ様にサポート出来る・・との事でした。また、幾つかのパターンに分けてこの可能性を考えてみると、

技術的なユースケースの観点から:

Kafka-> Equalum-> RDBMS
このパターンは通常、アプリケーション統合等に使用されるパターンで、アプリケーションの特定のイベントがKafkaへの書き込みをトリガーし、Equalumがこのイベントを読み取って必要な変換/エンリッチメントなどを行い、そのデータを2番目のデータベースに速やか且つ確実プッシュする事が出来ます。

Kafka-> Equalum-> Kafka
このパターンも前述同様に、かなりの頻度でアプリケーション統合に使用されており、アプリケーションの特定のイベントがKafkaへの書き込みをトリガーし、Equalumはこのイベントを読み取って、前述同様に必要な変換/エンリッチメントなどを実行(この部分は、EqualumのGUIで行いますので、皆様が既に暗記されている「Noコード」の恩恵を受ける事が可能です)し、変換されたイベントを別のKafkaトピックにプッシュして、他のアプリケーションでさらに処理を続行する事になります。(戦略的にkafkaの途中処理に柔軟な介入を可能とする事で、新しいkafkaの可能性が出てくる使い方になるかと)

RDBMS-> Equalum-> Kafka
このパターンは、「イベント生成」または「イベントソーシング」と呼ばれる使い方で、EqualumはデータベースからCDCイベントを読み取り、必要に応じてデータを変換し、誰でも簡単に利用できるように前準備を提供してから、当該イベントをKafkaトピックに公開し、処理を継続していきます。既存のデータベース情報を速やか&確実にkafkaのパイプに提供出来ますので(FLOWの部分は前述同様の「Noコード」で)面白い使い方が可能になるかと思います。

等が出てきますので、最近良く話を聞く「xxOps」的なシステム活用を「データ流通」の側面で大きくサポート出来る可能性が有ると思います。

次回は・・・

次回は、今回の環境を活用していよいよEqualumを間に入れて、ターゲット・データソースとしてMySQLに「Exactly Once」で着地させてみたいと思います。

謝辞

本検証は、Equalum社の最新公式バージョン(V2.24)を利用して実施しています。この貴重な機会を提供して頂いたEqualum社に対して感謝の意を表すると共に、本内容とEqualum社の公式ホームページで公開されている内容等が異なる場合は、Equalum社の情報が優先する事をご了解ください。

0
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
2