0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Prefectって何だ??・・(3)

Last updated at Posted at 2022-02-01

忘れてました・・・検証で使用する道具はこれです。。

今回の検証で使用する「なんちゃって物販データ」生成用のツールはこれになります。毎度で恐縮ですが、NDA(ノン・ダメ出し・アグリーメント)ベースでご自由にお使いください。

#
# Prefect検証用・仮想物販データ生成ツール
#
# Python 3版
#
#  初期設定
import sys
stdout = sys.stdout
sys.stdout = stdout
import pymysql.cursors
import datetime
import time
import random
import re
#
# 広域変数
Yes = 1
No = 0
#
# 生成するデータの数
Generate_Data = 1000
#
# データ生成間隔の調整
Less_Than_1Sec = Yes # 1秒未満でデータ生成する場合
Min_Interval = 1     # 1秒以上でデータ生成する場合の最小値
Max_Interval = 3     # 1秒以上でデータ生成する場合の最大値
Wait_Time = 3        # テーブル初期化からデータ生成までの猶予時間(秒)
#
Table_Name = "Prefect_Demo_TABLE" # SingleStoreに作成するテーブルの名前
#
# SigleStoreとの接続情報(適宜変更)
SS_Host = "localhost"
SS_Port = 3306
SS_User = "root"
SS_Pass = "adminzoom"
SS_DB = "Demo"
SS_Char = "utf8"
#
# 物販情報で使うメタデータの定義
# カテゴリ名
Category_Name = ["酒類","家電","書籍","DVD/CD","雑貨"]
# 酒類の商品情報
Product_Name0 = ["日本酒","バーボン","ビール","芋焼酎","赤ワイン","白ワイン","スコッチ","ブランデー","泡盛","テキーラ"]
Product_Price0 = [1980, 2500, 490, 2000, 3000, 2500, 3500, 5000, 1980, 2000]    
# 家電の商品情報
Product_Name1 = ["テレビ","洗濯機","ラジオ","ステレオ","電子レンジ","パソコン","電池","エアコン","乾燥機","掃除機"]
Product_Price1 = [49800, 39800, 2980, 88000, 29800, 64800, 198, 64800, 35800, 24800]    
# 書籍の商品情報
Product_Name2 = ["週刊誌","歴史","写真集","漫画","参考書","フィクション","経済","自己啓発","月刊誌","新刊"]
Product_Price2 = [280, 1500, 2500, 570, 1480, 1400, 1800, 1540, 980, 1980]    
# DVD/CDの商品情報
Product_Name3 = ["洋楽","演歌","Jポップ","洋画","アイドル","クラッシック","邦画","連続ドラマ","企画","アニメ"]
Product_Price3 = [1980, 2200, 2500, 3500, 2980, 1980, 3800, 2690, 1980, 2400]
# 雑貨の商品情報
Product_Name4 = ["洗剤","電球","贈答品","医薬部外品","ペットフード","乾電池","文房具","男性用品","女性用品","季節用品"]
Product_Price4 = [498, 198, 1980, 398, 980, 248, 398, 2980, 3580, 1980]
#
#  地域名ルックアップ情報(キーは都道府県名)
Area_Data={'北海道':'北海道','青森県':'東北','岩手県':'東北','宮城県':'東北','秋田県':'東北','山形県':'東北','福島県':'東北',
           '茨城県':'関東','栃木県':'関東','群馬県':'関東','埼玉県':'関東','千葉県':'関東','東京都':'関東','神奈川県':'関東',
           '新潟県':'中部','富山県':'中部','石川県':'中部','福井県':'中部','山梨県':'中部','長野県':'中部','岐阜県':'中部','静岡県':'中部','愛知県':'中部',
           '三重県':'近畿','滋賀県':'近畿','京都府':'近畿','大阪府':'近畿','兵庫県':'近畿','奈良県':'近畿','和歌山県':'近畿',
           '鳥取県':'中国','島根県':'中国','岡山県':'中国','広島県':'中国','山口県':'中国',
           '徳島県':'四国','香川県':'四国','愛媛県':'四国','高知県':'四国',
           '福岡県':'九州・沖縄','佐賀県':'九州・沖縄','長崎県':'九州・沖縄','熊本県':'九州・沖縄','大分県':'九州・沖縄','宮崎県':'九州・沖縄','鹿児島県':'九州・沖縄','沖縄県':'九州・沖縄'}
