1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ろうとるがPythonを扱う、、(その34:pingのRealtimeグラフ表示(その3))

Posted at

pingのRealtime表示改良版

ろうとるがPythonを扱う、、(その27:tkinter+matplotlibでRealtimeグラフ表示(その2))」の改良版であり、下記が主な修正点である。

  1. Pythonのicmplibのpingが正常に動作しないケースあり
    • レスポンスがあるにも関わらず、応答時間が未表示(後述)
    • そこで、SubProcessにより、コマンドpingをCall
  2. Interval(送信間隔)周辺修正
    • Intervalを2秒としても、2秒以上の間隔で実行
  3. グラフのファイル保存
  4. Stopボタン追加
  5. Enterキーによる実行

なお、用いたOSはWindows。他のOSでは、pingの結果表示が異なるであろうことから、他のOSでは修正が必要。

icmplibのpingでの応答時間未表示

下記のような状況が発生。

>>> from icmplib import ping
>>> r=ping('192.168.50.1')
>>> print(r)
  192.168.50.1
------------------------------------------------------------
  Packets sent:     4
  Packets received: 4
  Packet loss:      0.0%
  Round-trip times: 0.0 ms / 0.0 ms / 0.0 ms
  Jitter:           0.0 ms
------------------------------------------------------------

応答があるにもかかわらず、応答時間が表示されていない。この状況を改善する。

結果

最初に修正後の結果を提示。

全体

Overall.png

フォルダ(作成されるファイル)

Folder.png

CSVファイル

CSV.png

保存されるグラフ

ping_yahoo.com_20250420_163149.png

方針(pingの実行結果を利用)

コマンドプロンプトでの結果表示を利用してコーディングしている。結果表示のサンプルを下記。

正常

通常

>ping 1.1.1.1 -n 1

1.1.1.1 に ping を送信しています 32 バイトのデータ:
1.1.1.1 からの応答: バイト数 =32 時間 =21ms TTL=57

上記の”21ms”を対象に、”=”と"ms"とをもとに”21”を取り出す。

例外

>ping 127.0.0.1 -n 1

127.0.0.1 に ping を送信しています 32 バイトのデータ:
127.0.0.1 からの応答: バイト数 =32 時間 <1ms TTL=128

上記の”1ms”を対象に、”<”と"ms"とをもとに”1”を取り出す。

エラー

>ping 5.5.5.5 -n 1

5.5.5.5 に ping を送信しています 32 バイトのデータ:
要求がタイムアウトしました。
>ping uiw0fw.com
ping 要求ではホスト uiw0fw.com が見つかりませんでした。ホスト名を確認してもう一度実行してください。
>ping 172.16.10.22 -n 1

172.16.10.22 に ping を送信しています 32 バイトのデータ:
ping: 転送に失敗しました。一般エラーです。

エラー時には、”ms”が含まれない。

ソースコード

例によって、ポイントとなると感じる点を記載。

Library Import

## Imported Library
import sys
import time 
from datetime import datetime
import logging
import logging.handlers
import threading 
import subprocess
from socket import inet_aton
import re
from tkinter import * 
import tkinter.ttk as ttk
import tkinter.messagebox as messagebox
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg 
from matplotlib.figure import Figure 

ここは特記事項なし。

変数など

## Version
version = 'RC2'

## Graph position
xpos = []
ypos = []

## ping parameters
param = [' Address ', ' Count ', ' Interval ', ' Timeout ']	# For Entry
param_default = ['1.1.1.1', '10', '1', '3']	# IP, N=10, 1s and 3s
p_width = [18, 6, 3, 3]				# width for Entry
header = 'Count, Time, Address, Response(ms)'	# For CSV

## Button
btn_list = ['Start', 'Stop']
  • グラフの位置用リスト(xpos、ypos)
  • pingパラメータ
    • ターゲットIPアドレス、送信回数、送信間隔、タイムアウト
    • それぞれのデフォルト
    • 入力エントリの幅
  • CSVヘッダ
  • ボタンリスト(Start、Stop)

tkinterメイン

4フレーム構成のWindowである。

###############  Main  ###############
## Window
root = Tk()
ver_str = 'pingGraph ' + version
root.title(ver_str)
root.geometry("800x650")
  • Windowおよびタイトル
## Flag for Start/Stop
doing = 0
  • ping実行中かどうかのフラグ
