1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Prefectって何だ??・・(4)

Posted at

いよいよ本題に挑戦!

前回は、今回の検証で使用するデータ生成ツーールを紹介させて頂きましたが、今回はいよいよ以前のスクリプトをベースに雪だるま方式の作り込みを行い、幾つかのSQLクエリを一定時間毎に定期的ワークフロー処理してみる事にします。

因みにどうやってスケジュールを定義するのか・・・

この一連の検証を企画した際に、**どうやってスケジュールを設定するのか?**が大きな壁だと覚悟していましたが、実際作業を進めていくと・・

schedule = IntervalSchedule(interval=timedelta(seconds=Offset_Sec)) 

と書くだけでOK!だと解り、以前の叩き台を

with Flow(FLOW_NAME, schedule) as flow:

とすれば良いと知り・・・・ある意味で愕然としました・・・

また、彼らのホームページやGitHubには沢山のサンプルが有りますので、是非一読されることをお勧め致します。

今回使用するワークフローはこんな感じです・・・

今回は、以前のモノをベースに必要そうな処理を雪だるま式に追加し、なんとかそれっぽい系のモノに仕上げてみました。毎度おなじみのNDA(ノン・ダメ出し・アグリーメント)ベースで公開しますので、適宜に改造等を行って自由に使ってください。

#
# Prefectの検証:少し重たいSQLを振り回せ!版
#
# お約束の初期設定
import prefect
import datetime
from prefect import task, Flow
from datetime import timedelta
from prefect.schedules import IntervalSchedule
import pymysql.cursors
#
# 広域変数設定
Yes = 1
No  = 0
# 各種設定
Offset_Sec = 30       # 処理間隔の設定(この例では30秒間隔)
Column_Number = 20    # 今回の参照テーブルのカラム数(ここも実情に併せて変更する)
Data_Lines = 2              # 一部出力の際のデータ行数(ここも実情に併せて変更する)
Console_Out_ALL = Yes # コンソールに出力するパターンを設定(Yes:全ての情報 No:Data_Lines行だけ)
Aggregate = No        # 処理を集計型で行うかRawデータ表示を行うかを選択(ここも実情に併せて変更する)
#
# SigleStoreとの接続情報(適宜変更)
SS_Host = "zzz.zzz.zzz.zzz"
SS_Port = 3306
SS_User = "zzzzz"
SS_Pass = "zzzzz"
SS_DB = "zzzzz"
SS_Char = "utf8"
#
FLOW_NAME = "Prefect_Demo_FLOW"     # 処理フローの名前
TABLE_NAME = "Prefect_Demo_TABLE" # SingleStoreに作成するテーブルの名前
# ログ出力用メッセージ
LOG_MESSAGE1 = "SingleStoreと接続開始"
LOG_MESSAGE2 = "SingleStoreと接続完了"
LOG_MESSAGE3 = "SingleStore上のテーブル名をクエリします"
LOG_MESSAGE4 = "現在登録されているカラム情報は以下の通りです(全部表示)"
LOG_MESSAGE5 = "現在登録されているカラム情報は以下の通りです(一部抜粋)"
LOG_MESSAGE6 = "SingleStoreとの接続を終了"
LOG_MESSAGE7 = "SingleStore連携処理の終了"
# SQL関連メッセージ
SQL_MESSAGE1 = "カテゴリ別の期間売上"
SQL_MESSAGE2 = "カテゴリ別の総売上"
SQL_MESSAGE3 = "カテゴリ別の期間出荷"
SQL_MESSAGE4 = "カテゴリ別の総出荷"
SQL_MESSAGE5 = "配送センター別の期間出荷"
SQL_MESSAGE6 = "配送センター別の総出荷"
#
# SQLクエリで使うSQL文(全データ対象:固定)
#
# 全てのカラム情報を全部クエリする
SQL00 = "SELECT * FROM " + TABLE_NAME + " ORDER BY id"
# カテゴリ別の総売上クエリ
SQL01 = "SELECT SUM(Payment) as PP, Category FROM " + TABLE_NAME + " GROUP BY Category ORDER BY PP DESC"
# カテゴリ別の総出荷数クエリ
SQL02 = "SELECT SUM(Units) as UU, Category FROM " + TABLE_NAME + " GROUP BY Category ORDER BY UU DESC"
# 配送センター別の総取扱数クエリ
SQL03 = "SELECT SUM(Units) as LL, Logistics FROM " + TABLE_NAME + " GROUP BY Logistics ORDER BY LL DESC"
#
# SingleStoreに接続して処理に必要なポインタを返す
#
def Open_DB():

    # ターゲットのデータベースに接続してポインタを取得
    db = pymysql.connect(
        host     = SS_Host,
        port     = SS_Port,
        user     = SS_User,
        password = SS_Pass,
        db       = SS_DB,
        charset  = SS_Char,
        cursorclass = pymysql.cursors.DictCursor)
    
    return(db) # データベース処理に必要な情報を戻す