# 物流センタールックアップ情報(キーは地域名)
Logi_Data = {'北海道':'道央物流センター','東北':'東北物流センター','関東':'関東中央物流センター',
             '中部':'甲州物流センター','近畿':'伊丹物流センター','中国':'広島臨港物流センター','四国':'讃岐物流センター','九州・沖縄':'平戸物流センター'}
# 購入ポイント情報(カテゴリ名の順番に設定
Point_Data = [0.02, 0.1, 0.03, 0.02, 0.05]
# 消費税率の設定
Tax_Data = 0.1
#
# SingleStore上に作成するテーブルのカラム定義
DC0 = "id BIGINT AUTO_INCREMENT, ts_SS TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), ts DATETIME(6), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, Logistics VARCHAR(20), "    
DC2 = "Card VARCHAR(40), Number VARCHAR(30), Payment INT, Tax INT, "    
DC3 = "User VARCHAR(20), Zip VARCHAR(10), Prefecture VARCHAR(10), Address VARCHAR(60), Area VARCHAR(10), Tel VARCHAR(15), Email VARCHAR(40), Point INT, "
DC4 = "SHARD KEY (Logistics), PRIMARY KEY(id,Logistics)"
#
# 書き込み用のカラム設定
DL0 = "ts, "                                                     # タイムスタンプ情報
DL1 = "Category, Product, Price, Units, Logistics, "             # ビジネス情報
DL2 = "Card, Number, Payment, Tax, "                             # 支払い情報
DL3 = "User, Zip, Prefecture, Address, Area, Tel, Email, Point"  # 顧客情報
#
Table_Init   = "DROP TABLE IF EXISTS " + Table_Name # テーブル初期化SQL
Table_Create = "CREATE TABLE IF NOT EXISTS " + Table_Name + "(" + DC0 + DC1 + DC2 + DC3 + DC4 + ")" # テーブル作成用SQL
#
# データベースとの接続
#
def Open_DB():

    db = pymysql.connect(host     = SS_Host,
                         port     = SS_Port,
                         user     = SS_User,
                         password = SS_Pass,
                         db       = SS_DB,
                         charset  = SS_Char,
                         cursorclass=pymysql.cursors.DictCursor)

    return(db)

