いよいよ本題に挑戦!
前回は、今回の検証で使用するデータ生成ツーールを紹介させて頂きましたが、今回はいよいよ以前のスクリプトをベースに雪だるま方式の作り込みを行い、幾つかのSQLクエリを一定時間毎に定期的ワークフロー処理してみる事にします。
因みにどうやってスケジュールを定義するのか・・・
この一連の検証を企画した際に、**どうやってスケジュールを設定するのか?**が大きな壁だと覚悟していましたが、実際作業を進めていくと・・
schedule = IntervalSchedule(interval=timedelta(seconds=Offset_Sec))
と書くだけでOK!だと解り、以前の叩き台を
with Flow(FLOW_NAME, schedule) as flow:
とすれば良いと知り・・・・ある意味で愕然としました・・・
また、彼らのホームページやGitHubには沢山のサンプルが有りますので、是非一読されることをお勧め致します。
今回使用するワークフローはこんな感じです・・・
今回は、以前のモノをベースに必要そうな処理を雪だるま式に追加し、なんとかそれっぽい系のモノに仕上げてみました。毎度おなじみのNDA(ノン・ダメ出し・アグリーメント)ベースで公開しますので、適宜に改造等を行って自由に使ってください。
#
# Prefectの検証:少し重たいSQLを振り回せ!版
#
# お約束の初期設定
import prefect
import datetime
from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule
import pymysql.cursors
#
# 広域変数設定
Yes = 1
No = 0
# 各種設定
Offset_Sec = 30 # 処理間隔の設定(この例では30秒間隔)
Column_Number = 20 # 今回の参照テーブルのカラム数(ここも実情に併せて変更する)
Data_Lines = 2 # 一部出力の際のデータ行数(ここも実情に併せて変更する)
Console_Out_ALL = Yes # コンソールに出力するパターンを設定(Yes:全ての情報 No:Data_Lines行だけ)
Aggregate = No # 処理を集計型で行うかRawデータ表示を行うかを選択(ここも実情に併せて変更する)
#
# SigleStoreとの接続情報(適宜変更)
SS_Host = "zzz.zzz.zzz.zzz"
SS_Port = 3306
SS_User = "zzzzz"
SS_Pass = "zzzzz"
SS_DB = "zzzzz"
SS_Char = "utf8"
#
FLOW_NAME = "Prefect_Demo_FLOW" # 処理フローの名前
TABLE_NAME = "Prefect_Demo_TABLE" # SingleStoreに作成するテーブルの名前
# ログ出力用メッセージ
LOG_MESSAGE1 = "SingleStoreと接続開始"
LOG_MESSAGE2 = "SingleStoreと接続完了"
LOG_MESSAGE3 = "SingleStore上のテーブル名をクエリします"
LOG_MESSAGE4 = "現在登録されているカラム情報は以下の通りです(全部表示)"
LOG_MESSAGE5 = "現在登録されているカラム情報は以下の通りです(一部抜粋)"
LOG_MESSAGE6 = "SingleStoreとの接続を終了"
LOG_MESSAGE7 = "SingleStore連携処理の終了"
# SQL関連メッセージ
SQL_MESSAGE1 = "カテゴリ別の期間売上"
SQL_MESSAGE2 = "カテゴリ別の総売上"
SQL_MESSAGE3 = "カテゴリ別の期間出荷"
SQL_MESSAGE4 = "カテゴリ別の総出荷"
SQL_MESSAGE5 = "配送センター別の期間出荷"
SQL_MESSAGE6 = "配送センター別の総出荷"
#
# SQLクエリで使うSQL文(全データ対象:固定)
#
# 全てのカラム情報を全部クエリする
SQL00 = "SELECT * FROM " + TABLE_NAME + " ORDER BY id"
# カテゴリ別の総売上クエリ
SQL01 = "SELECT SUM(Payment) as PP, Category FROM " + TABLE_NAME + " GROUP BY Category ORDER BY PP DESC"
# カテゴリ別の総出荷数クエリ
SQL02 = "SELECT SUM(Units) as UU, Category FROM " + TABLE_NAME + " GROUP BY Category ORDER BY UU DESC"
# 配送センター別の総取扱数クエリ
SQL03 = "SELECT SUM(Units) as LL, Logistics FROM " + TABLE_NAME + " GROUP BY Logistics ORDER BY LL DESC"
#
# SingleStoreに接続して処理に必要なポインタを返す
#
def Open_DB():
# ターゲットのデータベースに接続してポインタを取得
db = pymysql.connect(
host = SS_Host,
port = SS_Port,
user = SS_User,
password = SS_Pass,
db = SS_DB,
charset = SS_Char,
cursorclass = pymysql.cursors.DictCursor)
return(db) # データベース処理に必要な情報を戻す
#
# コンソールに1行単位で全カラム情報を表示する
def Print_Query_Data(Data, i, Type):
if i % Column_Number == 0:
print(Data)
if Type == Yes: print("=====================================")
else: print("/////////////////////////////////////")
return("")
#
# 指示されたSQLを実行して結果を戻す
def Print_SQL_Result(cursor, db, SQL_Data):
Tmp_Data = []
cursor.execute(SQL_Data)
db.commit()
# クエリ結果を取得して利用可能な状態にする
for Query_Data in cursor.fetchall():
for item in Query_Data.values(): Tmp_Data.append(item)
return(Tmp_Data)
#
# Prefectで使用するタスクを作成
#
@task
def Prefect_Batch_Task():
logger = prefect.context.get("logger") # ログ出力の準備
dt_now = datetime.datetime.now() # 現在の時刻を取得
dt_old = dt_now - datetime.timedelta(seconds=Offset_Sec) # 現在時間から設定時間分戻った時刻を生成
# 全てのカラム情報を期間設定してクエリする
SQL10 = "SELECT * FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' ORDER BY id"
# カテゴリ別の売上を期間設定集計する
SQL11 = "SELECT SUM(Payment) as PP, Category FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' GROUP BY Category ORDER BY PP DESC"
# カテゴリ別の出荷数を期間設定集計する
SQL12 = "SELECT SUM(Units) as UU, Category FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' GROUP BY Category ORDER BY UU DESC"
# 配送センター別の出荷数を期間設定集計する
SQL13 = "SELECT SUM(Units) as UU, Logistics FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' GROUP BY Logistics ORDER BY UU DESC"
logger.info(LOG_MESSAGE1) # 処理の過程をログで出力
db = Open_DB() # SingleStoreと接続
logger.info(LOG_MESSAGE2) # 処理の過程をログで出力
with db.cursor() as cursor:
logger.info(LOG_MESSAGE3) # 処理の過程をログで出力
if Aggregate == Yes: # 集計系のSQL形式で出力
logger.info(SQL_MESSAGE1)
logger.info(Print_SQL_Result(cursor, db, SQL11))
logger.info(SQL_MESSAGE2)
logger.info(Print_SQL_Result(cursor, db, SQL01))
logger.info(SQL_MESSAGE3)
logger.info(Print_SQL_Result(cursor, db, SQL12))
logger.info(SQL_MESSAGE4)
logger.info(Print_SQL_Result(cursor, db, SQL02))
logger.info(SQL_MESSAGE5)
logger.info(Print_SQL_Result(cursor, db, SQL13))
logger.info(SQL_MESSAGE6)
logger.info(Print_SQL_Result(cursor, db, SQL03))
else: # Raw形式で出力
i = 1
DB_Data = ""
Tmp_Data = Print_SQL_Result(cursor, db, SQL10) # クエリ処理を行い結果を取得
if Console_Out_ALL == Yes: # 全部を表示
logger.info(LOG_MESSAGE4)
for loop in Tmp_Data:
DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
if i % Column_Number == 0: DB_Data = Print_Query_Data(DB_Data, i, Yes)
i = i + 1
else: # 指定された行数を表示
logger.info(LOG_MESSAGE5)
for loop in range(Column_Number * Data_Lines):
DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
if i % Column_Number == 0: DB_Data = Print_Query_Data(DB_Data, i, No)
i = i + 1
logger.info(LOG_MESSAGE6) # 処理の過程をログで出力
db.close() # SingleStoreとの接続を停止する
logger.info(LOG_MESSAGE7) # 処理の過程をログで出力
#
# メインの処理
#
try:
print("Prefect検証開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
schedule = IntervalSchedule(interval=timedelta(seconds=Offset_Sec)) # スケジュールの設定
with Flow(FLOW_NAME, schedule) as flow:
Prefect_Batch_Task()
flow.run()
except KeyboardInterrupt:
print('!!!!! 割り込み発生 !!!!!')
finally:
print("Prefect検証終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
###################################################################################################
最初の検証を実行してみる
最初は、前回動作検証でも使用したRaw形式のクエリを、当該クエリ結果の先頭から指定行数と、当該結果全行の処理に分けて実行してみます。
各検証作業の前には、前回作成したなんちゃって物販データ生成ツールをコマンドラインで起動しておきます。
Raw形式で指定行数のクエリ結果を表示した例・・・
(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/P01.py
Prefect検証開始 : 2022/02/01 10:03:14
[2022-02-01 10:03:14+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:03:30+00:00
[2022-02-01 10:03:30+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:03:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(一部抜粋)
1 2022-02-01 10:03:07.620292 2022-02-01 10:03:10.606413 家電 エアコン 64800 1 関東中央物流センター Mastercard 5168403885421431 64800 6480 村上 陽一 266-4452 埼玉県 埼玉県中央区羽折町41丁目24番12号 関東 070-9568-3237 otasayuri@yahoo.com 6480
/////////////////////////////////////
2 2022-02-01 10:03:09.629209 2022-02-01 10:03:10.610422 雑貨 女性用品 3580 4 伊丹物流センター 現金 N/A 14320 1432 橋本 くみ子 689-3204 大阪府 大阪府横浜市青葉区木立41丁目21番15号 近畿 090-2569-5835 hashimotohideki@fujita.net 716
/////////////////////////////////////
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:03:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:03:30+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:04:00+00:00
[2022-02-01 10:04:00+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:04:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(一部抜粋)
12 2022-02-01 10:03:31.701516 2022-02-01 10:03:51.661462 雑貨 ペットフード 980 3 甲州物流センター 現金 N/A 2940 294 伊藤 洋介 097-1867 山梨県 山梨県横浜市旭区四区町5丁目22番6号 独鈷沢コーポ409 中部 090-5952-2300 morinaoto@hashimoto.jp 147
/////////////////////////////////////
13 2022-02-01 10:03:34.687769 2022-02-01 10:03:52.664464 家電 エアコン 64800 1 伊丹物流センター 現金 N/A 64800 6480 小林 亮介 199-5637 三重県 三重県調布市無栗屋15丁目7番12号 柿木沢新田コート001 近畿 080-4587-0462 wshimizu@nakamura.jp 6480
/////////////////////////////////////
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:04:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:04:00+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:04:30+00:00
無事に処理されている様なので、今度は対象データ全部を表示してみます。
Raw形式で対象全行数のクエリ結果を表示した例・・・
(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/P01.py
Prefect検証開始 : 2022/02/01 09:58:54
[2022-02-01 09:58:54+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T00:59:00+00:00
[2022-02-01 09:59:00+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 09:59:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(全部表示)
1 2022-02-01 09:58:54.182879 2022-02-01 09:59:00.157370 雑貨 女性用品 3580 4 平戸物流センター 現金 N/A 14320 1432 井上 春香 444-5180 佐賀県 佐賀県横浜市青葉区権現堂36丁目16番15号 コート細野801 九州・沖縄 09-7535-1393 kumikoyamada@suzuki.jp 716
=====================================
2 2022-02-01 09:58:55.186754 2022-02-01 09:59:08.164371 雑貨 女性用品 3580 4 東北物流センター JCB 15 digit 349894719659340 14320 1432 田中 健一 033-4260 秋田県 秋田県白井市油井10丁目2番3号 竜泉シティ833 東北 94-7751-5917 naokisato@yahoo.com 716
=====================================
3 2022-02-01 09:58:58.194997 2022-02-01 09:59:09.167376 書籍 フィクション 1400 3 関東中央物流センター Mastercard 4098910139916154 4200 420 佐藤 晃 199-3030 茨城県 茨城県八王子市幸手4丁目22番1号 コート箪笥町413 関東 070-4562-0870 wtanaka@aoki.jp 126
=====================================
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 09:59:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 09:59:00+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T00:59:30+00:00
[2022-02-01 09:59:30+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 09:59:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(全部表示)
4 2022-02-01 09:59:00.200775 2022-02-01 09:59:18.169379 酒類 ビール 490 3 道央物流センター 現金 N/A 1470 147 小林 直子 157-9184 北海道 北海道足立区方京22丁目24番2号 コート脚折375 北海道 090-5992-4661 mikakonakamura@ito.jp 29
=====================================
5 2022-02-01 09:59:01.206683 2022-02-01 09:59:24.178385 酒類 スコッチ 3500 10 讃岐物流センター 現金 N/A 35000 3500 村上 あすか 828-6801 徳島県 徳島県川崎市宮前区細竹9丁目1番13号 四国 50-3008-9131 hideki21@sasaki.jp 700
=====================================
6 2022-02-01 09:59:03.212448 2022-02-01 09:59:25.178389 書籍 歴史 1500 3 広島臨港物流センター VISA 16 digit 180012400034853 4500 450 福田 あすか 866-9954 島根県 島根県大島町池之端41丁目14番12号 コート押上694 中国 070-2245-5515 watanabesotaro@okamoto.org 135
=====================================
こちらも無事に動いている様ですね。
では、少し重たいSQLを動かしてみる・・・
今回のメイン検証になりますが、集計系のクエリを数パターン動かしてみたいと思います。
SQL的にはその時点の総数を対象にするクエリと、その時点の範囲対象に含まれるデータに対して実行されるクエリになります。
その時点までの総数を対象にするSQL
SELECT SUM(Payment) as PP, Category FROM Prefect_Table GROUP BY Category ORDER BY PP DESC;
SELECT SUM(Units) as UU, Category FROM Prefect_Table GROUP BY Category ORDER BY UU DESC;
SELECT SUM(Units) as LL, Logistics FROM Prefect_Table GROUP BY Logistics ORDER BY LL DESC;
その時点の対象データ範囲で集計を行うSQL
SELECT SUM(Payment) as PP, Category FROM Prefect_Table WHERE ts_SS BETWEEN '此処から' AND '此処まで' GROUP BY Category ORDER BY PP DESC;
SELECT SUM(Units) as UU, Category FROM Prefect_Table WHERE ts_SS BETWEEN '此処から' AND '此処まで' GROUP BY Category ORDER BY UU DESC;
SELECT SUM(Units) as LL, Logistics FROM Prefect_Table WHERE ts_SS BETWEEN '此処から' AND '此処まで' GROUP BY Logistics ORDER BY LL DESC;
では、実際にPrefectでFLOW実行させてみます。
集計系のクエリ結果を表示した例・・・
総数系のSQL文が「実行時総数」になる関係上微妙に異なる結果にはなりますが、最初の段階では、総数系と範囲系は想定通りにほぼ同じ結果を返してきます。SQL文を改良すればより厳密な検証が可能だと思いますが、先を急ぐ・・・・と言うことでこのまま作業を続行します。
(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/P01.py
Prefect検証開始 : 2022/02/01 10:08:14
[2022-02-01 10:08:14+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:08:30+00:00
[2022-02-01 10:08:30+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:08:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間売上
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('137600'), '家電', Decimal('17432'), '雑貨', Decimal('17400'), 'DVD/CD', Decimal('7480'), '書籍']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総売上
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('137600'), '家電', Decimal('17432'), '雑貨', Decimal('17400'), 'DVD/CD', Decimal('7480'), '書籍']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('12'), '雑貨', Decimal('6'), 'DVD/CD', Decimal('4'), '書籍', Decimal('3'), '家電']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('12'), '雑貨', Decimal('8'), 'DVD/CD', Decimal('4'), '書籍', Decimal('3'), '家電']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の期間出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('9'), '讃岐物流センター', Decimal('7'), '伊丹物流センター', Decimal('3'), '平戸物流センター', Decimal('3'), '広島臨港物流センター', Decimal('1'), '道央物流センター', Decimal('1'), '関東中央物流センター', Decimal('1'), '東北物流センター']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の総出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('11'), '讃岐物流センター', Decimal('7'), '伊丹物流センター', Decimal('3'), '平戸物流センター', Decimal('3'), '広島臨港物流センター', Decimal('1'), '関東中央物流センター', Decimal('1'), '東北物流センター', Decimal('1'), '道央物流センター']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:08:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:08:30+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:09:00+00:00
暫く生成処理が進むと、総数側の処理結果と範囲指定の結果の幅が大きくなり始めますが、30秒間隔でデータの今を見る事が普通に出来る事が確認できました。
[2022-02-01 10:09:00+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:09:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間売上
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('130400'), '家電', Decimal('28460'), '酒類', Decimal('19920'), 'DVD/CD', Decimal('14280'), '書籍', Decimal('8912'), '雑貨']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総売上
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('268000'), '家電', Decimal('37320'), 'DVD/CD', Decimal('28460'), '酒類', Decimal('26344'), '雑貨', Decimal('21760'), '書籍']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('13'), '酒類', Decimal('9'), '書籍', Decimal('8'), '雑貨', Decimal('8'), 'DVD/CD', Decimal('3'), '家電']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('20'), '雑貨', Decimal('14'), 'DVD/CD', Decimal('13'), '書籍', Decimal('13'), '酒類', Decimal('6'), '家電']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の期間出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('10'), '関東中央物流センター', Decimal('9'), '伊丹物流センター', Decimal('8'), '讃岐物流センター', Decimal('8'), '平戸物流センター', Decimal('4'), '広島臨港物流センター', Decimal('2'), '甲州物流センター']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の総出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('17'), '讃岐物流センター', Decimal('16'), '伊丹物流センター', Decimal('11'), '関東中央物流センター', Decimal('11'), '平戸物流センター', Decimal('7'), '広島臨港物流センター', Decimal('2'), '甲州物流センター', Decimal('1'), '道央物流センター', Decimal('1'), '東北物流センター']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:09:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:09:00+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:09:30+00:00
簡単なPython処理をタスクに設定し、Prefectのワークフローで普通に活用出来そうですね。
今回のまとめ
今回は、以前のPythonスクリプトを雪だるま方式で改造し、少し複雑なSQLをSingleStoreに処理させるワークフローを想定・検証してみました。結果的には非常にシンプル且つ簡単にワークフローをPythonワールドで書く事が出来ると同時に、Prefect自体も確実に狙った操作をしてくれる事が解りました。
クエリ結果を取り出す事が出来るようになったので、次回は追加で解析系ライブラリとの連携ワークフローに挑戦してみたいと思います。