2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Elasticsearch + Logstash で採取したNetFlowをダンプしてみた

Posted at

概要

ElasticsearchとLogstashで採取したNetFlowの中身をKibanaのダッシュボードから確認しようとしたら、GUI操作が重くて面倒くさかったので、Pythonでフロー情報をダンプするツールを作ってみた。
image.png
LogstashのNetFlowモジュールはフローコレクタとして動作し、Elasticsearchの "netflow-*" インデックスにフローエクスポータから受信したフロー情報を蓄積する。その中から特定の期間のレコードをElasticsearchのREST APIを用いて抽出・ダウンロードすることにした。

API

蓄積されているレコード数は不定のため、ダウンロードにはScroll APIを用いる。

まず、以下のように"_search"リクエストを"scroll"パラメタを指定してPOSTする。

POST /netflow-*/_search?scroll=1m
{
  "sort": {
    "@timestamp": { "order": "asc" }         # タイムスタンプでソート
  },
  "query": {
    "range": {
      "@timestamp": {                        # タイムスタンプの範囲指定
        "gte": "2018-09-21T00:00:00+00:00",  # 開始時刻
        "lte": "2018-09-21T23:59:59+00:00",  # 終了時刻
        "format": "date_time_no_millis"
      }
    }
  },
  "size": 100                                # レコード数
}

後続のレコードは、最初のPOSTの応答に含まれる "_scroll_id" 利用してPOSTする。

POST /netflow-*/_search?scroll
{
  "scroll": "1m",
  "scroll_id": "DnF1ZXJ5VGhlbkZldGNoGQAAAAAAAB2DFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYUWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2HFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYgWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2JFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdkBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYoWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2LFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHY0WaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2OFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZMWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2SFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlRZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZEWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2YFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdmxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZQWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2XFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZkWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2aFmhMVnhQT3VBU1V1SWdkcmY3bUZERnc="
}

すべてのレコードをダウンロードしたら、不要となった"scroll_id" を消去する(省略した場合、タイムアウト後に消去される)。

DELETE /_search/scroll
{
    "scroll_id" : "DnF1ZXJ5VGhlbkZldGNoGQAAAAAAAB2DFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYUWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2HFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdhhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYgWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2JFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdkBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHYoWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2LFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjBZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHY0WaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2OFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdjxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZMWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2SFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlRZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZEWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2YFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdmxZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZQWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2XFmhMVnhQT3VBU1V1SWdkcmY3bUZERncAAAAAAAAdlhZoTFZ4UE91QVNVdUlnZHJmN21GREZ3AAAAAAAAHZkWaExWeFBPdUFTVXVJZ2RyZjdtRkRGdwAAAAAAAB2aFmhMVnhQT3VBU1V1SWdkcmY3bUZERnc="
}

サンプルコード

上記処理をPythonのRequestsを用いて行う場合は、以下のようになる。

import os
import sys
import requests
import urllib
import json

headers = { 'Content-Type' : 'application/json' }
from_date = "2018-09-21T00:00:00+00:00"
to_date   = "2018-09-21T23:59:59+00:00"
search_url = "http://localhost:9200/netflow-*/_search?scroll=1m"
scroll_url = "http://localhost:9200/_search/scroll"
search_format = json.dumps(
  {
    "size": 100,
    "query": {
      "range" : {
        "@timestamp" : {
          "gte": "%s", "lte": "%s", "format": "date_time_no_millis"
        }
      }
    },
    "sort": { "@timestamp": { "order": "asc" } }
  },
  encoding='utf8',
)

#
# POST /netflow-*/_search?scroll=1m
#
scroll_format = '{ "scroll": "1m", "scroll_id": "%s" }'
try:
    search_json = search_format % (from_date, to_date)
    r = requests.post(search_url, headers=headers, data=search_json)
    if r.status_code != 200:
        print "POST Failed: %d" % r.status_code
        exit(1)
except Exception as e:
    print "POST Failed: %s" % e
    exit(1)

result = r.json()
flows = []
hits = result["hits"]
hits = hits["hits"]
for flow in hits:
    source = flow["_source"]
    flows.append(source)

#
# POST /netflow-*/scroll
#
scroll_id = result["_scroll_id"]
scroll_json = scroll_format % scroll_id
while True:
    finished = True
    try:
        r = requests.post(scroll_url, headers=headers, data=scroll_json)
        if r.status_code != 200:
            print "POST Failed: %d" % r.status_code
            exit(1)
    except Exception as e:
        print "POST Failed: %s" % e
        exit(1)
    result = r.json()
    hits = result["hits"]
    hits = hits["hits"]
    for flow in hits:
        #source = flow["_source"]
        flows.append(flow)
        finished = False
    if finished == True:
        break

#
# Show NetFlow Records
#
for source in flows:
    print json.dumps(source, ensure_ascii=False, encoding='utf8', indent=2)

exit(0)

実行結果

以下のように、JSON形式でNetFlowのレコードが出力される。

