以前の検証以降・・・・
EqualumやSingleStore関連の投稿をしていると、データ生成部分だけ下さい!というお話を頂いたりする事が増え、また確かに良く考えると、有れば有ったで何かのお役に立てるかもしれない・・と思い立ち、データ生成部分に少し汎用性を持たせたバージョンを作成してみました。
まずは冒頭部分のカオスを修正します
大量に定義されていたメタデータ系情報をCSVファイルで作成し、それを別ファイルとして読み込む形にしたいと思います。
基本的には
import csv
with open('CSV_File', 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
for row in reader:
print(row)
の定番パターンをベースに実装します。
少し工夫が必要なのは、辞書形式にデータを準備する場合で、今回は以下のパターンをベースにしてみました。
import csv
Dict_Data = {}
with open(CSV_File, 'r', encoding='utf_8_sig') as f:
reader = csv.reader(f)
Dict_Data = {rows[0]:rows[1] for rows in reader}
print(Dict_Data)
さて出来上がったのが・・・
CSVのカラムに合わせて修正して頂けば、ほぼ問題無く動くかと思います。また、今回のバージョンでは、基本的に以前の検証で使っていた、カテゴリ数5とかカテゴリ毎の商品数10個・・といった制限は外してあります(上の方の数字で調整してください)。また、これに伴って乱数のネタ元をnumpyに変更しています。
# coding: utf-8
#
# 日本語版BIG DATA Generator
#
# Python 3版
#
# 初期設定
import sys
stdout = sys.stdout
sys.stdout = stdout
import datetime
import pymysql.cursors
import re
import csv
import numpy as np
from numpy.random import *
# テーブル名
Table_Name = "BIGDATA_TEST0_Table"
# 書き込み用のカラム設定
DL0 = "ts, " # タイムスタンプ情報
DL1 = "Category, Product, Price, Units, Logistics, " # ビジネス情報
DL2 = "Card, Number, Payment, Tax, " # 支払い情報
DL3 = "User, Zip, Prefecture, Address, Area, Tel, Email, Point" # 顧客情報
# 生成するデータの数
Generate_Data = 1000000
# 途中経過のチェックポイント設定
Check_Point = 10
# 途中経過で生成されたSQL文を表示するか否か(0:表示しない 1:表示する)
Display_SQL = 0
# CSVデータが5カラム毎のフォーマットなので5個飛びになります(必要に応じて変更)
ID_Gap = 5
# 商品データの行数を定義(50個定義の場合は0−49で定義します)
Data_Start = 0 # データのスタート行数(原則的に0を指定)
Data_End = 49 # 投入データの総数から1を引いた数を指定)
# 消費税率の設定(整数でパーセント表記)
Tax_Data = 10
# 辞書のID(参照辞書を作成する際に利用)
Area_List = 1
Logi_List = 2
#
# データをランダムに行番号基準で選択して必要情報を返す
#
# 読み込んだCSVデータ列からランダムに商品情報を選択してデータを返します。
# 今回の改修で、読み込むCSVデータの行情報に対して、ランダムに0から始まる
# データ番号でアクセスする形にしましたので、とりあえず選択対象を作成する!というノリで
# CSVファイルを用意されればOKかと。
#
def Pick_Up_Item(start, end, Data):
RET_Data = []
ID = randint(start,end) * ID_Gap
RET_Data.append(Data[ID]) # カテゴリ名
RET_Data.append(Data[ID + 1]) # 商品名
RET_Data.append(Data[ID + 2]) # 価格
RET_Data.append(Data[ID + 3]) # ポイント率
RET_Data.append(Data[ID + 4]) # 販売調整数(リアルっぽく見せる為)
return(RET_Data)
#
# メタデータをCSV形式で読み込んで準備を行う
#
# CSVのフォーマット カテゴリー、 製品名、価格、 ポイント率、 販売調整係数
#
def CSV_Data_Set1(CSV_File):
RET_Data = []
with open(CSV_File, 'r', encoding='utf-8-sig') as f:
reader = csv.reader(f)
i = 0
for row in reader:
RET_Data.append(row[i]) # カテゴリ名
RET_Data.append(row[i + 1]) # 商品名
RET_Data.append(row[i + 2]) # 価格
RET_Data.append(row[i + 3]) # ポイント率
RET_Data.append(row[i + 4]) # 販売調整数(リアルっぽく見せる為)
return(RET_Data)
#
# 物流センターと地域名の辞書データをCSVファイルから作成する
#
# CSVのフォーマット 県名、 地域名、 物流センター名
#
def CSV_Data_Set2(CSV_File, List_ID):
RET_Data = {}
with open(CSV_File, 'r', encoding='utf_8_sig') as f:
reader = csv.reader(f)
if List_ID == Area_List:
RET_Data = {rows[0]:rows[1] for rows in reader} # 地域名辞書の作成
else:
RET_Data = {rows[0]:rows[2] for rows in reader} # 配送センター辞書の作成
return(RET_Data)
#
# 此処からメインの処理になります
#
try:
print("SingleStore上へのデータの生成を開始します。")
print ("処理の開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
# Fakerの初期化
from faker import Faker
fakegen = Faker('ja_JP')
Faker.seed(fakegen.random_digit())
# 商品データをCSVファイルから読み込む
Product_Data = CSV_Data_Set1('Product_Data.csv')
# 地域名・配送センター名辞書をCSVフィルから作成
Area_Data = CSV_Data_Set2('Logi_Data.csv', Area_List) # 地域名辞書
Logi_Data = CSV_Data_Set2('Logi_Data.csv', Logi_List) # 物流センター名辞書
# SingleStoreとの接続
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='xxxxxxxx',
password='xxxxxxx',
db='xxxxxxx',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
with db.cursor() as cursor:
# 時間情報を生成する起点を確保
dt_now = datetime.datetime.now()
# ループカウンターの初期化
Loop_Counter = 0
# 検証データの生成
while Loop_Counter < Generate_Data:
# ts用のデータを生成
Sec = fakegen.random_digit()
MIL_Sec = fakegen.random_digit()
MIC_Sec = fakegen.random_digit()
# 現実的なタイムスタンプ情報として利用します(秒単位でデータをズラしてBI等で使い易くする)
ts_now = dt_now + datetime.timedelta(seconds=Sec, milliseconds=MIL_Sec, microseconds=MIC_Sec)
dt_now = ts_now # 生成データを次回の起点に変更
# データをランダムに選択する(投入データ数は引数で定義可能)
Select_Data = Pick_Up_Item(Data_Start, Data_End, Product_Data)
# 所得した各情報を設定
Category = Select_Data[0] # カテゴリ名
Product = Select_Data[1] # 商品名
Price = Select_Data[2] # 価格
Point_Rate = Select_Data[3] # ポイント換算率
Real_Rate = Select_Data[4] # 販売数調整係数
# 購入数を設定(家電は原則的に1個単位(乾電池を除く))
if Real_Rate == '1':
Units = 1
else:
Units = randint(1, Real_Rate)
# ポイントの計算を行いデータを設定
Point = int(Price) * Units * int(Point_Rate) / 100
# 支払い情報の設定
if str(fakegen.pybool()) == "True":
Card = "現金"
else:
Card = fakegen.credit_card_provider()
Number = fakegen.credit_card_number()
if Card == "現金": Number = "N/A"
# 支払い総額と消費税の設定
Payment = Units * int(Price)
Tax = Payment * Tax_Data / 100
# 購入者情報の設定
User = fakegen.name()
Zip = fakegen.zipcode()
Address = fakegen.address()
Tel = fakegen.phone_number()
Email = fakegen.ascii_email()
# 都道府県情報の抽出
pattern = u"東京都|北海道|(?:京都|大阪)府|.{2,3}県"
m = re.match(pattern , Address)
if m:
Prefecture = m.group()
# 地域名と物流センター名を県名から設定
Area = Area_Data.get(Prefecture)
Logistics = Logi_Data.get(Prefecture)
# SQLで使用するデータ列の作成
DV0 = str(ts_now)+"','"
DV1 = Category + "','" + Product + "','" + str(Price) + "','" + str(Units) + "','" + Logistics + "','"
DV2 = Card + "','" + Number + "','" + str(Payment) + "','" + str(Tax) + "','"
DV3 = User + "','" + Zip + "','" + Prefecture + "','" + Address + "','" + Area + "','" + Tel + "','" + str(Email) + "','" + str(Point)
SQL = "INSERT INTO " + Table_Name +"(" + DL0 + DL1 + DL2 + DL3 + ") VALUES('" + DV0 + DV1 + DV2 + DV3 + "')"
# データベースへの書き込み
cursor.execute(SQL)
db.commit()
# コンソールに生成データを表示
if Display_SQL == 1: print (SQL)
# ループカウンタの更新
Loop_Counter = Loop_Counter + 1
# データの作成状況を表示
if (Loop_Counter % 10) == 0: print("途中経過: " + str(Loop_Counter) + " 個目のデータ作成を終了")
except KeyboardInterrupt:
print('!!!!! 割り込み発生 !!!!!')
finally:
# データベースコネクションを閉じる
db.close()
# 処理時間を表示して終了
print("今回生成したデータの総数 : " + str(Loop_Counter))
print("SingleStore上への指定されたデータ生成が終了しました。")
print ("処理の終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
因みに、データテーブルを作成する部分は以前のものと同じです。
# coding: utf-8
#
# 日本語版BIG DATA Generator
#
# Python 3版
#
#
# 初期設定
import sys
stdout = sys.stdout
sys.stdout = stdout
import pymysql.cursors
import datetime
# テーブル名
Table_Name = "BIGDATA_TEST0_Table"
# テーブル初期化
Table_Init = "DROP TABLE IF EXISTS " + Table_Name
# テーブル定義
# DC0 = "id BIGINT AUTO_INCREMENT, ts TIMESTAMP(6) DEFAULT NOW(), "
DC0 = "id BIGINT AUTO_INCREMENT, ts TIMESTAMP(6), "
DC1 = "Category VARCHAR(20), Product VARCHAR(20), Price INT, Units INT, Logistics VARCHAR(20), "
DC2 = "Card VARCHAR(40), Number VARCHAR(30), Payment INT, Tax INT, "
DC3 = "User VARCHAR(20), Zip VARCHAR(10), Prefecture VARCHAR(10), Address VARCHAR(60), Area VARCHAR(10), Tel VARCHAR(15), Email VARCHAR(40), Point INT, "
DC4 = "SHARD KEY (Area), PRIMARY KEY(id, Area)"
try:
print("SingleStore上へのテーブル作成処理を開始")
print ("処理の開始 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
# SingleStoreに接続
db = pymysql.connect(host = 'xxx.xxx.xxx.xxx',
port=3306,
user='xxxxxxxx',
password='xxxxxxxx',
db='xxxxxxxx',
charset='utf8',
cursorclass=pymysql.cursors.DictCursor)
# デモ用のテーブルの作成
Table_Create = "CREATE TABLE IF NOT EXISTS " + Table_Name + "(" + DC0 + DC1 + DC2 + DC3 + DC4 + ")"
with db.cursor() as cursor:
# 既存テーブルの初期化
cursor.execute(Table_Init)
db.commit()
# 新規にテーブルを作成
cursor.execute(Table_Create)
db.commit()
except KeyboardInterrupt:
# 割り込み処理への対応
print('!!!!! 割り込み発生 !!!!!')
finally:
# データベースコネクションを閉じる
db.close()
# 処理時間を表示して終了
print("SingleStore上へのテーブル作成処理が終了")
print ("処理の終了 : " + datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S"))
CSVファイルの状況
CSVファイルをExcel等で適宜作成します。今回のカラム構造は
(A)商材カテゴリ
(B)商品名
(C)価格
(D)ポイント率(整数でパーセント数)
(E)販売調整係数(1回の最大販売数)
になります。
この部分はPython内のパラメータと辻褄が合えば、変更利用が可能なように(多分・・(汗))作り変えましたので、いろいろなパターンでお使い頂けるかと思います(それに合わせてSQL定義文も忘れずに変更しておいてください)
地域名とそのエリアを配送するセンター名の参照辞書用CSVも同じ様に作成します。
同様に今回のカラム構造は
(A)県名
(B)地域名
(C)物流センター名
になります。
県名をキーに辞書参照を行う形で処理を行います。
こちらも前述の商品系情報と同様に、柔軟な変更が可能ですので、随時必要な形に変更しPython側と辻褄を合わせてお使いください。
実行してみました・・・
これらのファイルを一つのディレクトリに格納して、いつものANACONDA環境で実行した結果が以下の通りです。
今回の纏め
今回は、今まで検証で使ってきたツールの整理整頓を行ってみました。出来るだけ横展開出来るように汎用性を持たせたつもりですが、引き続き改良していきたいと思います。
また特に、前回まで連続してご紹介してきた、MySQL互換で、マシン語レベルでSQL処理を実装している、”インメモリDBであるSingleStore環境での、動作検証やパフォーマンス確認等で使って頂ければ幸いです。