Posted at

Elasticsearch + Logstash で採取したNetFlowをダンプしてみた

More than 1 year has passed since last update.


概要

ElasticsearchとLogstashで採取したNetFlowの中身をKibanaのダッシュボードから確認しようとしたら、GUI操作が重くて面倒くさかったので、Pythonでフロー情報をダンプするツールを作ってみた。

image.png

LogstashのNetFlowモジュールはフローコレクタとして動作し、Elasticsearchの "netflow-*" インデックスにフローエクスポータから受信したフロー情報を蓄積する。その中から特定の期間のレコードをElasticsearchのREST APIを用いて抽出・ダウンロードすることにした。


API

蓄積されているレコード数は不定のため、ダウンロードにはScroll APIを用いる。

まず、以下のように"_search"リクエストを"scroll"パラメタを指定してPOSTする。

POST /netflow-*/_search?scroll=1m

{
"sort": {
"@timestamp": { "order": "asc" } # タイムスタンプでソート
},
"query": {
"range": {
"@timestamp": { # タイムスタンプの範囲指定
"gte": "2018-09-21T00:00:00+00:00", # 開始時刻
"lte": "2018-09-21T23:59:59+00:00", # 終了時刻
"format": "date_time_no_millis"
}
}
},
"size": 100 # レコード数
}

後続のレコードは、最初のPOSTの応答に含まれる "_scroll_id" 利用してPOSTする。

POST /netflow-*/_search?scroll

{
"scroll": "1m",
"scroll_id": "DnF1ZXJ5VGhlbkZldGNoGQAAAAAAAB2DFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYUWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2HFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYgWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2JFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdkBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYoWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2LFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHY0WaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2OFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZMWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2SFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlRZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZEWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2YFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdmxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZQWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2XFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZkWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2aFmhMVnhQT3VBU1V1SWdkcmY3bUZERnc="
}

すべてのレコードをダウンロードしたら、不要となった"scroll_id" を消去する(省略した場合、タイムアウト後に消去される)。

DELETE /_search/scroll

{
"scroll_id" : "DnF1ZXJ5VGhlbkZldGNoGQAAAAAAAB2DFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYUWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2HFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYgWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2JFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdkBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYoWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2LFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHY0WaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2OFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZMWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2SFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlRZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZEWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2YFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdmxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZQWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2XFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZkWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2aFmhMVnhQT3VBU1V1SWdkcmY3bUZERnc="
}


サンプルコード

上記処理をPythonのRequestsを用いて行う場合は、以下のようになる。

import os

import sys
import requests
import urllib
import json

headers = { 'Content-Type' : 'application/json' }
from_date = "2018-09-21T00:00:00+00:00"
to_date = "2018-09-21T23:59:59+00:00"
search_url = "http://localhost:9200/netflow-*/_search?scroll=1m"
scroll_url = "http://localhost:9200/_search/scroll"
search_format = json.dumps(
{
"size": 100,
"query": {
"range" : {
"@timestamp" : {
"gte": "%s", "lte": "%s", "format": "date_time_no_millis"
}
}
},
"sort": { "@timestamp": { "order": "asc" } }
},
encoding='utf8',
)

#
# POST /netflow-*/_search?scroll=1m
#
scroll_format = '{ "scroll": "1m", "scroll_id": "%s" }'
try:
search_json = search_format % (from_date, to_date)
r = requests.post(search_url, headers=headers, data=search_json)
if r.status_code != 200:
print "POST Failed: %d" % r.status_code
exit(1)
except Exception as e:
print "POST Failed: %s" % e
exit(1)

result = r.json()
flows = []
hits = result["hits"]
hits = hits["hits"]
for flow in hits:
source = flow["_source"]
flows.append(source)

#
# POST /netflow-*/scroll
#
scroll_id = result["_scroll_id"]
scroll_json = scroll_format % scroll_id
while True:
finished = True
try:
r = requests.post(scroll_url, headers=headers, data=scroll_json)
if r.status_code != 200:
print "POST Failed: %d" % r.status_code
exit(1)
except Exception as e:
print "POST Failed: %s" % e
exit(1)
result = r.json()
hits = result["hits"]
hits = hits["hits"]
for flow in hits:
#source = flow["_source"]
flows.append(flow)
finished = False
if finished == True:
break

#
# Show NetFlow Records
#
for source in flows:
print json.dumps(source, ensure_ascii=False, encoding='utf8', indent=2)

exit(0)


実行結果

以下のように、JSON形式でNetFlowのレコードが出力される。

