0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

メッシュWifiへの不正侵入検知を試してみましたが、なかなか難しい

0
Posted at

 私の自宅は流行りのメッシュWifiで、Wifi 6子機が1つのWiFi 6環境です。Wifi 6親機は、ドコモのhome 5G HR02でを使用しています。
 Wifiの子機のひとつは、テレビ等につなぐための有線LANの口にもしています。これにより、LANケーブルをはいずりまわす距離が激減しました。
 さて、20年以上の心配事として、Wifiに不正ログインをされていても気が付きにくいものがあります。
 Wifiへの不正ログインの心配そのものが取り越し苦労な気はしますが、自分の性格的に、心配は消えません。DDoS攻撃の踏み台にされただけなのに、主犯と疑われたらどうしようなんて、思ってしまうこともあります。LSIへのハードウェアトロイによるセキュリティホールもないとは言い切れないですし。
 そこで、幾度かChatGPTとやり取りして、PingやARPやポートなどもたたいて、1分ごとにログを残すPythonスクリプトを作ってみました。
 この手のことをChatGPTに作らせているユーザーは多いものとみえ、要求を数回繰り返しただけですんなりできました。

課題1

 WiFi 6ということをプロンプトしていなかったので、5GHz/2.4GHzの切り替えにより親機Macアドレスが変わることを考慮していないコードになっていますが、親機IPアドレスは固定されているので、問題は解決できます。

課題2

 ログ取得の定期性が今ひとつ賢くないのですが、これもプロンプトしなかっただけであり、ChatGPTが考えてくれるでしょう。

課題3

 最も残念なのは、macアドレスやhost nameが得られないノードがあったり、macアドレスが変わるノードがあったりで、そのノードが不正かどうかがわからないことです。自宅であればiPhoneがそうです。
 この結果、集計しても、今ひとつな結果です。

課題4

 取り越し苦労かもしれないですが、このスクリプトを実行している本機そのものを、不正マシンとして判断されてしまう可能性も感じてます。Wifi親機/子機/ルータ類/その他各機の機能によってはあり得えそうですし。

 根本的な解決は、天から答えが降ってくるのを待つとして、Pythonスクリプトを掲載しておきます。
 使い方は、

python lan_logger.py --subnet 192.168.128.0/24 --log lan_nodes_log.csv
lan_logger.py
#!/usr/bin/env python3
import argparse
import csv
import ipaddress
import os
import platform
import re
import socket
import subprocess
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

OS_NAME = platform.system().lower()
IS_WINDOWS = "windows" in OS_NAME

def now_ts() -> str:
    # 年月日時分秒(秒まで)
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

# --- Ping(再試行) ---
def ping_once(ip: str, timeout_ms: int = 800) -> bool:
    if IS_WINDOWS:
        cmd = ["ping", "-n", "1", "-w", str(timeout_ms), ip]
    else:
        cmd = ["ping", "-c", "1", ip]
    r = subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
    return r.returncode == 0

def ping(ip: str, tries: int = 2, timeout_ms: int = 800) -> bool:
    for _ in range(tries):
        if ping_once(ip, timeout_ms):
            return True
    return False

# --- TCP補助(Ping無視端末対策) ---
def tcp_probe(ip: str, ports: tuple[int, ...], timeout_s: float = 0.35) -> bool:
    for p in ports:
        try:
            with socket.create_connection((ip, p), timeout=timeout_s):
                return True
        except Exception:
            pass
    return False

def is_alive(ip: str, ping_tries: int, timeout_ms: int, tcp_ports: tuple[int, ...]) -> bool:
    return ping(ip, tries=ping_tries, timeout_ms=timeout_ms) or tcp_probe(ip, tcp_ports)

# --- ARP ---
def get_arp_table() -> dict[str, str]:
    table: dict[str, str] = {}
    try:
        if IS_WINDOWS:
            out = subprocess.check_output(["arp", "-a"], text=True, errors="ignore")
            for ip, mac in re.findall(r"(\d+\.\d+\.\d+\.\d+)\s+([0-9a-fA-F-]{17})", out):
                table[ip] = mac.replace("-", ":").lower()
        else:
            out = subprocess.check_output(["arp", "-a"], text=True, errors="ignore")
            for ip, mac in re.findall(r"\((\d+\.\d+\.\d+\.\d+)\)\s+at\s+([0-9a-fA-F:]{17})", out):
                table[ip] = mac.lower()
    except Exception:
        pass
    return table

def get_hostname(ip: str) -> str:
    try:
        return socket.gethostbyaddr(ip)[0]
    except Exception:
        return ""

def ensure_csv_header(path: str):
    exists = os.path.exists(path)
    if not exists:
        with open(path, "w", newline="", encoding="utf-8") as f:
            w = csv.writer(f)
            # 追記ログ(1行=1ノード=1観測)
            w.writerow(["timestamp", "subnet", "ip", "mac", "hostname"])

def scan_once(subnet: str, max_workers: int, ping_tries: int, timeout_ms: int, tcp_ports: tuple[int, ...]):
    net = ipaddress.ip_network(subnet, strict=False)

    alive: list[str] = []
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        futures = {ex.submit(is_alive, str(ip), ping_tries, timeout_ms, tcp_ports): str(ip) for ip in net.hosts()}
        for f in as_completed(futures):
            ip = futures[f]
            try:
                if f.result():
                    alive.append(ip)
            except Exception:
                pass

    alive.sort(key=lambda s: list(map(int, s.split("."))))

    arp = get_arp_table()
    rows = []
    for ip in alive:
        mac = arp.get(ip, "")
        host = get_hostname(ip)
        rows.append((ip, mac, host))
    return rows

def append_log(path: str, ts: str, subnet: str, rows: list[tuple[str, str, str]]):
    # ログは「追記」する
    with open(path, "a", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        for ip, mac, host in rows:
            w.writerow([ts, subnet, ip, mac, host])

def sleep_to_next_minute():
    # ちょうど分境界に寄せる(秒が揺れにくい)
    now = time.time()
    next_min = (int(now) // 60 + 1) * 60
    time.sleep(max(0, next_min - now))

def main():
    ap = argparse.ArgumentParser(description="Log LAN nodes every minute (append CSV).")
    ap.add_argument("--subnet", required=True, help='e.g. "192.168.1.0/24"')
    ap.add_argument("--log", default="lan_nodes_log.csv", help="CSV log path (append)")
    ap.add_argument("--interval", type=int, default=60, help="seconds (default 60)")
    ap.add_argument("--max-workers", type=int, default=128)
    ap.add_argument("--ping-tries", type=int, default=2)
    ap.add_argument("--timeout-ms", type=int, default=800, help="ping timeout (Windows only)")
    ap.add_argument("--tcp-ports", default="443,80,22", help="comma ports used when ping fails")
    ap.add_argument("--runs", type=int, default=0, help="0=forever, else number of iterations")
    args = ap.parse_args()

    tcp_ports = tuple(int(x.strip()) for x in args.tcp_ports.split(",") if x.strip())

    ensure_csv_header(args.log)

    run_count = 0
    while True:
        ts = now_ts()
        rows = scan_once(args.subnet, args.max_workers, args.ping_tries, args.timeout_ms, tcp_ports)
        append_log(args.log, ts, args.subnet, rows)

        run_count += 1
        print(f"[{ts}] logged {len(rows)} nodes -> {args.log}")

        if args.runs and run_count >= args.runs:
            break

        # interval=60想定だが、指定秒で回す
        if args.interval == 60:
            sleep_to_next_minute()
        else:
            time.sleep(args.interval)

if __name__ == "__main__":
    main()
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?