0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ろうとるがPythonを扱う、、(その26:tkinter+matplotlibでRealtimeグラフ表示(その1))

Posted at

pingのRealtime表示(目的達せず)

tkinterで作成されるWindowに、pingの結果を、グラフ付きでリアルタイムに表示する。ただし、目的を達成できず、不具合もあり。コードも美しない。

参考URL

結果から記述

下記のように、数値で結果を表示するWindowとグラフ表示を行うWindowを設けるもの。
OKグラフ.png
再度「Start」をクリックしたら、グラフWindowを削除して再度グラフを作成したかったが、ネットで見つかる情報を色々試してもすぐには解決できず、とりあえず、Give Up。2回「Start」をクリックしたときの結果は下記となる。
ping6_OK2.png
さらに、上記WindowをすべてCloseすると、

RuntimeError: main thread is not in main loop

となり、不具合もある。(今回は深追いせず、、)

ソースコード

Library import

from tkinter import * 
import tkinter.ttk as ttk
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg 
from matplotlib.figure import Figure 
from datetime import datetime
import time 
import logging
import logging.handlers
import threading 
import queue
from icmplib import ping

ここはノーコメントでおねがいします。

変数など

## For internal communication between threads
q1 = queue.Queue() # スレッド間データやり取り用Queue

## For plotting # グラフで使用する位置情報
xpos = []
ypos = []

## Window Size # tkinterで作成するWindowサイズ
w_width = 800
w_height = 400

## ping parameters # pingパラメーター(+デフォルト値)
param = [' Address ', ' Count ', ' Interval ', ' Timeout ', ' Size ']
param_default = ['1.1.1.1', '10', '1', '3', '64']
		# Address, Count, Interval(s), Timeout(s), Payload size(byte)
p_width = [20, 8, 5, 5, 5] # width for parameter input box # 入力Boxのサイズ
header = 'Count, Time, Address, Timeout, Payload, RTT(ms)/Down \t\t\t\t\t\t\t\t\t\t\t\t\t\t' # ヘッダ

メイン

ここの解説は小生の次の過去投稿を参照してほしいです。

#####  Main  #####
root = Tk()
root.title("Execution of Ping")
root.geometry("800x400")

## Making frames
frm1 = Frame(root, width=w_width, height=w_height)
frm1.label = [Label(frm1, text=param[i]) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i]) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text='    ')
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text='Start', style='W.TButton', command=lambda:start_clk()) # "ttk" is necessary
# The 2nd click does not work
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
frm2 = Frame(root, width=w_width, height=w_height)
frm2.label = Label(frm2, text=header)
frm2.label.grid(row = 0, column = 0)
frm3 = Frame(root, width=w_width, height=w_height)
frm3.prt = Text(frm3, font=("Courier New", 12), width=120, height=33)
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()

frm1.pack()
frm2.pack()
frm3.pack()

## For log
log = logging.getLogger('ping')
log.setLevel(logging.DEBUG)
fh = logging.FileHandler('ping_'+'{:%Y%m%d_%H%M%S}.csv'.format(datetime.now()))
log.addHandler(fh)

## Loop
root.mainloop() 

少々コメント。

  • pingの結果をリアルタイムで「文字表示」する、Window内のフレーム設定
  • 「Start」ボタンクリック時の関数定義(start_clk())
  • 結果をログファイルに出力する初期設定

「Start」ボタンクリック時にCallされる関数

## Start of start_clk()
def start_clk(): 
    input = [frm1.text[i].get() for i in range(len(param))] # 入力Boxの内容取得
    for i in range(len(param)):
        if input[i] == "":
            input[i] = param_default[i] # 入力Box空時にデフォルト値の設定
    threading.Thread(target=thread_ping, args=(input, )).start() # ping実行+結果表示スレッド起動
    threading.Thread(target=thread_graph, args=(input, )).start() # Realtimeグラフスレッド起動
## End of start_clk()

ping実行および結果表示

## Start of show_result()
def show_result(data): # ping結果(data)をWindows内フレームに表示
    frm3.prt.config(state='normal')
    frm3.prt.insert(END, data)
    frm3.prt.config(state='disabled')
    frm3.prt.see('end')
## End of show_result()

