0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ろうとるがPythonを扱う、、(その14:ping結果をCSVファイル化)

Posted at

ろうとるの頭の活性化

タイトルのとおり、Pythonにてping実行結果をCSVファイル化するもの。tkinter上で画面表示も行う。内容も平凡、相変わらず、美しくないプログラムであるが、新たに気がついた事項もあるので、備忘録として残す。

最初に実行結果

SimplePing(1回の実行で規定回数のpingを実施)の結果。後述するPythonのicmplibのpingを忠実に使うもの。成功回数だけではなく、平均RTTなどを表示)の結果は下記となる。表示もCSV化している。
スクリーンショット 2023-08-17 20.18.24.png
RealtimePing(規定回数のpingを実行かつリアルタイムに結果を表示)の結果は下記となる。icmplibのpingを規定回数呼び出すものである。
スクリーンショット 2023-08-17 20.29.07.png

ソースコード

主な点をコメント。

import

import logging
import logging.handlers
from datetime import datetime
import time
from tkinter import *
import tkinter.ttk as ttk
from icmplib import ping
import threading
  • icmplibよりpingを利用。
  • tkinter周辺のimportを最低限化すると上記のようになり、今までの小生の記事より、若干だが周辺コード量が減少。
  • tkinterでの表示をリアルタイムに行うため、スレッドの利用。

定義

w_width = 800
w_height = 400

ping_type = ['SinglePing', 'RealtimePing']
param = [' Address ', ' Count ', ' Interval ', ' Timeout ', ' Size '] # for input box
param_default = ['8.8.8.8', '4', '1', '3', '64'] # Address, Count, Interval(s), Timeout(s), Payload size(byte)
p_width = [20, 8, 5, 5, 5] # width for parameter input box
header = ['Time, Address, Count, Interval, Timeout, Payload, Avg_RTT(ms), Min_RTT(ms), \
Max_RTT(ms), Jitter(ms), Loss, Sent, Rcvd \t\t\t\t', \
          'Count, Time, Address, Timeout, Payload, RTT(ms)/Down \t\t\t\t\t\t\t\t\t\t\t\t\t\t'] # Should be improved
  • 変数(アドレス、繰り返し回数、間隔、タイムアウト、ペイロードサイズ)の定義およびデフォルト値の定義。
  • CSVや表示用ヘッダの定義。
  • tkinterでのgrid利用による画面調整のため、タブ”\t”を多用しているが、美しくない。

メイン+ExecPingクラス

class ExecPing(ttk.Frame): # Only 'Frame' works.
    def __init__(self, master):
        super().__init__(master)
        self.create_widgets()
        self.pack()

    def create_widgets(self):
...()

        ## Start of create_widgets()
        # Log
        num = len(ping_type)
        log = [logging.getLogger(ping_type[i]) for i in range(num)]
        [log[i].setLevel(logging.DEBUG) for i in range(num)]
        fh = [logging.FileHandler(ping_type[i]+'_'+'{:%Y%m%d_%H%M%S}.csv'.format(datetime.now())) \
              for i in range(num)]
        [log[i].addHandler(fh[i]) for i in range(num)]
        title = [header[i] for i in range(num)]
        [log[i].debug(title[i]) for i in range(num)]
        # Tab
        note = ttk.Notebook(self) # "ttk" is necessary
        note.pack()
        note_child = [Frame(note, width=w_width, height=w_height) for i in range(num)]
        [note.add(note_child[i], text=ping_type[i]) for i in range(num)]
        [create_tab(note_child[i], i) for i in range(num)]
    ## End of createwidgets()
## End of ExecPing()

if __name__ == '__main__':
    master = Tk()
    master.title("Execution Ping v0.94")
    master.geometry("800x400")
    # Call main part
    ExecPing(master)
    master.mainloop()

tkinterタブ作成

        def create_tab(frm, type):
