前回は「道具編(1)」という事で、Pythonを使ったCSVファイル生成&転送ツールの前半戦を書かせて頂きました。今回は、その続き&締めとして後半部分を作成していきたいと思います。内容に関しては、いつものNDA(ノン・ダメ出し・アグリーメント)ベースで自由に書き換えてご活用ください。
データ生成のメイン部分
今回のツールでは、
(1)CSVファイル自体の生成総数
(2)各CSVファイルに生成されるなんちゃって物販データの数
を自由に設定出来るようにしてあります。
まずは。各CSVファイルに**「なんちゃって物販データ」**を指定個数書き出す処理を準備します。
#
# 仮想物販情報のDB処理
def Data_Gen(File_Name):
        # 処理に必要なパラメータを取得
        Data_Count = int(Gen_Count.get("1.0", "end"))
        EQ_Test    = int(combo_dict1[cb1.get()])
        Column_1st = int(combo_dict2[cb2.get()])
        Base_Path  = Base_DIR.get("1.0", "end").replace('\n','')
        # CSVファイルの作成準備
        f = open(Base_Path + File_Name, 'w')
        writer = csv.writer(f)
        # 1行目の処理を行う
        if (Column_1st == ON): 
            if (EQ_Test == OFF): writer.writerow(CSV_Column1)
            else:                writer.writerow(CSV_Column2)
            
        # 時間情報を生成する起点を確保
        dt_now = datetime.datetime.now()
        # ループカウンターの初期化
        Loop_Counter = 0
        # 使用するリスト領域を初期化
        CSV_List = []
        TMP_List = []
        # データの生成
        while Loop_Counter < Data_Count:
            # ts用のデータをアプリ側で生成
            Sec     = fakegen.random_digit()
            MIL_Sec = fakegen.random_digit()
            MIC_Sec = fakegen.random_digit()
            # 現実的なタイムスタンプ情報として利用します(秒単位でデータをズラしてBI等で使い易くする)
            ts_now  = dt_now + datetime.timedelta(seconds=Sec, milliseconds=MIL_Sec, microseconds=MIC_Sec)
            dt_now  = ts_now # 生成データを次回の起点に変更
            # ID, タイムスタンプ情報をリスト化
            CSV_List = [Loop_Counter + 1, str(ts_now)]
            TMP_List   = Data_Gen_1()  # 物販系情報の生成とリスト化
            List_Count = len(TMP_List) # 取得したリストのデータ数を抽出
            for i in range(0, List_Count): CSV_List.append(TMP_List[i])
            TMP_List   = Data_Gen_2()  # 決済系情報の生成とリスト化
            List_Count = len(TMP_List) # 取得したリストのデータ数を抽出
            for i in range(0, List_Count): CSV_List.append(TMP_List[i])
            TMP_List   = Data_Gen_3(Area_Dict, Logi_Dict)  # 顧客系情報の生成とリスト化
            List_Count = len(TMP_List)                     # 取得したリストのデータ数を抽出
            for i in range(0, List_Count): CSV_List.append(TMP_List[i])
            # CSVファイルに生成したデータを格納
            writer.writerow(CSV_List)
            # ループカウンタの更新
            Loop_Counter = Loop_Counter + 1
            # 処理ステータスの更新   オプションが選択されていればコンソールに生成データを表示
            if (Loop_Counter % 10) == 0: print("途中経過: " + str(Loop_Counter) + " 個目のデータ作成を終了。")
        # 作成したCSVファイルを閉じる
        f.close()
        # 作成したデータ数を戻す
        return(Loop_Counter)
生成されるCSVファイルは、後々Equalumのレプリケーション(O社の金門橋より性能が良いと言われている・・・(汗))機能の検証等で使えるように、ファイルの頭文字を共通化して後半部分を**「被らないように生成時間情報をタネにハッシュ情報を生成し、その情報を頭部分に繋げた形」**のファイル名としています。
ここは、現実的にはもっと短縮して記述出来るかと思いますが、解り易くするために1行ずつ区切って書きました。
#
# ファイル名をユニークにする為の文字列情報を生成
def Hash_Data():
        # この時点での日付・時間情報を文字列で取得
        Ini_Data = str(datetime.datetime.now())
        # ハッシュ値を生成
        Hash_Data = hashlib.md5(Ini_Data.encode()).hexdigest()
        # 作成したハッシュ値を戻す
        return (Hash_Data)