## Start of start_ping()
def start_ping(i, arg, t):
    start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f") # 時刻取得
    try:
        result = ping(arg[0], count=1, interval=1, timeout=int(arg[3]), payload_size=int(arg[4])) # ping実行
        output = str(i+1)+', '+start_time+', '+arg[0]+', '+arg[3]+', '+arg[4]+', ' # 実行パラメータ格納
        if result.is_alive:
            output += str(result.avg_rtt) # ping結果(応答時間)格納
            xpos.append(t) # 経過時間をグラフのx軸値に格納
            ypos.append(result.avg_rtt) # 応答時間をグラフのx軸値に格納
        else:
            output += 'Down'
            # xpos.append(t)
            # ypos.appedn(0)   Todo: Should be considered
    except:
        output = 'Error'
    log.debug(output) # 結果をログファイルへ出力
    output += '\n'
    show_result(output) # 結果をWindowへ出力
## End of start_ping()

## Start of thread_ping()
def thread_ping(arg):
    xpos.clear()
    ypos.clear()
    repetition = int(arg[1])
    wait = float(arg[2])
    elapsed = 0
    for i in range(repetition): # 規定回数ping実行
        elapsed += wait
        start_ping(i, arg, elapsed) # 上述関数
        time.sleep(wait) # 規定時間ごとに実行
    q1.put(True) # Queueに終了フラグを入れる
    print('End of thread_ping')
## End of thread_ping()

Realtimeグラフ

グラフ内の値のプロットについては、上述した下記URL参照。

## Start of plotter()
def plotter(arg):
    global ax, graph
    wait = float(arg)
    while True:
        ax.set_xlabel("Time (s)") 
        ax.set_ylabel("Response Time (ms)") 
        ax.plot(xpos, ypos, color='C0', linestyle='-')
        graph.draw() 
        ax.grid() 
        time.sleep(wait)
        if (not q1.empty()) and q1.get(): # ping実行終了時、QueueにTrueフラグが立つ
            break
    print('End of plotter')
## End of plotter()

RealtimeグラフCanvas(Window)作成

Canvas作成については、ここでも下記URL参照。

## Start of thread_graph()
##  New window(canvas) must be made here for realtime display
def thread_graph(arg):
    global ax, graph
    root2 = Tk() 
    root2.title("Graph")
    root2.geometry("800x400") 
    #
    fig = Figure() 
    ax = fig.add_subplot(111) 
    graph = FigureCanvasTkAgg(fig, master=root2) 
    graph.get_tk_widget().pack(side="top",fill='both',expand=True) 
    #
    threading.Thread(target=plotter, args=(arg[2], )).start() # for realtime display
    root2.mainloop()
    # May be quitted at first. If not, "RuntimeError: main thread is not in main loop" happens.
    print('End of thread_graph')
## End of thread_graph()

理由不明だが、リアルタイムにグラフ表示するには、Mainから別スレッド(thread_graph())を起動し、その中で、matplotlib用Canvasを作成し、さらにスレッド(上述のplotter())を起動して、値をプロットする必要があった。

全体

## Library
from tkinter import * 
import tkinter.ttk as ttk
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg 
from matplotlib.figure import Figure 
from datetime import datetime
import time 
import logging
import logging.handlers
import threading 
import queue
from icmplib import ping
 
## For internal communication between threads
q1 = queue.Queue()

## For plotting
xpos = []
ypos = []

## Window Size
w_width = 800
w_height = 400

## ping parameters
param = [' Address ', ' Count ', ' Interval ', ' Timeout ', ' Size ']
param_default = ['1.1.1.1', '10', '1', '3', '64']
		# Address, Count, Interval(s), Timeout(s), Payload size(byte)
p_width = [20, 8, 5, 5, 5] # width for parameter input box
header = 'Count, Time, Address, Timeout, Payload, RTT(ms)/Down \t\t\t\t\t\t\t\t\t\t\t\t\t\t'

## Start of plotter()
def plotter(arg):
    global ax, graph
    wait = float(arg)
    while True:
        ax.set_xlabel("Time (s)") 
        ax.set_ylabel("Response Time (ms)") 
        ax.plot(xpos, ypos, color='C0', linestyle='-')
        graph.draw() 
        ax.grid() 
        time.sleep(wait)
        if (not q1.empty()) and q1.get():
            break
    print('End of plotter')
