#今回は・・・
###前回は、Pythonワールドでワークフローの仕組みが書けそうな・・・Prefectについての環境導入を行いましたので、今回からは少し深掘り検証モードに入って行きたいと思います。
#シンプルにデータベースを転がせるか?・・・・・
最初の検証は、指定したデータベースの状況を定期的に処理する、超簡単系ワークフローを作ってみたいと思います。
まずは、彼らのホームページから検証の叩き台をお借りして、少しカスタマイズを行います。
なお、コードに関しては・・・NDA(ノン・ダメ出し・アグリーメント)を前提にさせて頂きますので、自由にカスタマイズして使って頂いて結構です。
#
# Prefectの検証たたき台(彼らのホームページより)
#
import prefect
from prefect import task, Flow
FLOW_NAME = "Prefect_Test01"
LOG_MESSAGE = "Hello world!"
@task
def test01_task():
logger = prefect.context.get("logger")
logger.info(LOG_MESSAGE)
with Flow(FLOW_NAME) as flow:
test01_task()
flow.run()
サクッと!実行してみます。
(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/prefect01.py
[2022-01-30 09:54:19+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Test01'
[2022-01-30 09:54:19+0900] INFO - prefect.TaskRunner | Task 'test01_task': Starting task run...
[2022-01-30 09:54:19+0900] INFO - prefect.test01_task | Hello world!
[2022-01-30 09:54:19+0900] INFO - prefect.TaskRunner | Task 'test01_task': Finished task run for task with final state: 'Success'
[2022-01-30 09:54:19+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
(base) apple@appurunoMacBook-Pro Prefect %
無事に動いている様なので、以前に作成したSingleStore向けMySQLのフリして繋げる・・を追加してみます。
SingleStoreに関しては、検証用に導入済みのMBPのローカルDocker版を使っています。
現在も引き続き条件付きフリー使用が継続している様なので、ご興味があれば是非メアドを登録して使ってみてください。
また、使用するテーブルも以前の検証で作成した「なんちゃって物販データ」の既存テーブルを転用してアクセスしてみます。
# Prefectの検証たたき台
#
# お約束の初期設定
import prefect
from prefect import task, Flow
#
# MySQL接続を利用する準備
import pymysql.cursors
#
# 広域変数設定
Yes = 1
No = 0
#
Console_Out_ALL = No # コンソールに出力するパターンを設定(Yes:全ての情報 No:3行だけ)
Column_Number = 21 # 今回の参照テーブルのカラム数
Data_Lines = 2 # 一部出力の際のデータ行数
#
# 処理フローの名前
FLOW_NAME = "Prefect_Test02"
#
# ログ出力用メッセージ
LOG_MESSAGE1 = "SingleStoreと接続開始"
LOG_MESSAGE2 = "SingleStoreと接続完了"
LOG_MESSAGE3 = "SingleStore上のテーブル名をクエリします"
LOG_MESSAGE4 = "現在登録されているカラム情報は以下の通りです(全部表示)"
LOG_MESSAGE5 = "現在登録されているカラム情報は以下の通りです(一部抜粋)"
LOG_MESSAGE6 = "SingleStoreとの接続を終了"
LOG_MESSAGE7 = "SingleStore連携処理の終了"
#
# SigleStoreとの接続情報(適宜変更)
SS_Host = "zzz.zzz.zzz.zzz"
SS_Port = 3306
SS_User = "zzzzz"
SS_Pass = "zzzzz"
SS_DB = "zzzzz"
SS_Char = "utf8"
#
# SQLクエリで使うSQL文
SQL = "SELECT * FROM RTS_Demo_Table ORDER BY id_SS"
#
# SingleStoreに接続して処理に必要な情報を返す
#
def Open_DB():
db = pymysql.connect(
host = SS_Host,
port = SS_Port,
user = SS_User,
password = SS_Pass,
db = SS_DB,
charset = SS_Char,
cursorclass = pymysql.cursors.DictCursor)
return(db) # データベース処理に必要な情報を戻す
#
# コンソールに1行単位で表示する
#
def Print_Query_Data(Data, i, Type):
if i % Column_Number == 0:
print(Data)
if Type == Yes: print("=====================================")
else: print("/////////////////////////////////////")
return("")
#
# Prefectで使用するタスクを作成
#
@task
def test02_task():
logger = prefect.context.get("logger") # ログ出力の準備
Tmp_Data=[]
logger.info(LOG_MESSAGE1) # 処理の過程をログで出力
db = Open_DB() # SingleStoreと接続
logger.info(LOG_MESSAGE2) # 処理の過程をログで出力
with db.cursor() as cursor:
# 処理の過程をログで出力
logger.info(LOG_MESSAGE3)
# SingleStoreにSQLクエリを発行する
cursor.execute(SQL)
db.commit()
# クエリ結果を取得して利用可能な状態にする
for Query_Data in cursor.fetchall():
for item in Query_Data.values():
Tmp_Data.append(item)
# クエリ結果の表示
i = 1
DB_Data = ""
if Console_Out_ALL == Yes: # 全部を表示
logger.info(LOG_MESSAGE4)
for loop in Tmp_Data:
DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
if i % Column_Number == 0:
DB_Data = Print_Query_Data(DB_Data, i, Yes)
i = i + 1
else: # 指定された行数を表示
logger.info(LOG_MESSAGE5)
for loop in range(Column_Number * Data_Lines):
DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
if i % Column_Number == 0:
DB_Data = Print_Query_Data(DB_Data, i, No)
i = i + 1
logger.info(LOG_MESSAGE6) # 処理の過程をログで出力
db.close() # SingleStoreとの接続を停止する
logger.info(LOG_MESSAGE7) # 処理の過程をログで出力
#
# メインの処理
#
with Flow(FLOW_NAME) as flow:
test02_task()
flow.run()
################################################
サクッと!動かしてみます。
(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/prefect02.py
[2022-01-30 09:59:44+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Test02'
[2022-01-30 09:59:44+0900] INFO - prefect.TaskRunner | Task 'test02_task': Starting task run...
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStoreと接続開始
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStoreと接続完了
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStore上のテーブル名をクエリします
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | 現在登録されているカラム情報は以下の通りです(一部抜粋)
1 2021-11-19 10:24:09.441558 2021-11-19 10:24:16.423722 雑貨 贈答品 1980 3 297 5940 594 American Express 180091589062760 高橋 篤司 113-2169 JP-38 愛媛県 愛媛県袖ケ浦市芝公園7丁目10番7号 アーバン高田馬場436 090-5030-6067 itoyui@aoki.com 四国 讃岐物流センター
/////////////////////////////////////
2 2021-11-19 10:24:10.339402 2021-11-19 10:24:16.424731 DVD/CD 洋画 3500 2 140 7000 700 Mastercard 4819783616319228 佐藤 英樹 699-9934 JP-18 福井県 福井県神津島村土呂部8丁目23番13号 080-2980-5899 shota95@yamada.jp 中部 甲州物流センター
/////////////////////////////////////
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStoreとの接続を終了
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStore連携処理の終了
[2022-01-30 09:59:44+0900] INFO - prefect.TaskRunner | Task 'test02_task': Finished task run for task with final state: 'Success'
[2022-01-30 09:59:44+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
(base) apple@appurunoMacBook-Pro Prefect %
表示の関係で最初から2行分を出力していますが、無事に通常のPython系MySQLアクセスが出来る事を確認できました。
因みに100行全部出力すると・・・・
(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/prefect02.py
[2022-01-30 10:15:39+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Test02'
[2022-01-30 10:15:39+0900] INFO - prefect.TaskRunner | Task 'test02_task': Starting task run...
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStoreと接続開始
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStoreと接続完了
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStore上のテーブル名をクエリします
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | 現在登録されているカラム情報は以下の通りです(全部表示)
1 2021-11-19 10:24:09.441558 2021-11-19 10:24:16.423722 雑貨 贈答品 1980 3 297 5940 594 American Express 180091589062760 高橋 篤司 113-2169 JP-38 愛媛県 愛媛県袖ケ浦市芝公園7丁目10番7号 アーバン高田馬場436 090-5030-6067 itoyui@aoki.com 四国 讃岐物流センター
=====================================
2 2021-11-19 10:24:10.339402 2021-11-19 10:24:16.424731 DVD/CD 洋画 3500 2 140 7000 700 Mastercard 4819783616319228 佐藤 英樹 699-9934 JP-18 福井県 福井県神津島村土呂部8丁目23番13号 080-2980-5899 shota95@yamada.jp 中部 甲州物流センター
=====================================
3 2021-11-19 10:24:10.974893 2021-11-19 10:24:25.424737 雑貨 乾電池 248 1 12 248 24 現金 N/A 佐々木 千代 579-1085 JP-42 長崎県 長崎県長生郡一宮町松浦町26丁目17番12号 月島コーポ240 070-2421-9200 ytanaka@yamamoto.com 九州・沖縄 平戸物流センター
=====================================
4 2021-11-19 10:24:11.704003 2021-11-19 10:24:29.426744 DVD/CD アイドル 2980 2 119 5960 596 JCB 16 digit 180011672869244 中村 あすか 283-8664 JP-12 千葉県 千葉県横浜市西区細竹11丁目25番8号 070-3277-9886 sayurisasaki@ota.com 関東 関東中央物流センター
=====================================
----- 途中省略 -------
95 2021-11-19 10:25:15.206949 2021-11-19 10:30:05.831141 雑貨 医薬部外品 398 4 79 1592 159 Diners Club / Carte Blanche 30280001215195 藤井 加奈 451-8413 JP-26 京都府 京都府杉並区南赤田40丁目15番9号 クレスト松が谷520 090-3696-8259 xfujita@hayashi.jp 近畿 伊丹物流センター
=====================================
96 2021-11-19 10:25:15.712581 2021-11-19 10:30:10.837144 家電 テレビ 49800 1 4980 49800 4980 VISA 13 digit 4583175589485926 坂本 涼平 173-7986 JP-15 新潟県 新潟県印旛郡本埜村平須賀21丁目10番17号 権現堂クレスト469 070-4714-1060 reitakahashi@gmail.com 中部 甲州物流センター
=====================================
97 2021-11-19 10:25:16.605000 2021-11-19 10:30:14.840148 書籍 漫画 570 3 51 1710 171 現金 N/A 斎藤 結衣 426-0651 JP-31 鳥取県 鳥取県印旛郡栄町虎ノ門虎ノ門ヒルズ森タワー28丁目19番6号 080-0512-0117 gfukuda@hotmail.com 中国 広島臨港物流センター
=====================================
98 2021-11-19 10:25:17.408374 2021-11-19 10:30:23.847156 書籍 新刊 1980 3 178 5940 594 Discover 3568815083887561 小川 あすか 052-3746 JP-40 福岡県 福岡県八丈島八丈町隼町8丁目24番13号 湯宮アーバン379 83-8825-5209 kobayashitomoya@hotmail.com 九州・沖縄 平戸物流センター
=====================================
99 2021-11-19 10:25:18.221668 2021-11-19 10:30:31.853157 DVD/CD Jポップ 2500 2 100 5000 500 現金 N/A 福田 陽子 328-2222 JP-04 宮城県 宮城県長生郡長柄町細竹26丁目16番17号 幸手コーポ025 070-1979-9296 khasegawa@maeda.jp 東北 東北物流センター
=====================================
100 2021-11-19 10:25:18.771961 2021-11-19 10:30:32.860161 雑貨 電球 198 4 39 792 79 JCB 15 digit 6569962741550363 佐藤 浩 703-9742 JP-38 愛媛県 愛媛県川崎市宮前区勝どき27丁目26番12号 070-0402-6939 nishimurajun@hotmail.com 四国 讃岐物流センター
=====================================
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStoreとの接続を終了
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStore連携処理の終了
[2022-01-30 10:15:39+0900] INFO - prefect.TaskRunner | Task 'test02_task': Finished task run for task with final state: 'Success'
[2022-01-30 10:15:39+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
(base) apple@appurunoMacBook-Pro Prefect %
となります。Docker利用とはいえ、SingleStoreのインメモリ性能が高いので、今後の「少し複雑系SQLアクセス」でも非常に期待が持てるかと思います。
余談ですが、国内のSingleStore性能評価で**「十数億行のデータに対して抽出・集計系のSQLを仕掛けて、3秒を切る性能」**が出ているとの事です。
#今回のまとめ
今回は、初めてのPrefect第2弾として「Python経験のMySQL接続でSingleStoreが接続出来るか?」を検証してみました。結果的には、実にサクッと!使える事が確認でき、次回以降の展開が非常に期待出来る状況になってきました。
次回は、今回の「雪だるま方式開発手法」(勝手に命名。。)で作成・検証を行ったコードに、新たに「スケジュール機能」を組み込んでみたいと思います。また、その検証としてSingleStoreにランダムにデータを生成・挿入する仕組みを併用し、「初歩の短サイクルバッチ処理」的な作業を行いたいと思います。