#今回は残りの幾つかを纏めて使ってみよう!
Equalum紹介の前半戦最後に、幾つかの処理要素を組み合わせた、**「本物っぽい処理FLOW」**を作成してみようと思います。
シナリオ的には、今まで検証に使用してきたPythonスクリプトを改造し、支店情報と取り扱い製品情報を参照型(今までは埋め込みで処理)に変更し、これらの基本情報をLookUpテーブル化して処理を行うようにします。
また、納品業者側から期間限定のキャンペーンが入り、ちょうど自分達でもポイント制を導入する事にしたので、その一連の処理もEqualumで自動化させて最終の処理テーブルとしてMemSQL上に展開する事とします。
##まずはルックアップテーブル処理・・・
以前より使用しているMySQL側のテーブル設定を実名の文字列型から参照IDのINT型に変更します。
# 初期設定
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
import pymysql.cursors
# 検証処理開始
print("MySQL側にオリジナル用の空テーブルを作成します")
print
try:
# 既存のテーブルを初期化
Table_Init = "DROP TABLE IF EXISTS Demo_Table400"
# カラムの設定
DC0 = "id INT AUTO_INCREMENT PRIMARY KEY, ts TIMESTAMP(6),"
# 今回はこの部分を変更します
DC1 = "dt DATETIME, Product_ID INT, Units INT, Shop_ID INT,"
DC2 = "User VARCHAR(20),Zip VARCHAR(10),Address VARCHAR(60),Tel VARCHAR(15),Email VARCHAR(40)"
# 検証用のテーブルの作成
Table_Create = "CREATE TABLE IF NOT EXISTS Demo_Table400("+DC0+DC1+DC2+")"
# MySQLとの接続
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='root',
password='zzzzzzzzzzz',
db='Demo',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
# 既存テーブルの初期化
cursor.execute(Table_Init)
db.commit()
# 新規にテーブルを作成
cursor.execute(Table_Create)
db.commit()
except KeyboardInterrupt:
print('!!!!! 割り込み発生 !!!!!')
finally:
# データベースコネクションを閉じる
db.close()
print("処理が終了しました")
print
print("引き続きMemSQL側にルックアップテーブルを作成してください")
print
次にルックアップテーブルを定義します。この程度の数であれば直接SQL文を書いてもOKなのですが・・・以前の物を使いまわしている関係上、かなり無駄なステップを踏んでいます。その辺はご容赦の程・・・
# 初期設定
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
import pymysql.cursors
# 検証処理開始
print("MySQL側テーブルにルックアップテーブルを作ります")
print
try:
# 既存のテーブルを初期化
Table_Init1 = "DROP TABLE IF EXISTS Demo_Table410"
Table_Init2 = "DROP TABLE IF EXISTS Demo_Table420"
# ルックアップテーブルの定義
Table_Create1 = "CREATE TABLE IF NOT EXISTS Demo_Table410(Id_L INT PRIMARY KEY, Product_L VARCHAR(20),Price_L INT)"
Table_Create2 = "CREATE TABLE IF NOT EXISTS Demo_Table420(Id_L INT PRIMARY KEY, Shop_L VARCHAR(20))"
# ルックアップテーブル(1)
Product_Name = ["日本酒","バーボン","ビール","芋焼酎","赤ワイン","白ワイン","スコッチ","ブランデー","泡盛","テキーラ"]
Product_Price = [1980, 2500, 490, 2000, 3000, 2500, 3500, 5000, 1980, 2000]
# ルックアップテーブル(2)
Shop_Name = ["旭町","三丁目","本町","二丁目","西新町","一丁目","住吉町","佐島","五本木","古橋"]
# 生成するデータの総数
Loop_Count = 10
# その他の変数
Counter = 0
# MySQLとの接続
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='root',
password='zzzzzzzzzz',
db='Demo',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
# 既存テーブルの初期化
cursor.execute(Table_Init1)
db.commit()
cursor.execute(Table_Init2)
db.commit()
# 新規にテーブルを作成
cursor.execute(Table_Create1)
db.commit()
cursor.execute(Table_Create2)
db.commit()
# データの生成
while Counter < Loop_Count:
# ルックアップ情報の作成
Product = Product_Name[Counter]
Price = Product_Price[Counter]
Shop = Shop_Name[Counter]
# 此処から先を各データベースの規程テーブルへ書き込む
DV1 = str(Counter)+"','"+Product+"','"+str(Price)
DV2 = str(Counter)+"','"+Shop
sql_data1 = "INSERT INTO Demo_Table410(Id_L, Product_L, Price_L) VALUES('"+DV1+"')"
sql_data2 = "INSERT INTO Demo_Table420(Id_L, Shop_L) VALUES('"+DV2+"')"
# データベースへの書き込み
cursor.execute(sql_data1)
db.commit()
cursor.execute(sql_data2)
db.commit()
# ループカウンタの更新
Counter=Counter+1
except KeyboardInterrupt:
print('!!!!! 割り込み発生 !!!!!')
finally:
# デバッグ用データ表示
print
print("生成したデータの総数 : " + str(Counter))
# データベースコネクションを閉じる
db.close()
print("MySQL側ルックアップテーブル作成終了")
最後に着地側のMemSQL用のテーブル作成を行います。
これも、単純にSQL文を直接書いてしまえばシンプルに終了する話なのですが、使いまわしの法則により(苦笑)・・・細々と無駄なステップを踏んでいます。
# 初期設定
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
import pymysql.cursors
# デモ処理開始
print("MemSQL側にターゲットのテーブルを作成します")
print
try:
# 既存のテーブルを初期化
Table_Init = "DROP TABLE IF EXISTS Demo_Table400"
# デモ用のテーブル定義
DC0 = "id INT AUTO_INCREMENT PRIMARY KEY, ts TIMESTAMP(6) DEFAULT NOW(),"
# ここの項目をIDから参照してきた文字列等に置き換える形にします
DC1 = "dt DATETIME, Product_L VARCHAR(20), Price_L INT, Units INT, Shop_L VARCHAR(20),"
DC2 = "User VARCHAR(20), Zip VARCHAR(10), Address VARCHAR(60), Tel VARCHAR(15), Email VARCHAR(40)"
# デモ用のテーブルの作成
sql_data = "CREATE TABLE IF NOT EXISTS Demo_Table400("+DC0+DC1+DC2+")"
# MySQLとの接続
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='eqdemo',
password='zzzzzzzzz',
db='Demo',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
# 既存テーブルの初期化
cursor.execute(Table_Init)
db.commit()
# デモ用テーブルの作成
cursor.execute(sql_data)
db.commit()
except KeyboardInterrupt:
print('!!!!! 割り込み発生 !!!!!')
finally:
# データベースコネクションを閉じる
db.close()
print("処理が終了しました")
print
print("引き続きデータの自動生成を開始してください")
print
ここまでくれば後はEqualum側の設定作業になります。
基本的な手順は今までの設定作業と同じですが、ルックアップテーブル特有の項目が幾つか有りますので、その辺を中心に検証作業を行って行きます。
ちなみに、インメモリ上にテーブルを置きますので、Equalum社でも説明している様にあまり巨大なテーブルを作成しない事が重要です。もちろん今回程度のテーブルでは何の問題もありませんので、高速性重視でインメモリ展開戦略を取る事にします。
ルックアップテーブルを設定する際にDo Not Evolveを設定し、キーの選択などの必要作業を行います。
下の方までスクロールして必要事項を設定します(通常の場合は特に必要ない作業ですが・・・)
トピックを作成する際には・・・このAdvanced設定を忘れないようにします。
自動挿入されるメインのテーブル設定はいつも通りでOKです。
##処理部分の設定・・・・
FLOWSで新規のストリーミング処理を作る際の(今回のルックアップ操作)設定項目はシンプルで判り易いと思いますので、それぞれ適宜選択・設定を行っていきます。
#では、取り急ぎ動作検証!
今回のLookUp機能の検証用にカスタマイズしたスクリプトです・・・・といっても、単純に乱数を発生させてそれをIDとしてメインのテーブルに挿入しているだけですが・・・
# 初期設定
import sys
stdout = sys.stdout
reload(sys)
sys.setdefaultencoding('utf-8')
sys.stdout = stdout
import time
import pymysql.cursors
# 検証処理開始
print("MySQL側テーブルにデータの自動生成&挿入を開始します")
print
try:
# Pythonのデータ自動生成機能の設定
from faker.factory import Factory
Faker = Factory.create
fakegen = Faker()
fakegen.seed(0)
fakegen = Faker("ja_JP")
# 生成するデータの総数 -> ここは適宜変更
Loop_Count = 50
# 一定間隔の場合(システム時間で秒単位)
Sleep_Wait = 1
# その他の変数
Counter = 0
Work_Count = 1
# 書き込み用のデータカラム
DL1 = "dt, Product_ID, Units, Shop_ID," #ここを修正
DL2 = "User,Zip,Address,Tel,Email"
# MySQLとの接続
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='root',
password='zzzzzzzzz',
db='Demo',
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
# 検証データの生成
while Counter < Loop_Count:
# 受付日時の情報
from datetime import datetime
dt = datetime.now().strftime("%Y/%m/%d %H:%M:%S")
Product_ID = fakegen.random_digit()
Shop_ID = fakegen.random_digit()
Units = fakegen.random_digit() + 1
# 物流情報の生成
User = fakegen.name()
Zip = fakegen.zipcode()
Address = fakegen.address()
Tel = fakegen.phone_number()
Email = fakegen.ascii_email()
# 此処から先を各データベースの規程テーブルへ書き込む
DV1 = dt+"','"+str(Product_ID)+"','"+str(Units)+"','"+str(Shop_ID)+"','"
DV2 = User+"','"+Zip+"','"+Address+"','"+Tel+"','"+str(Email)
sql_data = "INSERT INTO Demo_Table400("+DL1+DL2+") VALUES('"+DV1+DV2+"')"
# データベースへの書き込み
cursor.execute(sql_data)
db.commit()
# コンソールに生成データを表示(不要な場合はコメントアウトする)
print sql_data
print
# 時間調整用(一定間隔)
time.sleep(Sleep_Wait)
# ループカウンタの更新
Counter=Counter+1
# デバッグ用データ表示
#print("生成済みデータ番号 : " + str(Counter))
except KeyboardInterrupt:
print('!!!!! 割り込み発生 !!!!!')
finally:
# デバッグ用データ表示
print
print("生成したデータの総数 : " + str(Counter))
# データベースコネクションを閉じる
db.close()
print("MySQL側テーブルへのデータ挿入処理終了")
うまくテーブル参照して置き換えてくれました!!
#今回のまとめ
当初の予定では、今回でEqualumの基本的な検証を終了させる方向でしたが、記事が長くなる可能性が出てきた為に次回に総集編的な検証を行う事にしたいと思います。
Equalumを使う事で、高度なプログラミング作業を行う事無く高速・高効率なストリーミング処理を実現出来ます。
もちろん、総合処理的な内容はデータを括らなければ行えませんが、その過程をリアルタイムで処理しながらAIやBI、また今後出てくるデータ活用系のソリューションやアイディアを活用していく・・・・
###持続発展可能な高い投資対効果の為の速さ
これは、データを諦めずに**「変化に強いデータ・ドリブン環境」**を実現する上で非常に重要なポイントになる事を明確に示しているという事なのかもしれません。
#謝辞
本検証は、Equalum社の特別の許可を得て実施しています。この貴重な機会を設定して頂いたEqualum社に対して感謝の意を表すると共に、本内容とEqualum社の公式ホームページで公開されている内容等が異なる場合は、Equalum社の情報が優先する事をご了解ください。