1
2

ろうとるがPythonを扱う、、(その18:Pysharkを使ったGUIアプリケーション)

Posted at

Pysharkを使ってGUIアプリケーションをつくる

tsharkのPythonラッパーであるPysharkを使ってGUIアプリケーションを作ってみた。要するに、Wiresharkもどきである。ただ、レベルはかなり劣る。相変わらず、global使用などクオリティの高くないソースコードであるが、ご容赦いただきたい。

最初に見た目から

キャプチャI/Fを選択

image.png
Selectをクリックすると下記ダイアログボックスが表示される。
IF選択.png
キャプチャI/Fを選んで、”Set”をクリック

キャプチャ開始

初期画面で”Start”をクリックしてキャプチャを開始。
Eth&IPのコピー.png
上側Windowに、リアルタイムにキャプチャデータの概要を表示。上側Windowでマウスを動かし、クリックした行の詳細が下側Windowsに表示される。上記は、パケット番号8の例である。

ソースコード

例によって、キーとなる点をメインにコメント。

importとパラメーター

from scapy.all import *
import pyshark
import datetime
import threading
from tkinter import *
import tkinter.ttk as ttk

### Parameters ###
w_width = 900
ifList = []
btn_list = ['Start', 'Stop']
RawData = []
  • ネットワークI/F情報の取り扱いは、PysharkよりScapyの方が扱いやすかったため、それを利用。
  • リアルタイム表示のため、スレッドの利用
  • ifList:ネットワークI/F
  • RawData:キャプチャ生データ

メイン+α

##### Main Program #####
root = Tk()
root.geometry('900x600')
root.title('Netowkr Packet Capture')

# Init
interface_list()
IFidx = 0
doing = 0

...

# Loop
root.mainloop()
  • tkinter基本
  • ネットワークI/F情報取得(inteface_list():後述)
  • 変数初期化(IFidx:ネットワークI/Fインデックス、doing:キャプチャ中か否か)

フレーム(Window)作成(Main内)

# Making Frame
frm1 = Frame(root, width=w_width)
frm1.iflabel = Label(frm1, text='Network I/F: ')
frm1.iflabel.grid(row=0, column=0)
frm1.ifname = Label(frm1, text=ifList[0].name)
frm1.ifname.grid(row=0, column=1)
frm1.dummy1 = Label(frm1, text='\t\t')
frm1.dummy1.grid(row=0, column=2)
frm1.select_btn = ttk.Button(frm1, text='Select', style='W.TButton', command=lambda:select_clk(frm1))
frm1.select_btn.grid(row=0, column=3)
frm1.dummy2 = Label(frm1, text='\t')
frm1.dummy2.grid(row=0, column=4)
frm1.start_btn = ttk.Button(frm1, text=btn_list[doing], style='W.TButton', command=lambda:start_clk())
frm1.start_btn.grid(row=0, column=5)

frm2 = Frame(root, width=w_width)
frm2.prt = Text(frm2, font=("Courier New", 12), width=130, height=15)
frm2.ysc = Scrollbar(frm2, orient=VERTICAL, command=frm2.prt.yview)
frm2.ysc.pack(side=RIGHT, fill="y")
frm2.prt["yscrollcommand"] = frm2.ysc.set
frm2.prt.config(state='disabled')
frm2.prt.pack()
frm2.prt.bindtags(('Text','post-class-bindings', '.', 'all'))
frm2.prt.bind_class("post-class-bindings", "<Button-1>", ShowDetail)

frm3 = Frame(root, width=w_width)
frm3.prt = Text(frm3, font=("Courier New", 12), width=130, height=20, wrap=NONE) # for Horizontal Scroll
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.ysc2 = Scrollbar(frm3, orient=HORIZONTAL, command=frm3.prt.xview)
frm3.ysc2.pack(side=BOTTOM, fill="x")
frm3.prt["xscrollcommand"] = frm3.ysc2.set
frm3.prt.config(state='disabled')
frm3.prt.pack()

frm1.pack()
frm2.pack()
frm3.pack()
  • Windowは3フレームから構成
  • 第1フレーム
    • ネットワークI/F選択ボタンおよびStart/Stopボタン
  • 第2フレーム(テキストボックス)
    • キャプチャデータ表示リアルタイム表示
    • クリックされた行番号を取得する関数(ShowDetail:後述)定義
  • 第3フレーム(テキストボックス)
    • 第2フレームで選択されたキャプチャデータ詳細表示
    • 横スクロール付き

