Python
TCP
pcap
ARP
scapy

Python Scapyを使ったネットワークプログラミング

More than 1 year has passed since last update.

Pythonにおけるネットワークプログラミングの覚書。

Python Scapyのインストール

以下サイトを参考にインストール。環境はMAC OS X 10.11.6。
http://nigaky.hatenablog.com/entry/20110716/1310813250

ただし、scapyだけインストールしても使えないので、pcapyもインストールが必要。
以下サイトより、sourceをDLしてインストール。
https://pypi.python.org/pypi/pcapy

scapyの詳細については以下を参照。
http://scapy.readthedocs.io/

ARPパケットの生成、送信

特定のIPアドレスのMACアドレスを得るためのARPリクエストメッセージ送信するコード。

ARP.py
from scapy.all import *
target_ip="192.168.1.1"
frame = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(op=1, pdst=target_ip)
sendp(frame)

ICMPパケットの生成、送信

特定のIPアドレスに対してICMPパケットの送信するコード。

ICMP.py
from scapy.all import *
target_ip="192.168.1.1"
frame = Ether() / IP(dst=target_ip) / ICMP()
sendp(frame)

TCP SYNパケットの生成、送信

特定のIP, 特定のPortに対して、TCP SYNパケットを送信するコード。

ICMP.py
from scapy.all import *
target_ip="192.168.1.1"
dst_port = 5001
src_port = 5002
frame = IP(dst=target_ip)/TCP(flags = 'S',sport=src_port,dport=dst_port)
send(frame)

並列処理で受信パケットをSniff

SniffRecPkt.py
import threading
from scapy.all import *

class SniffRecPkt(threading.Thread):
    def __init__(self,target_ip):
        super(RecPingScan, self).__init__()
        self.target_ip = target_ip
        self.stop_event = threading.Event() #停止させるかのフラグ
        self.thread = threading.Thread(target = self.run)
        self.thread.start()

    def run(self):
        while not self.stop_event.is_set():
            sniff(filter="tcp and ip src host " + self.target_ip,prn=packet_show, count=1)

    def stop(self):
        """スレッドを停止させる"""
        self.stop_event.set()
        self.thread.join()    #スレッドが停止するのを待つ


def packet_show(packet):
    if packet[TCP].flags==18: #SYN/ACKパケットだった時のみ
        print "IP : " + str(packet[IP].src) + " | TCP PORT : " + str(packet[TCP].sport)

if __name__ == '__main__':
    target_ip = "192.168.1.1"
    Rec_thread=SniffRecPkt(target_ip)

    target_ip="192.168.1.1"
    dst_port = 5001
    src_port = 5002
    frame = IP(dst=target_ip)/TCP(flags = 'S',sport=src_port,dport=dst_port)
    send(frame)

    Rec_thread.stop()

ポートSCANプログラミング

port_scan_v1.0.py
# encoding: utf-8
from scapy.all import *
import netifaces
import threading
import time
import sys
import random

class RecPingScan(threading.Thread):
    def __init__(self,target_ip):
        super(RecPingScan, self).__init__()
        self.target_ip = target_ip
        self.stop_event = threading.Event() #停止させるかのフラグ
        self.thread = threading.Thread(target = self.run)
        self.thread.start()

    def run(self):
        while not self.stop_event.is_set():
            sniff(filter="tcp and ip src host " + self.target_ip,prn=packet_show, count=1)

    def stop(self):
        """スレッドを停止させる"""
        self.stop_event.set()
        self.thread.join()    #スレッドが停止するのを待つ


def packet_show(packet):
    if packet[TCP].flags==18:
        print "IP : " + str(packet[IP].src) + " | TCP PORT : " + str(packet[TCP].sport)


def send_tcpsyn(target_ip):
    sport = random.randint(50000,51000)
    for i in range(0,65535):
        frame = IP(dst=target_ip)/TCP(flags = 'S',sport=sport,dport=i)
        send(frame)
        send(frame)

if __name__ == '__main__':
    target_ip = "192.168.1.1"

    Rec_thread=RecPingScan(target_ip)
    send_tcpsyn(target_ip)
    time.sleep(2)
    Rec_thread.stop()
    sys.exit()