Edited at

Python Scapyを使ったネットワークプログラミング

More than 1 year has passed since last update.

Pythonにおけるネットワークプログラミングの覚書。


Python Scapyのインストール

以下サイトを参考にインストール。環境はMAC OS X 10.11.6。

http://nigaky.hatenablog.com/entry/20110716/1310813250

ただし、scapyだけインストールしても使えないので、pcapyもインストールが必要。

以下サイトより、sourceをDLしてインストール。

https://pypi.python.org/pypi/pcapy

scapyの詳細については以下を参照。

http://scapy.readthedocs.io/


ARPパケットの生成、送信

特定のIPアドレスのMACアドレスを得るためのARPリクエストメッセージ送信するコード。


ARP.py

from scapy.all import *

target_ip="192.168.1.1"
frame = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(op=1, pdst=target_ip)
sendp(frame)


ICMPパケットの生成、送信

特定のIPアドレスに対してICMPパケットの送信するコード。


ICMP.py

from scapy.all import *

target_ip="192.168.1.1"
frame = Ether() / IP(dst=target_ip) / ICMP()
sendp(frame)


TCP SYNパケットの生成、送信

特定のIP, 特定のPortに対して、TCP SYNパケットを送信するコード。


ICMP.py

from scapy.all import *

target_ip="192.168.1.1"
dst_port = 5001
src_port = 5002
frame = IP(dst=target_ip)/TCP(flags = 'S',sport=src_port,dport=dst_port)
send(frame)


並列処理で受信パケットをSniff


SniffRecPkt.py

import threading

from scapy.all import *

class SniffRecPkt(threading.Thread):
def __init__(self,target_ip):
super(RecPingScan, self).__init__()
self.target_ip = target_ip
self.stop_event = threading.Event() #停止させるかのフラグ
self.thread = threading.Thread(target = self.run)
self.thread.start()

def run(self):
while not self.stop_event.is_set():
sniff(filter="tcp and ip src host " + self.target_ip,prn=packet_show, count=1)

def stop(self):
"""スレッドを停止させる"""
self.stop_event.set()
self.thread.join() #スレッドが停止するのを待つ

def packet_show(packet):
if packet[TCP].flags==18: #SYN/ACKパケットだった時のみ
print "IP : " + str(packet[IP].src) + " | TCP PORT : " + str(packet[TCP].sport)

if __name__ == '__main__':
target_ip = "192.168.1.1"
Rec_thread=SniffRecPkt(target_ip)

target_ip="192.168.1.1"
dst_port = 5001
src_port = 5002
frame = IP(dst=target_ip)/TCP(flags = 'S',sport=src_port,dport=dst_port)
send(frame)

Rec_thread.stop()



ポートSCANプログラミング


port_scan_v1.0.py

# encoding: utf-8

from scapy.all import *
import netifaces
import threading
import time
import sys
import random

class RecPingScan(threading.Thread):
def __init__(self,target_ip):
super(RecPingScan, self).__init__()
self.target_ip = target_ip
self.stop_event = threading.Event() #停止させるかのフラグ
self.thread = threading.Thread(target = self.run)
self.thread.start()

def run(self):
while not self.stop_event.is_set():
sniff(filter="tcp and ip src host " + self.target_ip,prn=packet_show, count=1)

def stop(self):
"""スレッドを停止させる"""
self.stop_event.set()
self.thread.join() #スレッドが停止するのを待つ

def packet_show(packet):
if packet[TCP].flags==18:
print "IP : " + str(packet[IP].src) + " | TCP PORT : " + str(packet[TCP].sport)

def send_tcpsyn(target_ip):
sport = random.randint(50000,51000)
for i in range(0,65535):
frame = IP(dst=target_ip)/TCP(flags = 'S',sport=sport,dport=i)
send(frame)
send(frame)

if __name__ == '__main__':
target_ip = "192.168.1.1"

Rec_thread=RecPingScan(target_ip)
send_tcpsyn(target_ip)
time.sleep(2)
Rec_thread.stop()
sys.exit()