はじめに
※1コードの全体像は最後に載せています。前書きはいいからコードくれという方はこちら
※2本記事はNSメッセージって?章で解説している①についてです。
ICMPv6のNDPメッセージをPythonで実現したいなと思って調べたところ、
Cでの実装(本来ならrawソケット通信するならCのほうがいい)しか見つからず、
苦戦したので同じ状況の方の助けになればと思い共有します。
今回はNeighbor Solicitation(以下NS)メッセージの送信について扱います。
Neighbor Advertisementメッセージについても記事にできれば、、、
実装環境
VirtualBox(バージョン 7.0.10)上のKali Linux上で動かしています。
また、同一NATネットワーク(VirtualBoxのネットワーク設定の一つ)でdebainを起動し、debianとの通信確認を行っています。
NSメッセージって?
とりあえず小川晃通さんのプロフェッショナルIPv6 を読めばいいと思います。(ちなみに小川晃通さんはyoutubeでも解説動画をあげてらっしゃっていてとても分かりやすいのでぜひ。)
本記事で紹介するヘッダの構造が分からない場合はこの本を読めばわかるので適宜ご確認を。
簡単に説明すると、NSメッセージとは近隣探索プロトコル(NDP)の一種で、
①単に宛先ノードの場所を探す際、(今回はこっち!)
②自身に新しくアドレスを設定する際の重複確認
の際に送信するパケットで、IPv4でいうところのARP要求のようなものです
コード
今回拡張性を持たせるためにscapyといったライブラリを使用せず、
socket,struct,uuidのライブラリでコードを書いています。(uuidはなくても良い)
address_family = socket.AF_INET6
socket_type = socket.SOCK_RAW
protocol = socket.IPPROTO_RAW
soc = socket.socket ( address_family, socket_type, protocol)
まずはアドレスファミリー、ソケットタイプ、プロトコルを指定したソケットオブジェクトを作成します。
プロトコルはIPPROTO_RAWではなくIPPROTO_ICMPV6とするとIPヘッダは自動で作成され、ICMPv6ヘッダから自作していくことになります。
(詳しい挙動はsocketライブラリ参照)
def make_ip_header(src_addr, dst_addr):
ver = 6
traf = 0
flow = 0
payload =
next_header = 58
hop = 1
src_addr = socket.inet_pton(socket.AF_INET6, src_addr)
dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr)
ver_traf_flow = (ver << 28) + (traf <<20) + flow
ip_header = struct.pack("!IHBB16s16s", ver_traf_flow, payload, next_header, hop, src_addr, dst_addr)
return ip_header
説明が必要そうな箇所のみ説明します。
payloadはICMPv6以降のデータが今回32Byteなので32、
hopはNSメッセージはリンクローカル上でのみ(ルータを超えては届かない)ので1、
src_addrとdst_addrはともにアドレス表記のstringからbytesに直すためにptonを使用、
structライブラリのpackでは0.5Byte単位で受け取れないため、
ver,traf,flowの順で合わせて4Byteとして扱うためにビットシフト
それらをまとめてバイト列オブジェクトとして返しています。
(詳しい挙動はstructライブラリ参照 )
def make_icmp_header(tgt_addr,src_addr, dst_addr):
type = 135
code = 0
check = 0
resv = 0
tgt_addr = socket.inet_pton(socket.AF_INET6, tgt_addr)
opt_type = 1
opt_len = 1
opt_lladdr = get_mac_addr ()
checkは後でチェックサムを計算するので一旦0、
Source Link-Layer Addressオプションを用いるため
opt_typeは1、
オプション部分の合計(opt_type+opt_len+opt_lladdr)が8(=sum)Byteであり
opt_lenはsum/8であるため1、
opt_lladdrはNSメッセージを送信するポートのMACアドレスであり、後で紹介するget_mac_addrを用いて自動取得してもいいし、直接
opt_lladdr = b'\x○○\x○○\x○○\x○○\x○○\x○○'#〇に実際のMACアドレスを入れる
の形式で入力しても良いです。(この場合uuidはimportしなくてOK)
windowsならipconfig、linuxならifconfigやip addressで確認できるはず。
src_addr = socket.inet_pton(socket.AF_INET6, src_addr)
dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr)
upper_len = 32
padding = 0
next_header = 58
pad_next = (padding << 8) + next_header
チェックサムは疑似IPヘッダを含めて計算するので、必要になります。
フォーマットはRFC8200 8.1. Upper-Layer Checksums参照
ちなみにこの部分のコードはmake_icmp_header内に書いてあるのでそのつもりで。
src_addr,dst_addrは実際に送信に用いているIPヘッダと同じもの、
upper_lenは今回ICMPv6部分が32Byteであるため32、
structライブラリは3Byte単位で扱えないため、
paddingとnext_header合わせて4Byteとするためにビットシフト
check_packet = struct.pack("!16s16sIIBBHI16sBB6s", src_addr, dst_addr, upper_len, pad_next, type, code, check, resv, tgt_addr, opt_type, opt_len,opt_lladdr)
check = checksum(check_packet)
icmp_header = struct.pack("!BBHI16sBB6s", type, code, check, resv, tgt_addr, opt_type, opt_len, opt_lladdr)
return icmp_header
まずチェックサム用に疑似IPヘッダを含めたパケットを作成し、そのパケットでチェックサムを計算。
チェックサムフィールドに正しい値を挿入したうえで、ICMPv6部分を返す。
def get_mac_addr():
node = uuid.getnode()
mac = uuid.UUID(int=node)
#uuid型からバイナリーに
addr = mac.bytes
#MACアドレス部分のみ取り出す
addr = bytearray(addr)
addr = addr[-6:]
addr = bytes(addr)
return addr
def checksum(packet):
total = 0
num_words = len(packet)//2
for chunk in struct.unpack("!%sH" % num_words, packet[0:num_words*2]):
#2Bずつ加算
total += chunk
#合計が奇数バイトのとき
if(len(packet)%2):
total += ord(packet[-1] << 8)
#下位16bitとオーバーフローした分を足し合わせる
while(total>>16):
total = (total >> 16) + (total & 0xffff)
#1の補数に変換
total = 0xffff - total
return total
解説しようとしたがコメントアウトで説明してたので割愛。
def send(src_addr, dst_addr, tgt_addr):
ip_header = make_ip_header(src_addr, dst_addr)
icmp_header = make_icmp_header(tgt_addr ,src_addr, dst_addr)
packet = ip_header + icmp_header
soc.sendto(packet, (dst_addr, 0))
sendtoの詳しい挙動はsocketライブラリ参照 で
ソースコード
import socket
import struct
import uuid
#自身のインターフェースに使用されているMACアドレスをバイナリ型で取得
def get_mac_addr():
node = uuid.getnode()
mac = uuid.UUID(int=node)
#uuid型からバイナリーに
addr = mac.bytes
#MACアドレス部分のみ取り出す
addr = bytearray(addr)
addr = addr[-6:]
addr = bytes(addr)
return addr
def checksum(packet):
total = 0
num_words = len(packet)//2
for chunk in struct.unpack("!%sH" % num_words, packet[0:num_words*2]):
#2Bずつ加算
total += chunk
#合計が奇数バイトのとき
if(len(packet)%2):
total += ord(packet[-1] << 8)
#下位16bitとオーバーフローした分を足し合わせる
while(total>>16):
total = (total >> 16) + (total & 0xffff)
#1の補数に変換
total = 0xffff - total
return total
address_family = socket.AF_INET6
socket_type = socket.SOCK_RAW
protocol = socket.IPPROTO_RAW
soc = socket.socket ( address_family, socket_type, protocol)
#IPヘッダ作成
def make_ip_header(src_addr, dst_addr):
#IPv6では6
ver = 6 #0.5B
traf = 0 #1B
flow = 0 #2.5B
payload = 32 #2B
#ICMPv6では58
next_header = 58 #1B
#リンクローカル上なので1
hop = 1 #1B
src_addr = socket.inet_pton(socket.AF_INET6, src_addr) #16B
dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr) #16B
#3つをまとめて4Bに
ver_traf_flow = (ver << 28) + (traf <<20) + flow
ip_header = struct.pack("!IHBB16s16s", ver_traf_flow, payload, next_header, hop, src_addr, dst_addr)
return ip_header #40B
#ICMPヘッダ作成
def make_icmp_header(tgt_addr,src_addr, dst_addr):
#NSでは135
type = 135 #1B
code = 0 #1B
# check = 0 #2B
check = 0
resv = 0 #4B
tgt_addr = socket.inet_pton(socket.AF_INET6, tgt_addr) #16B
#Source Link-Layer Addressオプション
opt_type = 1 #1B
opt_len = 1 #1B
opt_lladdr = get_mac_addr ()#6B
src_addr = socket.inet_pton(socket.AF_INET6, src_addr) #16B
dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr) #16B
upper_len = 32 # 4B
padding = 0 #3B
next_header = 58 #1B
pad_next = (padding << 8) + next_header #4B
check_packet = struct.pack("!16s16sIIBBHI16sBB6s", src_addr, dst_addr, upper_len, pad_next, type, code, check, resv, tgt_addr, opt_type, opt_len,opt_lladdr)
check = checksum(check_packet)
icmp_header = struct.pack("!BBHI16sBB6s", type, code, check, resv, tgt_addr, opt_type, opt_len, opt_lladdr)
return icmp_header #32B
def send(src_addr, dst_addr, tgt_addr):
ip_header = make_ip_header(src_addr, dst_addr)
icmp_header = make_icmp_header(tgt_addr ,src_addr, dst_addr)
packet = ip_header + icmp_header
soc.sendto(packet, (dst_addr, 0))
src_addr = "fe80::1:2:3:4"
dst_addr = "ff02::1:ffac:cdef"
tgt_addr = "fe80::1234:5678:90ab:cdef"
send(src_addr, dst_addr, tgt_addr)
自身のインターフェースにリンクローカルアドレスがfe80::1:2:3:4で、
fe80:: 1234:5678:90ab:cdefのアドレスを持つノードを探索する状況を仮想してアドレスを設定しました。
(↑1234の前にspace入れないとこうなってしまう。。)
本来dst_addrはtgt_addrの下位24bitを用いたff02::1:ffxx:xxxxで表せる要請ノードマルチキャストアドレスなのでtgt_addrから求められるが直接入力で妥協してます。
実装の確認
本実装環境ではsrc_addrにkali linuxに設定されているIPv6アドレス、
dst_addrにdebianの内部ネットワークのインターフェースに設定されているIPv6アドレス、
tgt_addrにはdst_addrから求めた要請ノードマルチキャストアドレスを設定しました。
コードを実行する際はそのままだとpermission errorが出るはずなのでsudoで動かします。
wiresharkでicmpv6でフィルタリングしパケットを観測するとkali linuxからNSメッセージが送られ、それに対するNAメッセージがdebianから送られてきていることを確認できました。