Edited at

PcapファイルをMySQLに入れる

More than 1 year has passed since last update.

PcapファイルMySQLに格納


TL;DR

大量のPcapファイルを効率的に扱いたい。

日付ごとのtableを作成しpcapをmysqlに入れる。

下記のコードはTCPのみです。


前提


  • ファイル名は<適当な名前>_year-month-day_HH:MM:SS.pcap

  • pythonからアクセスするためのユーザやデータベース

ファイル名からaccess_log_yyyymmddのtableを作成するため、get_day() 関数は適当に自分の環境に合わせて変更


コード

# -*- coding: utf-8 -*-

from glob import glob
import dpkt
import socket
from datetime import datetime, timedelta
import time
from collections import OrderedDict
from os.path import basename, splitext
import MySQLdb

def pcap2csv(file_name, proto="TCP"):
def _bool2int(_bool_list):
return [1 if row == True else 0 for row in _bool_list]

def _uts2dt(ts):
return datetime.strftime(datetime.fromtimestamp(ts), "%Y-%m-%d %H:%M:%S")

def _ipheader(header, proto):
src_ip = socket.inet_ntoa(header.src)
dst_ip = socket.inet_ntoa(header.dst)
header_len = header.hl
ttl = header.ttl

if (proto == "TCP") and (type(header.data) == dpkt.tcp.TCP):
tcp = header.data
src_port = tcp.sport
dst_port = tcp.dport
data_len = len(tcp.data)
_bool_list = []
_bool_list.append((tcp.flags & dpkt.tcp.TH_URG) != 0)
_bool_list.append((tcp.flags & dpkt.tcp.TH_ACK) != 0)
_bool_list.append((tcp.flags & dpkt.tcp.TH_PUSH) != 0)
_bool_list.append((tcp.flags & dpkt.tcp.TH_RST) != 0)
_bool_list.append((tcp.flags & dpkt.tcp.TH_SYN) != 0)
_bool_list.append((tcp.flags & dpkt.tcp.TH_FIN) != 0)
code_bit = _bool2int(_bool_list)
window = tcp.win
urg_p = tcp.urp
seq_num = tcp.seq
result = OrderedDict()
result["yyyy-mm-dd hh:mm:ss"] = None
result["src_ip"] = src_ip
result["src_port"] = src_port
result["dst_ip"] = dst_ip
result["dst_port"] = dst_port
result["header_len"] = header_len
result["data_len"] = data_len
result["ttl"] = ttl
result["urg"] = code_bit[0]
result["ack"] = code_bit[1]
result["psh"] = code_bit[2]
result["rst"] = code_bit[3]
result["syn"] = code_bit[4]
result["fin"] = code_bit[5]
result["window"] = window
result["urg_pointer"] = urg_p
result["seq_number"] = seq_num
return result
"""
if (proto == "UDP") and (type(header.data) == dpkt.udp.UDP):
udp = header.data
src_port = udp.sport
dst_port = udp.dport
data_len = len(udp.data)
result = {
"yyyymmdd": None,
"hh:mm:ss": None,
"src_ip": src_ip,
"src_port": src_port,
"dst_ip": dst_ip,
"dst_port": dst_port,
"header_len": header_len,
"data_len": data_len,
"ttl": ttl,
}
return result

if (proto == "ICMP") and (type(header.data) == dpkt.icmp.ICMP):
icmp = header.data
data_len = len(icmp.data)
header_len = icmp.da
result = {
"yyyymmdd": None,
"hh:mm:ss": None,
"src_ip": src_ip,
"dst_ip": dst_ip,
"header_len": header_len,
"data_len": data_len,
"ttl": ttl,
}
return result
"""
return None

res = []
pcap = dpkt.pcap.Reader(open(file_name, "rb"))
for ts, buf in pcap:
eth = dpkt.ethernet.Ethernet(buf)
if type(eth.data) == dpkt.ip.IP:
ip = _ipheader(eth.data, proto)
if ip != None:
ip["yyyy-mm-dd hh:mm:ss"] = _uts2dt(ts)
res.append(ip.values())
return res

class DB:
def __init__(self):
self.conn = MySQLdb.connect(host="localhost", port=3306, user="", passwd="", db="", charset="utf8")
self.cur = self.conn.cursor()

def create_table(self, day):
sql = "create table access_log_%s(id INT AUTO_INCREMENT primary key, date datetime, src_ip varchar(255), src_port int, dst_ip varchar(255), dst_port int, header_len int, data_len int, ttl int, urg int, ack int, psh int, rst int, syn int, fin int, window int, urg_pointer varchar(255), seq_number bigint)" % day
self.__execute(sql)

def insert(self, day, vals):
sql = "insert into access_log_%s(date,src_ip,src_port,dst_ip,dst_port,header_len,data_len,ttl,urg,ack,psh,rst,syn,fin,window,urg_pointer,seq_number) values(%s)" % (day, ",".join(["'%s'" % str(r) for r in vals]))
self.__execute(sql)

def commit(self):
self.conn.commit()

def __execute(self, sql):
self.cur.execute(sql)

def close(self):
self.cur.close()
self.conn.close()

def get_day(file_path):
"""
自分の環境に合わせて適当に変更
"""

return splitext(basename(file_path))[0].split("_")[1].replace("-", "")

def main(pcap_dir):
table_list = []
db = DB()
for file_path in sorted(glob(pcap_dir+"/*")):
print "processing ->", file_path
day = get_day(file_path)
if day not in table_list:
db.create_table(day)
table_list.append(day)
for row in pcap2csv(file_path):
db.insert(day, row)
db.commit()
db.close()

if __name__ == '__main__':
main("pcapがあるディレクトリ名")