{

"type": "netflow",
"@timestamp": "2018-09-21T07:23:41.231Z",
"host": "192.168.10.254",
"geoip_src": {
"autonomous_system": "PRIVATE"
},
"geoip_dst": {
"autonomous_system": "PRIVATE"
},
"netflow": {
"tcp_flags": 0,
"flow_locality": "private",
"src_port_name": "UDP/59990",
"protocol": 17,
"sampling_interval": 1,
"packets": 1,
"src_locality": "private",
"flow_records": 17,
"last_switched": "2018-09-21T07:17:49.230Z",
"protocol_name": "UDP",
"src_addr": "192.168.100.65",
"tos": 0,
"src_as": 0,
"first_switched": "2018-09-21T07:17:49.230Z",
"version": "Netflow v5",
"engine_type": 0,
"input_snmp": 3,
"flow_seq_num": 0,
"direction": "ingress",
"engine_id": 0,
"dst_locality": "private",
"dst_as": 0,
"next_hop": "0.0.0.0",
"src_port": 59990,
"tcp_flag_tags": [],
"dst_mask_len": 0,
"output_snmp": 0,
"src_mask_len": 0,
"dst_port_name": "UDP/33442",
"bytes": 60,
"tcp_flags_label": "none",
"dst_port": 33442,
"ip_version": "IPv4",
"sampling_algorithm": 1,
"dst_addr": "192.168.200.66"
},
"@version": "1"
}
{
"type": "netflow",
"@timestamp": "2018-09-21T07:23:41.231Z",
"host": "192.168.10.254",
"geoip_src": {
"autonomous_system": "PRIVATE"
},
"geoip_dst": {
"autonomous_system": "PRIVATE"
},
"netflow": {
"tcp_flags": 0,
"flow_locality": "private",
"src_port_name": "UDP/45928",
"protocol": 17,
"sampling_interval": 1,
"packets": 1,
"src_locality": "private",
"flow_records": 17,
"last_switched": "2018-09-21T07:17:49.230Z",
"protocol_name": "UDP",
"src_addr": "192.168.100.65",
"tos": 0,
"src_as": 0,
"first_switched": "2018-09-21T07:17:49.230Z",
"version": "Netflow v5",
"engine_type": 0,
"input_snmp": 3,
"flow_seq_num": 0,
"direction": "ingress",
"engine_id": 0,
"dst_locality": "private",
"dst_as": 0,
"next_hop": "0.0.0.0",
"src_port": 45928,
"tcp_flag_tags": [],
"dst_mask_len": 0,
"output_snmp": 0,
"src_mask_len": 0,
"dst_port_name": "UDP/33446",
"bytes": 60,
"tcp_flags_label": "none",
"dst_port": 33446,
"ip_version": "IPv4",
"sampling_algorithm": 1,
"dst_addr": "192.168.200.66"
},
"@version": "1"
}
...
...
...
{
"sort": [
1537533008445
],
"_type": "doc",
"_source": {
"type": "netflow",
"@timestamp": "2018-09-21T12:30:08.445Z",
"host": "192.168.10.254",
"geoip_src": {
"autonomous_system": "PRIVATE"
},
"geoip_dst": {
"autonomous_system": "PRIVATE"
},
"netflow": {
"tcp_flags": 27,
"flow_locality": "private",
"src_port_name": "TCP/47228",
"protocol": 6,
"sampling_interval": 1,
"packets": 8,
"src_locality": "private",
"flow_records": 29,
"last_switched": "2018-09-21T12:24:38.445Z",
"protocol_name": "TCP",
"src_addr": "192.168.100.65",
"tos": 0,
"src_as": 0,
"first_switched": "2018-09-21T12:24:38.445Z",
"version": "Netflow v5",
"engine_type": 0,
"input_snmp": 3,
"flow_seq_num": 8120,
"direction": "ingress",
"engine_id": 0,
"dst_locality": "private",
"dst_as": 0,
"next_hop": "0.0.0.0",
"src_port": 47228,
"tcp_flag_tags": [
"FIN",
"SYN",
"PSH",
"ACK"
],
"dst_mask_len": 0,
"output_snmp": 0,
"src_mask_len": 0,
"dst_port_name": "TCP/80 (http)",
"bytes": 502,
"tcp_flags_label": "FIN-SYN-PSH-ACK",
"dst_port": 80,
"ip_version": "IPv4",
"sampling_algorithm": 1,
"dst_addr": "192.168.200.66"
},
"@version": "1"
},
"_score": null,
"_index": "netflow-2018.09.21",
"_id": "lU4b_GUBrzI0TEnP3Bds"
}