0
1

More than 1 year has passed since last update.

[Pythonで]ICMPv6NDPパケット通信~NSメッセージ~

Posted at

はじめに

※1コードの全体像は最後に載せています。前書きはいいからコードくれという方はこちら
※2本記事はNSメッセージって?章で解説している①についてです。

ICMPv6のNDPメッセージをPythonで実現したいなと思って調べたところ、
Cでの実装(本来ならrawソケット通信するならCのほうがいい)しか見つからず、
苦戦したので同じ状況の方の助けになればと思い共有します。

今回はNeighbor Solicitation(以下NS)メッセージの送信について扱います。
Neighbor Advertisementメッセージについても記事にできれば、、、

実装環境

VirtualBox(バージョン 7.0.10)上のKali Linux上で動かしています。
また、同一NATネットワーク(VirtualBoxのネットワーク設定の一つ)でdebainを起動し、debianとの通信確認を行っています。

NSメッセージって?

とりあえず小川晃通さんのプロフェッショナルIPv6 を読めばいいと思います。(ちなみに小川晃通さんはyoutubeでも解説動画をあげてらっしゃっていてとても分かりやすいのでぜひ。)
本記事で紹介するヘッダの構造が分からない場合はこの本を読めばわかるので適宜ご確認を。

簡単に説明すると、NSメッセージとは近隣探索プロトコル(NDP)の一種で、
①単に宛先ノードの場所を探す際、(今回はこっち!)
②自身に新しくアドレスを設定する際の重複確認
の際に送信するパケットで、IPv4でいうところのARP要求のようなものです

コード

今回拡張性を持たせるためにscapyといったライブラリを使用せず、
socket,struct,uuidのライブラリでコードを書いています。(uuidはなくても良い)

ソケットオブジェクト作成
address_family = socket.AF_INET6
socket_type = socket.SOCK_RAW
protocol =  socket.IPPROTO_RAW
soc = socket.socket ( address_family, socket_type, protocol)

まずはアドレスファミリー、ソケットタイプ、プロトコルを指定したソケットオブジェクトを作成します。
プロトコルはIPPROTO_RAWではなくIPPROTO_ICMPV6とするとIPヘッダは自動で作成され、ICMPv6ヘッダから自作していくことになります。
(詳しい挙動はsocketライブラリ参照)

IPヘッダ作成
def make_ip_header(src_addr, dst_addr):
    ver = 6 
    traf = 0
    flow = 0 
    payload =
    next_header = 58 
    hop = 1 
    src_addr = socket.inet_pton(socket.AF_INET6, src_addr) 
    dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr) 
    ver_traf_flow = (ver << 28) + (traf <<20) + flow
    ip_header = struct.pack("!IHBB16s16s", ver_traf_flow, payload, next_header, hop, src_addr, dst_addr)
    return ip_header 

説明が必要そうな箇所のみ説明します。
payloadはICMPv6以降のデータが今回32Byteなので32、
hopはNSメッセージはリンクローカル上でのみ(ルータを超えては届かない)ので1、
src_addrとdst_addrはともにアドレス表記のstringからbytesに直すためにptonを使用、
structライブラリのpackでは0.5Byte単位で受け取れないため、
ver,traf,flowの順で合わせて4Byteとして扱うためにビットシフト
それらをまとめてバイト列オブジェクトとして返しています。
(詳しい挙動はstructライブラリ参照 )

ICMPv6ヘッダ作成の前半
def make_icmp_header(tgt_addr,src_addr, dst_addr):
    type = 135 
    code = 0
    check = 0
    resv = 0 
    tgt_addr = socket.inet_pton(socket.AF_INET6, tgt_addr)
    opt_type = 1
    opt_len = 1
    opt_lladdr = get_mac_addr ()

checkは後でチェックサムを計算するので一旦0、
Source Link-Layer Addressオプションを用いるため
opt_typeは1、
オプション部分の合計(opt_type+opt_len+opt_lladdr)が8(=sum)Byteであり
opt_lenはsum/8であるため1、
opt_lladdrはNSメッセージを送信するポートのMACアドレスであり、後で紹介するget_mac_addrを用いて自動取得してもいいし、直接