...()

        ## Start of create_tab()
        ## addr, count(1:arg of ping(), 2:arg of 'for' repetition),
        ##     interval(1:arg of ping(), 2:arg of sleep in 'for' repetiotion), timeout, payload size
            frm1 = Frame(frm, width=w_width, height=w_height)
            frm1.label = [Label(frm1, text=param[i]) for i in range(len(param))]
            [frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
            frm1.text = [Entry(frm1, width=p_width[i]) for i in range(len(p_width))]
            [frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
            frm1.dummy = Label(frm1, text='    ')
            frm1.dummy.grid(row=0, column=len(param)+len(p_width))
            frm1.Btn = ttk.Button(frm1, text='Start', style='W.TButton', command=lambda:ok_clk(type)) # "ttk" is necessary
            # The above should be improved, in order not to click twice.
            frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)

            frm2 = Frame(frm, width=w_width, height=w_height)
            frm2.label = Label(frm2, text=header[type])
            frm2.label.grid(row = 0, column = 0)
            # NG frm2.label.place(x=0, y=0)

            frm3 = Frame(frm, width=w_width, height=w_height)
            frm3.prt = Text(frm3, font=("Courier New", 10), width=120, height=33)
            frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
            frm3.ysc.pack(side=RIGHT, fill="y")
            frm3.prt["yscrollcommand"] = frm3.ysc.set
            frm3.prt.config(state='disabled')
            frm3.prt.pack()

            frm1.pack()
            frm2.pack()
            frm3.pack()
        ## End of create_tab()
  • タブ内にフレームは3つ。
  • 第1フレーム:pingのパラメータのラベル、入力エントリ、開始ボタン(Start)。
  • 第2フレーム:表示用ヘッダ
  • 第3フレーム:結果表示用ボックス

"Start"ボタンクリック

            ## Start of ok_clk()
            def ok_clk(flag):
                input = [frm1.text[i].get() for i in range(len(param))]
                for i in range(len(param)):
                    if input[i] == "":
                        input[i] = param_default[i]
                if flag == 0:
                    single_ping(input)
                else:
                    th = threading.Thread(target=thread_ping, args=(input, ))
                    th.start()
            ## End of ok_clk()
  • 入力エントリから値(pingパラメーター)を取得。
  • 入力値が空の場合、デフォルト値を代入。パラメーター入力エラーは未考慮。
  • single_ping(SinglePing用)またはthread_ping(RealtimePing用)をコール。
  • RealtimePingでは、tkinterでの結果をリアルタイム表示するため、スレッド実行する。

結果の表示

            ## Start of show_result()
            def show_result(data):
                frm3.prt.config(state='normal')		# 'frm3' should not be used?
                frm3.prt.insert(END, data)
                frm3.prt.config(state='disabled')
                frm3.prt.see('end')
            ## End of show_result()
  • 結果表示用ボックスに結果(data)を表示。(ソフトウェア的には、frm3は引数化したほうがよいと思うが、、。)

SinglePing(sing_ping)

            ## Start of single_ping()
            def single_ping(arg):
                start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
                try:
                    result = ping(arg[0], count=int(arg[1]), interval=float(arg[2]), \
                                      timeout=int(arg[3]), payload_size=int(arg[4]))
                    output = start_time + ', ' + arg[0] + ', ' + arg[1] + ', ' + arg[2] + ', ' \
                        + arg[3] + ', ' + arg[4] + ', ' + str(result.avg_rtt) + ', ' \
                        + str(result.min_rtt) + ', ' + str(result.max_rtt) + ', ' \
                        + str(result.jitter) +', ' + str(result.packet_loss) + ', ' \
                        + str(result.packets_sent) + ', ' + str(result.packets_received)
                except:
                    output = 'Error'
                log[type].debug(output)
                output += '\n'
                show_result(output)
            ## End of single_ping()
  • 開始日時を取得。
  • 入力パラメーターにもとづいて、ping実行。
  • 結果をログ化および画面表示。

RealtimePing(thread_ping)

            ## Start of start_ping()
            def start_ping(i, arg):
                start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
                try:
                    result = ping(arg[0], count=1, interval=1, timeout=int(arg[3]), payload_size=int(arg[4]))
                    output = str(i+1) + ', ' + start_time + ', ' + arg[0] + ', ' + arg[3] + ', ' + arg[4] + ', '
                    if result.is_alive:
                        output += str(result.avg_rtt)
                    else:
                        output += 'Down'
                except:
                    output = 'Error'
                log[type].debug(output)
                output += '\n'
                show_result(output)
            ## End of start_ping()

            ## Start of thread_ping()
            def thread_ping(arg):
                repetition = int(arg[1])
                wait = float(arg[2])
                for i in range(repetition):
                    thrd = threading.Thread(target=start_ping, args=(i, arg))
                    thrd.start()
                    time.sleep(wait)
            ## End of thread_ping()
  • thread_ping
    • 入力パラメーターからfor文の繰り返し数やfor文内のsleep時間を取得。
    • リアルタイム表示を行うため、1回のping実行(start_ping)をスレッド実行。
    • for文による繰り返し。
  • start_ping
    • 開始日時を取得。
    • ping実行。
    • 結果をログ化および画面表示。

全体

import logging
import logging.handlers
from datetime import datetime
import time
from tkinter import *
import tkinter.ttk as ttk
from icmplib import ping
import threading

w_width = 800
w_height = 400

ping_type = ['SinglePing', 'RealtimePing']
param = [' Address ', ' Count ', ' Interval ', ' Timeout ', ' Size '] # for input box
param_default = ['8.8.8.8', '4', '1', '3', '64'] # Address, Count, Interval(s), Timeout(s), Payload size(byte)
p_width = [20, 8, 5, 5, 5] # width for parameter input box
header = ['Time, Address, Count, Interval, Timeout, Payload, Avg_RTT(ms), Min_RTT(ms), \
Max_RTT(ms), Jitter(ms), Loss, Sent, Rcvd \t\t\t\t', \
          'Count, Time, Address, Timeout, Payload, RTT(ms)/Down \t\t\t\t\t\t\t\t\t\t\t\t\t\t'] # Should be improved

class ExecPing(ttk.Frame): # Only 'Frame' works.
    def __init__(self, master):
        super().__init__(master)
        self.create_widgets()
        self.pack()

    def create_widgets(self):
        def create_tab(frm, type):
            ## Start of show_result()
            def show_result(data):
                frm3.prt.config(state='normal')		# 'frm3' should not be used?
                frm3.prt.insert(END, data)
                frm3.prt.config(state='disabled')
                frm3.prt.see('end')
            ## End of show_result()

            ## Start of single_ping()
            def single_ping(arg):
                start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
                try:
                    result = ping(arg[0], count=int(arg[1]), interval=float(arg[2]), \
                                      timeout=int(arg[3]), payload_size=int(arg[4]))
                    output = start_time + ', ' + arg[0] + ', ' + arg[1] + ', ' + arg[2] + ', ' \
                        + arg[3] + ', ' + arg[4] + ', ' + str(result.avg_rtt) + ', ' \
                        + str(result.min_rtt) + ', ' + str(result.max_rtt) + ', ' \
                        + str(result.jitter) +', ' + str(result.packet_loss) + ', ' \
                        + str(result.packets_sent) + ', ' + str(result.packets_received)
                except:
                    output = 'Error'
                log[type].debug(output)
                output += '\n'
                show_result(output)
            ## End of single_ping()

            ## Start of start_ping()
            def start_ping(i, arg):
                start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
                try:
                    result = ping(arg[0], count=1, interval=1, timeout=int(arg[3]), payload_size=int(arg[4]))
                    output = str(i+1) + ', ' + start_time + ', ' + arg[0] + ', ' + arg[3] + ', ' + arg[4] + ', '
                    if result.is_alive:
                        output += str(result.avg_rtt)
                    else:
                        output += 'Down'
                except:
                    output = 'Error'
                log[type].debug(output)
                output += '\n'
                show_result(output)
            ## End of start_ping()

            ## Start of thread_ping()
            def thread_ping(arg):
                repetition = int(arg[1])
                wait = float(arg[2])
                for i in range(repetition):
                    thrd = threading.Thread(target=start_ping, args=(i, arg))
                    thrd.start()
                    time.sleep(wait)
            ## End of thread_ping()

            ## Start of ok_clk()
            def ok_clk(flag):
                input = [frm1.text[i].get() for i in range(len(param))]
                for i in range(len(param)):
                    if input[i] == "":
                        input[i] = param_default[i]
                if flag == 0:
                    single_ping(input)
                else:
                    th = threading.Thread(target=thread_ping, args=(input, ))
                    th.start()
            ## End of ok_clk()

        ## Start of create_tab()
        ## addr, count(1:arg of ping(), 2:arg of 'for' repetition),
        ##     interval(1:arg of ping(), 2:arg of sleep in 'for' repetiotion), timeout, payload size
            frm1 = Frame(frm, width=w_width, height=w_height)
            frm1.label = [Label(frm1, text=param[i]) for i in range(len(param))]
            [frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
            frm1.text = [Entry(frm1, width=p_width[i]) for i in range(len(p_width))]
            [frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
            frm1.dummy = Label(frm1, text='    ')
            frm1.dummy.grid(row=0, column=len(param)+len(p_width))
            frm1.Btn = ttk.Button(frm1, text='Start', style='W.TButton', command=lambda:ok_clk(type)) # "ttk" is necessary
            # The above should be improved, in order not to click twice.
            frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)

            frm2 = Frame(frm, width=w_width, height=w_height)
            frm2.label = Label(frm2, text=header[type])
            frm2.label.grid(row = 0, column = 0)
            # NG frm2.label.place(x=0, y=0)

            frm3 = Frame(frm, width=w_width, height=w_height)
            frm3.prt = Text(frm3, font=("Courier New", 10), width=120, height=33)
            frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
            frm3.ysc.pack(side=RIGHT, fill="y")
            frm3.prt["yscrollcommand"] = frm3.ysc.set
            frm3.prt.config(state='disabled')
            frm3.prt.pack()

            frm1.pack()
            frm2.pack()
            frm3.pack()
        ## End of create_tab()

        ## Start of create_widgets()
        # Log
        num = len(ping_type)
        log = [logging.getLogger(ping_type[i]) for i in range(num)]
        [log[i].setLevel(logging.DEBUG) for i in range(num)]
        fh = [logging.FileHandler(ping_type[i]+'_'+'{:%Y%m%d_%H%M%S}.csv'.format(datetime.now())) \
              for i in range(num)]
        [log[i].addHandler(fh[i]) for i in range(num)]
        title = [header[i] for i in range(num)]
        [log[i].debug(title[i]) for i in range(num)]
        # Tab
        note = ttk.Notebook(self) # "ttk" is necessary
        note.pack()
        note_child = [Frame(note, width=w_width, height=w_height) for i in range(num)]
        [note.add(note_child[i], text=ping_type[i]) for i in range(num)]
        [create_tab(note_child[i], i) for i in range(num)]
    ## End of createwidgets()
## End of ExecPing()

if __name__ == '__main__':
    master = Tk()
    master.title("Execution Ping v0.94")
    master.geometry("800x400")
    # Call main part
    ExecPing(master)
    master.mainloop()

EOF

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?