1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

今回はデータ生成ツールの話・・

Last updated at Posted at 2021-03-14

以前の検証以降・・・・

EqualumSingleStore関連の投稿をしていると、データ生成部分だけ下さい!というお話を頂いたりする事が増え、また確かに良く考えると、有れば有ったで何かのお役に立てるかもしれない・・と思い立ち、データ生成部分に少し汎用性を持たせたバージョンを作成してみました。

まずは冒頭部分のカオスを修正します

大量に定義されていたメタデータ系情報をCSVファイルで作成し、それを別ファイルとして読み込む形にしたいと思います。

基本的には


import csv

with open('CSV_File', 'r', encoding='utf-8-sig') as f:
    reader = csv.reader(f)
    for row in reader:
        print(row)

の定番パターンをベースに実装します。
少し工夫が必要なのは、辞書形式にデータを準備する場合で、今回は以下のパターンをベースにしてみました。


import csv

Dict_Data = {}

with open(CSV_File, 'r', encoding='utf_8_sig') as f:
    reader = csv.reader(f)
    Dict_Data = {rows[0]:rows[1] for rows in reader}
    print(Dict_Data)

さて出来上がったのが・・・

CSVのカラムに合わせて修正して頂けば、ほぼ問題無く動くかと思います。また、今回のバージョンでは、基本的に以前の検証で使っていた、カテゴリ数5とかカテゴリ毎の商品数10個・・といった制限は外してあります(上の方の数字で調整してください)。また、これに伴って乱数のネタ元をnumpyに変更しています。


# coding: utf-8
#
# 日本語版BIG DATA Generator
#
# Python 3版
#

#  初期設定
import sys
stdout = sys.stdout
sys.stdout = stdout

import datetime
import pymysql.cursors
import re

import csv

import numpy as np
from numpy.random import *

# テーブル名
Table_Name = "BIGDATA_TEST0_Table"

# 書き込み用のカラム設定
DL0 = "ts, " # タイムスタンプ情報
DL1 = "Category, Product, Price, Units, Logistics, "  # ビジネス情報
DL2 = "Card, Number, Payment, Tax, "  # 支払い情報
DL3 = "User, Zip, Prefecture, Address, Area, Tel, Email, Point"  # 顧客情報

# 生成するデータの数
Generate_Data = 1000000

# 途中経過のチェックポイント設定
Check_Point = 10

# 途中経過で生成されたSQL文を表示するか否か(0:表示しない 1:表示する)
Display_SQL = 0

# CSVデータが5カラム毎のフォーマットなので5個飛びになります(必要に応じて変更)
ID_Gap = 5

# 商品データの行数を定義(50個定義の場合は0−49で定義します)
Data_Start = 0 # データのスタート行数(原則的に0を指定)
Data_End = 49 # 投入データの総数から1を引いた数を指定)

# 消費税率の設定(整数でパーセント表記)
Tax_Data = 10

# 辞書のID(参照辞書を作成する際に利用)
Area_List = 1
Logi_List = 2

#
# データをランダムに行番号基準で選択して必要情報を返す
#
# 読み込んだCSVデータ列からランダムに商品情報を選択してデータを返します。
# 今回の改修で、読み込むCSVデータの行情報に対して、ランダムに0から始まる
# データ番号でアクセスする形にしましたので、とりあえず選択対象を作成する!というノリで
# CSVファイルを用意されればOKかと。
#
def Pick_Up_Item(start, end, Data):
    
    RET_Data = []
    
    ID = randint(start,end) * ID_Gap
    
    RET_Data.append(Data[ID]) # カテゴリ名
    RET_Data.append(Data[ID + 1]) # 商品名
    RET_Data.append(Data[ID + 2]) # 価格
    RET_Data.append(Data[ID + 3]) # ポイント率
    RET_Data.append(Data[ID + 4]) # 販売調整数(リアルっぽく見せる為)

    return(RET_Data)

#
# メタデータをCSV形式で読み込んで準備を行う
#
# CSVのフォーマット      カテゴリー、 製品名、価格、 ポイント率、 販売調整係数
#
def CSV_Data_Set1(CSV_File):
    
    RET_Data = []
    
    with open(CSV_File, 'r', encoding='utf-8-sig') as f:
        
        reader = csv.reader(f)
        
        i = 0
        
        for row in reader:
            
            RET_Data.append(row[i]) #  カテゴリ名
            RET_Data.append(row[i + 1]) # 商品名
            RET_Data.append(row[i + 2]) # 価格
            RET_Data.append(row[i + 3]) # ポイント率
            RET_Data.append(row[i + 4]) # 販売調整数(リアルっぽく見せる為)
        
    return(RET_Data)