try:

    # Fakerの初期化
    from faker import Faker
    fakegen = Faker('ja_JP')    
    Faker.seed(fakegen.random_digit())
   
    print("検証用テーブル作成開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))

    db = Open_DB() # SingleStoreに接続
                           
    with db.cursor() as cursor:
       
        # 検証作業用テーブルの初期化
        cursor.execute(Table_Init)
        db.commit()
       
        # 新規にテーブルを作成
        cursor.execute(Table_Create)    
        db.commit()

        print("検証用テーブル作成終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))

        print(str(Wait_Time) + "秒後にSingleStore上へのデータの生成を開始します。")
        time.sleep(Wait_Time)

        print("検証用データ生成開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))

        dt_now = datetime.datetime.now() # 時間情報を生成する起点を確保
        
        Loop_Counter = 0 # ループカウンターの初期化
   
        while Loop_Counter < Generate_Data: # 検証データの生成
            
            # ts用のデータを生成
            Sec     = fakegen.random_digit()
            MIL_Sec = fakegen.random_digit()
            MIC_Sec = fakegen.random_digit()
            
            # 現実的なタイムスタンプ情報として利用します(秒単位でデータをズラしてBI等で使い易くする)
            ts_now = dt_now + datetime.timedelta(seconds=Sec, milliseconds=MIL_Sec, microseconds=MIC_Sec)
            dt_now = ts_now # 生成データを次回の起点に変更

            # ランダムに書き込む商材の種類と商品IDを選択
            Category_ID = fakegen.random_digit()
            if Category_ID > 4: Category_ID = Category_ID - 5

            Product_ID = fakegen.random_digit() # 割り当てられたカテゴリ内の商材IDを選択 
           
            Category = Category_Name[Category_ID] # カテゴリ名の設定  
            
            # 物販情報の生成
            if Category_ID == 0: # 酒類
                Product = Product_Name0[Product_ID]
                Price = Product_Price0[Product_ID]
                Units = fakegen.random_digit() + 1  # リアルっぽく調整しています
               
            elif Category_ID == 1: # 家電
                Product = Product_Name1[Product_ID]
                Price = Product_Price1[Product_ID]
                Units = 1  # リアルっぽく調整しています
               
            elif Category_ID == 2: # 書籍
                Product = Product_Name2[Product_ID]
                Price = Product_Price2[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >3: Units = 3  # リアルっぽく調整しています  
                   
            elif Category_ID == 3: # DVD/CD
                Product = Product_Name3[Product_ID]
                Price = Product_Price3[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >2: Units = 2  # リアルっぽく調整しています
                   
            else: # 雑貨
                Product = Product_Name4[Product_ID]
                Price = Product_Price4[Product_ID]
                Units = fakegen.random_digit() + 1
                if Units >4: Units = 4  # リアルっぽく調整しています

            # 支払い総額と消費税とポイントの計算
            Payment = Units * Price
            Tax     = int(Payment * Tax_Data)
            Point   = int(Payment * Point_Data[Category_ID])
               
            # 支払い情報の設定
            if str(fakegen.pybool()) == "True": Card = "現金"
            else:                               Card = fakegen.credit_card_provider()
   
            Number = fakegen.credit_card_number()          
            if Card == "現金": Number = "N/A" # 現金の場合はデータをN/Aに固定
                        
            # 購入者情報の生成
            User    = fakegen.name()
            Zip     = fakegen.zipcode()
            Address = fakegen.address()
            Tel     = fakegen.phone_number()
            Email   = fakegen.ascii_email()
           
            # 都道府県情報の抽出
            pattern = u"東京都|北海道|(?:京都|大阪)府|.{2,3}県"
            m = re.match(pattern , Address)
            if m: Prefecture = m.group()
            
            # 地域名と物流センター名を取得           
            Area = Area_Data.get(Prefecture)
            Logistics = Logi_Data.get(Area)

            # SQLで使用するデータ列の作成
            DV0 = str(ts_now)+"','"
            DV1 = Category + "','" + Product + "','" + str(Price) + "','" + str(Units) + "','" + Logistics + "','"
            DV2 = Card + "','" + Number + "','" + str(Payment) + "','" + str(Tax) + "','"
            DV3 = User + "','" + Zip + "','" + Prefecture + "','" + Address + "','" + Area + "','" + Tel + "','" + str(Email) + "','" + str(Point)

            SQL = "INSERT INTO " + Table_Name +"(" + DL0 + DL1 + DL2 + DL3 + ") VALUES('" + DV0 + DV1 + DV2 + DV3 + "')" # クエリ用のSQLを作成
                       
            # SingleStoreへの書き込み
            cursor.execute(SQL)    
            db.commit()

            # デバッグ用
            #print(SQL)

            if Less_Than_1Sec == Yes: time.sleep(random.random())                           # 0-1で乱数を生成
            else:                     time.sleep(random.randint(Min_Interval,Max_Interval)) # Min-Max間の乱数で次のデータ生成を待機する(適宜調整可)

            Loop_Counter = Loop_Counter + 1  # ループカウンタの更新

except KeyboardInterrupt:
   
    print('!!!!! 割り込み発生 !!!!!')
           
finally:
   
    db.close() # データベースコネクションを閉じる
   
    print("生成したデータの総数 : " + str(Loop_Counter))    
    print("検証用データ作成終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
###################################################################################################

今回の検証では、このツールをコマンドラインで起動して使います。
スクリーンショット 2022-02-01 9.38.41.png

今回のまとめ

今回は、検証に使用するツールを「超シンプル」に紹介させて頂きました。
次回は、前回のPrefect検証スクリプトを使って「雪だるま式開発手法」にて少し込み入ったSQLを複数投げるワークフローに挑戦してみたいと思います。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?