opt_lladdr = b'\x○○\x○○\x○○\x○○\x○○\x○○'#〇に実際のMACアドレスを入れる

の形式で入力しても良いです。(この場合uuidはimportしなくてOK)
windowsならipconfig、linuxならifconfigやip addressで確認できるはず。

疑似IPヘッダ(make_icmp_header内)
    src_addr = socket.inet_pton(socket.AF_INET6, src_addr)
    dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr)
    upper_len = 32
    padding = 0 
    next_header = 58
    pad_next = (padding << 8) + next_header

チェックサムは疑似IPヘッダを含めて計算するので、必要になります。
フォーマットはRFC8200 8.1. Upper-Layer Checksums参照
ちなみにこの部分のコードはmake_icmp_header内に書いてあるのでそのつもりで。

src_addr,dst_addrは実際に送信に用いているIPヘッダと同じもの、
upper_lenは今回ICMPv6部分が32Byteであるため32、
structライブラリは3Byte単位で扱えないため、
paddingとnext_header合わせて4Byteとするためにビットシフト

ICMPv6ヘッダ作成後半
    check_packet = struct.pack("!16s16sIIBBHI16sBB6s", src_addr, dst_addr, upper_len, pad_next, type, code, check, resv, tgt_addr, opt_type, opt_len,opt_lladdr)
    check = checksum(check_packet)
    icmp_header = struct.pack("!BBHI16sBB6s", type, code, check, resv, tgt_addr, opt_type, opt_len, opt_lladdr)
    return icmp_header 

まずチェックサム用に疑似IPヘッダを含めたパケットを作成し、そのパケットでチェックサムを計算。
チェックサムフィールドに正しい値を挿入したうえで、ICMPv6部分を返す。

#自身のインターフェースに使用されているMACアドレスをバイナリ型で取得
def get_mac_addr():
    node = uuid.getnode()
    mac = uuid.UUID(int=node)
    #uuid型からバイナリーに
    addr = mac.bytes
    #MACアドレス部分のみ取り出す
    addr = bytearray(addr)
    addr = addr[-6:]
    addr = bytes(addr)
    return addr
チェックサム計算
def checksum(packet):
    total = 0
    num_words = len(packet)//2
    for chunk in struct.unpack("!%sH" % num_words, packet[0:num_words*2]):
        #2Bずつ加算
        total += chunk       
    #合計が奇数バイトのとき
    if(len(packet)%2):
        total += ord(packet[-1] << 8)
    #下位16bitとオーバーフローした分を足し合わせる
    while(total>>16):
        total = (total >> 16) + (total & 0xffff)
    #1の補数に変換 
    total = 0xffff - total
    return total

解説しようとしたがコメントアウトで説明してたので割愛。

パケット送信
def send(src_addr, dst_addr, tgt_addr):
    ip_header = make_ip_header(src_addr, dst_addr)
    icmp_header = make_icmp_header(tgt_addr ,src_addr, dst_addr)
    packet = ip_header + icmp_header
    soc.sendto(packet, (dst_addr, 0))

sendtoの詳しい挙動はsocketライブラリ参照 で

ソースコード

import socket
import struct
import uuid


#自身のインターフェースに使用されているMACアドレスをバイナリ型で取得
def get_mac_addr():
    node = uuid.getnode()
    mac = uuid.UUID(int=node)
    #uuid型からバイナリーに
    addr = mac.bytes
    #MACアドレス部分のみ取り出す
    addr = bytearray(addr)
    addr = addr[-6:]
    addr = bytes(addr)
    return addr


def checksum(packet):
    total = 0
    num_words = len(packet)//2
    for chunk in struct.unpack("!%sH" % num_words, packet[0:num_words*2]):
        #2Bずつ加算
        total += chunk       
    #合計が奇数バイトのとき
    if(len(packet)%2):
        total += ord(packet[-1] << 8)
    #下位16bitとオーバーフローした分を足し合わせる
    while(total>>16):
        total = (total >> 16) + (total & 0xffff)
    #1の補数に変換 
    total = 0xffff - total
    return total