## End of plotter()

## Start of thread_graph()
##  New window(canvas) must be made here for realtime display
def thread_graph(arg):
    global ax, graph
    root2 = Tk() 
    root2.title("Graph")
    root2.geometry("800x400") 
    #
    fig = Figure() 
    ax = fig.add_subplot(111) 
    graph = FigureCanvasTkAgg(fig, master=root2) 
    graph.get_tk_widget().pack(side="top",fill='both',expand=True) 
    #
    threading.Thread(target=plotter, args=(arg[2], )).start() # for realtime display
    root2.mainloop()
    # May be quitted at first. If not, "RuntimeError: main thread is not in main loop" happens.
    print('End of thread_graph')
## End of thread_graph()

## Start of show_result()
def show_result(data):
    frm3.prt.config(state='normal')
    frm3.prt.insert(END, data)
    frm3.prt.config(state='disabled')
    frm3.prt.see('end')
## End of show_result()

## Start of start_ping()
def start_ping(i, arg, t):
    start_time = datetime.now().strftime("%y%m%d_%H%M%S_%f")
    try:
        result = ping(arg[0], count=1, interval=1, timeout=int(arg[3]), payload_size=int(arg[4]))
        output = str(i+1)+', '+start_time+', '+arg[0]+', '+arg[3]+', '+arg[4]+', '
        if result.is_alive:
            output += str(result.avg_rtt)
            xpos.append(t)
            ypos.append(result.avg_rtt)
        else:
            output += 'Down'
            # xpos.append(t)
            # ypos.appedn(0)   Todo: Should be considered
    except:
        output = 'Error'
    log.debug(output)
    output += '\n'
    show_result(output)
## End of start_ping()

## Start of thread_ping()
def thread_ping(arg):
    xpos.clear()
    ypos.clear()
    repetition = int(arg[1])
    wait = float(arg[2])
    elapsed = 0
    for i in range(repetition):
        elapsed += wait
        start_ping(i, arg, elapsed)
        time.sleep(wait)
    q1.put(True)
    print('End of thread_ping')
## End of thread_ping()

## Start of start_clk()
def start_clk(): 
    input = [frm1.text[i].get() for i in range(len(param))]
    for i in range(len(param)):
        if input[i] == "":
            input[i] = param_default[i]
    threading.Thread(target=thread_ping, args=(input, )).start()
    threading.Thread(target=thread_graph, args=(input, )).start() 
## End of start_clk()

#####  Main  #####
root = Tk()
root.title("Execution of Ping")
root.geometry("800x400")

## Making frames
frm1 = Frame(root, width=w_width, height=w_height)
frm1.label = [Label(frm1, text=param[i]) for i in range(len(param))]
[frm1.label[i].grid(row=0, column=i*2) for i in range(len(param))]
frm1.text = [Entry(frm1, width=p_width[i]) for i in range(len(p_width))]
[frm1.text[i].grid(row=0, column=(i*2+1)) for i in range(len(p_width))]
frm1.dummy = Label(frm1, text='    ')
frm1.dummy.grid(row=0, column=len(param)+len(p_width))
frm1.Btn = ttk.Button(frm1, text='Start', style='W.TButton', command=lambda:start_clk()) # "ttk" is necessary
# The 2nd click does not work
frm1.Btn.grid(row=0, column=len(param)+len(p_width)+1)
frm2 = Frame(root, width=w_width, height=w_height)
frm2.label = Label(frm2, text=header)
frm2.label.grid(row = 0, column = 0)
frm3 = Frame(root, width=w_width, height=w_height)
frm3.prt = Text(frm3, font=("Courier New", 12), width=120, height=33)
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.prt.config(state='disabled')
frm3.prt.pack()

frm1.pack()
frm2.pack()
frm3.pack()

## For log
log = logging.getLogger('ping')
log.setLevel(logging.DEBUG)
fh = logging.FileHandler('ping_'+'{:%Y%m%d_%H%M%S}.csv'.format(datetime.now()))
log.addHandler(fh)

## Loop
root.mainloop() 

最後に

その2があります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?