0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

EqualumのExactly Once & All動作検証(3)

Last updated at Posted at 2023-01-03

最終回は可視化ツールを・・・

前回までの検証で、EqualumのExactly Once処理機能を活用し、即時同期先のターゲットDBへの処理をAppend(追記)モードとする事で、オリジナル側で処理される全てのデータを「Exactly Once & All」で記録出来る事を確認出来ました。

具体的な応用範囲については、今後のアイディア次第という形になりますが、限られた動作環境上でも「不死身のDBもどき」が動きましたので、今後の「データ駆動型DX基盤」の可能性として是非各種の実装を検討して頂ければと思います。

今回は、一連の検証の最終回として「動作状況のモニターとして利用」したPythonベースのワードクラウドツールを簡単に紹介させて頂きます。

まずはコードから・・・

ツールとしては、引数で可視化するDBを指定する方式なので、それぞれのDBに対応するコンソール上でコマンドに引数を付与して実行する形になります。

特に「これ!」といった所も無いので(苦笑)取り急ぎ全部を纏めて公開させて頂きます。(過去作品からの多数コピペ作成になりますので、読み難い点等に関しては・・・・平にご容赦の程・・)

#
# Exactly Once & All検証の可視化ツール
#
from wordcloud import WordCloud
import matplotlib.pyplot as plt
import pymysql.cursors
import datetime
from argparse import ArgumentParser
#
# 日本語フォントの設定
from matplotlib import rcParams
rcParams['font.family'] = 'sans-serif'
rcParams['font.sans-serif'] = ['Hiragino Maru Gothic Pro', 'Yu Gothic', 'Meirio', 'Takao', 'IPAexGothic', 'IPAPGothic', 'VL PGothic', 'Noto Sans CJK JP']
#
# SQL文で使う情報(ここは適宜変更 現状はユーザー名を表示)
CDC_SQL = "SELECT User FROM DemoC_CDC_Table "
TGT_SQL = "SELECT User FROM DemoC_TGT_Table "
#
CDC_ROW_Status = "SELECT COUNT(HS) FROM DemoC_CDC_Table"
TGT_ROW_Status = "SELECT COUNT(HS) FROM DemoC_TGT_Table"
#
# 可視化に使用するCDC側の接続パラメータ
CDC_Host = "ZZZ.ZZZ.ZZZ.ZZZ"
CDC_Port = 3306
CDC_User = "ZZZZZ"
CDC_Pass = "ZZZZZ"
CDC_DB   = "ZZZZZ"
CDC_Char = "utf8mb4"

# 可視化に使用するTGT側の接続パラメータ
TGT_Host = "YYY.YYY.YYY.YYY"
TGT_Port = 3306
TGT_User = "YYYYY"
TGT_Pass = "YYYYY"
TGT_DB   = "YYYYY"
TGT_Char = "utf8mb4"
#
# コマンド引数の確認
#
def get_option():

    argparser = ArgumentParser()
    argparser.add_argument('-d', default='CDC', help='可視化するDBを指定します(デフォルトはCDC側)')

    return argparser.parse_args()