#
# コンソールに1行単位で全カラム情報を表示する
def Print_Query_Data(Data, i, Type):

    if i % Column_Number == 0:
        print(Data)
        if Type == Yes: print("=====================================")
        else:           print("/////////////////////////////////////")

    return("")
#
# 指示されたSQLを実行して結果を戻す
def Print_SQL_Result(cursor, db, SQL_Data):

    Tmp_Data = []

    cursor.execute(SQL_Data)
    db.commit()

    # クエリ結果を取得して利用可能な状態にする
    for Query_Data in cursor.fetchall():
        for item in Query_Data.values(): Tmp_Data.append(item)  

    return(Tmp_Data)
#
# Prefectで使用するタスクを作成
#
@task
def Prefect_Batch_Task():

    logger = prefect.context.get("logger") # ログ出力の準備
    
    dt_now = datetime.datetime.now() # 現在の時刻を取得
    
    dt_old  = dt_now - datetime.timedelta(seconds=Offset_Sec) # 現在時間から設定時間分戻った時刻を生成

    # 全てのカラム情報を期間設定してクエリする
    SQL10 = "SELECT * FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' ORDER BY id"
    # カテゴリ別の売上を期間設定集計する
    SQL11 = "SELECT SUM(Payment) as PP, Category FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' GROUP BY Category ORDER BY PP DESC"
    # カテゴリ別の出荷数を期間設定集計する
    SQL12 = "SELECT SUM(Units) as UU, Category FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' GROUP BY Category ORDER BY UU DESC"
    # 配送センター別の出荷数を期間設定集計する
    SQL13 = "SELECT SUM(Units) as UU, Logistics FROM " + TABLE_NAME + " WHERE ts_SS BETWEEN '" + str(dt_old) + "' AND '" + str(dt_now) + "' GROUP BY Logistics ORDER BY UU DESC"

    logger.info(LOG_MESSAGE1) # 処理の過程をログで出力
    
    db = Open_DB() # SingleStoreと接続

    logger.info(LOG_MESSAGE2) # 処理の過程をログで出力

    with db.cursor() as cursor:

        logger.info(LOG_MESSAGE3) # 処理の過程をログで出力
        
        if Aggregate == Yes: # 集計系のSQL形式で出力

            logger.info(SQL_MESSAGE1)
            logger.info(Print_SQL_Result(cursor, db, SQL11))
            logger.info(SQL_MESSAGE2)
            logger.info(Print_SQL_Result(cursor, db, SQL01))
            logger.info(SQL_MESSAGE3)
            logger.info(Print_SQL_Result(cursor, db, SQL12))
            logger.info(SQL_MESSAGE4)
            logger.info(Print_SQL_Result(cursor, db, SQL02))
            logger.info(SQL_MESSAGE5)
            logger.info(Print_SQL_Result(cursor, db, SQL13))
            logger.info(SQL_MESSAGE6)
            logger.info(Print_SQL_Result(cursor, db, SQL03))

        else: # Raw形式で出力

            i = 1
            DB_Data = ""

            Tmp_Data = Print_SQL_Result(cursor, db, SQL10) # クエリ処理を行い結果を取得

            if Console_Out_ALL == Yes: # 全部を表示
                
                logger.info(LOG_MESSAGE4)

                for loop in Tmp_Data:
                    DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
                    if i % Column_Number == 0: DB_Data = Print_Query_Data(DB_Data, i, Yes)
                    i = i + 1

            else: # 指定された行数を表示

                logger.info(LOG_MESSAGE5)

                for loop in range(Column_Number * Data_Lines):
                    DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
                    if i % Column_Number == 0: DB_Data = Print_Query_Data(DB_Data, i, No)
                    i = i + 1

    logger.info(LOG_MESSAGE6) # 処理の過程をログで出力

    db.close() # SingleStoreとの接続を停止する

    logger.info(LOG_MESSAGE7) # 処理の過程をログで出力