## Frames
# Input parameters & Start/Stop
frm1 = Frame(root, bd=1)
frm1.label = [Label(frm1, text=param[i], font=("Courier New", 11)) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i], font=("Courier New", 11)) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text=' ', font=("Courier New", 11))
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text=btn_list[doing], style='W.TButton', command=lambda:start_clk())
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
[frm1.text[i].bind('<Return>', lambda event:start_clk()) for i in range(len(param))]
frm1.Btn.bind('<Return>', lambda event:start_clk())
  • フレーム1
    • pingパラメーター(IPアドレス、送信回数、送信間隔、タイムアウト)入力エントリ
    • Start/Stopボタンおよびボタンクリック時の関数(start_clk)
    • Enterキー入力時も同じ関数呼び出し
# Executed command & Graph_Save
frm2 = Frame(root)
frm2.label = Label(frm2, text='\t\t\t\t\t\t\t', font=("Courier New", 11))
frm2.label.grid(row=0, column=0)
frm2.dummy = Label(frm2, text=' ', font=("Courier New", 11))
frm2.dummy.grid(row=0, column=1)
frm2.Btn = ttk.Button(frm2, text='Graph Save', style='W.TButton', command=lambda:save_clk())
frm2.Btn.grid(row=0, column=2)
frm2.Btn.bind('<Return>', lambda event:save_clk())
  • フレーム2
    • 実行コマンド表示
    • Graph Saveボタンおよびボタンクリック時の関数(save_clk)
# Text
frm3 = Frame(root, bd=1)
frm3.prt = Text(frm3, font=("Courier New", 11), width=90, height=10, bg="White")
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()
  • フレーム3
    • 結果表示エリア
    • 右端に縦スクロール
# Graph
frm4 = Frame(root)
fig = Figure() 
ax = fig.add_subplot(111)
fig.patch.set_facecolor('PapayaWhip')
ax.set_facecolor('lavender')
graph = FigureCanvasTkAgg(fig, master=frm4)
graph.get_tk_widget().pack()
draw_graph()
  • フレーム4
    • グラフ表示エリア
# Locate
frm1.pack()
frm2.pack()
frm3.pack()
frm4.pack()
  • フレームをWindowに配置
## Menu
menubar = Menu()
filemenu = Menu(menubar, tearoff=0, bg="lightgray", fg="black")
menubar.add_cascade(label="File", menu=filemenu)
filemenu.add_command(label="Exit", command=sys.exit)
root.config(menu=menubar)

## Loop
root.mainloop()

###### End of Program ######
  • メニュー(Exitのみ)
  • Windowループ

入力チェック関数

IPアドレス

###### Start of is_valid_ip() ######
def is_valid_ip(addr):
    try:
        inet_aton(addr)
        return True
    except:
        return False
###### End of is_valid_ip() ######
  • inet_aton(IPアドレス変換)関数によるIPアドレスフォーマットチェック

ドメイン

###### Start of is_valid_domain() ######
def is_valid_domain(domain):
    pattern = re.compile(
        r'^(?!\-)([A-Za-z0-9\-]{1,63}(?<!\-)\.)+[A-Za-z]{2,}$'
    )
    return bool(pattern.match(domain))
###### End of is_valid_domain() ######
  • re(正規表現)モジュールによるドメイン名チェック(ChatGPTの回答利用)

結果表示

テキスト

###### Start of show_result() ######
def show_result(data):
    frm3.prt.config(state='normal')
    frm3.prt.insert(END, data)
    frm3.prt.config(state='disabled')
    frm3.prt.see('end')
###### End of show_result() ######
  • テキスト表示エリアへ引数dataの表示

グラフ

###### Start of draw_graph() ######
def draw_graph():
    ax.set_xlabel("Time (s)")
    ax.set_ylabel("Response Time (ms)") 
    ax.plot(xpos, ypos, color='blue', linestyle='-')
    graph.draw() 
###### End of draw_graph() ######
  • グラフ表示エリアへデータプロット

ファイル保存(”Graph Save”クリック処理)

###### Start of save_clk() ('Graph Save' button) ######
def save_clk():
    if 'dest' in globals():
        dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
        fig.savefig('ping_' + dest + '_' + dt + '.png')
        messagebox.showinfo('Message', 'Save Complete') 
###### End of save_clk() ######
  • 日時およびターゲットIPアドレスをファイル名として、グラフをファイル化(保存)
  • 保存後のメッセージの表示