{
  "type": "netflow",
  "@timestamp": "2018-09-21T07:23:41.231Z",
  "host": "192.168.10.254",
  "geoip_src": {
    "autonomous_system": "PRIVATE"
  },
  "geoip_dst": {
    "autonomous_system": "PRIVATE"
  },
  "netflow": {
    "tcp_flags": 0,
    "flow_locality": "private",
    "src_port_name": "UDP/59990",
    "protocol": 17,
    "sampling_interval": 1,
    "packets": 1,
    "src_locality": "private",
    "flow_records": 17,
    "last_switched": "2018-09-21T07:17:49.230Z",
    "protocol_name": "UDP",
    "src_addr": "192.168.100.65",
    "tos": 0,
    "src_as": 0,
    "first_switched": "2018-09-21T07:17:49.230Z",
    "version": "Netflow v5",
    "engine_type": 0,
    "input_snmp": 3,
    "flow_seq_num": 0,
    "direction": "ingress",
    "engine_id": 0,
    "dst_locality": "private",
    "dst_as": 0,
    "next_hop": "0.0.0.0",
    "src_port": 59990,
    "tcp_flag_tags": [],
    "dst_mask_len": 0,
    "output_snmp": 0,
    "src_mask_len": 0,
    "dst_port_name": "UDP/33442",
    "bytes": 60,
    "tcp_flags_label": "none",
    "dst_port": 33442,
    "ip_version": "IPv4",
    "sampling_algorithm": 1,
    "dst_addr": "192.168.200.66"
  },
  "@version": "1"
}
{
  "type": "netflow",
  "@timestamp": "2018-09-21T07:23:41.231Z",
  "host": "192.168.10.254",
  "geoip_src": {
    "autonomous_system": "PRIVATE"
  },
  "geoip_dst": {
    "autonomous_system": "PRIVATE"
  },
  "netflow": {
    "tcp_flags": 0,
    "flow_locality": "private",
    "src_port_name": "UDP/45928",
    "protocol": 17,
    "sampling_interval": 1,
    "packets": 1,
    "src_locality": "private",
    "flow_records": 17,
    "last_switched": "2018-09-21T07:17:49.230Z",
    "protocol_name": "UDP",
    "src_addr": "192.168.100.65",
    "tos": 0,
    "src_as": 0,
    "first_switched": "2018-09-21T07:17:49.230Z",
    "version": "Netflow v5",
    "engine_type": 0,
    "input_snmp": 3,
    "flow_seq_num": 0,
    "direction": "ingress",
    "engine_id": 0,
    "dst_locality": "private",
    "dst_as": 0,
    "next_hop": "0.0.0.0",
    "src_port": 45928,
    "tcp_flag_tags": [],
    "dst_mask_len": 0,
    "output_snmp": 0,
    "src_mask_len": 0,
    "dst_port_name": "UDP/33446",
    "bytes": 60,
    "tcp_flags_label": "none",
    "dst_port": 33446,
    "ip_version": "IPv4",
    "sampling_algorithm": 1,
    "dst_addr": "192.168.200.66"
  },
  "@version": "1"
}
    ...
    ...
    ...
{
  "sort": [
    1537533008445
  ],
  "_type": "doc",
  "_source": {
    "type": "netflow",
    "@timestamp": "2018-09-21T12:30:08.445Z",
    "host": "192.168.10.254",
    "geoip_src": {
      "autonomous_system": "PRIVATE"
    },
    "geoip_dst": {
      "autonomous_system": "PRIVATE"
    },
    "netflow": {
      "tcp_flags": 27,
      "flow_locality": "private",
      "src_port_name": "TCP/47228",
      "protocol": 6,
      "sampling_interval": 1,
      "packets": 8,
      "src_locality": "private",
      "flow_records": 29,
      "last_switched": "2018-09-21T12:24:38.445Z",
      "protocol_name": "TCP",
      "src_addr": "192.168.100.65",
      "tos": 0,
      "src_as": 0,
      "first_switched": "2018-09-21T12:24:38.445Z",
      "version": "Netflow v5",
      "engine_type": 0,
      "input_snmp": 3,
      "flow_seq_num": 8120,
      "direction": "ingress",
      "engine_id": 0,
      "dst_locality": "private",
      "dst_as": 0,
      "next_hop": "0.0.0.0",
      "src_port": 47228,
      "tcp_flag_tags": [
        "FIN",
        "SYN",
        "PSH",
        "ACK"
      ],
      "dst_mask_len": 0,
      "output_snmp": 0,
      "src_mask_len": 0,
      "dst_port_name": "TCP/80 (http)",
      "bytes": 502,
      "tcp_flags_label": "FIN-SYN-PSH-ACK",
      "dst_port": 80,
      "ip_version": "IPv4",
      "sampling_algorithm": 1,
      "dst_addr": "192.168.200.66"
    },
    "@version": "1"
  },
  "_score": null,
  "_index": "netflow-2018.09.21",
  "_id": "lU4b_GUBrzI0TEnP3Bds"
}
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?