pingのRealtime表示改良版
「ろうとるがPythonを扱う、、(その27:tkinter+matplotlibでRealtimeグラフ表示(その2))」の改良版であり、下記が主な修正点である。
- Pythonのicmplibのpingが正常に動作しないケースあり
- レスポンスがあるにも関わらず、応答時間が未表示(後述)
- そこで、SubProcessにより、コマンドpingをCall
- Interval(送信間隔)周辺修正
- Intervalを2秒としても、2秒以上の間隔で実行
- グラフのファイル保存
- Stopボタン追加
- Enterキーによる実行
なお、用いたOSはWindows。他のOSでは、pingの結果表示が異なるであろうことから、他のOSでは修正が必要。
icmplibのpingでの応答時間未表示
下記のような状況が発生。
>>> from icmplib import ping
>>> r=ping('192.168.50.1')
>>> print(r)
192.168.50.1
------------------------------------------------------------
Packets sent: 4
Packets received: 4
Packet loss: 0.0%
Round-trip times: 0.0 ms / 0.0 ms / 0.0 ms
Jitter: 0.0 ms
------------------------------------------------------------
応答があるにもかかわらず、応答時間が表示されていない。この状況を改善する。
結果
最初に修正後の結果を提示。
全体
フォルダ(作成されるファイル)
CSVファイル
保存されるグラフ
方針(pingの実行結果を利用)
コマンドプロンプトでの結果表示を利用してコーディングしている。結果表示のサンプルを下記。
正常
通常
>ping 1.1.1.1 -n 1
1.1.1.1 に ping を送信しています 32 バイトのデータ:
1.1.1.1 からの応答: バイト数 =32 時間 =21ms TTL=57
上記の”21ms”を対象に、”=”と"ms"とをもとに”21”を取り出す。
例外
>ping 127.0.0.1 -n 1
127.0.0.1 に ping を送信しています 32 バイトのデータ:
127.0.0.1 からの応答: バイト数 =32 時間 <1ms TTL=128
上記の”1ms”を対象に、”<”と"ms"とをもとに”1”を取り出す。
エラー
>ping 5.5.5.5 -n 1
5.5.5.5 に ping を送信しています 32 バイトのデータ:
要求がタイムアウトしました。
>ping uiw0fw.com
ping 要求ではホスト uiw0fw.com が見つかりませんでした。ホスト名を確認してもう一度実行してください。
>ping 172.16.10.22 -n 1
172.16.10.22 に ping を送信しています 32 バイトのデータ:
ping: 転送に失敗しました。一般エラーです。
エラー時には、”ms”が含まれない。
ソースコード
例によって、ポイントとなると感じる点を記載。
Library Import
## Imported Library
import sys
import time
from datetime import datetime
import logging
import logging.handlers
import threading
import subprocess
from socket import inet_aton
import re
from tkinter import *
import tkinter.ttk as ttk
import tkinter.messagebox as messagebox
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
ここは特記事項なし。
変数など
## Version
version = 'RC2'
## Graph position
xpos = []
ypos = []
## ping parameters
param = [' Address ', ' Count ', ' Interval ', ' Timeout '] # For Entry
param_default = ['1.1.1.1', '10', '1', '3'] # IP, N=10, 1s and 3s
p_width = [18, 6, 3, 3] # width for Entry
header = 'Count, Time, Address, Response(ms)' # For CSV
## Button
btn_list = ['Start', 'Stop']
- グラフの位置用リスト(xpos、ypos)
- pingパラメータ
- ターゲットIPアドレス、送信回数、送信間隔、タイムアウト
- それぞれのデフォルト
- 入力エントリの幅
- CSVヘッダ
- ボタンリスト(Start、Stop)
tkinterメイン
4フレーム構成のWindowである。
############### Main ###############
## Window
root = Tk()
ver_str = 'pingGraph ' + version
root.title(ver_str)
root.geometry("800x650")
- Windowおよびタイトル
## Flag for Start/Stop
doing = 0
- ping実行中かどうかのフラグ
## Frames
# Input parameters & Start/Stop
frm1 = Frame(root, bd=1)
frm1.label = [Label(frm1, text=param[i], font=("Courier New", 11)) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i], font=("Courier New", 11)) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text=' ', font=("Courier New", 11))
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text=btn_list[doing], style='W.TButton', command=lambda:start_clk())
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
[frm1.text[i].bind('<Return>', lambda event:start_clk()) for i in range(len(param))]
frm1.Btn.bind('<Return>', lambda event:start_clk())
- フレーム1
- pingパラメーター(IPアドレス、送信回数、送信間隔、タイムアウト)入力エントリ
- Start/Stopボタンおよびボタンクリック時の関数(start_clk)
- Enterキー入力時も同じ関数呼び出し
# Executed command & Graph_Save
frm2 = Frame(root)
frm2.label = Label(frm2, text='\t\t\t\t\t\t\t', font=("Courier New", 11))
frm2.label.grid(row=0, column=0)
frm2.dummy = Label(frm2, text=' ', font=("Courier New", 11))
frm2.dummy.grid(row=0, column=1)
frm2.Btn = ttk.Button(frm2, text='Graph Save', style='W.TButton', command=lambda:save_clk())
frm2.Btn.grid(row=0, column=2)
frm2.Btn.bind('<Return>', lambda event:save_clk())
- フレーム2
- 実行コマンド表示
- Graph Saveボタンおよびボタンクリック時の関数(save_clk)
# Text
frm3 = Frame(root, bd=1)
frm3.prt = Text(frm3, font=("Courier New", 11), width=90, height=10, bg="White")
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()
- フレーム3
- 結果表示エリア
- 右端に縦スクロール
# Graph
frm4 = Frame(root)
fig = Figure()
ax = fig.add_subplot(111)
fig.patch.set_facecolor('PapayaWhip')
ax.set_facecolor('lavender')
graph = FigureCanvasTkAgg(fig, master=frm4)
graph.get_tk_widget().pack()
draw_graph()
- フレーム4
- グラフ表示エリア
# Locate
frm1.pack()
frm2.pack()
frm3.pack()
frm4.pack()
- フレームをWindowに配置
## Menu
menubar = Menu()
filemenu = Menu(menubar, tearoff=0, bg="lightgray", fg="black")
menubar.add_cascade(label="File", menu=filemenu)
filemenu.add_command(label="Exit", command=sys.exit)
root.config(menu=menubar)
## Loop
root.mainloop()
###### End of Program ######
- メニュー(Exitのみ)
- Windowループ
入力チェック関数
IPアドレス
###### Start of is_valid_ip() ######
def is_valid_ip(addr):
try:
inet_aton(addr)
return True
except:
return False
###### End of is_valid_ip() ######
- inet_aton(IPアドレス変換)関数によるIPアドレスフォーマットチェック
ドメイン
###### Start of is_valid_domain() ######
def is_valid_domain(domain):
pattern = re.compile(
r'^(?!\-)([A-Za-z0-9\-]{1,63}(?<!\-)\.)+[A-Za-z]{2,}$'
)
return bool(pattern.match(domain))
###### End of is_valid_domain() ######
- re(正規表現)モジュールによるドメイン名チェック(ChatGPTの回答利用)
結果表示
テキスト
###### Start of show_result() ######
def show_result(data):
frm3.prt.config(state='normal')
frm3.prt.insert(END, data)
frm3.prt.config(state='disabled')
frm3.prt.see('end')
###### End of show_result() ######
- テキスト表示エリアへ引数dataの表示
グラフ
###### Start of draw_graph() ######
def draw_graph():
ax.set_xlabel("Time (s)")
ax.set_ylabel("Response Time (ms)")
ax.plot(xpos, ypos, color='blue', linestyle='-')
graph.draw()
###### End of draw_graph() ######
- グラフ表示エリアへデータプロット
ファイル保存(”Graph Save”クリック処理)
###### Start of save_clk() ('Graph Save' button) ######
def save_clk():
if 'dest' in globals():
dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
fig.savefig('ping_' + dest + '_' + dt + '.png')
messagebox.showinfo('Message', 'Save Complete')
###### End of save_clk() ######
- 日時およびターゲットIPアドレスをファイル名として、グラフをファイル化(保存)
- 保存後のメッセージの表示
実行
Start/Stopクリック処理
###### Start of start_clk() ('Start'/'Stop' button) ######
def start_clk():
global doing, dest
if doing == 0: # Click 'Start'
doing = 1
frm1.Btn['text'] = btn_list[doing]
# Get input parameters
input = [frm1.text[i].get() for i in range(len(param))]
for i in range(len(param)):
if input[i] == "":
input[i] = param_default[i]
try:
if (is_valid_ip(input[0]) == False) and (is_valid_domain(input[0]) == False):
dest = param_default[0]
else:
dest = input[0]
repetition = int(input[1])
interval = int(input[2])
timeout_val = int(input[3])
except:
dest = param_default[0]
repetition = int(param_default[1])
interval = int(param_default[2])
timeout_val = int(param_default[3])
# Make ping command
cmd = 'ping ' + dest + ' -n 1' + ' -w ' + str(timeout_val*1000)
frm2.label['text'] = '\tping to ' + dest + ' timeout:' + str(timeout_val)\
+ 's interval:' + str(interval) + 's n=' + str(repetition)
# Create log
log = logging.getLogger('ping')
log.setLevel(logging.DEBUG)
dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
fh = logging.FileHandler('ping_' + dest + '_' + dt + '.csv')
log.addHandler(fh)
#
log.debug(cmd)
log.debug(header)
show_result(header+'\n')
# Clear graph
xpos.clear()
ypos.clear()
ax.cla()
ax.grid()
# Execute ping
call_ping(cmd, log, fh, repetition, interval)
else: # Click 'Stop'
if 'proc' in globals():
proc.terminate() # Stop thread (exec_ping())
doing = 0
frm1.Btn['text'] = btn_list[doing]
###### End of start_clk() ######
スレッド呼び出し
###### Start of call_ping() ######
def call_ping(cmd, log, fh, repetition, interval):
global num, elapsed
# loop function
def loop():
global doing
nonlocal next_time
while True:
now = time.time()
if now >= next_time:
exec_ping(cmd, log, interval) # Execute ping
# Blocked until 'exec_ping' ends. @@@
# -> The next may be executed after more than 'internal'.
next_time += interval
draw_graph()
if num >= repetition or doing == 0:
try:
jitter = max(ypos) - min(ypos)
str_j = 'Jitter: '+str(jitter)+' ms'
log.debug(str_j)
show_result(str_j+'\nEnd\n\n')
except:
pass
# Close log
fh.close()
log.removeHandler(fh)
doing = 0
frm1.Btn['text'] = btn_list[doing]
return # Exit
time.sleep(0.001)
# End of while
# End of loop
#
### Main function ###
num = 0
elapsed = 0
next_time = time.time()
# Execute 'loop' until N is reached or 'Stop' is clicked
threading.Thread(target=loop, daemon=True).start()
###### End of call_ping() ######
スレッド(ping)実行
###### Start of exec_ping() ######
def exec_ping(cmd, log, interval):
global proc, num, elapsed
num += 1
start_time = datetime.now().strftime("%H%M%S")
#start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
output = str(num)+', '+start_time+', '+dest+', '
# Execute ping
#proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,\
creationflags=subprocess.CREATE_NO_WINDOW)
out, err = proc.communicate()
result = out.decode(encoding='shift-jis') # depending on languages
try: # Look for 'YYms' in output
end = result.find('ms')
until_end = result[:end]
eq_start = until_end.rfind('=') # Look for 1st '='
ineq_start = until_end.rfind('<') # for "<1ms"
if ineq_start == -1:
val = int(until_end[eq_start+1:]) # Normal case like 12ms
else:
val = int(until_end[ineq_start+1:]) # for "<1ms"
output += str(val)
xpos.append(elapsed)
ypos.append(val)
flag = 0
except:
output = result
flag = 1
if flag == 0:
log.debug(output)
output += '\n'
else:
log.debug(output.rstrip('\r\n'))
show_result(output)
elapsed += interval
###### End of exec_ping() ######
コード全体
pingGraph_rc2.py
# -*- coding: utf-8 -*-
## Imported Library
import sys
import time
from datetime import datetime
import logging
import logging.handlers
import threading
import subprocess
from socket import inet_aton
import re
from tkinter import *
import tkinter.ttk as ttk
import tkinter.messagebox as messagebox
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
from matplotlib.figure import Figure
## Version
version = 'RC2'
## Graph position
xpos = []
ypos = []
## ping parameters
param = [' Address ', ' Count ', ' Interval ', ' Timeout '] # For Entry
param_default = ['1.1.1.1', '10', '1', '3'] # IP, N=10, 1s and 3s
p_width = [18, 6, 3, 3] # width for Entry
header = 'Count, Time, Address, Response(ms)' # For CSV
## Button
btn_list = ['Start', 'Stop']
###### Start of show_result() ######
def show_result(data):
frm3.prt.config(state='normal')
frm3.prt.insert(END, data)
frm3.prt.config(state='disabled')
frm3.prt.see('end')
###### End of show_result() ######
###### Start of draw_graph() ######
def draw_graph():
ax.set_xlabel("Time (s)")
ax.set_ylabel("Response Time (ms)")
ax.plot(xpos, ypos, color='blue', linestyle='-')
graph.draw()
###### End of draw_graph() ######
###### Start of exec_ping() ######
def exec_ping(cmd, log, interval):
global proc, num, elapsed
num += 1
start_time = datetime.now().strftime("%H%M%S")
#start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
output = str(num)+', '+start_time+', '+dest+', '
# Execute ping
#proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,\
creationflags=subprocess.CREATE_NO_WINDOW)
out, err = proc.communicate()
result = out.decode(encoding='shift-jis') # depending on languages
try: # Look for 'YYms' in output
end = result.find('ms')
until_end = result[:end]
eq_start = until_end.rfind('=') # Look for 1st '='
ineq_start = until_end.rfind('<') # for "<1ms"
if ineq_start == -1:
val = int(until_end[eq_start+1:]) # Normal case like 12ms
else:
val = int(until_end[ineq_start+1:]) # for "<1ms"
output += str(val)
xpos.append(elapsed)
ypos.append(val)
flag = 0
except:
output = result
flag = 1
if flag == 0:
log.debug(output)
output += '\n'
else:
log.debug(output.rstrip('\r\n'))
show_result(output)
elapsed += interval
###### End of exec_ping() ######
###### Start of call_ping() ######
def call_ping(cmd, log, fh, repetition, interval):
global num, elapsed
# loop function
def loop():
global doing
nonlocal next_time
while True:
now = time.time()
if now >= next_time:
exec_ping(cmd, log, interval) # Execute ping
# Blocked until 'exec_ping' ends. @@@
# -> The next may be executed after more than 'internal'.
next_time += interval
draw_graph()
if num >= repetition or doing == 0:
try:
jitter = max(ypos) - min(ypos)
str_j = 'Jitter: '+str(jitter)+' ms'
log.debug(str_j)
show_result(str_j+'\nEnd\n\n')
except:
pass
# Close log
fh.close()
log.removeHandler(fh)
doing = 0
frm1.Btn['text'] = btn_list[doing]
return # Exit
time.sleep(0.001)
# End of while
# End of loop
#
### Main function ###
num = 0
elapsed = 0
next_time = time.time()
# Execute 'loop' until N is reached or 'Stop' is clicked
threading.Thread(target=loop, daemon=True).start()
###### End of call_ping() ######
###### Start of is_valid_ip() ######
def is_valid_ip(addr):
try:
inet_aton(addr)
return True
except:
return False
###### End of is_valid_ip() ######
###### Start of is_valid_domain() ######
def is_valid_domain(domain):
pattern = re.compile(
r'^(?!\-)([A-Za-z0-9\-]{1,63}(?<!\-)\.)+[A-Za-z]{2,}$'
)
return bool(pattern.match(domain))
###### End of is_valid_domain() ######
###### Start of start_clk() ('Start'/'Stop' button) ######
def start_clk():
global doing, dest
if doing == 0: # Click 'Start'
doing = 1
frm1.Btn['text'] = btn_list[doing]
# Get input parameters
input = [frm1.text[i].get() for i in range(len(param))]
for i in range(len(param)):
if input[i] == "":
input[i] = param_default[i]
try:
if (is_valid_ip(input[0]) == False) and (is_valid_domain(input[0]) == False):
dest = param_default[0]
else:
dest = input[0]
repetition = int(input[1])
interval = int(input[2])
timeout_val = int(input[3])
except:
dest = param_default[0]
repetition = int(param_default[1])
interval = int(param_default[2])
timeout_val = int(param_default[3])
# Make ping command
cmd = 'ping ' + dest + ' -n 1' + ' -w ' + str(timeout_val*1000)
frm2.label['text'] = '\tping to ' + dest + ' timeout:' + str(timeout_val)\
+ 's interval:' + str(interval) + 's n=' + str(repetition)
# Create log
log = logging.getLogger('ping')
log.setLevel(logging.DEBUG)
dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
fh = logging.FileHandler('ping_' + dest + '_' + dt + '.csv')
log.addHandler(fh)
#
log.debug(cmd)
log.debug(header)
show_result(header+'\n')
# Clear graph
xpos.clear()
ypos.clear()
ax.cla()
ax.grid()
# Execute ping
call_ping(cmd, log, fh, repetition, interval)
else: # Click 'Stop'
if 'proc' in globals():
proc.terminate() # Stop thread (exec_ping())
doing = 0
frm1.Btn['text'] = btn_list[doing]
###### End of start_clk() ######
###### Start of save_clk() ('Graph Save' button) ######
def save_clk():
if 'dest' in globals():
dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
fig.savefig('ping_' + dest + '_' + dt + '.png')
messagebox.showinfo('Message', 'Save Complete')
###### End of save_clk() ######
############### Main ###############
## Window
root = Tk()
ver_str = 'pingGraph ' + version
root.title(ver_str)
root.geometry("800x650")
## Flag for Start/Stop
doing = 0
## Frames
# Input parameters & Start/Stop
frm1 = Frame(root, bd=1)
frm1.label = [Label(frm1, text=param[i], font=("Courier New", 11)) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i], font=("Courier New", 11)) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text=' ', font=("Courier New", 11))
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text=btn_list[doing], style='W.TButton', command=lambda:start_clk())
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
[frm1.text[i].bind('<Return>', lambda event:start_clk()) for i in range(len(param))]
frm1.Btn.bind('<Return>', lambda event:start_clk())
# Executed command & Graph_Save
frm2 = Frame(root)
frm2.label = Label(frm2, text='\t\t\t\t\t\t\t', font=("Courier New", 11))
frm2.label.grid(row=0, column=0)
frm2.dummy = Label(frm2, text=' ', font=("Courier New", 11))
frm2.dummy.grid(row=0, column=1)
frm2.Btn = ttk.Button(frm2, text='Graph Save', style='W.TButton', command=lambda:save_clk())
frm2.Btn.grid(row=0, column=2)
frm2.Btn.bind('<Return>', lambda event:save_clk())
# Text
frm3 = Frame(root, bd=1)
frm3.prt = Text(frm3, font=("Courier New", 11), width=90, height=10, bg="White")
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()
# Graph
frm4 = Frame(root)
fig = Figure()
ax = fig.add_subplot(111)
fig.patch.set_facecolor('PapayaWhip')
ax.set_facecolor('lavender')
graph = FigureCanvasTkAgg(fig, master=frm4)
graph.get_tk_widget().pack()
draw_graph()
# Locate
frm1.pack()
frm2.pack()
frm3.pack()
frm4.pack()
## Menu
menubar = Menu()
filemenu = Menu(menubar, tearoff=0, bg="lightgray", fg="black")
menubar.add_cascade(label="File", menu=filemenu)
filemenu.add_command(label="Exit", command=sys.exit)
root.config(menu=menubar)
## Loop
root.mainloop()
###### End of Program ######
EOF