#
# 可視化ツールのメイン
#
try:

    print("------------------------------------------------")
    print("******************* 処理開始 *******************")
    print(datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") + " : " + "検証ツールを起動")
    print("------------------------------------------------")

    # 引数をチェックする(引数参照はargs.dで行う)
    args = get_option()

    # 使用変数の初期化
    Counter = 0
    Time_Wait = 1.0
    
    # 描画領域を取得
    fig, ax = plt.subplots(1, 1)

    # MySQLとの接続と必要な諸設定を行う
    if args.d == "CDC":
        db = pymysql.connect(host = CDC_Host, port = CDC_Port, user = CDC_User, password = CDC_Pass, db = CDC_DB, charset = CDC_Char, cursorclass = pymysql.cursors.DictCursor)
        ROW_Status = CDC_ROW_Status 
        SQL = CDC_SQL
        Tool_Title = "CDC側に登録されているユーザー名"
    else:
        db = pymysql.connect(host = TGT_Host, port = TGT_Port, user = TGT_User, password = TGT_Pass, db = TGT_DB, charset = TGT_Char, cursorclass = pymysql.cursors.DictCursor)
        ROW_Status = TGT_ROW_Status
        SQL = TGT_SQL
        Tool_Title = "TGT側に登録されているユーザー名"

    # 無限ループで対応(停止はCtr-Cで行う)
    while True:
        
        with db.cursor() as cursor:

            # テーブルの状態を確認
            cursor.execute(ROW_Status)
            db.commit()

            Tmp_Data = []

            for Query_Data in cursor.fetchall(): 
                    for item in Query_Data.values(): Tmp_Data.append(item)
        
            # データが登録されていれば可視化処理を行う
            if Tmp_Data[0] != 0:
            
                # クエリ関連の初期化設定
                cursor.arraysize = 1000
                Tmp_Data = []
                data = []

                # クエリ用のSQLを送信してコミット
                cursor.execute(SQL)                  
                db.commit()

                # クエリの結果を取得
                rows = cursor.fetchall()
            
                # クエリ結果を取り出す
                for Query_Data in rows:                
                    for item in Query_Data.values():                  
                        Tmp_Data.append(item)

                # クエリ結果を文字列に変換する
                data = ' '.join(Tmp_Data)
            
                # ワードクラウドの生成
                wordcloud = WordCloud(width=640, 
                                           height=480, 
                                           background_color='white', 
                                           colormap='magma', 
                                           font_path='/System/Library/Fonts/ヒラギノ丸ゴ ProN W4.ttc'
                                           ).generate(data)
            
                plt.imshow(wordcloud, interpolation='bilinear')
                plt.axis("off")
            
                # タイトルの表示
                ax.set_title(Tool_Title, fontsize = 16)    
      
                # 画面更新までの待ち時間(適宜調整)    
                plt.pause(Time_Wait)
        
                # 表示を初期化    
                plt.cla()
        
            Counter = Counter + 1

except KeyboardInterrupt:
    
    # データベースを切り離す
    db.close()
   
    print('!!!!! 割り込み発生 !!!!!')
    
finally:
   
    print ("処理の終了")    
    print(str(Counter) + "回の処理を実行しました")

    print("------------------------------------------------")
    print("******************* 処理終了 *******************")
    print(datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") + " : " + "全ての処理が終了しました。")
    print("------------------------------------------------")
###################################################################################################

動作は、天下無双の無限ループ方式ですので、止める場合は起動したコンソール上でCtr-Cの割り込みを実行します。
また、引数が無い場合はオリジナル側のDB状況を可視化する設定ですので、その辺は随時書き換えてください。

% python Viz.py

動作している状況

画面録画 2022-12-31 時刻 10.41.15.gif

同様に引数を与えてターゲット側を可視化する場合は以下の通りです。

% python Viz.py -d TGT

動作している状況

画面録画 2022-12-31 時刻 10.43.24.gif

総集編

では、今回の総集編としてURL公開した処理一連の連続動画を・・・・

GUIツールの上から順番にボタンを選択し、DBの初期化から最初の検証データ生成&Equalum即時同期、オリジナル側のテーブルに対するアップデート処理&ターゲット側のAppend処理状況、最後にターゲット側の正常情報を使ったオリジナル側復活・・の順番で連続処理を行なっている状況が確認出来るかと思います。

今回のまとめ

3回に渡り、Equalumの特徴であるExactly Onceでのデータ同期処理機能と、即時同期先のテーブル指定時にAppend(追記)モードを設定する事により、オリジナル側で発生する全てのデータ処理をターゲット側に「Exactly Once & All」で全て記録しながら、コンピューティング処理でロジカル(個人的には学習系AIが良くフィットする気がしています)にオリジナル側の状況を制御する検証を行なってきました。

検証環境の処理能力の関係上、設定シナリオとしては非常にシンプルな形での実証となりましたが、物理的・論理的に異なるターゲット側に即時同期される全てのデータの利活用に関する、別の一面からのアプローチとしては非常に面白い結果が得られたかと思います。
また、オリジナル側を「記録台帳」の役割に特化させ、集計系等の各種バッチ処理を「即時同期先のDB上で常時全量に対して実施」する事が可能になりますので(このバッチ負荷は現場と切り離されて行われますので、就業時間後の深夜・・という謎の縛りは必要無くなります)、最近取り組みを始められている「データ駆動型DX」とか「データ・ドリブンxx」等に関する「データ都合の制限事項」は現実的に取り払う事が可能になります。

遠慮会釈無く利活用できるデータが、常に使える状態で使い易い環境に存在している・・・という事の可能性は、特に最近注目されているデータ駆動型DX基盤として「非常に重要な要素」であると言えるでしょう。

シンプルに・・・・
間に合わないデータは使えない
使えないデータに意味は無い
意味の無いデータに価値は無い

という事であり、今までは「個別設計の専用開発で、実現させるためのコストが非常に高い、崇高な導入・構築前提の作業」とされてきた現場と利活用側が即時同期でデータの価値を高めていく環境が、現在では単純に「仕組みの導入とノーコードの構成」で普通にユーザレベルで多岐に渡り利用出来るステージに入ってきた・・という事が非常に重要なポイントになります。

タイム・イズ・マネーをサクッ!と実現し、色々な意味で境界の無い時代におけるデータ駆動型DXに新たな進化をもたらし、さらには最新のAI技術等と連携させた「タイミング・イズ・バリュー」を可能にできる・・・かもしれないという状況が、既に現実の通常環境になっているという事実を前提に各種データ戦略を遂行しなければならない・・のかもしれません。

データは企業・団体における再生可能エネルギーであり、
データを活用してより良いデータ(結果)を創出する・・・
発想の転換による現状打破と持続発展可能型のデータ空間創造・・・・

引き続き、色々と発掘しながら紹介させて頂ければと考えておりますので、本年も宜しくお願い致します。
近々、面白い検証を実施する予定です・・・(前フリ)

謝辞

本検証は、Equalum社の全面バックアップにより実施しています。この貴重な機会を提供して頂いたEqualum社に対して感謝の意を表すると共に、本内容とEqualum社の公式ホームページで公開されている内容等が異なる場合は、Equalum社の情報が優先する事をご了解ください。

EqualumのExactly Once & All動作検証(1)はこちら
EqualumのExactly Once & All動作検証(2)はこちら

[適材適所・常時全量のデータ処理を時系列透過で実現するコンセプト]
役割分担型データ・システムというアプローチはこちら

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?