1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ろうとるがPythonを扱う、、(その27:tkinter+matplotlibでRealtimeグラフ表示(その2))

Posted at

pingのRealtime表示(今度はOK?)

ろうとるがPythonを扱う、、(その26:tkinter+matplotlibでRealtimeグラフ表示(その1))」の続編。pingの結果表示もグラフ表示も、tkinterの1つのWindowsに収めることとした。

結果

ping7_OK.png
前回とは異なり、複数回「Start」を押しても大丈夫であった(グラフは毎回クリアされる)。

ソースコード

Library import

## Library
from tkinter import * 
import tkinter.ttk as ttk
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg 
from matplotlib.figure import Figure 
from datetime import datetime
import time 
import logging
import logging.handlers
import threading 
from icmplib import ping

ここは解説なし。

変数など

## For plotting # グラフで使用する位置情報
xpos = []
ypos = []

## ping parameters  # pingパラメーター(+デフォルト値)
param = [' Address ', ' Count ', ' Interval ', ' Timeout ', ' Size ']
param_default = ['1.1.1.1', '5', '1', '3', '64']
		# Address, Count, Interval(s), Timeout(s), Payload size(byte)
p_width = [20, 8, 5, 5, 5] # width for parameter input box
header = 'Count, Time, Address, Timeout, Payload, RTT(ms)/Down'

メイン

ここの解説は小生の次の過去投稿を参照してほしいです。

#####  Main  #####
root = Tk()
root.title("Execution of Ping")
root.geometry("800x650")

## Making frames
frm1 = Frame(root)
frm1.label = [Label(frm1, text=param[i]) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i]) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text='    ')
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text='Start', style='W.TButton', command=lambda:start_clk()) # "ttk" is necessary
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
frm2 = Frame(root)
frm2.label = Label(frm2, text=header+'\t\t\t\t\t', font=("Courier New", 10))
frm2.label.pack()
frm3 = Frame(root)
frm3.prt = Text(frm3, font=("Courier New", 11), height=10)
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()
frm4 = Frame(root, bd = 10)
fig = Figure() 
ax = fig.add_subplot(111) 
ax.set_xlabel("Time (s)") 
ax.set_ylabel("Response Time (ms)") 
graph = FigureCanvasTkAgg(fig, master=frm4)
graph.get_tk_widget().pack()

frm1.pack()
frm2.pack()
frm3.pack()
frm4.pack()

## For log
log = logging.getLogger('ping')
log.setLevel(logging.DEBUG)
fh = logging.FileHandler('ping_'+'{:%Y%m%d_%H%M%S}.csv'.format(datetime.now()))
log.addHandler(fh)
log.debug(header)

## Loop
root.mainloop() 

少々コメント。

  • Window内の各フレーム設定
    • パラメータ入力
    • ヘッダ
    • ping結果をリアルタイムで「文字表示」
    • ping結果をリアルタイムで「グラフ表示」
  • 「Start」ボタンクリック時の関数定義(start_clk())
  • 結果をログファイルに出力する初期設定

「Start」ボタンクリック時にCallされる関数

## Start of start_clk()
def start_clk(): 
    global num, elapsed, dest, repetition, wait, timeout_val, payload
    num = 0
    elapsed = 0
    xpos.clear()
    ypos.clear()
    ax.cla()
    ax.grid() 
    input = [frm1.text[i].get() for i in range(len(param))]
    for i in range(len(param)):
        if input[i] == "":
            input[i] = param_default[i]
    try:
        dest = input[0]
        repetition = int(input[1])
        wait = float(input[2])
        timeout_val = float(input[3])
        payload = int(input[4])
    except:
        dest = param_default[0]
        repetition = int(param_default[1])
        wait = float(param_default[2])
        timeout_val = float(param_default[3])
        payload = int(param_default[4])
    frm1.Btn['text'] = 'Cannot'
    frm1.Btn['state'] = 'disabled'
    call_ping()
## End of start_clk()
  • 初期化(回数、経過時間、グラフ関連)
  • 入力値の取得(またはデフォルト値の利用)およびエラーチェック
  • 終了まで、再度クリックできないよう、ボタンを無効化

「global」を多数利用しているが、今回のプログラムでは、この方がわかりやすいと考えた。

ping呼び出し手続きおよび結果のグラフ化

## Start of call_ping()
def call_ping():
    global num, elapsed, dest, repetition, wait, timeout_val, payload
    ax.set_xlabel("Time (s)") 
    ax.set_ylabel("Response Time (ms)") 
    ax.plot(xpos, ypos, color='C0', linestyle='-')
    graph.draw() 
    if num >= repetition:
        frm1.Btn['text'] = 'Start'
        frm1.Btn['state'] = 'normal'
        return
    threading.Thread(target=exec_ping).start()
    elapsed += wait
    num += 1
    root.after(int(wait*1000), call_ping) # argument of call_ping must not exist???
## End of call_ping()
  • ping実行スレッド(exec_ping)呼び出し
  • グラフへのプロット(ax.plot)およびDraw(graph.draw)
  • 規定回数を超えたら、ボタンを有効化およびReturn
  • wait後に再度本関数呼び出し(root.after)
    • できるだけwait時間が正確になるようafterの呼び出し場所に留意

ping実行および結果の文字表示

## Start of show_result()
def show_result(data):
    frm3.prt.config(state='normal')
    frm3.prt.insert(END, data)
    frm3.prt.config(state='disabled')
    frm3.prt.see('end')
