#今回はいよいよ可視化に挑戦!
前回は、3個のテーブルをCDC設定されたMySQL上の作成し、そのテーブルをPythonで自動更新しながらEqualumのレプリケーション機能を検証してみました。結果としては、非常にシンプルで簡単な設定でCDCストリーミング・レプリケーションが実現出来る事が確認できましたので、今回の検証では続編としてターゲット側のSingleStore側更新状況を簡単なPythonツールを作成して可視化してみたいと思います。
#まずは可視化ツールの作成
この辺は、既に多くの諸先輩方がご苦労の末に作られたコードが、ネット上に数多く公開されていますので、取り急ぎそれらを参考にしてサクッと作ってみます。
Anaconda+Jupyter Notebook環境に必要なモジュール類を導入して以下のコードを書きます(内容は適宜書き換えてお使いください)
SQLで使うテーブル名は、前回の検証で使ったものと同じ名称にしておきます。
# coding: utf-8
#
# レプリケーション状況の即時可視化表示(4個のグラフを一括表示)
#
import pymysql.cursors
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
%matplotlib
# 日本語フォントの設定
from matplotlib import rcParams
rcParams['font.family'] = 'sans-serif'
rcParams['font.sans-serif'] = ['Hiragino Maru Gothic Pro', 'Yu Gothic', 'Meirio', 'Takao', 'IPAexGothic', 'IPAPGothic', 'VL PGothic', 'Noto Sans CJK JP']
# SQL文で使う情報設定
# Bizデータからの可視化
SQL0 = "SELECT SUM(Units) as UU, Category FROM Rep_Demo_Biz_Table GROUP BY Category ORDER BY UU DESC "
SQL1 = "SELECT SUM(Units) as LL , Logistics FROM Rep_Demo_Biz_Table GROUP BY Logistics ORDER BY LL DESC "
# Payデータからの可視化
SQL2= "SELECT SUM(Payment) as PP, COUNT(Card) as CC, Card FROM Rep_Demo_Pay_Table GROUP BY Card ORDER BY PP DESC LIMIT 0,5"
# Usrデータからの可視化
SQL3 = "SELECT SUM(Point) as PT, COUNT(Prefecture) as CC, Prefecture FROM Rep_Demo_Usr_Table GROUP BY Prefecture ORDER BY PT DESC LIMIT 0,10"
# 共通関数の定義
# クエリ結果からデータを取り出して結果を返す
def query2data(rows):
Tmp_Data = []
for Query_Data in rows:
for item in Query_Data.values():
Tmp_Data.append(item)
return(Tmp_Data)
# 自動的にラベルを表示する
def autolabel(rects,ax):
for rect in rects:
height = rect.get_height()
ax.annotate('{}'.format(height),
xy=(rect.get_x() + rect.get_width() / 2, height),
xytext=(0, 3), # 縦方向に3ポイントずらして表示
textcoords="offset points",
ha='center', va='bottom')
try:
# 使用変数の初期化
Counter = 0 # 処理回数のカウント用
Time_Wait = 0.5 # 描画間隔の調整用
Size1 = 2 # カテゴリ・配送センター別のデータ抽出用オフセット
Size2 = 3 # 商品・県別TOP10データ抽出用オフセット
# 可視化処理関連の準備
fig = plt.figure(figsize=(14,8))
gs = gridspec.GridSpec(3,2)
fig.subplots_adjust(hspace=0.6, wspace=0.4)
# add_subplot()でグラフを描画する領域を追加する.引数は行,列,場所
ax1 = fig.add_subplot(gs[0,0])
ax2 = fig.add_subplot(gs[0,1])
ax3 = fig.add_subplot(gs[1,:])
ax4 = fig.add_subplot(gs[2,:])
# SingleStoreとの接続
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='xxxxxxxx',
password='zzzzzzzz',
db='xxxxxxxx',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# 無限ループで対応(停止はSTOPボタン(■)で行う)
while True:
with db.cursor() as cursor:
# クエリ関連の初期化設定
cursor.arraysize = 1000
# 描画処理用作業バッファの初期化
Tmp_Data1 = [] #カテゴリ別のクエリ処理用
Tmp_Data2 = [] #配送センター別のクエリ処理用
Tmp_Data3 = [] #支払い別のクエリ処理用
Tmp_Data4 = [] #県別ポイントのクエリ処理用
# カテゴリ別のクエリSQLを送信してコミット
cursor.execute(SQL0)
db.commit()
Tmp_Data1 = query2data(cursor.fetchall())
# 配送センター別のクエリSQLを送信してコミット
cursor.execute(SQL1)
db.commit()
Tmp_Data2 = query2data(cursor.fetchall())
# 商品別のクエリSQLを送信してコミット
cursor.execute(SQL2)
db.commit()
Tmp_Data3 = query2data(cursor.fetchall())
# 県別のクエリSQLを送信してコミット
cursor.execute(SQL3)
db.commit()
Tmp_Data4 = query2data(cursor.fetchall())
# クエリ結果からカラム毎のデータを抽出してデータフレームを作成
# カテゴリ別の処理
i = 0
Category_Label = []
Category_Data = []
for start in range(0, len(Tmp_Data1), Size1):
Category_Data.append(Tmp_Data1[i])
Category_Label.append(Tmp_Data1[i + 1])
i = i + Size1
# 物流センター別の処理
i = 0
Logi_Label = []
Logi_Data = []
for start in range(0, len(Tmp_Data2), Size1):
Logi_Data.append(Tmp_Data2[i])
Logi_Label.append(Tmp_Data2[i + 1])
i = i + Size1
# 支払い別の処理
i = 0
Card_Label = []
Card_Data_Payment = []
Card_Data_Count = []
for start in range(0, len(Tmp_Data3), Size2):
Card_Data_Payment.append(Tmp_Data3[i]/10000) # 数字が大きいので調整
Card_Data_Count.append(Tmp_Data3[i + 1])
Card_Label.append(Tmp_Data3[i + 2])
i = i + Size2
# ユーザポイント情報の処理
i = 0
Prefecture_Label = []
Prefecture_Data_Point = []
Prefecture_Data_Count = []
for start in range(0, len(Tmp_Data4), Size2):
Prefecture_Data_Point.append(Tmp_Data4[i]/1000) # 数字が大きいので調整
Prefecture_Data_Count.append(Tmp_Data4[i + 1])
Prefecture_Label.append(Tmp_Data4[i + 2])
i = i +Size2
# 実際の描画処理
#
# 商材カテゴリ別グラフの表示
y_pos1 = np.arange(len(Category_Label))
ax1.barh(y_pos1, Category_Data, color = 'green')
ax1.set_yticks(y_pos1)
ax1.set_yticklabels(Category_Label)
ax1.invert_yaxis() # labels read top-to-bottom
ax1.set_xlabel('受注数', fontsize = 12)
ax1.set_title('商材カテゴリ別の受注状況(販売管理DB)', fontsize = 16)
# 配送センター別グラフの表示
y_pos2 = np.arange(len(Logi_Label))
ax2.barh(y_pos2, Logi_Data, color = 'red')
ax2.set_yticks(y_pos2)
ax2.set_yticklabels(Logi_Label)
ax2.invert_yaxis() # labels read top-to-bottom
ax2.set_xlabel('出荷数', fontsize = 12)
ax2.set_title('配送センター別の出荷状況(販売管理DB)', fontsize = 16)
x1 = np.arange(len(Card_Label)) # 商品別ラベルのX座標
x2 = np.arange(len(Prefecture_Label)) # 県別ベルのX座標
width = 0.35 # 棒グラフの幅
# ラベルを設定する
ax3.set_ylabel('売り上げ額(万円)', fontsize = 12)
ax3.set_title('利用支払い種別TOP5(会計管理DB)', fontsize = 16)
ax4.set_ylabel('総ポイント(x1000)', fontsize = 12)
ax4.set_title('県別ユーザ活性動向TOP10(顧客管理DB)', fontsize = 16)
# 決済種別グラフの表示
if len(Card_Data_Payment) != 0:
rects11 = ax3.bar(x1 - width/2, Card_Data_Payment, width, color = 'orange')
rects12 = ax3.bar(x1 + width/2, Card_Data_Count, width, color = 'blue')
autolabel(rects11,ax3)
autolabel(rects12,ax3)
ax3.set_xticks(x1)
ax3.set_xticklabels(Card_Label)
# 県別ポイント付与状況グラフの表示
if len(Prefecture_Data_Point) != 0:
rects21 = ax4.bar(x2 - width/2, Prefecture_Data_Point, width, color = 'limegreen')
rects22 = ax4.bar(x2 + width/2, Prefecture_Data_Count, width, color = 'gold')
autolabel(rects21,ax4)
autolabel(rects22,ax4)
ax4.set_xticks(x2)
ax4.set_xticklabels(Prefecture_Label)
fig.tight_layout()
# 画面更新までの待ち時間(適宜調整)
plt.pause(Time_Wait)
# 表示を初期化
ax1.cla()
ax2.cla()
ax3.cla()
ax4.cla()
Counter = Counter + 1
except KeyboardInterrupt:
db.close()
print('!!!!! 割り込み発生 !!!!!')
finally:
print('処理の終了')
print(str(Counter) + "回の処理を実行しました")
因みに、無事に起動するとターゲットのテーブルにデータが有る場合、この様な感じで可視化されます。(Pythonって凄いですね・・)
#では!再度検証を行ってみます!!
前回の環境を起動し、今回の可視化ツールを起動します。ツール自体は無限ループで回っていますので、前回ベースのデータが可視化された事を確認して再度自動挿入のコードを実行してみます。(停止はNotebookの■ボタンを押します)
無事に上流側のデータ変化をCDCストリーミングでレプリケーションしている状況が確認出来ました。今回はインメモリRDBのSingleStoreを使いましたが、電子帳簿側の既存DBにOracleを使っていて、BCP的に即時同期をCDCストリーミングでコスパ良く!という事であれば、OSS系のPostgreSQLやMySQLを適宜配備し、そのターゲットに向けてレプリケーションを仕掛けても良いかと思います。
もちろん、今回のSingleStoreをターゲットに選択した場合は、取り敢えず現場の状況を会議室に持ってきて利活用するという荒技も可能かと。(何処かの映画で有名な主人公のセリフを真逆にした感じ(苦笑)・・・・・利活用は!会議室で起きてるんだ!でしょうか(汗)・・・)
#今回のまとめ
今回は、前回に入りきらなかった「レプリケーション側を可視化してみる!」を行ってみました。現実的には間に必要な事前処理を入れて、必要十分な情報をストリーミングで準備した方が効率良いと思いますが、BCP等の観点からデータベースの構造を遠隔地に常時複製しておく・・・といった用途にも、EqualumのCDCストリーミングが十分役に立つ!という事が解りました。もちろん、ネットワーク環境の整備は必要ですが、通常のモダントランザクション系(後付けで急成長を続ける、既存フロント側電子帳簿側から「トランザクション泥棒!」と加害者扱いされるSQL処理・・)の利活用支援に加えて、すぐ横でシンプルにレプリケーション環境も構築・運用できる!という点では、十分検討に値するソリューションであると言えるでしょう。
#謝辞
本検証は、Equalum社の公式バージョン(V2.23)を利用して実施しています。
この貴重な機会を提供して頂いたEqualum社に対して感謝の意を表すると共に、本内容とEqualum社の公式ホームページで公開されている内容等が異なる場合は、Equalum社の情報が優先する事をご了解ください。