LoginSignup
17
18

More than 5 years have passed since last update.

トラフィックモニタリング with Kibana, ElasticSearch and Python

Last updated at Posted at 2015-03-09

Motivation

  • tshark, wiresharkよりもトラッフィクを簡単に可視化したい。
  • Kibanaのバージョンが新しくなったので使いたい。

Dashboard

イメージ

  • "リアルタイム"でパケットの総量を可視化
  • パケットのSource IP, Destination IP, Source Port, Destination Portを可視化

ElasticSearch Install

$ brew install elasticsearch
$ elasticsearch -v
Version: 1.4.4, Build: c88f77f/2015-02-19T13:05:36Z, JVM: 1.7.0_72

Download Kibana

$ wget https://download.elasticsearch.org/kibana/kibana/kibana-4.0.1-darwin-x64.tar.gz
$ tar zxvf kibana-4.0.1-darwin-x64.tar.gz

Python Library

$ pip install pyshark elasticsearch requests

Execute

$ python packet_cap_es.py <interface>
  • See localhost:5601
  • Configure index packet*

Python Script

"""
This app captures packets and extract five tupels.
Store these data to elastic search.
Elastic search and kibana creates real time packet monitering
bashbord.
"""
import json
import sys
import datetime
import time

import pyshark
import requests
from elasticsearch import Elasticsearch
from elasticsearch import helpers

URL = "http://localhost:9200"
INDEX_URL = URL + "/packets"
TYPE_URL = INDEX_URL + "/packet"
ACTION = {"_index" : "packets",
          "_type" : "packet",
          "_source": {}
         }


def delete_index():
    """Delete an index in elastic search."""
    requests.delete(INDEX_URL)


def create_index():
    """Create an index in elastic search with timestamp enabled."""
    requests.put(INDEX_URL)
    setting = {"packet" : {
                "_timestamp" : {
                    "enabled" : True,
                    "path" : "capture_timestamp",
                },
                "numeric_detection" : False,
                "properties" : {
                    "dstip" : { "type":"string",
                                "index" : "not_analyzed",
                                "store" : True},
                    "srcip" : { "type":"string",
                                "index" : "not_analyzed",
                                "store" : True}
                }
            }}
    for _ in range(1, 100):
        try:
            r = requests.put(TYPE_URL + "/_mapping", data=json.dumps(setting))
            break
        except:
            time.sleep(1)
            pass

def main():
    """Extract packets and store them to ES"""
    capture = pyshark.LiveCapture(interface=sys.argv[1])
    packet_que = list()
    es = Elasticsearch()

    end_time = None
    for packet in capture.sniff_continuously():
        if packet.transport_layer in ("UDP", "TCP"):
            try:
                # Why does ES add 9 hours automatically?
                localtime = float(packet.sniff_timestamp) - 60 * 60 * 9  # GMT + 9
                row_timestamp = datetime.datetime.fromtimestamp(localtime)
                timestamp = row_timestamp.strftime("%Y-%m-%dT%H:%M:%SZ")
                version = int(packet[1].version)
                # ip v6 does not have protocol. It has next header instead.
                if version == 4:
                    protocol = int(packet[1].proto)
                elif version == 6:
                    protocol = int(packet[1].nxt)
                else:
                    protocol = None

                dstip = packet[1].dst
                srcip = packet[1].src
                dstport = int(packet[2].dstport)
                srcport = int(packet[2].srcport)
                parsed_packet = dict(version=version, protocol=protocol,
                                     dstip=dstip, srcip=srcip,
                                     dstport=dstport, srcport=srcport,
                                     capture_timestamp=timestamp)
                # For historical graph
                parsed_packet["@timestamp"] = timestamp
                action = ACTION.copy()
                action["_source"].update(parsed_packet)
                packet_que.append(action)
                current = time.time()
                while(end_time is None or current - end_time >= 3):
                    helpers.bulk(es, packet_que)
                    del packet_que[0:len(packet_que)]
                    end_time = time.time()
                    break

            except Exception as e:
                time.sleep(1)


if __name__ == "__main__":
    if len(sys.argv) != 2:
        print >>sys.stderr, "python packet_cap_es.py <interface>"
        exit(1)
    delete_index()
    create_index()
    main()

17
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
18