ひきつづき『サイバーセキュリティプログラミング第2版』の読書メモです。
3.4 ICMP のパース より
P63 Scanner クラスの sniff メソッドを抜粋。
def sniff(self):
hosts_up = set([f'{str(self.host)} *'])
try:
while True:
# パケットの読み込み
raw_buffer = self.socket.recvfrom(65535)[0]
# バッファーの最初の20バイトからIP構造体を作成
ip_header = IP(raw_buffer[0:20])
# ICMPであればそれを処理
if ip_header.protocol == "ICMP":
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + 8]
icmp_header = ICMP(buf)
# コードとタイプが3であるかチェック
if icmp_header.code == 3 and icmp_header.type == 3:
if ipaddress.ip_address(ip_header.src_address) in ipaddress.IPv4Network(SUBNET):
# マジック文字列を含むか確認
if raw_buffer[len(raw_buffer) - len(MESSAGE): ] == bytes(MESSAGE, 'utf8'):
tgt = str(ip_header.src_address)
if tgt != self.host and tgt not in hosts_up:
hosts_up.add(str(ip_header.src_address))
print(f'Host Up: {tgt}')
# CTRL-Cが押された際の処理を定義
except KeyboardInterrupt:
if os.name == 'nt':
self.socket.ioctl(socket.SIO_RCVALL, socket.RCVALL_OFF)
print('\nUser interrupted.')
if hosts_up:
print(f'\n\nSummary: Hosts up on {SUBNET}')
for host in sorted(hosts_up):
print(f'{host}')
print('')
sys.exit()
UDPパケットを読み込んで、「存在が確認されたホスト」を登録していくメソッドです。
登録条件は、読み込んだパケットが特定のヘッダーを持つ ICMP パケットでであり、かつ、特定のマジック文字列を持ち、かつ、送信元アドレスが自身のアドレスではなく、かつ、未登録のホストであるということです。
無限ループを抜けるには CTRL-C を入力します。
発見したホストを標準出力に表示し、最後に sys.exit() でプログラム全体が終了します。
さて、このメソッドは読み込んだパケットデータから IPヘッダの切り出し、IPヘッダの操作、ICMPパケットの切り出し、ICMP ヘッダの操作 の方法を学ぶことができます。
普通のアプリケーションではほとんどみかけることのない、なかなかマニアックなコードです。
それにしても try および while ループの上に if の制御構造が 5 段もあり、階層が深すぎて読みにくいですね。
あんまりなので少し手直ししてみました。
from typing import Final
BUFSIZE: Final[int] = 65535
(略)
def sniff(self):
hosts_up = set([f'{str(self.host)} *'])
try:
while True:
# パケットの読み込み
raw_buffer = self.socket.recvfrom(BUFSIZE)[0]
# バッファーの最初の20バイトからIP構造体を作成
ip_header = IP(raw_buffer[0:20])
# 送信元アドレスがスキャン対象サブネットに含まれないときはスキップ
if ipaddress.ip_address(ip_header.src_address) not in ipaddress.IPv4Network(SUBNET):
continue
# 送信元アドレスが自身のアドレスか、すでに登録済みの場合はスキップ
tgt = str(ip_header.src_address)
if tgt == self.host or tgt in hosts_up:
continue
# ICMPのパケットでなければスキップ
if ip_header.protocol != "ICMP":
continue
# ICMP ヘッダの切り出し
offset = ip_header.ihl * 4
buf = raw_buffer[offset:offset + 8]
icmp_header = ICMP(buf)
# コードとタイプがいずれも 3 でないときはスキップ
if icmp_header.code != 3 or icmp_header.type != 3:
continue
# マジック文字列を含まないときはスキップ
if raw_buffer[len(raw_buffer) - len(MESSAGE): ] != bytes(MESSAGE, 'utf8'):
continue
# 送信元アドレスを登録
hosts_up.add(tgt)
print(f'Host Up: {tgt}')
(以下略)
行数は増えましたが、階層がフラットになりより読みやすくなったのではないでしょうか。
送信元アドレスのチェックはもっと早くに行った方が無駄な処理が減って効果的なのではないかと考え、ICMP パケット関連の処理の前に移動しました。
総じてこの著者は少ないコード量でやりたいことを完結に実装するという主義の方のようです。
非常に優秀な方なのでしょう。
ところで最初の set 型の変数 hosts_up の初期化のところ、これは何のおまじないだろう?
hosts_up = set([f'{str(self.host)} *'])
これをやると例えば self.host が '192.168.1.2' の場合には
>>> hosts_up
{'192.168.1.2 *'}
となります。
ちょっと考えてみましたが結局わかりませんでした。