## End of show_result()

## Start of exec_ping()
def exec_ping():
    global num, elapsed, dest, repetition, wait, timeout_val, payload
    start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
    try:
        output = str(num+1)+', '+start_time+', '+dest+', '+str(timeout_val)+', '+str(payload)+', '
        result = ping(dest, count=1, interval=1, timeout=timeout_val, payload_size=payload)
        if result.is_alive:
            output += str(result.avg_rtt)
            xpos.append(elapsed)
            ypos.append(result.avg_rtt)
        else:
            output += 'Down'
            xpos.append(elapsed)
            ypos.append(0) # Todo: Should be considered
    except:
        output = 'Error'
    log.debug(output)
    output += '\n'
    show_result(output)
## End of exec_ping()
  • 現時刻取得
  • ping実行
  • 実行結果(result.avg_rtt)をグラフ用変数へ格納(ypos.append)
    • 経過時間も格納(xpos.append)
  • ログへの出力
  • 結果の文字表示(show_result)

全体

## Library
from tkinter import * 
import tkinter.ttk as ttk
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg 
from matplotlib.figure import Figure 
from datetime import datetime
import time 
import logging
import logging.handlers
import threading 
from icmplib import ping

## For plotting
xpos = []
ypos = []

## ping parameters
param = [' Address ', ' Count ', ' Interval ', ' Timeout ', ' Size ']
param_default = ['1.1.1.1', '5', '1', '3', '64']
		# Address, Count, Interval(s), Timeout(s), Payload size(byte)
p_width = [20, 8, 5, 5, 5] # width for parameter input box
header = 'Count, Time, Address, Timeout, Payload, RTT(ms)/Down'

## Start of show_result()
def show_result(data):
    frm3.prt.config(state='normal')
    frm3.prt.insert(END, data)
    frm3.prt.config(state='disabled')
    frm3.prt.see('end')
## End of show_result()

## Start of exec_ping()
def exec_ping():
    global num, elapsed, dest, repetition, wait, timeout_val, payload
    start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
    try:
        output = str(num+1)+', '+start_time+', '+dest+', '+str(timeout_val)+', '+str(payload)+', '
        result = ping(dest, count=1, interval=1, timeout=timeout_val, payload_size=payload)
        if result.is_alive:
            output += str(result.avg_rtt)
            xpos.append(elapsed)
            ypos.append(result.avg_rtt)
        else:
            output += 'Down'
            xpos.append(elapsed)
            ypos.append(0) # Todo: Should be considered
    except:
        output = 'Error'
    log.debug(output)
    output += '\n'
    show_result(output)
## End of exec_ping()

## Start of call_ping()
def call_ping():
    global num, elapsed, dest, repetition, wait, timeout_val, payload
    ax.set_xlabel("Time (s)") 
    ax.set_ylabel("Response Time (ms)") 
    ax.plot(xpos, ypos, color='C0', linestyle='-')
    graph.draw() 
    if num >= repetition:
        frm1.Btn['text'] = 'Start'
        frm1.Btn['state'] = 'normal'
        return
    threading.Thread(target=exec_ping).start()
    elapsed += wait
    num += 1
    root.after(int(wait*1000), call_ping) # argument of call_ping must not exist???
## End of call_ping()

## Start of start_clk()
def start_clk(): 
    global num, elapsed, dest, repetition, wait, timeout_val, payload
    num = 0
    elapsed = 0
    xpos.clear()
    ypos.clear()
    ax.cla()
    ax.grid() 
    input = [frm1.text[i].get() for i in range(len(param))]
    for i in range(len(param)):
        if input[i] == "":
            input[i] = param_default[i]
    try:
        dest = input[0]
        repetition = int(input[1])
        wait = float(input[2])
        timeout_val = float(input[3])
        payload = int(input[4])
    except:
        dest = param_default[0]
        repetition = int(param_default[1])
        wait = float(param_default[2])
        timeout_val = float(param_default[3])
        payload = int(param_default[4])
    frm1.Btn['text'] = 'Cannot'
    frm1.Btn['state'] = 'disabled'
    call_ping()
## End of start_clk()

#####  Main  #####
root = Tk()
root.title("Execution of Ping")
root.geometry("800x650")

## Making frames
frm1 = Frame(root)
frm1.label = [Label(frm1, text=param[i]) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i]) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text='    ')
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text='Start', style='W.TButton', command=lambda:start_clk()) # "ttk" is necessary
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
frm2 = Frame(root)
frm2.label = Label(frm2, text=header+'\t\t\t\t\t', font=("Courier New", 10))
frm2.label.pack()
frm3 = Frame(root)
frm3.prt = Text(frm3, font=("Courier New", 11), height=10)
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()
frm4 = Frame(root, bd = 10)
fig = Figure() 
ax = fig.add_subplot(111) 
ax.set_xlabel("Time (s)") 
ax.set_ylabel("Response Time (ms)") 
graph = FigureCanvasTkAgg(fig, master=frm4)
graph.get_tk_widget().pack()

frm1.pack()
frm2.pack()
frm3.pack()
frm4.pack()

## For log
log = logging.getLogger('ping')
log.setLevel(logging.DEBUG)
fh = logging.FileHandler('ping_'+'{:%Y%m%d_%H%M%S}.csv'.format(datetime.now()))
log.addHandler(fh)
log.debug(header)

## Loop
root.mainloop() 

EOF

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?