この部分は、指定された個数分のCSVファイルを作成し、SFTPセッションで指定されたリモート側(今回はEqualumの上流側MySQL)に転送、その転送処理が終わった後に自動的にローカルに生成されたCSVファイルを掃除する仕組みを組み込みました。
#
# メインのCSVファイル作成処理
def CSV_Make():
        # 必要な情報を取得する(末尾の改行コードを削除しておく)
        CSV_File  = CSV_FIL.get("1.0", "end").replace('\n','')
        Base_Path = Base_DIR.get("1.0", "end").replace('\n','')
        EQ_Path   = EQ_DIR.get("1.0", "end").replace('\n','')
        # 生成するファイルの総数
        Loop_Count = int(File_No.get("1.0", "end"))
        # プログレスバーの最大値をファイル総数に設定
        progressbar1.configure(maximum=Loop_Count)
        # 処理回数カウンターの初期化
        Counter = 0
        #paramikoインスタンスを生成
        client = paramiko.SSHClient()
        client.load_system_host_keys()
        client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        client.connect(hostname=host, username=host_username, password=host_password, timeout=10, look_for_keys=False)
        # 指定された回数分のCSVファイルを作成しEQ側へ転送する
        while Counter < Loop_Count:
                # 生成ファイル固有のIDを生成時間をハッシュ化した文字列で作る
                File_ID = "_" + Hash_Data()
                # 作成するファイル名の作成(レプリケーション等で使いやすい様に固定文字列を含める)
                File_Name = CSV_File + File_ID + ".csv"
                # 物販系のデータを架空生成してCSVファイルを生成
                print(str(Counter + 1) + "番目のファイル " + File_Name + " に" + str(Data_Gen(File_Name)) + "個のデータを作成しました")
                # 必要なパス情報を作成
                Local_Path  = Base_Path + File_Name
                Remote_Path = EQ_Path   + File_Name
                # SFTPセッション開始
                sftp_connection = client.open_sftp()
 
                # ローカルPCからリモートサーバーへファイルを転送
                sftp_connection.put(Local_Path, Remote_Path)
                # 時間調整
                time.sleep(Wait_Time)
        
                print("リモート側にファイルの転送を行いました")
                # プログレスバーの処理
                progressbar1.after(Delay, var_start1(Counter+1))
                progressbar1.update()
                # 処理カウンターを進める
                Counter = Counter + 1
        # リモート転送処理を終了する
        client.close()
        # ローカル側の生成ファイルを整理する
        Remove_Files = Base_Path + "*.csv"
        for file in glob.glob(Remove_Files): os.remove(file)
        # ステータスバーに処理終了を表示
        statusbar["text"] = " " + GEN_Message
        return(Counter)