インターフェースリスト作成

### interface_list: Listing usable I/F ###
def interface_list():
    for i in ifaces.data.keys():
        iface = ifaces.data[i]
        flg = int(iface.flags)
        if ((flg & 0x20) == 0) and len(iface.ip):	# 0x20: Disconnected
            ifList.append(iface)
  • Scapyフレームワークを利用して有効(接続中かつIPアドレスあり)なI/Fのみリスト化
    • インターフェースフラグのビット位置0x20は”Disconnect”を意味

キャプチャI/F選択

### set_clk: Getting I/F name for selected I/F (index) ###
def set_clk(dg, idx):
    global IFidx
    IFidx = idx
    frm1.ifname['text'] = ifList[IFidx].name
    dg.destroy()
    return

### select_clk: Dialog Window for Selection of Network I/F ###
def select_clk(frm):
    dlg = Toplevel(frm)
    dlg.title('Select Network I/F')
    dlg.geometry('300x200')
    dlg.grab_set()

    radio = []
    name = []
    selectedIF = IntVar()
    selectedIF.set(0)
    for i in range(len(ifList)):
        radio.append(Radiobutton(dlg, variable=selectedIF, value=i))
        name.append(Label(dlg, text=ifList[i].name))
    [radio[i].grid(row=i, column=0) for i in range(len(ifList))]
    [name[i].grid(row=i, column=1) for i in range(len(ifList))]
    dummy = Label(dlg, text=' ')
    dummy.grid(row=len(ifList), column=0)
    set_btn = ttk.Button(dlg, text='Set', style='W.TButton', command=lambda:set_clk(dlg, selectedIF.get()))
    set_btn.grid(row=len(ifList)+1, column=1)
  • select_clk
    • モーダルダイアログボックス作成
    • 有効なインターフェースリストをラジオボタン表示
    • 設定ボタン”Set”作成
  • set_clk
    • クリックされたI/Fのインデックスを変数IFidexに格納
    • キャプチャ時に利用するI/F名をインターフェースリストから取得
    • ダイアログボックスクローズ

キャプチャ開始(”Start”クリック)

### start_clk: Start of Capture ###
def start_clk():
    global doing
    global capture
    global IFidx
    if doing == 0:
        RawData.clear()
        doing = 1
        dt=datetime.datetime.now()
        file = str(dt.strftime('%Y%m%d_%H%M%S')) +'.pcap'
        capture = pyshark.LiveCapture(interface=ifList[IFidx].name, output_file=file) # Must be here.
        th_Monitor = threading.Thread(target=CaptureThread, daemon=True)	# Run Thread
        th_Monitor.start()
    else:
        doing = 0
    frm1.start_btn['text'] = btn_list[doing]
  • キャプチャ開始前(doing = 0)
    • フラグdoingを"1"にセット
    • キャプチャファイル作成
    • キャプチャスレッド呼び出し
  • キャプチャ中(doing = 1)
  • ボタン名を変更(Start↔Stop)

キャプチャスレッド

### CaptureThread: Thread for capturing network packet ###
def CaptureThread():
    global doing
    global capture
    frm3.prt.config(state='normal')
    frm3.prt.delete(0.0, END)
    frm3.prt.config(state='disabled')
    frm2.prt.config(state='normal')
    frm2.prt.delete(0.0, END)
    i = 1
    # https://stackoverflow.com/questions/57099396/continuously-capture-packets-in-pyshark
    for raw_packet in capture.sniff_continuously():
        RawData.append(raw_packet)
        try:	# IP
            out = str(i) + ' ' + str(raw_packet.sniff_time) + ' Src:' + raw_packet.ip.src \
                + ' Dst:' + raw_packet.ip.dst + ' Proto:' + raw_packet.ip.proto
        except:	# non IP -> Ether
            out = str(i) + ' ' + str(raw_packet.sniff_time) + ' Src:' + raw_packet.eth.src \
                + ' Dst:' + raw_packet.eth.dst + ' Proto:' + raw_packet.eth.type
        frm2.prt.insert(END, out + '\n')
        frm2.prt.see('end')
        i += 1
        if doing == 0:	# "Stop" clicked
            break
    frm2.prt.config(state='disabled')
  • テキストボックス(第2フレームおよび第3フレーム)クリア
  • 連続キャプチャ(sniff_continuously())
  • キャプチャデータを変数RawDataに随時追加
  • IPパケットとそれ以外とで出力内容(概要)を変化させる
  • テキストボックス(第2フレーム)に表示