実行

Start/Stopクリック処理

###### Start of start_clk() ('Start'/'Stop' button) ######
def start_clk(): 
    global doing, dest
    if doing == 0:	# Click 'Start'
        doing = 1
        frm1.Btn['text'] = btn_list[doing]
        # Get input parameters
        input = [frm1.text[i].get() for i in range(len(param))]
        for i in range(len(param)):
            if input[i] == "":
                input[i] = param_default[i]
        try:
            if (is_valid_ip(input[0]) == False) and (is_valid_domain(input[0]) == False):
                dest = param_default[0]
            else:
                dest = input[0]
            repetition = int(input[1])
            interval = int(input[2])
            timeout_val = int(input[3])
        except:
            dest = param_default[0]
            repetition = int(param_default[1])
            interval = int(param_default[2])
            timeout_val = int(param_default[3])
        # Make ping command
        cmd = 'ping ' + dest + ' -n 1' + ' -w ' + str(timeout_val*1000)
        frm2.label['text'] = '\tping to ' + dest + '  timeout:' + str(timeout_val)\
            + 's  interval:' + str(interval) + 's  n=' + str(repetition)
        # Create log
        log = logging.getLogger('ping')
        log.setLevel(logging.DEBUG)
        dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
        fh = logging.FileHandler('ping_' + dest + '_' + dt + '.csv')
        log.addHandler(fh)
        #
        log.debug(cmd)
        log.debug(header)
        show_result(header+'\n')
        # Clear graph
        xpos.clear()
        ypos.clear()
        ax.cla()
        ax.grid()
        # Execute ping
        call_ping(cmd, log, fh, repetition, interval)
    else:		# Click 'Stop'
        if 'proc' in globals():
            proc.terminate()	# Stop thread (exec_ping())
        doing = 0
        frm1.Btn['text'] = btn_list[doing]
###### End of start_clk() ######

スレッド呼び出し

###### Start of call_ping() ######
def call_ping(cmd, log, fh, repetition, interval):
    global num, elapsed
    # loop function
    def loop():
        global doing
        nonlocal next_time
        while True:
            now = time.time()
            if now >= next_time:
                exec_ping(cmd, log, interval)	# Execute ping
                # Blocked until 'exec_ping' ends. @@@
                # -> The next may be executed after more than 'internal'.
                next_time += interval
                draw_graph()
                if num >= repetition or doing == 0:
                    try:
                        jitter = max(ypos) - min(ypos)
                        str_j = 'Jitter: '+str(jitter)+' ms'
                        log.debug(str_j)
                        show_result(str_j+'\nEnd\n\n')
                    except:
                        pass
                    # Close log
                    fh.close()
                    log.removeHandler(fh)
                    doing = 0
                    frm1.Btn['text'] = btn_list[doing]
                    return			# Exit
            time.sleep(0.001)
        # End of while
        # End of loop
    #
    ### Main function ###
    num = 0
    elapsed = 0
    next_time = time.time()
    # Execute 'loop' until N is reached or 'Stop' is clicked
    threading.Thread(target=loop, daemon=True).start()
###### End of call_ping() ######

スレッド(ping)実行

###### Start of exec_ping() ######
def exec_ping(cmd, log, interval):
    global proc, num, elapsed
    num += 1
    start_time = datetime.now().strftime("%H%M%S")
    #start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
    output = str(num)+', '+start_time+', '+dest+', '
    # Execute ping
    #proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,\
                            creationflags=subprocess.CREATE_NO_WINDOW)
    out, err = proc.communicate()
    result = out.decode(encoding='shift-jis')	# depending on languages
    try:					# Look for 'YYms' in output
        end = result.find('ms')
        until_end = result[:end]
        eq_start = until_end.rfind('=')		# Look for 1st '='
        ineq_start = until_end.rfind('<')	# for "<1ms"
        if ineq_start == -1:
            val = int(until_end[eq_start+1:])	# Normal case like 12ms
        else:
            val = int(until_end[ineq_start+1:])	# for "<1ms"
        output += str(val)
        xpos.append(elapsed)
        ypos.append(val)
        flag = 0
    except:
        output = result
        flag = 1
    if flag == 0:
        log.debug(output)
        output += '\n'
    else:
        log.debug(output.rstrip('\r\n'))
    show_result(output)
    elapsed += interval
