私の自宅は流行りのメッシュWifiで、Wifi 6子機が1つのWiFi 6環境です。Wifi 6親機は、ドコモのhome 5G HR02でを使用しています。
Wifiの子機のひとつは、テレビ等につなぐための有線LANの口にもしています。これにより、LANケーブルをはいずりまわす距離が激減しました。
さて、20年以上の心配事として、Wifiに不正ログインをされていても気が付きにくいものがあります。
Wifiへの不正ログインの心配そのものが取り越し苦労な気はしますが、自分の性格的に、心配は消えません。DDoS攻撃の踏み台にされただけなのに、主犯と疑われたらどうしようなんて、思ってしまうこともあります。LSIへのハードウェアトロイによるセキュリティホールもないとは言い切れないですし。
そこで、幾度かChatGPTとやり取りして、PingやARPやポートなどもたたいて、1分ごとにログを残すPythonスクリプトを作ってみました。
この手のことをChatGPTに作らせているユーザーは多いものとみえ、要求を数回繰り返しただけですんなりできました。
課題1
WiFi 6ということをプロンプトしていなかったので、5GHz/2.4GHzの切り替えにより親機Macアドレスが変わることを考慮していないコードになっていますが、親機IPアドレスは固定されているので、問題は解決できます。
課題2
ログ取得の定期性が今ひとつ賢くないのですが、これもプロンプトしなかっただけであり、ChatGPTが考えてくれるでしょう。
課題3
最も残念なのは、macアドレスやhost nameが得られないノードがあったり、macアドレスが変わるノードがあったりで、そのノードが不正かどうかがわからないことです。自宅であればiPhoneがそうです。
この結果、集計しても、今ひとつな結果です。
課題4
取り越し苦労かもしれないですが、このスクリプトを実行している本機そのものを、不正マシンとして判断されてしまう可能性も感じてます。Wifi親機/子機/ルータ類/その他各機の機能によってはあり得えそうですし。
根本的な解決は、天から答えが降ってくるのを待つとして、Pythonスクリプトを掲載しておきます。
使い方は、
python lan_logger.py --subnet 192.168.128.0/24 --log lan_nodes_log.csv
#!/usr/bin/env python3
import argparse
import csv
import ipaddress
import os
import platform
import re
import socket
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
OS_NAME = platform.system().lower()
IS_WINDOWS = "windows" in OS_NAME
def now_ts() -> str:
# 年月日時分秒(秒まで)
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# --- Ping(再試行) ---
def ping_once(ip: str, timeout_ms: int = 800) -> bool:
if IS_WINDOWS:
cmd = ["ping", "-n", "1", "-w", str(timeout_ms), ip]
else:
cmd = ["ping", "-c", "1", ip]
r = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return r.returncode == 0
def ping(ip: str, tries: int = 2, timeout_ms: int = 800) -> bool:
for _ in range(tries):
if ping_once(ip, timeout_ms):
return True
return False
# --- TCP補助(Ping無視端末対策) ---
def tcp_probe(ip: str, ports: tuple[int, ...], timeout_s: float = 0.35) -> bool:
for p in ports:
try:
with socket.create_connection((ip, p), timeout=timeout_s):
return True
except Exception:
pass
return False
def is_alive(ip: str, ping_tries: int, timeout_ms: int, tcp_ports: tuple[int, ...]) -> bool:
return ping(ip, tries=ping_tries, timeout_ms=timeout_ms) or tcp_probe(ip, tcp_ports)
# --- ARP ---
def get_arp_table() -> dict[str, str]:
table: dict[str, str] = {}
try:
if IS_WINDOWS:
out = subprocess.check_output(["arp", "-a"], text=True, errors="ignore")
for ip, mac in re.findall(r"(\d+\.\d+\.\d+\.\d+)\s+([0-9a-fA-F-]{17})", out):
table[ip] = mac.replace("-", ":").lower()
else:
out = subprocess.check_output(["arp", "-a"], text=True, errors="ignore")
for ip, mac in re.findall(r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]{17})", out):
table[ip] = mac.lower()
except Exception:
pass
return table
def get_hostname(ip: str) -> str:
try:
return socket.gethostbyaddr(ip)[0]
except Exception:
return ""
def ensure_csv_header(path: str):
exists = os.path.exists(path)
if not exists:
with open(path, "w", newline="", encoding="utf-8") as f:
w = csv.writer(f)
# 追記ログ(1行=1ノード=1観測)
w.writerow(["timestamp", "subnet", "ip", "mac", "hostname"])
def scan_once(subnet: str, max_workers: int, ping_tries: int, timeout_ms: int, tcp_ports: tuple[int, ...]):
net = ipaddress.ip_network(subnet, strict=False)
alive: list[str] = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
futures = {ex.submit(is_alive, str(ip), ping_tries, timeout_ms, tcp_ports): str(ip) for ip in net.hosts()}
for f in as_completed(futures):
ip = futures[f]
try:
if f.result():
alive.append(ip)
except Exception:
pass
alive.sort(key=lambda s: list(map(int, s.split("."))))
arp = get_arp_table()
rows = []
for ip in alive:
mac = arp.get(ip, "")
host = get_hostname(ip)
rows.append((ip, mac, host))
return rows
def append_log(path: str, ts: str, subnet: str, rows: list[tuple[str, str, str]]):
# ログは「追記」する
with open(path, "a", newline="", encoding="utf-8") as f:
w = csv.writer(f)
for ip, mac, host in rows:
w.writerow([ts, subnet, ip, mac, host])
def sleep_to_next_minute():
# ちょうど分境界に寄せる(秒が揺れにくい)
now = time.time()
next_min = (int(now) // 60 + 1) * 60
time.sleep(max(0, next_min - now))
def main():
ap = argparse.ArgumentParser(description="Log LAN nodes every minute (append CSV).")
ap.add_argument("--subnet", required=True, help='e.g. "192.168.1.0/24"')
ap.add_argument("--log", default="lan_nodes_log.csv", help="CSV log path (append)")
ap.add_argument("--interval", type=int, default=60, help="seconds (default 60)")
ap.add_argument("--max-workers", type=int, default=128)
ap.add_argument("--ping-tries", type=int, default=2)
ap.add_argument("--timeout-ms", type=int, default=800, help="ping timeout (Windows only)")
ap.add_argument("--tcp-ports", default="443,80,22", help="comma ports used when ping fails")
ap.add_argument("--runs", type=int, default=0, help="0=forever, else number of iterations")
args = ap.parse_args()
tcp_ports = tuple(int(x.strip()) for x in args.tcp_ports.split(",") if x.strip())
ensure_csv_header(args.log)
run_count = 0
while True:
ts = now_ts()
rows = scan_once(args.subnet, args.max_workers, args.ping_tries, args.timeout_ms, tcp_ports)
append_log(args.log, ts, args.subnet, rows)
run_count += 1
print(f"[{ts}] logged {len(rows)} nodes -> {args.log}")
if args.runs and run_count >= args.runs:
break
# interval=60想定だが、指定秒で回す
if args.interval == 60:
sleep_to_next_minute()
else:
time.sleep(args.interval)
if __name__ == "__main__":
main()