python版 Aterm検索ツール (スレッドセーフversion)
1.はじめに
前回のあらすじ:企業が配布しているツールがhtaでたまげた。→pythonが良さそうじゃない?→置き換えました。
拙作をご覧いただいた方々、ありがとうございました。
htaからの置き換えということでpythonとして変な部分があったりしないか、勉強し直しました。その成果も踏まえて、そもそもスレッドセーフじゃないとか、グローバル変数が跋扈していたりとか酷いものだったので、書き直しました。相変わらずGUI部分はごった返してますが、今回もWindows、Mac、Linuxでの動作を想定しています。変更点とポイントは後述します。
2.コード
開発環境:
Windows11 24H2
python 3.10.6
動作確認:
Windows11 24H2
_/_/_/_/コード折りたたみ(約400行)_/_/_/_/
# ネットワーク関連
import psutil
import socket
import netifaces
import ipaddress
# 非同期処理関連
import threading
import asyncio
import aiohttp
# スレッド間通信関連
import queue
import json
# UI関連
import tkinter as tk
from tkinter import font
from tkinter import ttk
from tkinter import messagebox
import webbrowser
# グローバル定数
VERSION = "2.1"
MODEARRAY = [
"ブリッジ", "PPPoEルータ", "ローカルルータ",
"無線LAN子機", "無線LAN中継機", "MAP-E",
"464XLAT", "DS-Lite", "固定IP1", "複数固定IP", "メッシュ中継機"
]
MAX_PREFIX = 24 # MAX_DEVICE_NUM代替
TIMEOUT = 5
GUI_UPDATE_TIME = 500
# スクロールバー付テーブルクラス
# appの子要素canvasがscrollbarとリンクし、canvasの子要素のtable内をスクロール領域とする
class AtermTable:
def __init__(self, app, row):
self.labels = []
self.style = ttk.Style()
self.style.configure("White.TFrame", background="#ffffff")
self.style.configure("White.TLabel", background="#ffffff")
self.canvas = tk.Canvas(app, bg="#ffffff")
self.canvas.grid(row=row, column=0, sticky="nsew")
self.scrollbar = ttk.Scrollbar(
app, orient="vertical", command=self.canvas.yview
)
self.scrollbar.grid(row=row, column=1, sticky="ns")
# canvasとscrollbarをリンク
self.canvas.configure(yscrollcommand=self.scrollbar.set)
self.table = ttk.Frame(self.canvas, style="White.TFrame")
self.table.bind(
"<Configure>",
lambda e: self.canvas.configure(
scrollregion=self.canvas.bbox("all")
)
)
self.canvas.create_window((0, 0), window=self.table, anchor="nw")
custom_font = font.Font(size=14, weight="bold")
ttk.Label(
self.table, text="機種名", style="White.TLabel",
font=custom_font, anchor="center"
).grid(row=0, column=0, padx=40)
ttk.Label(
self.table, text="動作モード", style="White.TLabel",
font=custom_font, anchor="center"
).grid(row=0, column=1, padx=40)
ttk.Label(
self.table, text="クイック設定Web", style="White.TLabel",
font=custom_font, anchor="center"
).grid(row=0, column=2, padx=40)
self.ttk_row_num = 1
# AddTable関数相当
def add_table(self, string1, string2, string3, link):
custom_font = font.Font(size=14)
name_label = ttk.Label(
self.table, text=string1, style="White.TLabel",
font=custom_font, anchor="center"
)
name_label.grid(row=self.ttk_row_num, column=0, padx=20)
self.labels.append(name_label)
mode_label = ttk.Label(
self.table, text=string2, style="White.TLabel",
font=custom_font, anchor="center"
)
mode_label.grid(row=self.ttk_row_num, column=1, padx=20)
self.labels.append(mode_label)
hyperlink_label = HyperlinkLabel(
self.table, text=string3, url=link,
style="White.TLabel", anchor="center"
)
hyperlink_label.grid(row=self.ttk_row_num, column=2, padx=20)
self.labels.append(hyperlink_label)
self.ttk_row_num += 1
# ClearTable関数相当
def clear_table(self):
for label in self.labels:
if label is not None:
label.grid_forget()
self.ttk_row_num = 1
# ハイパーリンク作成クラス
# ttk.Labelクラス継承
class HyperlinkLabel(ttk.Label):
def __init__(self, master=None, text="", url="", **kwargs):
super().__init__(master, text=text, **kwargs)
self.url = url
self.configure(
foreground="#176eb3", cursor="hand2",
font=("Helvetica", 14, "underline")
)
self.bind("<Button-1>", self.open_link)
def open_link(self, event):
webbrowser.open(self.url)
# ローカルネットワーク情報取得
# return:ipaddressオブジェクト network
def get_local_ip():
network = None
# デフォルトゲートウェイの取得
try:
gateways = netifaces.gateways()
default_gateway = gateways.get(netifaces.AF_INET)
if default_gateway:
gw = ipaddress.ip_address(default_gateway[0][0])
else:
raise AttributeError("Default gateway not found")
except AttributeError as e:
messagebox.showerror(
f"エラー:{e}",
"デフォルトゲートウェイ情報の取得に失敗しました。"
"本ツールを終了し、ネットワークに接続していることを確認後、再度実行してください。\n"
)
return
net_info = psutil.net_if_addrs()
net_stats = psutil.net_if_stats()
for interface, stats in net_stats.items():
if stats.isup: # インターフェースが有効であることを確認
for addr in net_info.get(interface, []):
if addr.family == socket.AF_INET:
network = ipaddress.ip_network(
f"{addr.address}/{addr.netmask}", strict=False
)
# デフォルトゲートウェイを含むプライベートネットワークのみを成功とする
if network.is_private and gw in network:
return network
# Atermの動作モード取得
async def get_sys_mode(session, ip):
url = f"http://{ip}/aterm_httpif.cgi/getparamcmd_no_auth"
data = "REQ_ID=SYS_MODE_GET"
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
async with session.post(
url=url, data=data, headers=headers, timeout=TIMEOUT
) as response:
if response.status == 200:
response_text = await response.text()
response_text = response_text.strip()
return response_text
# Atermの製品名取得
async def get_product_name(session, ip):
url = f"http://{ip}/aterm_httpif.cgi/getparamcmd_no_auth"
data = "REQ_ID=PRODUCT_NAME_GET"
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
async with session.post(
url=url, data=data, headers=headers, timeout=TIMEOUT
) as response:
if response.status == 200:
response_text = await response.text()
response_text = response_text.strip()
return response_text
# 検索タスクの最小単位
async def search_task(session, ip, status_queue):
product_name = None
mode_name = None
response = await get_product_name(session, ip)
if response is not None:
# responseは適切な処理の場合"PRODUCT_NAME=製品名"を持つ
if response.split("=")[0] == "PRODUCT_NAME":
product_name = response.split("=")[1]
if product_name is not None:
response = await get_sys_mode(session, ip)
# modeの数字はAtermの機械依存なので想定しない数値が来た場合のケア
mode = response.split("=")[1]
try:
mode_name = MODEARRAY[int(mode)].strip()
except IndexError:
mode_name = "Unknown Mode"
if product_name is not None and mode_name is not None:
# 検索結果通信
data = {
"flag": "ADD_TABLE",
"productname": product_name,
"mode_name": mode_name,
"ip": ip
}
status_queue.put(json.dumps(data))
# 非同期処理スレッドの親
# 元コードの再帰的実行ではなく並行実行するので進捗表示を変更
async def aterm_search_index(network, status_queue):
async with aiohttp.ClientSession() as session:
tasks = []
for ip in network.hosts():
ip = str(ip) # IPv4アドレス型から文字列型に変換
tasks.append(search_task(session, ip, status_queue))
# 登録された非同期処理を並行実行し全て終わるまで待つ
await asyncio.gather(*tasks, return_exceptions=True)
# 検索終了処理通信
data = {
"flag": "COMPLETE",
"start": str(network.network_address + 1),
"end": str(network.broadcast_address - 1)
}
status_queue.put(json.dumps(data))
# MAX_PREFIXのプレフィクス長にネットワークの検索範囲を制限する
def set_max_device(network):
if network.prefixlen < MAX_PREFIX:
new_prefix = MAX_PREFIX
network = ipaddress.IPv4Network(
(int(network.network_address), new_prefix)
)
messagebox.showinfo(
"Aterm検索ツール",
"検索対象のネットワーク範囲が大きいため、検索できない場合があります。"
)
return network
# StartAtermSearch関数相当
def start_aterm_search(loop, status_queue):
# ネットワーク情報の取得
network = get_local_ip()
if network is not None:
network = set_max_device(network)
status_queue.put('{"flag":"CLEAR_TABLE"}')
status_queue.put('{"flag":"BUTTON_OFF"}')
data = {
"flag": "SEARCHING",
"start": str(network.network_address + 1),
"end": str(network.broadcast_address - 1)
}
status_queue.put(json.dumps(data))
# 非同期タスクの開始
asyncio.run_coroutine_threadsafe(
aterm_search_index(network, status_queue), loop
)
else:
messagebox.showerror(
"エラー",
"ネットワーク情報の取得に失敗しました。"
"本ツールを終了し、Atermに接続していることを確認後、再度実行してください。\n"
"再実行後も同様なエラーが表示される場合は、"
"ご利用のパソコンにIPアドレス、ネットマスクがアサインされているかご確認ください。\n"
)
def start_event_loop(loop):
asyncio.set_event_loop(loop)
loop.run_forever()
# スレッド間通信受信待ち
def update_aterm_search(
app, status_queue, search_button, status_str, search_table
):
try:
while not status_queue.empty():
status = status_queue.get_nowait()
data = json.loads(status)
if data["flag"] == "SEARCHING":
status_str.config(
text=f'{data["start"]}-{data["end"]} 検索中...'
)
elif data["flag"] == "COMPLETE":
status_str.config(
text=f'{data["start"]}-{data["end"]} 検索完了'
)
messagebox.showinfo("Aterm検索ツール", "検索完了")
search_button.config(state=tk.ACTIVE)
elif data["flag"] == "BUTTON_OFF":
search_button.config(state=tk.DISABLED)
elif data["flag"] == "ADD_TABLE":
search_table.add_table(
data["productname"], data["mode_name"],
data["ip"], f'http://{data["ip"]}'
)
elif data["flag"] == "CLEAR_TABLE":
search_table.clear_table()
except KeyError:
print("スレッド間通信エラー")
except queue.Empty:
pass
app.after(
GUI_UPDATE_TIME, update_aterm_search,
app, status_queue, search_button, status_str, search_table
)
if __name__ == "__main__":
status_queue = queue.Queue()
app = tk.Tk()
app.title("Aterm検索ツール")
app.geometry("800x640")
app.config(bg="#ffffff")
app.grid_columnconfigure(0, weight=1)
app.grid_rowconfigure(6, weight=1)
h1 = tk.Label(app, text="Aterm検索ツール", padx=20, pady=10, anchor="w")
h1.grid(row=0, columnspan=2, sticky="ew")
h1.configure(
bg="#176eb3", fg="#ffffff",
font=font.Font(size=16, weight="bold")
)
line = tk.Label(app, text=" ", bg="#5fb6f5", font=font.Font(size=1))
line.grid(row=1, columnspan=2, sticky="ew")
copy_r = tk.Label(
app, text="元ネタ NEC Platforms, Ltd. 2001-2020", anchor="e"
)
copy_r.grid(row=2, columnspan=2, sticky="ew")
copy_r.configure(bg="#ffffff", font=font.Font(size=12, weight="bold"))
ver_str = tk.Label(app, text="Ver."+VERSION, anchor="e")
ver_str.grid(row=3, columnspan=2, sticky="e")
ver_str.configure(bg="#ffffff", font=font.Font(size=12, weight="bold"))
search_button = tk.Button(
app, text="Aterm検索",
command=lambda: start_aterm_search(loop, status_queue),
padx=3, pady=3
)
search_button.grid(row=4, padx=20, sticky="w")
search_button.configure(font=font.Font(size=10, weight="bold"))
status_str = tk.Label(app, text="")
status_str.grid(row=5, padx=20, sticky="w")
status_str.configure(bg="#ffffff", font=font.Font(size=12, weight="bold"))
search_table = AtermTable(app=app, row=6)
# 非同期イベントループを別スレッドで開始
loop = asyncio.new_event_loop()
thread = threading.Thread(target=start_event_loop, args=(loop,))
thread.daemon = True
thread.start()
# キューからのUI更新を定期的に行う
app.after(
GUI_UPDATE_TIME, update_aterm_search,
app, status_queue, search_button, status_str, search_table
)
app.mainloop()
3.変更点とポイント
3.1.非同期処理スレッドの作成方法
前回もメインスレッドにappループ、サブスレッドにloopループという形でそれぞれに処理をしていましたが、今回はloopに対して、非同期処理を割り当てる際にasyncio.run_coroutine_threadsafe()を用いています。前回のコードでも動作はしていたのですが、スレッドセーフの文言が入っている通り、こちらのほうがスレッドセーフならしいです。
3.2.スレッド間通信
問題はメインスレッドのtkinterオブジェクトをサブスレッドから直接操作していた点です。これについてはメインスレッド側でupdate_aterm_search()がqueueを待ち構える形に変更しました。GUIの操作をしたい場合は、queueでjson形式のデータを送信し、update_aterm_search()でtkinterオブジェクトの操作をする事としました。queue以外にもスレッド間通信の実装方法はあるそうですが、スレッドセーフなのはqueueなそうです。余談ですが、python3.10以降ではswitch文が使えるらしいので、気になるならupdate_aterm_search()の中身は書き換えてもいいでしょう。
3.3.各関数内からのグローバル変数の駆除
pythonには自動テストのモジュールとしてunittestやpytestがあると知って、色々と触りました。しかし、グローバル変数が邪魔で書きにくいにもほどがありました。htaからの直訳的なものとはいえ横着はだめですね。テストの書きやすさという観点でもコードの品質は測れると言えるかもしれません。
4.あとがき
これで満足の行く完成版なのですが、GUI部分等をInternetExplorerにまかせていいhtaと違って、pythonでは結構な長さのコードになってしまいました。マイクロソフトはhtaでやっていたことを、現代の環境で簡単に置き換えできるようなものを作ってくれたらいいのにね。
それと今回始めてpython触りましたが、親しみやすく良い言語だと思いました。
なんか作ってまた投稿できたらいいなぁ。
以上です。