###### End of exec_ping() ######

コード全体

pingGraph_rc2.py
# -*- coding: utf-8 -*-
## Imported Library
import sys
import time 
from datetime import datetime
import logging
import logging.handlers
import threading 
import subprocess
from socket import inet_aton
import re
from tkinter import * 
import tkinter.ttk as ttk
import tkinter.messagebox as messagebox
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg 
from matplotlib.figure import Figure 

## Version
version = 'RC2'

## Graph position
xpos = []
ypos = []

## ping parameters
param = [' Address ', ' Count ', ' Interval ', ' Timeout ']	# For Entry
param_default = ['1.1.1.1', '10', '1', '3']	# IP, N=10, 1s and 3s
p_width = [18, 6, 3, 3]				# width for Entry
header = 'Count, Time, Address, Response(ms)'	# For CSV

## Button
btn_list = ['Start', 'Stop']

###### Start of show_result() ######
def show_result(data):
    frm3.prt.config(state='normal')
    frm3.prt.insert(END, data)
    frm3.prt.config(state='disabled')
    frm3.prt.see('end')
###### End of show_result() ######

###### Start of draw_graph() ######
def draw_graph():
    ax.set_xlabel("Time (s)")
    ax.set_ylabel("Response Time (ms)") 
    ax.plot(xpos, ypos, color='blue', linestyle='-')
    graph.draw() 
###### End of draw_graph() ######

###### Start of exec_ping() ######
def exec_ping(cmd, log, interval):
    global proc, num, elapsed
    num += 1
    start_time = datetime.now().strftime("%H%M%S")
    #start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
    output = str(num)+', '+start_time+', '+dest+', '
    # Execute ping
    #proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT,\
                            creationflags=subprocess.CREATE_NO_WINDOW)
    out, err = proc.communicate()
    result = out.decode(encoding='shift-jis')	# depending on languages
    try:					# Look for 'YYms' in output
        end = result.find('ms')
        until_end = result[:end]
        eq_start = until_end.rfind('=')		# Look for 1st '='
        ineq_start = until_end.rfind('<')	# for "<1ms"
        if ineq_start == -1:
            val = int(until_end[eq_start+1:])	# Normal case like 12ms
        else:
            val = int(until_end[ineq_start+1:])	# for "<1ms"
        output += str(val)
        xpos.append(elapsed)
        ypos.append(val)
        flag = 0
    except:
        output = result
        flag = 1
    if flag == 0:
        log.debug(output)
        output += '\n'
    else:
        log.debug(output.rstrip('\r\n'))
    show_result(output)
    elapsed += interval
###### End of exec_ping() ######

###### Start of call_ping() ######
def call_ping(cmd, log, fh, repetition, interval):
    global num, elapsed
    # loop function
    def loop():
        global doing
        nonlocal next_time
        while True:
            now = time.time()
            if now >= next_time:
                exec_ping(cmd, log, interval)	# Execute ping
                # Blocked until 'exec_ping' ends. @@@
                # -> The next may be executed after more than 'internal'.
                next_time += interval
                draw_graph()
                if num >= repetition or doing == 0:
                    try:
                        jitter = max(ypos) - min(ypos)
                        str_j = 'Jitter: '+str(jitter)+' ms'
                        log.debug(str_j)
                        show_result(str_j+'\nEnd\n\n')
                    except:
                        pass
                    # Close log
                    fh.close()
                    log.removeHandler(fh)
                    doing = 0
                    frm1.Btn['text'] = btn_list[doing]
                    return			# Exit
            time.sleep(0.001)
        # End of while
        # End of loop
    #
    ### Main function ###
    num = 0
    elapsed = 0
    next_time = time.time()
    # Execute 'loop' until N is reached or 'Stop' is clicked
    threading.Thread(target=loop, daemon=True).start()
###### End of call_ping() ######

###### Start of is_valid_ip() ######
def is_valid_ip(addr):
    try:
        inet_aton(addr)
        return True
    except:
        return False
###### End of is_valid_ip() ######

###### Start of is_valid_domain() ######
def is_valid_domain(domain):
    pattern = re.compile(
        r'^(?!\-)([A-Za-z0-9\-]{1,63}(?<!\-)\.)+[A-Za-z]{2,}$'
    )
    return bool(pattern.match(domain))
###### End of is_valid_domain() ######