最後にGUI部分を記述します・・・
ここは、毎度同じみの必要分だけ順番に記述する・・・・しかし!ある意味で一番時間が掛かる(位置合わせ等で・・)作業になるかと思います。
書き方自体は特にトリッキーな書き方ではないかと思いますので、適宜書き換えてご活用頂ければと思います。
#
# GUI・メイン部分
#
# 生成するCSVファイルの個数
label1 = tk.Label(root, text = "生成するCSVファイル総数")
label1.place(x = 20, y = 20)
File_No = tk.Text(root, width = 15, height = 1)
File_No.place(x = 200, y = 20)
File_No.insert(tk.END,"5")
# 生成する時間形式の選択
label2 = tk.Label(root, text = "1ファイルのデータ数")
label2.place(x = 20, y = 50)
Gen_Count = tk.Text(root, width = 15, height = 1)
Gen_Count.place(x = 200, y = 50)
Gen_Count.insert(tk.END,"5")
# EqualumのFLOW連携(派生カラム作成)を選択
label3 = tk.Label(root, text = "Equalum連携の選択")
label3.place(x = 20, y = 80)
combo_dict1 = {"Equalumと連携する": "0", "Equalumと連携しない": "1"}
cb1 = ttk.Combobox(root, values = list(combo_dict1.keys()), state = "readonly")
cb1.place(x = 200, y = 80)
cb1.current(0)
# EqualumのFLOW連携(派生カラム作成)を選択
label4 = tk.Label(root, text = "カラム情報の取り扱い")
label4.place(x = 20, y = 110)
combo_dict2 = {"1行目に書き込む": "0", "カラム情報は不要": "1"}
cb2 = ttk.Combobox(root, values = list(combo_dict2.keys()), state = "readonly")
cb2.place(x = 200, y = 110)
cb2.current(0)
# 生成先の基準ディレクトリ
label5 = tk.Label(root, text = "生成先のディレクトリ")
label5.place(x = 20, y = 140)
Base_DIR = tk.Text(root, width = 25, height = 1)
Base_DIR.place(x = 200, y = 140)
Base_DIR.insert(tk.END,"/Users/apple/CSV/")
# CSVファイルの識別文字列
label6 = tk.Label(root, text = "CSVファイルの識別文字列")
label6.place(x = 20, y = 170)
CSV_FIL = tk.Text(root, width = 25, height = 1)
CSV_FIL.place(x = 200, y = 170)
CSV_FIL.insert(tk.END,"CSV")
# CSVファイルの転送先ディレクトリ
label7 = tk.Label(root, text = "CSVファイルの転送先")
label7.place(x = 20, y = 200)
EQ_DIR = tk.Text(root, width = 25, height = 1)
EQ_DIR.place(x = 200, y = 200)
EQ_DIR.insert(tk.END,"/var/csv/")
# ターゲットに生成するテーブル名
label8 = tk.Label(root, text = "ターゲットのテーブル名")
label8.place(x = 20, y = 230)
TBL_Name = tk.Text(root, width = 25, height = 1)
TBL_Name.place(x = 200, y = 230)
TBL_Name.insert(tk.END,"EQ_CSV_Demo_Table")
# ターゲット・データベース上に格納テーブルを作成
TGT_Button = tk.Button(root, text = "ターゲット側のテーブルを初期化・作成", command = TGT_Table)
TGT_Button.place(x = 20, y = 270)
# CSVファイルを生成・転送
CSV_Button = tk.Button(root, text = "CSVファイルを作成・転送", command = CSV_Make)
CSV_Button.place(x = 20, y = 310)
# 処理プログレスバーの設定
label16 = tk.Label(root, text = "CSVファイル作成状況 : ")
label16.place(x = 20, y = 360)
progressbar1 = ttk.Progressbar(root, orient="horizontal", length=200, mode="determinate", style="Orange.Horizontal.TProgressbar")
progressbar1.pack()
maximum_bar = 20
value_bar = 0
div_bar = 1
progressbar1.configure(maximum=maximum_bar, value=value_bar)
progressbar1.place(x = 170, y = 365)
# 終了ボタン設置
exit_button = tk.Button(root, text = "  閉じる  ", command = Exit_Tool)
exit_button.place(x = 270, y = 400)
# ステータスバー設置
statusbar = tk.Label(root, text =  " 作業準備完了! ", bd = 1, relief = tk.SUNKEN, anchor = tk.W)
statusbar.pack(side = tk.BOTTOM, fill = tk.X)
# イベントループを作成
root.mainloop() 
#
# 終了後の処理
#
print("------------------------------------------------")
print(datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") + " : 検証用のCSVファイルが出来ました")
print("******************* 処理終了 *******************")
print("------------------------------------------------")
#########################################################################################
今回のまとめ
今回は、この検証で使用するPythonベースの**「なんちゃって物販データCSVファイル生成ツール」**の残りの部分を紹介させて頂きました。
次回はいよいよEqualumを使ったファイルシステム機能活用のCSVファイルストリーミング処理を実際に検証してみます。