address_family = socket.AF_INET6
socket_type = socket.SOCK_RAW
protocol =  socket.IPPROTO_RAW
soc = socket.socket ( address_family, socket_type, protocol)


#IPヘッダ作成
def make_ip_header(src_addr, dst_addr):
    #IPv6では6
    ver = 6 #0.5B
    traf = 0 #1B
    flow = 0 #2.5B
    payload = 32 #2B
    #ICMPv6では58
    next_header = 58 #1B
    #リンクローカル上なので1
    hop = 1 #1B
    src_addr = socket.inet_pton(socket.AF_INET6, src_addr) #16B
    dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr) #16B
    #3つをまとめて4Bに
    ver_traf_flow = (ver << 28) + (traf <<20) + flow
    ip_header = struct.pack("!IHBB16s16s", ver_traf_flow, payload, next_header, hop, src_addr, dst_addr)
    return ip_header #40B


#ICMPヘッダ作成
def make_icmp_header(tgt_addr,src_addr, dst_addr):
    #NSでは135
    type = 135 #1B
    code = 0 #1B
    # check = 0 #2B
    check = 0
    resv = 0 #4B
    tgt_addr = socket.inet_pton(socket.AF_INET6, tgt_addr) #16B
    #Source Link-Layer Addressオプション
    opt_type = 1 #1B
    opt_len = 1 #1B
    opt_lladdr = get_mac_addr ()#6B
    src_addr = socket.inet_pton(socket.AF_INET6, src_addr) #16B
    dst_addr = socket.inet_pton(socket.AF_INET6, dst_addr) #16B
    upper_len = 32 # 4B
    padding = 0 #3B
    next_header = 58 #1B
    pad_next = (padding << 8) + next_header #4B
    check_packet = struct.pack("!16s16sIIBBHI16sBB6s", src_addr, dst_addr, upper_len, pad_next, type, code, check, resv, tgt_addr, opt_type, opt_len,opt_lladdr)
    check = checksum(check_packet)
    icmp_header = struct.pack("!BBHI16sBB6s", type, code, check, resv, tgt_addr, opt_type, opt_len, opt_lladdr)
    return icmp_header #32B


def send(src_addr, dst_addr, tgt_addr):
    ip_header = make_ip_header(src_addr, dst_addr)
    icmp_header = make_icmp_header(tgt_addr ,src_addr, dst_addr)
    packet = ip_header + icmp_header
    soc.sendto(packet, (dst_addr, 0))


src_addr = "fe80::1:2:3:4"
dst_addr = "ff02::1:ffac:cdef"
tgt_addr = "fe80::1234:5678:90ab:cdef"
send(src_addr, dst_addr, tgt_addr)


自身のインターフェースにリンクローカルアドレスがfe80::1:2:3:4で、
fe80:: 1234:5678:90ab:cdefのアドレスを持つノードを探索する状況を仮想してアドレスを設定しました。
(↑1234の前にspace入れないと:1234:こうなってしまう。。)
本来dst_addrはtgt_addrの下位24bitを用いたff02::1:ffxx:xxxxで表せる要請ノードマルチキャストアドレスなのでtgt_addrから求められるが直接入力で妥協してます。

実装の確認

本実装環境ではsrc_addrにkali linuxに設定されているIPv6アドレス、
dst_addrにdebianの内部ネットワークのインターフェースに設定されているIPv6アドレス、
tgt_addrにはdst_addrから求めた要請ノードマルチキャストアドレスを設定しました。

コードを実行する際はそのままだとpermission errorが出るはずなのでsudoで動かします。

wiresharkでicmpv6でフィルタリングしパケットを観測するとkali linuxからNSメッセージが送られ、それに対するNAメッセージがdebianから送られてきていることを確認できました。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1