パケット詳細

### ShowDetail: Detail of Packet ###
def ShowDetail(event):
    pos = frm2.prt.index(INSERT)
    line = int(pos.split('.')[0])
    #print(line)
    #print(RawData[line-1])
    frm3.prt.config(state='normal')
    frm3.prt.delete(0.0, END)
    out = '<< Packet Number ' + str(line) + ' >>' + '\n\n'
    frm3.prt.insert(END, out)
    # Removal of special escape sequence
    out = str(RawData[line-1]).replace('\x1b\x5b\x30\x6d', '')
    out = out.replace('\x1b\x5b\x31\x6d', '')
    out = out.replace('\x1b\x5b\x33\x32\x6d', '')
    out = out.replace('\x1b\x5b\x33\x33\x6d', '')
    frm3.prt.insert(END, out)
    frm3.prt.config(state='disabled')
  • 第2フレームテキストボックスでクリックされた行の取得
  • 第3フレームテキストボックスの内容クリア
  • 生キャプチャデータに含まれるエスケープシーケンスの削除
  • データの表示

全体

from scapy.all import *
import pyshark
import datetime
import threading
from tkinter import *
import tkinter.ttk as ttk

### Parameters ###
w_width = 900
ifList = []
btn_list = ['Start', 'Stop']
RawData = []

### interface_list: Listing usable I/F ###
def interface_list():
    for i in ifaces.data.keys():
        iface = ifaces.data[i]
        flg = int(iface.flags)
        if ((flg & 0x20) == 0) and len(iface.ip):	# 0x20: Disconnected
            ifList.append(iface)

### set_clk: Getting I/F name for selected I/F (index) ###
def set_clk(dg, idx):
    global IFidx
    IFidx = idx
    frm1.ifname['text'] = ifList[IFidx].name
    dg.destroy()
    return

### select_clk: Dialog Window for Selection of Network I/F ###
def select_clk(frm):
    dlg = Toplevel(frm)
    dlg.title('Select Network I/F')
    dlg.geometry('300x200')
    dlg.grab_set()

    radio = []
    name = []
    selectedIF = IntVar()
    selectedIF.set(0)
    for i in range(len(ifList)):
        radio.append(Radiobutton(dlg, variable=selectedIF, value=i))
        name.append(Label(dlg, text=ifList[i].name))
    [radio[i].grid(row=i, column=0) for i in range(len(ifList))]
    [name[i].grid(row=i, column=1) for i in range(len(ifList))]
    dummy = Label(dlg, text=' ')
    dummy.grid(row=len(ifList), column=0)
    set_btn = ttk.Button(dlg, text='Set', style='W.TButton', command=lambda:set_clk(dlg, selectedIF.get()))
    set_btn.grid(row=len(ifList)+1, column=1)

### ShowDetail: Detail of Packet ###
def ShowDetail(event):
    pos = frm2.prt.index(INSERT)
    line = int(pos.split('.')[0])
    #print(line)
    #print(RawData[line-1])
    frm3.prt.config(state='normal')
    frm3.prt.delete(0.0, END)
    out = '<< Packet Number ' + str(line) + ' >>' + '\n\n'
    frm3.prt.insert(END, out)
    # Removal of special escape sequence
    out = str(RawData[line-1]).replace('\x1b\x5b\x30\x6d', '')
    out = out.replace('\x1b\x5b\x31\x6d', '')
    out = out.replace('\x1b\x5b\x33\x32\x6d', '')
    out = out.replace('\x1b\x5b\x33\x33\x6d', '')
    frm3.prt.insert(END, out)
    frm3.prt.config(state='disabled')