#
# 物流センターと地域名の辞書データをCSVファイルから作成する
#
# CSVのフォーマット      県名、 地域名、 物流センター名
#
def CSV_Data_Set2(CSV_File, List_ID):
    
    RET_Data = {}
    
    with open(CSV_File, 'r', encoding='utf_8_sig') as f:

        reader = csv.reader(f)
        
        if List_ID == Area_List:
            RET_Data = {rows[0]:rows[1] for rows in reader} # 地域名辞書の作成
        else:
            RET_Data = {rows[0]:rows[2] for rows in reader} # 配送センター辞書の作成
    
    return(RET_Data)

#
# 此処からメインの処理になります
#

try:
   
    print("SingleStore上へのデータの生成を開始します。")
    print ("処理の開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
    
    # Fakerの初期化
    from faker import Faker
    fakegen = Faker('ja_JP')    
    Faker.seed(fakegen.random_digit())
    
    # 商品データをCSVファイルから読み込む
    Product_Data = CSV_Data_Set1('Product_Data.csv')
    
    # 地域名・配送センター名辞書をCSVフィルから作成
    Area_Data = CSV_Data_Set2('Logi_Data.csv', Area_List) # 地域名辞書
    Logi_Data = CSV_Data_Set2('Logi_Data.csv', Logi_List) # 物流センター名辞書
      
    # SingleStoreとの接続
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='xxxxxxxx',
                         password='xxxxxxx',
                         db='xxxxxxx',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor) 
   
    with db.cursor() as cursor:
        
        # 時間情報を生成する起点を確保
        dt_now = datetime.datetime.now()
        
        # ループカウンターの初期化
        Loop_Counter = 0
   
        # 検証データの生成
        while Loop_Counter < Generate_Data:
            
            # ts用のデータを生成
            Sec = fakegen.random_digit()
            MIL_Sec = fakegen.random_digit()
            MIC_Sec = fakegen.random_digit()
            
            # 現実的なタイムスタンプ情報として利用します(秒単位でデータをズラしてBI等で使い易くする)
            ts_now = dt_now + datetime.timedelta(seconds=Sec, milliseconds=MIL_Sec, microseconds=MIC_Sec)
            dt_now = ts_now # 生成データを次回の起点に変更
            
            # データをランダムに選択する(投入データ数は引数で定義可能)
            Select_Data = Pick_Up_Item(Data_Start, Data_End, Product_Data)  
            
            # 所得した各情報を設定
            Category = Select_Data[0] # カテゴリ名
            Product = Select_Data[1] # 商品名            
            Price = Select_Data[2] # 価格            
            Point_Rate = Select_Data[3] # ポイント換算率            
            Real_Rate = Select_Data[4] # 販売数調整係数
            
            # 購入数を設定(家電は原則的に1個単位(乾電池を除く))
            if Real_Rate == '1':
                Units = 1
            else:
                Units = randint(1, Real_Rate)
            
            # ポイントの計算を行いデータを設定
            Point = int(Price) * Units * int(Point_Rate)  /  100
               
            # 支払い情報の設定
            if str(fakegen.pybool()) == "True":
                Card = "現金"
            else:
                Card = fakegen.credit_card_provider()
   
            Number = fakegen.credit_card_number()              
            if Card == "現金":    Number = "N/A"
           
            # 支払い総額と消費税の設定
            Payment = Units * int(Price)
            Tax = Payment * Tax_Data / 100
             
            # 購入者情報の設定
            User = fakegen.name()
            Zip = fakegen.zipcode()
            Address = fakegen.address()
            Tel = fakegen.phone_number()
            Email = fakegen.ascii_email()
           
            # 都道府県情報の抽出
            pattern = u"東京都|北海道|(?:京都|大阪)府|.{2,3}県"
            m = re.match(pattern , Address)
            if m:
                Prefecture = m.group()
            
            # 地域名と物流センター名を県名から設定 
            Area = Area_Data.get(Prefecture)
            Logistics = Logi_Data.get(Prefecture)
                   
            # SQLで使用するデータ列の作成
            DV0 = str(ts_now)+"','"
            DV1 = Category + "','" + Product + "','" + str(Price) + "','" + str(Units) + "','" + Logistics + "','"
            DV2 = Card + "','" + Number + "','" + str(Payment) + "','" + str(Tax) + "','"
            DV3 = User + "','" + Zip + "','" + Prefecture + "','" + Address + "','" + Area + "','" + Tel + "','" + str(Email) + "','" + str(Point)
       
            SQL = "INSERT INTO " + Table_Name +"(" + DL0 + DL1 + DL2 + DL3 + ") VALUES('" + DV0 + DV1 + DV2 + DV3 + "')"
                       
            # データベースへの書き込み
            cursor.execute(SQL)    
            db.commit()
            
            #  コンソールに生成データを表示
            if Display_SQL == 1:    print (SQL)
       
            # ループカウンタの更新
            Loop_Counter = Loop_Counter + 1
            
            # データの作成状況を表示
            if (Loop_Counter % 10) == 0:     print("途中経過: " + str(Loop_Counter) + " 個目のデータ作成を終了")

