Pythonにおけるネットワークプログラミングの覚書。
Python Scapyのインストール
以下サイトを参考にインストール。環境はMAC OS X 10.11.6。
http://nigaky.hatenablog.com/entry/20110716/1310813250
ただし、scapyだけインストールしても使えないので、pcapyもインストールが必要。
以下サイトより、sourceをDLしてインストール。
https://pypi.python.org/pypi/pcapy
scapyの詳細については以下を参照。
http://scapy.readthedocs.io/
ARPパケットの生成、送信
特定のIPアドレスのMACアドレスを得るためのARPリクエストメッセージ送信するコード。
ARP.py
from scapy.all import *
target_ip="192.168.1.1"
frame = Ether(dst="ff:ff:ff:ff:ff:ff") / ARP(op=1, pdst=target_ip)
sendp(frame)
ICMPパケットの生成、送信
特定のIPアドレスに対してICMPパケットの送信するコード。
ICMP.py
from scapy.all import *
target_ip="192.168.1.1"
frame = Ether() / IP(dst=target_ip) / ICMP()
sendp(frame)
TCP SYNパケットの生成、送信
特定のIP, 特定のPortに対して、TCP SYNパケットを送信するコード。
ICMP.py
from scapy.all import *
target_ip="192.168.1.1"
dst_port = 5001
src_port = 5002
frame = IP(dst=target_ip)/TCP(flags = 'S',sport=src_port,dport=dst_port)
send(frame)
並列処理で受信パケットをSniff
SniffRecPkt.py
import threading
from scapy.all import *
class SniffRecPkt(threading.Thread):
def __init__(self,target_ip):
super(RecPingScan, self).__init__()
self.target_ip = target_ip
self.stop_event = threading.Event() #停止させるかのフラグ
self.thread = threading.Thread(target = self.run)
self.thread.start()
def run(self):
while not self.stop_event.is_set():
sniff(filter="tcp and ip src host " + self.target_ip,prn=packet_show, count=1)
def stop(self):
"""スレッドを停止させる"""
self.stop_event.set()
self.thread.join() #スレッドが停止するのを待つ
def packet_show(packet):
if packet[TCP].flags==18: #SYN/ACKパケットだった時のみ
print "IP : " + str(packet[IP].src) + " | TCP PORT : " + str(packet[TCP].sport)
if __name__ == '__main__':
target_ip = "192.168.1.1"
Rec_thread=SniffRecPkt(target_ip)
target_ip="192.168.1.1"
dst_port = 5001
src_port = 5002
frame = IP(dst=target_ip)/TCP(flags = 'S',sport=src_port,dport=dst_port)
send(frame)
Rec_thread.stop()
ポートSCANプログラミング
port_scan_v1.0.py
# encoding: utf-8
from scapy.all import *
import netifaces
import threading
import time
import sys
import random
class RecPingScan(threading.Thread):
def __init__(self,target_ip):
super(RecPingScan, self).__init__()
self.target_ip = target_ip
self.stop_event = threading.Event() #停止させるかのフラグ
self.thread = threading.Thread(target = self.run)
self.thread.start()
def run(self):
while not self.stop_event.is_set():
sniff(filter="tcp and ip src host " + self.target_ip,prn=packet_show, count=1)
def stop(self):
"""スレッドを停止させる"""
self.stop_event.set()
self.thread.join() #スレッドが停止するのを待つ
def packet_show(packet):
if packet[TCP].flags==18:
print "IP : " + str(packet[IP].src) + " | TCP PORT : " + str(packet[TCP].sport)
def send_tcpsyn(target_ip):
sport = random.randint(50000,51000)
for i in range(0,65535):
frame = IP(dst=target_ip)/TCP(flags = 'S',sport=sport,dport=i)
send(frame)
send(frame)
if __name__ == '__main__':
target_ip = "192.168.1.1"
Rec_thread=RecPingScan(target_ip)
send_tcpsyn(target_ip)
time.sleep(2)
Rec_thread.stop()
sys.exit()