### CaptureThread: Thread for capturing network packet ###
def CaptureThread():
    global doing
    global capture
    frm3.prt.config(state='normal')
    frm3.prt.delete(0.0, END)
    frm3.prt.config(state='disabled')
    frm2.prt.config(state='normal')
    frm2.prt.delete(0.0, END)
    i = 1
    for raw_packet in capture.sniff_continuously():
        RawData.append(raw_packet)
        try:	# IP
            out = str(i) + ' ' + str(raw_packet.sniff_time) + ' Src:' + raw_packet.ip.src \
                + ' Dst:' + raw_packet.ip.dst + ' Proto:' + raw_packet.ip.proto
        except:	# non IP -> Ether
            out = str(i) + ' ' + str(raw_packet.sniff_time) + ' Src:' + raw_packet.eth.src \
                + ' Dst:' + raw_packet.eth.dst + ' Proto:' + raw_packet.eth.type
        frm2.prt.insert(END, out + '\n')
        frm2.prt.see('end')
        i += 1
        if doing == 0:	# "Stop" clicked
            break
    frm2.prt.config(state='disabled')

### start_clk: Start of Capture ###
def start_clk():
    global doing
    global capture
    global IFidx
    if doing == 0:
        RawData.clear()
        doing = 1
        dt=datetime.datetime.now()
        file = str(dt.strftime('%Y%m%d_%H%M%S')) +'.pcap'
        capture = pyshark.LiveCapture(interface=ifList[IFidx].name, output_file=file) # Must be here.
        th_Monitor = threading.Thread(target=CaptureThread, daemon=True)	# Run Thread
        th_Monitor.start()
    else:
        doing = 0
    frm1.start_btn['text'] = btn_list[doing]

##### Main Program #####
root = Tk()
root.geometry('900x600')
root.title('Netowkr Packet Capture')

# Init
interface_list()
IFidx = 0
doing = 0

# Making Frame
frm1 = Frame(root, width=w_width)
frm1.iflabel = Label(frm1, text='Network I/F: ')
frm1.iflabel.grid(row=0, column=0)
frm1.ifname = Label(frm1, text=ifList[0].name)
frm1.ifname.grid(row=0, column=1)
frm1.dummy1 = Label(frm1, text='\t\t')
frm1.dummy1.grid(row=0, column=2)
frm1.select_btn = ttk.Button(frm1, text='Select', style='W.TButton', command=lambda:select_clk(frm1))
frm1.select_btn.grid(row=0, column=3)
frm1.dummy2 = Label(frm1, text='\t')
frm1.dummy2.grid(row=0, column=4)
frm1.start_btn = ttk.Button(frm1, text=btn_list[doing], style='W.TButton', command=lambda:start_clk())
frm1.start_btn.grid(row=0, column=5)

frm2 = Frame(root, width=w_width)
frm2.prt = Text(frm2, font=("Courier New", 12), width=130, height=15)
frm2.ysc = Scrollbar(frm2, orient=VERTICAL, command=frm2.prt.yview)
frm2.ysc.pack(side=RIGHT, fill="y")
frm2.prt["yscrollcommand"] = frm2.ysc.set
frm2.prt.config(state='disabled')
frm2.prt.pack()
frm2.prt.bindtags(('Text','post-class-bindings', '.', 'all'))
frm2.prt.bind_class("post-class-bindings", "<Button-1>", ShowDetail)

frm3 = Frame(root, width=w_width)
frm3.prt = Text(frm3, font=("Courier New", 12), width=130, height=20, wrap=NONE) # for Horizontal Scroll
frm3.ysc = Scrollbar(frm3, orient=VERTICAL, command=frm3.prt.yview)
frm3.ysc.pack(side=RIGHT, fill="y")
frm3.prt["yscrollcommand"] = frm3.ysc.set
frm3.ysc2 = Scrollbar(frm3, orient=HORIZONTAL, command=frm3.prt.xview)
frm3.ysc2.pack(side=BOTTOM, fill="x")
frm3.prt["xscrollcommand"] = frm3.ysc2.set
frm3.prt.config(state='disabled')
frm3.prt.pack()

frm1.pack()
frm2.pack()
frm3.pack()

# Loop
root.mainloop()

課題

遭遇した課題や問題など。

  • 第2フレームでクリックした行のハイライト化
  • 「RuntimeError('This event loop is already running')」が時々発生
    • スレッドとLiveCaptureとの関連と推測

EOF

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2