###### Start of start_clk() ('Start'/'Stop' button) ######
def start_clk(): 
    global doing, dest
    if doing == 0:	# Click 'Start'
        doing = 1
        frm1.Btn['text'] = btn_list[doing]
        # Get input parameters
        input = [frm1.text[i].get() for i in range(len(param))]
        for i in range(len(param)):
            if input[i] == "":
                input[i] = param_default[i]
        try:
            if (is_valid_ip(input[0]) == False) and (is_valid_domain(input[0]) == False):
                dest = param_default[0]
            else:
                dest = input[0]
            repetition = int(input[1])
            interval = int(input[2])
            timeout_val = int(input[3])
        except:
            dest = param_default[0]
            repetition = int(param_default[1])
            interval = int(param_default[2])
            timeout_val = int(param_default[3])
        # Make ping command
        cmd = 'ping ' + dest + ' -n 1' + ' -w ' + str(timeout_val*1000)
        frm2.label['text'] = '\tping to ' + dest + '  timeout:' + str(timeout_val)\
            + 's  interval:' + str(interval) + 's  n=' + str(repetition)
        # Create log
        log = logging.getLogger('ping')
        log.setLevel(logging.DEBUG)
        dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
        fh = logging.FileHandler('ping_' + dest + '_' + dt + '.csv')
        log.addHandler(fh)
        #
        log.debug(cmd)
        log.debug(header)
        show_result(header+'\n')
        # Clear graph
        xpos.clear()
        ypos.clear()
        ax.cla()
        ax.grid()
        # Execute ping
        call_ping(cmd, log, fh, repetition, interval)
    else:		# Click 'Stop'
        if 'proc' in globals():
            proc.terminate()	# Stop thread (exec_ping())
        doing = 0
        frm1.Btn['text'] = btn_list[doing]
###### End of start_clk() ######

###### Start of save_clk() ('Graph Save' button) ######
def save_clk():
    if 'dest' in globals():
        dt = '{:%Y%m%d_%H%M%S}'.format(datetime.now())
        fig.savefig('ping_' + dest + '_' + dt + '.png')
        messagebox.showinfo('Message', 'Save Complete') 
###### End of save_clk() ######

###############  Main  ###############
## Window
root = Tk()
ver_str = 'pingGraph ' + version
root.title(ver_str)
root.geometry("800x650")

## Flag for Start/Stop
doing = 0

## Frames
# Input parameters & Start/Stop
frm1 = Frame(root, bd=1)
frm1.label = [Label(frm1, text=param[i], font=("Courier New", 11)) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i], font=("Courier New", 11)) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text=' ', font=("Courier New", 11))
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text=btn_list[doing], style='W.TButton', command=lambda:start_clk())
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
[frm1.text[i].bind('<Return>', lambda event:start_clk()) for i in range(len(param))]
frm1.Btn.bind('<Return>', lambda event:start_clk())
# Executed command & Graph_Save
frm2 = Frame(root)
frm2.label = Label(frm2, text='\t\t\t\t\t\t\t', font=("Courier New", 11))
frm2.label.grid(row=0, column=0)
frm2.dummy = Label(frm2, text=' ', font=("Courier New", 11))
frm2.dummy.grid(row=0, column=1)
frm2.Btn = ttk.Button(frm2, text='Graph Save', style='W.TButton', command=lambda:save_clk())
frm2.Btn.grid(row=0, column=2)
frm2.Btn.bind('<Return>', lambda event:save_clk())
# Text
frm3 = Frame(root, bd=1)
frm3.prt = Text(frm3, font=("Courier New", 11), width=90, height=10, bg="White")
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()
# Graph
frm4 = Frame(root)
fig = Figure() 
ax = fig.add_subplot(111)
fig.patch.set_facecolor('PapayaWhip')
ax.set_facecolor('lavender')
graph = FigureCanvasTkAgg(fig, master=frm4)
graph.get_tk_widget().pack()
draw_graph()
# Locate
frm1.pack()
frm2.pack()
frm3.pack()
frm4.pack()

## Menu
menubar = Menu()
filemenu = Menu(menubar, tearoff=0, bg="lightgray", fg="black")
menubar.add_cascade(label="File", menu=filemenu)
filemenu.add_command(label="Exit", command=sys.exit)
root.config(menu=menubar)

## Loop
root.mainloop()

###### End of Program ######

EOF

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?