#
# メインの処理
#
try:

    print("Prefect検証開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))

    schedule = IntervalSchedule(interval=timedelta(seconds=Offset_Sec)) # スケジュールの設定

    with Flow(FLOW_NAME, schedule) as flow:

        Prefect_Batch_Task()

    flow.run()

except KeyboardInterrupt:
   
    print('!!!!! 割り込み発生 !!!!!')
           
finally:
   
    print("Prefect検証終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
###################################################################################################

最初の検証を実行してみる

最初は、前回動作検証でも使用したRaw形式のクエリを、当該クエリ結果の先頭から指定行数と、当該結果全行の処理に分けて実行してみます。

各検証作業の前には、前回作成したなんちゃって物販データ生成ツールをコマンドラインで起動しておきます。

Raw形式で指定行数のクエリ結果を表示した例・・・

(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/P01.py
Prefect検証開始 : 2022/02/01 10:03:14
[2022-02-01 10:03:14+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:03:30+00:00
[2022-02-01 10:03:30+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:03:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(一部抜粋)
 1 2022-02-01 10:03:07.620292 2022-02-01 10:03:10.606413 家電 エアコン 64800 1 関東中央物流センター Mastercard 5168403885421431 64800 6480 村上 陽一 266-4452 埼玉県 埼玉県中央区羽折町41丁目24番12号 関東 070-9568-3237 otasayuri@yahoo.com 6480
/////////////////////////////////////
 2 2022-02-01 10:03:09.629209 2022-02-01 10:03:10.610422 雑貨 女性用品 3580 4 伊丹物流センター 現金 N/A 14320 1432 橋本 くみ子 689-3204 大阪府 大阪府横浜市青葉区木立41丁目21番15号 近畿 090-2569-5835 hashimotohideki@fujita.net 716
/////////////////////////////////////
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:03:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:03:30+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:03:30+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:04:00+00:00
[2022-02-01 10:04:00+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:04:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(一部抜粋)
 12 2022-02-01 10:03:31.701516 2022-02-01 10:03:51.661462 雑貨 ペットフード 980 3 甲州物流センター 現金 N/A 2940 294 伊藤 洋介 097-1867 山梨県 山梨県横浜市旭区四区町5丁目22番6号 独鈷沢コーポ409 中部 090-5952-2300 morinaoto@hashimoto.jp 147
/////////////////////////////////////
 13 2022-02-01 10:03:34.687769 2022-02-01 10:03:52.664464 家電 エアコン 64800 1 伊丹物流センター 現金 N/A 64800 6480 小林 亮介 199-5637 三重県 三重県調布市無栗屋15丁目7番12号 柿木沢新田コート001 近畿 080-4587-0462 wshimizu@nakamura.jp 6480
/////////////////////////////////////
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:04:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:04:00+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:04:00+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:04:30+00:00

無事に処理されている様なので、今度は対象データ全部を表示してみます。

Raw形式で対象全行数のクエリ結果を表示した例・・・

(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/P01.py
Prefect検証開始 : 2022/02/01 09:58:54
[2022-02-01 09:58:54+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T00:59:00+00:00
[2022-02-01 09:59:00+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 09:59:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(全部表示)
 1 2022-02-01 09:58:54.182879 2022-02-01 09:59:00.157370 雑貨 女性用品 3580 4 平戸物流センター 現金 N/A 14320 1432 井上 春香 444-5180 佐賀県 佐賀県横浜市青葉区権現堂36丁目16番15号 コート細野801 九州・沖縄 09-7535-1393 kumikoyamada@suzuki.jp 716
=====================================
 2 2022-02-01 09:58:55.186754 2022-02-01 09:59:08.164371 雑貨 女性用品 3580 4 東北物流センター JCB 15 digit 349894719659340 14320 1432 田中 健一 033-4260 秋田県 秋田県白井市油井10丁目2番3号 竜泉シティ833 東北 94-7751-5917 naokisato@yahoo.com 716
=====================================
 3 2022-02-01 09:58:58.194997 2022-02-01 09:59:09.167376 書籍 フィクション 1400 3 関東中央物流センター Mastercard 4098910139916154 4200 420 佐藤 晃 199-3030 茨城県 茨城県八王子市幸手4丁目22番1号 コート箪笥町413 関東 070-4562-0870 wtanaka@aoki.jp 126
=====================================
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 09:59:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 09:59:00+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 09:59:00+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T00:59:30+00:00
[2022-02-01 09:59:30+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 09:59:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 09:59:30+0900] INFO - prefect.Prefect_Batch_Task | 現在登録されているカラム情報は以下の通りです(全部表示)
 4 2022-02-01 09:59:00.200775 2022-02-01 09:59:18.169379 酒類 ビール 490 3 道央物流センター 現金 N/A 1470 147 小林 直子 157-9184 北海道 北海道足立区方京22丁目24番2号 コート脚折375 北海道 090-5992-4661 mikakonakamura@ito.jp 29
=====================================
 5 2022-02-01 09:59:01.206683 2022-02-01 09:59:24.178385 酒類 スコッチ 3500 10 讃岐物流センター 現金 N/A 35000 3500 村上 あすか 828-6801 徳島県 徳島県川崎市宮前区細竹9丁目1番13号 四国 50-3008-9131 hideki21@sasaki.jp 700
=====================================
 6 2022-02-01 09:59:03.212448 2022-02-01 09:59:25.178389 書籍 歴史 1500 3 広島臨港物流センター VISA 16 digit 180012400034853 4500 450 福田 あすか 866-9954 島根県 島根県大島町池之端41丁目14番12号 コート押上694 中国 070-2245-5515 watanabesotaro@okamoto.org 135
=====================================

こちらも無事に動いている様ですね。

では、少し重たいSQLを動かしてみる・・・

今回のメイン検証になりますが、集計系のクエリを数パターン動かしてみたいと思います。

SQL的にはその時点の総数を対象にするクエリと、その時点の範囲対象に含まれるデータに対して実行されるクエリになります。

その時点までの総数を対象にするSQL

SELECT SUM(Payment) as PP, Category FROM Prefect_Table GROUP BY Category ORDER BY PP DESC;
SELECT SUM(Units) as UU, Category FROM Prefect_Table GROUP BY Category ORDER BY UU DESC;
SELECT SUM(Units) as LL, Logistics FROM Prefect_Table GROUP BY Logistics ORDER BY LL DESC;

その時点の対象データ範囲で集計を行うSQL

SELECT SUM(Payment) as PP, Category FROM Prefect_Table WHERE ts_SS BETWEEN '此処から' AND '此処まで' GROUP BY Category ORDER BY PP DESC;
SELECT SUM(Units) as UU, Category FROM Prefect_Table WHERE ts_SS BETWEEN '此処から' AND '此処まで' GROUP BY Category ORDER BY UU DESC;
SELECT SUM(Units) as LL, Logistics FROM Prefect_Table WHERE ts_SS BETWEEN '此処から' AND '此処まで' GROUP BY Logistics ORDER BY LL DESC;

では、実際にPrefectでFLOW実行させてみます。

集計系のクエリ結果を表示した例・・・

総数系のSQL文が「実行時総数」になる関係上微妙に異なる結果にはなりますが、最初の段階では、総数系と範囲系は想定通りにほぼ同じ結果を返してきます。SQL文を改良すればより厳密な検証が可能だと思いますが、先を急ぐ・・・・と言うことでこのまま作業を続行します。

(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/P01.py
Prefect検証開始 : 2022/02/01 10:08:14
[2022-02-01 10:08:14+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:08:30+00:00
[2022-02-01 10:08:30+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:08:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間売上
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('137600'), '家電', Decimal('17432'), '雑貨', Decimal('17400'), 'DVD/CD', Decimal('7480'), '書籍']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総売上
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('137600'), '家電', Decimal('17432'), '雑貨', Decimal('17400'), 'DVD/CD', Decimal('7480'), '書籍']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('12'), '雑貨', Decimal('6'), 'DVD/CD', Decimal('4'), '書籍', Decimal('3'), '家電']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('12'), '雑貨', Decimal('8'), 'DVD/CD', Decimal('4'), '書籍', Decimal('3'), '家電']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の期間出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('9'), '讃岐物流センター', Decimal('7'), '伊丹物流センター', Decimal('3'), '平戸物流センター', Decimal('3'), '広島臨港物流センター', Decimal('1'), '道央物流センター', Decimal('1'), '関東中央物流センター', Decimal('1'), '東北物流センター']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の総出荷
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('11'), '讃岐物流センター', Decimal('7'), '伊丹物流センター', Decimal('3'), '平戸物流センター', Decimal('3'), '広島臨港物流センター', Decimal('1'), '関東中央物流センター', Decimal('1'), '東北物流センター', Decimal('1'), '道央物流センター']
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:08:30+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:08:30+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:08:30+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:09:00+00:00

暫く生成処理が進むと、総数側の処理結果と範囲指定の結果の幅が大きくなり始めますが、30秒間隔でデータの今を見る事が普通に出来る事が確認できました。


[2022-02-01 10:09:00+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Demo_FLOW'
[2022-02-01 10:09:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Starting task run...
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続開始
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreと接続完了
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore上のテーブル名をクエリします
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間売上
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('130400'), '家電', Decimal('28460'), '酒類', Decimal('19920'), 'DVD/CD', Decimal('14280'), '書籍', Decimal('8912'), '雑貨']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総売上
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('268000'), '家電', Decimal('37320'), 'DVD/CD', Decimal('28460'), '酒類', Decimal('26344'), '雑貨', Decimal('21760'), '書籍']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の期間出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('13'), '酒類', Decimal('9'), '書籍', Decimal('8'), '雑貨', Decimal('8'), 'DVD/CD', Decimal('3'), '家電']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | カテゴリ別の総出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('20'), '雑貨', Decimal('14'), 'DVD/CD', Decimal('13'), '書籍', Decimal('13'), '酒類', Decimal('6'), '家電']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の期間出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('10'), '関東中央物流センター', Decimal('9'), '伊丹物流センター', Decimal('8'), '讃岐物流センター', Decimal('8'), '平戸物流センター', Decimal('4'), '広島臨港物流センター', Decimal('2'), '甲州物流センター']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | 配送センター別の総出荷
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | [Decimal('17'), '讃岐物流センター', Decimal('16'), '伊丹物流センター', Decimal('11'), '関東中央物流センター', Decimal('11'), '平戸物流センター', Decimal('7'), '広島臨港物流センター', Decimal('2'), '甲州物流センター', Decimal('1'), '道央物流センター', Decimal('1'), '東北物流センター']
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStoreとの接続を終了
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Batch_Task | SingleStore連携処理の終了
[2022-02-01 10:09:00+0900] INFO - prefect.TaskRunner | Task 'Prefect_Batch_Task': Finished task run for task with final state: 'Success'
[2022-02-01 10:09:00+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
[2022-02-01 10:09:00+0900] INFO - prefect.Prefect_Demo_FLOW | Waiting for next scheduled run at 2022-02-01T01:09:30+00:00

簡単なPython処理をタスクに設定し、Prefectのワークフローで普通に活用出来そうですね。

今回のまとめ

今回は、以前のPythonスクリプトを雪だるま方式で改造し、少し複雑なSQLをSingleStoreに処理させるワークフローを想定・検証してみました。結果的には非常にシンプル且つ簡単にワークフローをPythonワールドで書く事が出来ると同時に、Prefect自体も確実に狙った操作をしてくれる事が解りました。

クエリ結果を取り出す事が出来るようになったので、次回は追加で解析系ライブラリとの連携ワークフローに挑戦してみたいと思います。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?