0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Prefectって何だ??・・(2)

Posted at

#今回は・・・
###前回は、Pythonワールドでワークフローの仕組みが書けそうな・・・Prefectについての環境導入を行いましたので、今回からは少し深掘り検証モードに入って行きたいと思います。

#シンプルにデータベースを転がせるか?・・・・・
最初の検証は、指定したデータベースの状況を定期的に処理する、超簡単系ワークフローを作ってみたいと思います。

まずは、彼らのホームページから検証の叩き台をお借りして、少しカスタマイズを行います。
なお、コードに関しては・・・NDA(ノン・ダメ出し・アグリーメント)を前提にさせて頂きますので、自由にカスタマイズして使って頂いて結構です。

#
# Prefectの検証たたき台(彼らのホームページより)
#
import prefect
from prefect import task, Flow

FLOW_NAME = "Prefect_Test01"
LOG_MESSAGE = "Hello world!"

@task
def test01_task():
    logger = prefect.context.get("logger")
    logger.info(LOG_MESSAGE)

with Flow(FLOW_NAME) as flow:
    test01_task()

flow.run()

サクッと!実行してみます。

(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/prefect01.py
[2022-01-30 09:54:19+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Test01'
[2022-01-30 09:54:19+0900] INFO - prefect.TaskRunner | Task 'test01_task': Starting task run...
[2022-01-30 09:54:19+0900] INFO - prefect.test01_task | Hello world!
[2022-01-30 09:54:19+0900] INFO - prefect.TaskRunner | Task 'test01_task': Finished task run for task with final state: 'Success'
[2022-01-30 09:54:19+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
(base) apple@appurunoMacBook-Pro Prefect % 

無事に動いている様なので、以前に作成したSingleStore向けMySQLのフリして繋げる・・を追加してみます。
SingleStoreに関しては、検証用に導入済みのMBPのローカルDocker版を使っています。
現在も引き続き条件付きフリー使用が継続している様なので、ご興味があれば是非メアドを登録して使ってみてください。

また、使用するテーブルも以前の検証で作成した「なんちゃって物販データ」の既存テーブルを転用してアクセスしてみます。

# Prefectの検証たたき台
#
# お約束の初期設定
import prefect
from prefect import task, Flow
#
# MySQL接続を利用する準備
import pymysql.cursors
#
# 広域変数設定
Yes = 1
No = 0
#
Console_Out_ALL = No # コンソールに出力するパターンを設定(Yes:全ての情報 No:3行だけ)
Column_Number = 21   # 今回の参照テーブルのカラム数
Data_Lines = 2       # 一部出力の際のデータ行数
#
# 処理フローの名前
FLOW_NAME = "Prefect_Test02"
#
# ログ出力用メッセージ
LOG_MESSAGE1 = "SingleStoreと接続開始"
LOG_MESSAGE2 = "SingleStoreと接続完了"
LOG_MESSAGE3 = "SingleStore上のテーブル名をクエリします"
LOG_MESSAGE4 = "現在登録されているカラム情報は以下の通りです(全部表示)"
LOG_MESSAGE5 = "現在登録されているカラム情報は以下の通りです(一部抜粋)"
LOG_MESSAGE6 = "SingleStoreとの接続を終了"
LOG_MESSAGE7 = "SingleStore連携処理の終了"
#
# SigleStoreとの接続情報(適宜変更)
SS_Host = "zzz.zzz.zzz.zzz"
SS_Port = 3306
SS_User = "zzzzz"
SS_Pass = "zzzzz"
SS_DB = "zzzzz"
SS_Char = "utf8"
#
# SQLクエリで使うSQL文
SQL = "SELECT * FROM RTS_Demo_Table ORDER BY id_SS" 
#
# SingleStoreに接続して処理に必要な情報を返す
#
def Open_DB():

    db = pymysql.connect(
        host = SS_Host, 
        port = SS_Port, 
        user = SS_User, 
        password = SS_Pass, 
        db = SS_DB, 
        charset = SS_Char, 
        cursorclass = pymysql.cursors.DictCursor)

    
    return(db) # データベース処理に必要な情報を戻す
#
# コンソールに1行単位で表示する
#
def Print_Query_Data(Data, i, Type):

    if i % Column_Number == 0:
        print(Data)
        if Type == Yes: print("=====================================")
        else:           print("/////////////////////////////////////")

    return("")
#
# Prefectで使用するタスクを作成
#
@task
def test02_task():
    
    logger = prefect.context.get("logger") # ログ出力の準備

    Tmp_Data=[]

    logger.info(LOG_MESSAGE1) # 処理の過程をログで出力

    db = Open_DB() # SingleStoreと接続

    logger.info(LOG_MESSAGE2) # 処理の過程をログで出力

    with db.cursor() as cursor:

        # 処理の過程をログで出力
        logger.info(LOG_MESSAGE3)

        # SingleStoreにSQLクエリを発行する
        cursor.execute(SQL)
        db.commit()

        # クエリ結果を取得して利用可能な状態にする
        for Query_Data in cursor.fetchall():
            for item in Query_Data.values():
                Tmp_Data.append(item)   

        # クエリ結果の表示
        i = 1
        DB_Data = ""

        if Console_Out_ALL == Yes: # 全部を表示

            logger.info(LOG_MESSAGE4)

            for loop in Tmp_Data:
                DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
                if i % Column_Number == 0:
                    DB_Data = Print_Query_Data(DB_Data, i, Yes)
                i = i + 1
        
        else: # 指定された行数を表示

            logger.info(LOG_MESSAGE5)

            for loop in range(Column_Number * Data_Lines):
                DB_Data = DB_Data + " " + str(Tmp_Data[i-1])
                if i % Column_Number == 0:
                    DB_Data = Print_Query_Data(DB_Data, i, No)
                i = i + 1

    logger.info(LOG_MESSAGE6) # 処理の過程をログで出力

    db.close() # SingleStoreとの接続を停止する

    logger.info(LOG_MESSAGE7) # 処理の過程をログで出力
#
# メインの処理
#
with Flow(FLOW_NAME) as flow:

    test02_task()

flow.run()
################################################

サクッと!動かしてみます。

(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/prefect02.py
[2022-01-30 09:59:44+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Test02'
[2022-01-30 09:59:44+0900] INFO - prefect.TaskRunner | Task 'test02_task': Starting task run...
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStoreと接続開始
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStoreと接続完了
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStore上のテーブル名をクエリします
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | 現在登録されているカラム情報は以下の通りです(一部抜粋)
 1 2021-11-19 10:24:09.441558 2021-11-19 10:24:16.423722 雑貨 贈答品 1980 3 297 5940 594 American Express 180091589062760 高橋 篤司 113-2169 JP-38 愛媛県 愛媛県袖ケ浦市芝公園7丁目10番7号 アーバン高田馬場436 090-5030-6067 itoyui@aoki.com 四国 讃岐物流センター
/////////////////////////////////////
 2 2021-11-19 10:24:10.339402 2021-11-19 10:24:16.424731 DVD/CD 洋画 3500 2 140 7000 700 Mastercard 4819783616319228 佐藤 英樹 699-9934 JP-18 福井県 福井県神津島村土呂部8丁目23番13号 080-2980-5899 shota95@yamada.jp 中部 甲州物流センター
/////////////////////////////////////
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStoreとの接続を終了
[2022-01-30 09:59:44+0900] INFO - prefect.test02_task | SingleStore連携処理の終了
[2022-01-30 09:59:44+0900] INFO - prefect.TaskRunner | Task 'test02_task': Finished task run for task with final state: 'Success'
[2022-01-30 09:59:44+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
(base) apple@appurunoMacBook-Pro Prefect % 

表示の関係で最初から2行分を出力していますが、無事に通常のPython系MySQLアクセスが出来る事を確認できました。

因みに100行全部出力すると・・・・

(base) apple@appurunoMacBook-Pro Prefect % /Users/apple/opt/anaconda3/bin/python /Users/apple/Desktop/Prefect/prefect02.py
[2022-01-30 10:15:39+0900] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect_Test02'
[2022-01-30 10:15:39+0900] INFO - prefect.TaskRunner | Task 'test02_task': Starting task run...
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStoreと接続開始
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStoreと接続完了
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStore上のテーブル名をクエリします
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | 現在登録されているカラム情報は以下の通りです(全部表示)
 1 2021-11-19 10:24:09.441558 2021-11-19 10:24:16.423722 雑貨 贈答品 1980 3 297 5940 594 American Express 180091589062760 高橋 篤司 113-2169 JP-38 愛媛県 愛媛県袖ケ浦市芝公園7丁目10番7号 アーバン高田馬場436 090-5030-6067 itoyui@aoki.com 四国 讃岐物流センター
=====================================
 2 2021-11-19 10:24:10.339402 2021-11-19 10:24:16.424731 DVD/CD 洋画 3500 2 140 7000 700 Mastercard 4819783616319228 佐藤 英樹 699-9934 JP-18 福井県 福井県神津島村土呂部8丁目23番13号 080-2980-5899 shota95@yamada.jp 中部 甲州物流センター
=====================================
 3 2021-11-19 10:24:10.974893 2021-11-19 10:24:25.424737 雑貨 乾電池 248 1 12 248 24 現金 N/A 佐々木 千代 579-1085 JP-42 長崎県 長崎県長生郡一宮町松浦町26丁目17番12号 月島コーポ240 070-2421-9200 ytanaka@yamamoto.com 九州・沖縄 平戸物流センター
=====================================
 4 2021-11-19 10:24:11.704003 2021-11-19 10:24:29.426744 DVD/CD アイドル 2980 2 119 5960 596 JCB 16 digit 180011672869244 中村 あすか 283-8664 JP-12 千葉県 千葉県横浜市西区細竹11丁目25番8号 070-3277-9886 sayurisasaki@ota.com 関東 関東中央物流センター
=====================================

----- 途中省略 -------

 95 2021-11-19 10:25:15.206949 2021-11-19 10:30:05.831141 雑貨 医薬部外品 398 4 79 1592 159 Diners Club / Carte Blanche 30280001215195 藤井 加奈 451-8413 JP-26 京都府 京都府杉並区南赤田40丁目15番9号 クレスト松が谷520 090-3696-8259 xfujita@hayashi.jp 近畿 伊丹物流センター
=====================================
 96 2021-11-19 10:25:15.712581 2021-11-19 10:30:10.837144 家電 テレビ 49800 1 4980 49800 4980 VISA 13 digit 4583175589485926 坂本 涼平 173-7986 JP-15 新潟県 新潟県印旛郡本埜村平須賀21丁目10番17号 権現堂クレスト469 070-4714-1060 reitakahashi@gmail.com 中部 甲州物流センター
=====================================
 97 2021-11-19 10:25:16.605000 2021-11-19 10:30:14.840148 書籍 漫画 570 3 51 1710 171 現金 N/A 斎藤 結衣 426-0651 JP-31 鳥取県 鳥取県印旛郡栄町虎ノ門虎ノ門ヒルズ森タワー28丁目19番6号 080-0512-0117 gfukuda@hotmail.com 中国 広島臨港物流センター
=====================================
 98 2021-11-19 10:25:17.408374 2021-11-19 10:30:23.847156 書籍 新刊 1980 3 178 5940 594 Discover 3568815083887561 小川 あすか 052-3746 JP-40 福岡県 福岡県八丈島八丈町隼町8丁目24番13号 湯宮アーバン379 83-8825-5209 kobayashitomoya@hotmail.com 九州・沖縄 平戸物流センター
=====================================
 99 2021-11-19 10:25:18.221668 2021-11-19 10:30:31.853157 DVD/CD Jポップ 2500 2 100 5000 500 現金 N/A 福田 陽子 328-2222 JP-04 宮城県 宮城県長生郡長柄町細竹26丁目16番17号 幸手コーポ025 070-1979-9296 khasegawa@maeda.jp 東北 東北物流センター
=====================================
 100 2021-11-19 10:25:18.771961 2021-11-19 10:30:32.860161 雑貨 電球 198 4 39 792 79 JCB 15 digit 6569962741550363 佐藤 浩 703-9742 JP-38 愛媛県 愛媛県川崎市宮前区勝どき27丁目26番12号 070-0402-6939 nishimurajun@hotmail.com 四国 讃岐物流センター
=====================================
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStoreとの接続を終了
[2022-01-30 10:15:39+0900] INFO - prefect.test02_task | SingleStore連携処理の終了
[2022-01-30 10:15:39+0900] INFO - prefect.TaskRunner | Task 'test02_task': Finished task run for task with final state: 'Success'
[2022-01-30 10:15:39+0900] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded
(base) apple@appurunoMacBook-Pro Prefect % 

となります。Docker利用とはいえ、SingleStoreのインメモリ性能が高いので、今後の「少し複雑系SQLアクセス」でも非常に期待が持てるかと思います。

余談ですが、国内のSingleStore性能評価で**「十数億行のデータに対して抽出・集計系のSQLを仕掛けて、3秒を切る性能」**が出ているとの事です。

#今回のまとめ
今回は、初めてのPrefect第2弾として「Python経験のMySQL接続でSingleStoreが接続出来るか?」を検証してみました。結果的には、実にサクッと!使える事が確認でき、次回以降の展開が非常に期待出来る状況になってきました。

次回は、今回の「雪だるま方式開発手法」(勝手に命名。。)で作成・検証を行ったコードに、新たに「スケジュール機能」を組み込んでみたいと思います。また、その検証としてSingleStoreにランダムにデータを生成・挿入する仕組みを併用し、「初歩の短サイクルバッチ処理」的な作業を行いたいと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?