except KeyboardInterrupt:
   
    print('!!!!! 割り込み発生 !!!!!')
    
finally:
   
    # データベースコネクションを閉じる
    db.close()

    # 処理時間を表示して終了 
    print("今回生成したデータの総数 : " + str(Loop_Counter))    
    print("SingleStore上への指定されたデータ生成が終了しました。")
    print ("処理の終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
  

因みに、データテーブルを作成する部分は以前のものと同じです。

# coding: utf-8
#
# 日本語版BIG DATA Generator
#
# Python 3版
#
#

#  初期設定
import sys
stdout = sys.stdout
sys.stdout = stdout

import pymysql.cursors
import datetime

# テーブル名
Table_Name = "BIGDATA_TEST0_Table"

# テーブル初期化
Table_Init = "DROP TABLE IF EXISTS " + Table_Name
   
# テーブル定義
# DC0 = "id BIGINT AUTO_INCREMENT, ts TIMESTAMP(6) DEFAULT NOW(), "
DC0 = "id BIGINT AUTO_INCREMENT, ts TIMESTAMP(6), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, Logistics VARCHAR(20), "    
DC2 = "Card VARCHAR(40), Number VARCHAR(30), Payment INT, Tax INT, "    
DC3 = "User VARCHAR(20), Zip VARCHAR(10), Prefecture VARCHAR(10), Address VARCHAR(60), Area VARCHAR(10), Tel VARCHAR(15), Email VARCHAR(40), Point INT, "
DC4 = "SHARD KEY (Area), PRIMARY KEY(id,  Area)"

try:
   
    print("SingleStore上へのテーブル作成処理を開始")
    print ("処理の開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))

    # SingleStoreに接続
    db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
                         port=3306,
                         user='xxxxxxxx',
                         password='xxxxxxxx',
                         db='xxxxxxxx',
                         charset='utf8',
                         cursorclass=pymysql.cursors.DictCursor)
        
   
    # デモ用のテーブルの作成  
    Table_Create = "CREATE TABLE IF NOT EXISTS " + Table_Name + "(" + DC0 + DC1 + DC2 + DC3 + DC4 + ")"  
                           
    with db.cursor() as cursor:
       
        # 既存テーブルの初期化
        cursor.execute(Table_Init)
        db.commit()
       
        # 新規にテーブルを作成
        cursor.execute(Table_Create)    
        db.commit()

except KeyboardInterrupt:

    # 割り込み処理への対応   
    print('!!!!! 割り込み発生 !!!!!')
           
finally:
   
    # データベースコネクションを閉じる
    db.close()

    # 処理時間を表示して終了  
    print("SingleStore上へのテーブル作成処理が終了")
    print ("処理の終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))

CSVファイルの状況

CSVファイルをExcel等で適宜作成します。今回のカラム構造は
(A)商材カテゴリ
(B)商品名
(C)価格
(D)ポイント率(整数でパーセント数)
(E)販売調整係数(1回の最大販売数)
になります。

この部分はPython内のパラメータと辻褄が合えば、変更利用が可能なように(多分・・(汗))作り変えましたので、いろいろなパターンでお使い頂けるかと思います(それに合わせてSQL定義文も忘れずに変更しておいてください)

スクリーンショット 2021-03-14 12.44.56.png

地域名とそのエリアを配送するセンター名の参照辞書用CSVも同じ様に作成します。
同様に今回のカラム構造は
(A)県名
(B)地域名
(C)物流センター名
になります。
県名をキーに辞書参照を行う形で処理を行います。

こちらも前述の商品系情報と同様に、柔軟な変更が可能ですので、随時必要な形に変更しPython側と辻褄を合わせてお使いください。

スクリーンショット 2021-03-14 14.07.58.png

実行してみました・・・

これらのファイルを一つのディレクトリに格納して、いつものANACONDA環境で実行した結果が以下の通りです。

スクリーンショット 2021-03-14 13.14.25.png

今回の纏め

今回は、今まで検証で使ってきたツールの整理整頓を行ってみました。出来るだけ横展開出来るように汎用性を持たせたつもりですが、引き続き改良していきたいと思います。
また特に、前回まで連続してご紹介してきた、MySQL互換で、マシン語レベルでSQL処理を実装している、”インメモリDBであるSingleStore環境での、動作検証やパフォーマンス確認等で使って頂ければ幸いです。

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?