LoginSignup
32
18

More than 3 years have passed since last update.

Rustで実装するNetflow Collector

Last updated at Posted at 2020-12-06

この記事は、NTTコミュニケーションズ Advent Calendar 2020 の7日目の記事です。
昨日は、 @staybuzz さんの 「SIM Toolkitを使ったSIMアプレットを触ってみる」 でした。

はじめに

こんにちは、@suzusuzu です。一年目の新入社員で普段はクラウド・ハイブリッドクラウドや機械学習基盤に関する業務をしています。本記事では、トラヒックの詳細情報を可視化するための技術であるNetflowを受信するソフトウェアであるNetflow CollectorをRustで実装します。一連の実装を通してNetflowの理解を深める内容となっております。

Netflowとは?

Netflow以前には、SNMPというプロトコルがありましたが利用内訳が不明でトラヒックを詳細に分析することが難しいという歴史がありました。

Netflowはその問題を解決するために、Ciscoが開発したトラヒックの詳細情報を可視化するための技術です。Ciscoのルータやスイッチなどに搭載されています。フローデータには、IPアドレス、Port番号、プロトコル、byte数などの様々な情報が載っています。Netflowを出力する機器をexporter、入力する機器をCollectorと言います。

応用分野として以下のようなものが挙げられます。

  • ネットワークの可視化
  • ネットワークの容量計画やコストなどの削減
  • ネットワークの異常検知によるセキュリティ対策(ワームやDDoS攻撃検出)

Netflowなどのトラヒックの詳細情報を扱うsFlow、IPFIXなどはxFlowと総称されています。本記事では、Netflow v5, v9のCollectorを実装します。

前準備

Netflowを実際に扱うのにスイッチ等の物理機器を用意するのは困難なので各種ソフトウェアによるGeneratorを用意します。

Netflow v5 Generator

Netflow v5をシミュレートするために、flowgen を使用します。

git clone https://github.com/mshindo/NetFlow-Generator.git
cd NetFlow-Generator
make
./flowgen 127.0.0.1 --port 2055

Netflow v9 Generator

Netflow v9をシミュレートするために、rust-netflood を使用します。
Rustのnightlyが必要となるのでインストールしておいてください。

git clone https://github.com/4hiziri/rust-netflood.git
cd rust-netflood
rustup run nightly cargo run generate 127.0.0.1 -t sample/template.json  -o sample/option.json -i 1 -c 1000 -s 10001 -p 2055 -n

-tでTemplate FlowSetのフィールドを設定し、-oでOptions Template FlowSetのフィールドを設定しています。

Netflow Collectorの実装

Rustの tokio を用いて非同期で動くNetflow Collectorを実装していきます。Rustを使用する理由として、まず第一に素早くpacketを捌く性能がCollectorに求められるからです。さらにテンプレートベースのプロトコルでは、テンプレートをスレッド間で共有するのでデータ競合がおきないことが保証されているRustを選択しました。

本記事のコードは、main関数などの重複を覗いて一つのfileに上から順にコピペしても動くようになっています(適宜crateは追加してください)。

FlowMessage

以下のように、packetをparseした結果を格納する構造体を定義します。

(多くのfieldが定義されているだけなので読み飛ばして頂いて大丈夫です)

struct FlowMessage
use serde::{Deserialize, Serialize};
use derive_builder::Builder;
use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr};

#[derive(Builder, Debug, Serialize, Deserialize)]
struct FlowMessage {
    #[builder(setter(into, strip_option), default)]
    datetime: Option<String>,

    #[builder(setter(into, strip_option), default)]
    exporter_addr: Option<SocketAddr>,

    #[builder(setter(into, strip_option), default)]
    version: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    sys_up_time: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    unix_secs: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    unix_nsecs: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    flow_sequence: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    engine_type: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    engine_id: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    sampling_interval: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    ipv4_src_addr: Option<Ipv4Addr>,

    #[builder(setter(into, strip_option), default)]
    ipv4_dst_addr: Option<Ipv4Addr>,

    #[builder(setter(into, strip_option), default)]
    ipv4_next_hop: Option<Ipv4Addr>,

    #[builder(setter(into, strip_option), default)]
    input: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    output: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    dpkts: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    d0ctets: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    first: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    last: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    src_port: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    dst_port: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    tcp_flags: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    tos: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    src_as: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    dst_as: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    src_mask: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    dst_mask: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    in_bytes: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    in_pkts: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    flows: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    protocol: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    input_snmp: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    output_snmp: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    bgp_ipv4_next_hop: Option<Ipv4Addr>,

    #[builder(setter(into, strip_option), default)]
    mul_dst_pkts: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    mul_dst_bytes: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    last_switched: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    first_switched: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    out_bytes: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    out_pkts: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    ipv6_src_addr: Option<Ipv6Addr>,

    #[builder(setter(into, strip_option), default)]
    ipv6_dst_addr: Option<Ipv6Addr>,

    #[builder(setter(into, strip_option), default)]
    ipv6_src_mask: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    ipv6_dst_mask: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    ipv6_flow_label: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    icmp_type: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    mul_igmp_type: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    sampling_algorithm: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    flow_active_timeout: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    flow_inactive_timeout: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    total_bytes_exp: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    total_pkts_exp: Option<usize>,

    #[builder(setter(into, strip_option), default)]
    mpls_top_label: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    mpls_top_label_ip_addr: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    flow_sampler_id: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    flow_sampler_mode: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    flow_sampler_random_interval: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    dst_tos: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    src_mac: Option<u64>,

    #[builder(setter(into, strip_option), default)]
    dst_mac: Option<u64>,

    #[builder(setter(into, strip_option), default)]
    src_vlan: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    dst_vlan: Option<u16>,

    #[builder(setter(into, strip_option), default)]
    ip_protocol_version: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    direction: Option<u8>,

    #[builder(setter(into, strip_option), default)]
    ipv6_next_hop: Option<Ipv6Addr>,

    #[builder(setter(into, strip_option), default)]
    bgp_ipv6_next_hop: Option<Ipv6Addr>,

    #[builder(setter(into, strip_option), default)]
    ipv6_option_headers: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_1: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_2: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_3: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_4: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_5: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_6: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_7: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_8: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_9: Option<u32>,

    #[builder(setter(into, strip_option), default)]
    mpls_label_10: Option<u32>,
}

Handler

以下のようにHandler traitを定義します。
このtraitでは、handle関数でpacketを処理してFlowMessageにparseしたものを返すResultを定義します。
Boxのcloneは後述の #Server でわかりますが非同期に動作させるために定義します。

このtraitを実装することで様々なflowのプロトコルに対応するようにします。今回は、Netflow v5, v9のそれぞれのHandlerを実装します。

use anyhow::Result;

trait Handler: Send{
    fn box_clone(&self) -> Box<dyn Handler>;
    fn handle(&self, buf: &Vec<u8>, size: usize, addr: SocketAddr) -> Result<Vec<FlowMessage>>;
}

impl Clone for Box<dyn Handler> {
    fn clone(&self) -> Box<dyn Handler> {
        self.box_clone()
    }
}

Publisher

以下のようにPublisher traitを定義します。
このtraitでは、publish関数でparseしたFlowMessageを受け取り、加工して様々な出力先に送信します。

このtraitを実装することで様々な出力先に対応するようにします。今回は、受け取ったデータをJSONやCSVで出力するように実装します。

trait Publisher: Send {
    fn box_clone(&self) -> Box<dyn Publisher>;
    fn publish(&self, flowmessages: &Vec<FlowMessage>) -> Result<()>;
}

impl Clone for Box<dyn Publisher> {
    fn clone(&self) -> Box<dyn Publisher> {
        self.box_clone()
    }
}

Server

上記のHandlerとPublisher用いて、tokioを使った非同期Serverを以下のように定義します。
Serverは複数のHandlerとPublisherを持ち、複数のプロトコルと出力に対応する設計にします。

use tokio::net::UdpSocket;

struct Server {
    pub socket: UdpSocket,
    pub buf: Vec<u8>,
    pub handlers: Vec<Box<dyn Handler>>,
    pub publishers: Vec<Box<dyn Publisher>>,
}

impl Server {
    async fn run(self) -> Result<()> {
        let Server {
            socket,
            mut buf,
            handlers,
            publishers,
        } = self;

        loop {
            // udp packetをreceivesする
            match socket.recv_from(&mut buf).await {
                Ok((size, addr)) => {
                    let buf_c = buf.clone();
                    let handlers_c = handlers.clone(); // Boxのclone
                    let publishers_c = publishers.clone(); // Boxのclone
                    // spwanして非同期に処理する
                    tokio::spawn(async move {
                        for handler in handlers_c.iter() {
                            match handler.handle(&buf_c, size, addr) {
                                // packetをparseする
                                Ok(flowdatas) => {
                                    for publisher in publishers_c.iter() {
                                        // parseしたflowをpublishする
                                        match publisher.publish(&flowdatas) {
                                            Ok(_) => {}
                                            Err(e) => {
                                                eprintln!("{}", e);
                                            }
                                        }
                                    }
                                    // 成功した場合breakする
                                    break;
                                }
                                Err(e) => {
                                    eprintln!("{}", e);
                                }
                            }
                        }
                    });
                }
                Err(e) => {
                    eprintln!("{}", e);
                }
            };
        }
    }
}

PrintHandler

試しにprintするだけのHandlerを定義してpacketが受け取れているか確かめて見ましょう。

use std::error::Error;

#[derive(Debug, Clone)]
struct PrintHandler {}

impl Handler for PrintHandler {
    fn box_clone(&self) -> Box<dyn Handler> {
        Box::new(self.clone())
    }

    fn handle(&self, buf: &Vec<u8>, size: usize, _addr: SocketAddr) -> Result<Vec<FlowMessage>> {
        println!("{:?}", &buf[0..size]);
        Ok(vec![])
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let socket = UdpSocket::bind("0.0.0.0:2055").await?;
    let server = Server {
        socket: socket,
        buf: vec![0u8; 4096],
        handlers: vec![Box::new(PrintHandler{})],
        publishers: vec![],
    };
    server.run().await?;
    Ok(())
}

上記のコードを動かすと、以下のような出力がされると思います。このpacketをparseするために次からNetflowの仕様を確認していきます。

[0, 9, 0, 2, 0, 1, 134, 160, 95, 205, 16, 111, 0, 0, 39, 17, 0, 0, 4, 0, 0, 0, 1, 100, 4, 0, 0, 21, 0, 21, 0, 4, 0, 22, 0, 4, 0, 1, 0, 4, 0, 2, 0, 4, 0, 60, 0, 1, 0, 10, 0, 2, 0, 14, 0, 2, 0, 61, 0, 1, 0, 3, 0, 4, 0, 8, 0, 4, 0, 12, 0, 4, 0, 7, 0, 2, 0, 11, 0, 2, 0, 5, 0, 1, 0, 6, 0, 1, 0, 4, 0, 1, 0, 56, 0, 6, 0, 80, 0, 6, 0, 58, 0, 2, 0, 201, 0, 4, 0, 48, 0, 1, 4, 1, 0, 21, 0, 21, 0, 4, 0, 22, 0, 4, 0, 1, 0, 4, 0, 2, 0, 4, 0, 60, 0, 1, 0, 10, 0, 2, 0, 14, 0, 2, 0, 61, 0, 1, 0, 3, 0, 4, 0, 8, 0, 4, 0, 12, 0, 4, 0, 7, 0, 2, 0, 11, 0, 2, 0, 5, 0, 1, 0, 6, 0, 1, 0, 4, 0, 1, 0, 81, 0, 6, 0, 57, 0, 6, 0, 59, 0, 2, 0, 201, 0, 4, 0, 48, 0, 1, 8, 0, 0, 21, 0, 21, 0, 4, 0, 22, 0, 4, 0, 1, 0, 4, 0, 2, 0, 4, 0, 60, 0, 1, 0, 10, 0, 2, 0, 14, 0, 2, 0, 61, 0, 1, 0, 3, 0, 4, 0, 27, 0, 16, 0, 28, 0, 16, 0, 5, 0, 1, 0, 7, 0, 2, 0, 11, 0, 2, 0, 6, 0, 1, 0, 4, 0, 1, 0, 56, 0, 6, 0, 80, 0, 6, 0, 58, 0, 2, 0, 201, 0, 4, 0, 48, 0, 1, 8, 1, 0, 21, 0, 21, 0, 4, 0, 22, 0, 4, 0, 1, 0, 4, 0, 2, 0, 4, 0, 60, 0, 1, 0, 10, 0, 2, 0, 14, 0, 2, 0, 61, 0, 1, 0, 3, 0, 4, 0, 27, 0, 16, 0, 28, 0, 16, 0, 5, 0, 1, 0, 7, 0, 2, 0, 11, 0, 2, 0, 6, 0, 1, 0, 4, 0, 1, 0, 81, 0, 6, 0, 57, 0, 6, 0, 59, 0, 2, 0, 201, 0, 4, 0, 48, 0, 1, 0, 1, 0, 28, 16, 0, 0, 4, 0, 12, 0, 1, 0, 4, 0, 48, 0, 1, 0, 49, 0, 1, 0, 50, 0, 4, 0, 0]

Netflow v5 Handler

ここではNetflow v5を受け取りFlowMessageにparseするHandlerを実装します。
Netflow v5は固定のフォーマット形式となっているので、packetを素直にparseすれば大丈夫です。

以下のように、Headerと複数のレコードが連続しています。

   Export Packet:
   +--------+--------------------------------------------------------+
   |        | +----------+ +---------+     +-----------+ +---------+ |
   |        | |          | |         |     |           | |         | |
   | Header | |  Record  | | Record  | ... | Record    | | Record  | |
   |        | |          | |         |     |           | |         | |
   |        | +----------+ +---------+     +-----------+ +---------+ |
   +--------+--------------------------------------------------------+

Header Format

Netflow v5のHeaderは以下のように定義されております。それぞれのfieldの内容の説明についてはここでは割愛します。

    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |           Version             |           Count               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                          SysUptime                            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                          unix_secs                            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                          unix_nsecs                           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                          flow_sequence                        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |  engine_type  |   engine_id   |       sampling_interval       |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Record Format

以下のようにflowのレコードデータが定義されています。このレコードがHeaderの Count 数だけ連続して存在します。

    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                            srcaddr                            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                            dstaddr                            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                            nexthop                            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |            input              |             output            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                             dPkts                             |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                             dOctets                           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                             First                             |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                             Last                              |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |           srcport             |            dstport            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   padding(0)  |   tcp_flags   |     prot      |     tos       |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |            src_as             |             dst_as            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |    src_mask   |    dst_mask   |             padding(0)        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

NetflowV5Handler

上記のNetflow v5のHeaderとRecordを読み込むHandlerを以下に定義します。読み込みの際には、エンディアンに注意して実装します。

use byteorder::{BigEndian, ReadBytesExt};
use chrono::Utc;
use std::io::Cursor;
use anyhow::anyhow;

#[derive(Debug, Clone)]
struct NetflowV5Handler {}

impl Handler for NetflowV5Handler {
    fn box_clone(&self) -> Box<dyn Handler> {
        Box::new(self.clone())
    }

    fn handle(
        &self,
        buf: &Vec<u8>,
        _size: usize,
        exporter_addr: SocketAddr,
    ) -> Result<Vec<FlowMessage>> {
        let mut rdr = Cursor::new(buf.as_slice());
        let datetime = Utc::now();
        // Parse Header
        let version = rdr.read_u16::<BigEndian>()?;
        if version != 5 {
            return Err(anyhow!(
                "NetflowV5Handler does not support version {}",
                version
            ));
        }
        let count = rdr.read_u16::<BigEndian>()?;
        let sys_up_time = rdr.read_u32::<BigEndian>()?;
        let unix_secs = rdr.read_u32::<BigEndian>()?;
        let unix_nsecs = rdr.read_u32::<BigEndian>()?;
        let flow_sequence = rdr.read_u32::<BigEndian>()?;
        let engine_type = rdr.read_u8()?;
        let engine_id = rdr.read_u8()?;
        let sampling_interval = rdr.read_u16::<BigEndian>()?;

        let mut flowmessages = Vec::with_capacity(count as usize);
        for _ in 0..count {
            let mut builder = FlowMessageBuilder::default();

            // Parse Record
            let src_addr = Ipv4Addr::from(rdr.read_u32::<BigEndian>()?);
            let dst_addr = Ipv4Addr::from(rdr.read_u32::<BigEndian>()?);
            let next_hop = Ipv4Addr::from(rdr.read_u32::<BigEndian>()?);
            let input = rdr.read_u16::<BigEndian>()?;
            let output = rdr.read_u16::<BigEndian>()?;
            let dpkts = rdr.read_u32::<BigEndian>()?;
            let d0ctets = rdr.read_u32::<BigEndian>()?;
            let first = rdr.read_u32::<BigEndian>()?;
            let last = rdr.read_u32::<BigEndian>()?;
            let src_port = rdr.read_u16::<BigEndian>()?;
            let dst_port = rdr.read_u16::<BigEndian>()?;
            let _ = rdr.read_u8()?;
            let tcp_flags = rdr.read_u8()?;
            let proto = rdr.read_u8()?;
            let tos = rdr.read_u8()?;
            let src_as = rdr.read_u16::<BigEndian>()?;
            let dst_as = rdr.read_u16::<BigEndian>()?;
            let src_mask = rdr.read_u8()?;
            let dst_mask = rdr.read_u8()?;
            let _ = rdr.read_u16::<BigEndian>()?;

            builder
                .datetime(datetime.to_string())
                .exporter_addr(exporter_addr)
                .version(version)
                .sys_up_time(sys_up_time)
                .unix_secs(unix_secs)
                .unix_nsecs(unix_nsecs)
                .flow_sequence(flow_sequence)
                .engine_type(engine_type)
                .engine_id(engine_id)
                .sampling_interval(sampling_interval as u32)
                .ipv4_src_addr(src_addr)
                .ipv4_dst_addr(dst_addr)
                .ipv4_next_hop(next_hop)
                .input(input)
                .output(output)
                .dpkts(dpkts)
                .d0ctets(d0ctets)
                .first(first)
                .last(last)
                .src_port(src_port)
                .dst_port(dst_port)
                .tcp_flags(tcp_flags)
                .protocol(proto)
                .tos(tos)
                .src_as(src_as as u32)
                .dst_as(dst_as as u32)
                .src_mask(src_mask)
                .dst_mask(dst_mask)
                .ip_protocol_version(4u8);

            flowmessages.push(builder.build().unwrap());
        }
        Ok(flowmessages)
    }
}

Netflow v9 Handler

Netflow v9はNetflow v5とは違いTemplate FlowSetによってデータのフォーマットを定義する仕様になっています。定義されたフォーマットに従ってData FlowSetをparseします。さらに、主にサンプリングレートなどの共通の付加情報を定義するためのOptions Template FlowSet、Options Data Recordがあります。TemplateとOptionでは状態を共有して処理をしなければいけないことに気をつけて処理をします。

以下のように、テンプレートとデータの2つのフローを組み合わせることで最終的なフローデータの受け取ります。

   Export Packet:
   +--------+--------------------------------------------------------+
   |        | +----------+ +---------+     +-----------+ +---------+ |
   | Packet | | Template | | Data    |     | Options   | | Data    | |
   | Header | | FlowSet  | | FlowSet | ... | Template  | | FlowSet | |
   |        | |          | |         |     | FlowSet   | |         | |
   |        | +----------+ +---------+     +-----------+ +---------+ |
   +--------+--------------------------------------------------------+

Header Format

ここでは、Headerの定義を示しています。

    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |       Version Number          |            Count              |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                           sysUpTime                           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                           UNIX Secs                           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                       Sequence Number                         |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |                        Source ID                              |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Source ID
A 32-bit value that identifies the Exporter Observation Domain.
NetFlow Collectors SHOULD use the combination of the source IP
address and the Source ID field to separate different export
streams originating from the same Exporter.

上記のようにexporterのIPとSource IDをTemplateのCache Keyとして使用する必要があります。このKeyを利用して共通したTemplate情報を扱うCache機構を実装します。

Template FlowSet Format

以下のように、FieldのTypeとLengthをTemplateとして定義します。Source IDとTemplate IDをKeyとしてTemplate Cache機構にフォーマットを保存します。
Netflow v9ではTemplate FlowSetはFlowSet IDが0、Template IDは256以上として設定されます。

    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |       FlowSet ID = 0          |          Length               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |      Template ID 256          |         Field Count           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Field Type 1           |         Field Length 1        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Field Type 2           |         Field Length 2        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |             ...               |              ...              |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Field Type N           |         Field Length N        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |      Template ID 257          |         Field Count           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Field Type 1           |         Field Length 1        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Field Type 2           |         Field Length 2        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |             ...               |              ...              |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Field Type M           |         Field Length M        |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |             ...               |              ...              |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Template ID K          |         Field Count           |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |             ...               |              ...              |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Data FlowSet Format

256以上のFlowSet IDは事前に受け取ったTemplate IDと一致するように設定されます。事前にCacheしていたTemplate Cache機構からFlowSet IDを用いてRecordのフォーマットを取得してparseします。4の倍数にpaddingされているのでparseする時には気をつけます。

    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   FlowSet ID = Template ID    |          Length               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 1 - Field Value 1    |   Record 1 - Field Value 2    |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 1 - Field Value 3    |             ...               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 2 - Field Value 1    |   Record 2 - Field Value 2    |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 2 - Field Value 3    |             ...               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 3 - Field Value 1    |             ...               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |              ...              |            Padding            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Options Template FlowSet Format

Template FlowSetと同様にSource IDとTemplate IDをKeyとしてTemplate Cache機構にフォーマットを保存します。
Options Template FlowSetには、Template FlowSetと違い共通情報としての適用範囲を指定する1 System, 2 Interface, 3 Line Card, 4 Cache, 5 Template の5段階のOption Scopeというものがあります。しかし、本記事ではoption scopeを利用せずexporter IPが同じあれば全て共通情報とする、つまりSystemのスコープとして扱います。これは、多くの場合Option Scopeが正しく実装されていない問題を回避する方法として知られている方法です。

OSSのNetflow Collectorであるpmacct では、optionとしてoption scopeを考慮しない設定があります。

KEY:            nfacctd_disable_opt_scope_check [GLOBAL, ONLY_NFACCTD]
VALUES:         [ true | false ]
DESC:           Mainly a workaround to implementations not encoding NetFlow v9/IPIFX option scope correctly,
                this knob allows to disable option scope checking. By doing so, options are considered scoped
                to the system level (ie. to the IP address of the expoter).
DEFAULT:        false

https://github.com/pmacct/pmacct/blob/master/CONFIG-KEYS#L1884-L1889

機器によってはインターフェイスによってサンプリングレートを変えている場合などあるのでそれぞれのネットワーク環境に十分に気をつける必要があります。

Netflow v9ではTemplate FlowSetはFlowSet IDが1、Template IDは256以上として設定されます。

    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |       FlowSet ID = 1          |          Length               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |         Template ID           |      Option Scope Length      |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |        Option Length          |       Scope 1 Field Type      |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |     Scope 1 Field Length      |               ...             |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |     Scope N Field Length      |      Option 1 Field Type      |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |     Option 1 Field Length     |             ...               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |     Option M Field Length     |           Padding             |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Options Data Record Format

256以上のFlowSet IDは事前に受け取ったOption Template FlowSetのTemplate IDと一致するように設定されます。事前にCacheしていたTemplate Cache機構からFlowSet IDを用いてRecordのフォーマットを取得してparseします。4の倍数にpaddingされているのでparseする時には気をつけます。parseしたOption レコードの情報はexporterのIPをKeyとして共通情報を保存するOption Cache機構にいれます。

    0                   1                   2                   3
    0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |    FlowSet ID = Template ID   |          Length               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 1 - Scope 1 Value    |Record 1 - Option Field 1 Value|
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |Record 1 - Option Field 2 Value|             ...               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 2 - Scope 1 Value    |Record 2 - Option Field 1 Value|
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |Record 2 - Option Field 2 Value|             ...               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |   Record 3 - Scope 1 Value    |Record 3 - Option Field 1 Value|
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |Record 3 - Option Field 2 Value|             ...               |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
   |              ...              |            Padding            |
   +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+

Template Cache

Template Cache機構の実装は以下のようになります。HashMapをwrapした実装にします。

use std::collections::HashMap;

#[derive(Debug, PartialEq, Eq, Hash)]
struct TemplateCacheKey {
    exporter_ip: String,
    source_id: u32,
    template_id: u16,
    version: u16,
}

impl TemplateCacheKey {
    pub fn new(
        exporter_ip: String,
        source_id: u32,
        template_id: u16,
        version: u16,
    ) -> TemplateCacheKey {
        TemplateCacheKey {
            exporter_ip: exporter_ip,
            source_id,
            template_id,
            version,
        }
    }
}

#[derive(Debug, PartialEq, Eq, Hash)]
struct Field {
    pub type_: u16,
    pub length: u16,
}

impl Field {
    pub fn new(type_: u16, length: u16) -> Field {
        Field { type_, length }
    }
}

#[derive(Debug, PartialEq, Eq, Hash)]
struct TemplateCacheValue {
    pub fields: Vec<Field>,
    pub scope_fields: Vec<Field>,
    pub is_option: bool,
}

impl TemplateCacheValue {
    pub fn new(
        fields: Vec<Field>,
        scope_fields: Vec<Field>,
        is_option: bool,
    ) -> TemplateCacheValue {
        TemplateCacheValue {
            fields: fields,
            scope_fields: scope_fields,
            is_option: is_option,
        }
    }
}

#[derive(Debug)]
struct TemplateCache {
    map: HashMap<TemplateCacheKey, TemplateCacheValue>,
}

impl TemplateCache {
    pub fn new() -> TemplateCache {
        TemplateCache {
            map: HashMap::new(),
        }
    }

    pub fn insert(
        &mut self,
        k: TemplateCacheKey,
        v: TemplateCacheValue,
    ) -> Option<TemplateCacheValue> {
        self.map.insert(k, v)
    }

    pub fn get(&self, k: &TemplateCacheKey) -> Option<&TemplateCacheValue> {
        self.map.get(k)
    }

    pub fn contains_key(&self, k: &TemplateCacheKey) -> bool {
        self.map.contains_key(k)
    }
}

Option Cache

共通情報のOption Cache機構の実装は以下のようになります。HashMapをwrapした実装にします。

type FlowDatas = HashMap<u16, Vec<u8>>;

#[derive(Debug, PartialEq, Eq, Hash)]
struct OptionCacheKey {
    exporter_ip: String,
}

impl OptionCacheKey {
    pub fn new(exporter_ip: String) -> OptionCacheKey {
        OptionCacheKey {
            exporter_ip: exporter_ip,
        }
    }
}

#[derive(Debug)]
struct OptionCache {
    map: HashMap<OptionCacheKey, FlowDatas>,
}

impl OptionCache {
    pub fn new() -> OptionCache {
        OptionCache {
            map: HashMap::new(),
        }
    }

    pub fn insert(&mut self, k: OptionCacheKey, v: FlowDatas) -> Option<FlowDatas> {
        self.map.insert(k, v)
    }

    pub fn get(&self, k: &OptionCacheKey) -> Option<&FlowDatas> {
        self.map.get(k)
    }

    pub fn contains_key(&self, k: &OptionCacheKey) -> bool {
        self.map.contains_key(k)
    }
}

NetflowV9Handler

以下は、Netflow v9をparseするHandlerの実装になっています。
共有状態であるTemplate Cache機構、Option Cache機構は ArcRwLock を使用します。

use byteorder::{ByteOrder};
use std::sync::{ Arc, RwLock};
use std::io::prelude::*;

fn bytes_to_usize(v: &Vec<u8>) -> Result<usize> {
    let v_len = v.len();
    match v_len {
        1 => Ok(v[0] as usize),
        2 => Ok(BigEndian::read_u16(v.as_slice()) as usize),
        3 => Ok(BigEndian::read_u24(v.as_slice()) as usize),
        4 => Ok(BigEndian::read_u32(v.as_slice()) as usize),
        6 => Ok(BigEndian::read_u48(v.as_slice()) as usize),
        8 => Ok(BigEndian::read_u64(v.as_slice()) as usize),
        16 => Ok(BigEndian::read_u128(v.as_slice()) as usize),
        _ => Err(anyhow!("read error")),
    }
}

#[derive(Debug, Clone)]
struct NetflowV9Handler {
    pub template_cache: Arc<RwLock<TemplateCache>>,
    pub option_cache: Arc<RwLock<OptionCache>>,
}

impl NetflowV9Handler {
    pub fn new() -> NetflowV9Handler {
        NetflowV9Handler {
            template_cache: Arc::new(RwLock::new(TemplateCache::new())),
            option_cache: Arc::new(RwLock::new(OptionCache::new())),
        }
    }
}

impl Handler for NetflowV9Handler {
    fn box_clone(&self) -> Box<dyn Handler> {
        Box::new(self.clone())
    }

    fn handle(
        &self,
        buf: &Vec<u8>,
        size: usize,
        exporter_addr: SocketAddr,
    ) -> Result<Vec<FlowMessage>> {
        let mut rdr = Cursor::new(buf.as_slice());
        let datetime = Utc::now();
        let version = rdr.read_u16::<BigEndian>()?;
        if version != 9 {
            return Err(anyhow!(
                "NetflowV9Handler does not support version {}",
                version
            ));
        }
        let _count = rdr.read_u16::<BigEndian>()?;
        let sys_up_time = rdr.read_u32::<BigEndian>()?;
        let unix_secs = rdr.read_u32::<BigEndian>()?;
        let seq_number = rdr.read_u32::<BigEndian>()?;
        let source_id = rdr.read_u32::<BigEndian>()?;

        let mut flowmessages = Vec::new();

        while (size as u64 - rdr.position()) > 0 {
            let flowset_id = rdr.read_u16::<BigEndian>()?;
            let length = rdr.read_u16::<BigEndian>()?;
            if flowset_id == 0 {
                // Template FlowSet
                let mut buf_data = vec![0u8; length as usize - 4];
                rdr.read_exact(&mut buf_data)?;
                let mut rdr_data = Cursor::new(buf_data.as_slice());

                let rdr_data_len = buf_data.len() as u64;
                while (rdr_data_len - rdr_data.position()) >= 4 {
                    let template_id = rdr_data.read_u16::<BigEndian>()?;
                    let field_count = rdr_data.read_u16::<BigEndian>()?;
                    let mut fields = Vec::with_capacity(field_count as usize);
                    for _ in 0..field_count {
                        let field_type = rdr_data.read_u16::<BigEndian>()?;
                        let field_length = rdr_data.read_u16::<BigEndian>()?;
                        if field_length == 0 {
                            return Err(anyhow!("field length 0 error"));
                        }
                        fields.push(Field::new(field_type, field_length))
                    }
                    let k = TemplateCacheKey::new(
                        exporter_addr.ip().to_string(),
                        source_id,
                        template_id,
                        version,
                    );

                    // Template Cache機構への書き込み
                    let v = TemplateCacheValue::new(fields, Vec::new(), false);
                    let mut template_cache = self.template_cache.write().unwrap();
                    template_cache.insert(k, v);
                }
            } else if flowset_id == 1 {
                // Options Template FlowSet
                let mut buf_data = vec![0u8; length as usize - 4];
                rdr.read_exact(&mut buf_data)?;
                let mut rdr_data = Cursor::new(buf_data.as_slice());

                let rdr_data_len = buf_data.len() as u64;
                while (rdr_data_len - rdr_data.position()) >= 4 {
                    let template_id = rdr_data.read_u16::<BigEndian>()?;
                    let option_scope_length = rdr_data.read_u16::<BigEndian>()?;
                    let option_length = rdr_data.read_u16::<BigEndian>()?;

                    let option_scope_cnt = (option_scope_length / 4) as usize;
                    let option_cnt = (option_length / 4) as usize;

                    let mut scope_fields = Vec::with_capacity(option_scope_cnt);
                    let mut fields = Vec::with_capacity(option_cnt);

                    for _ in 0..option_scope_cnt {
                        let field_type = rdr_data.read_u16::<BigEndian>()?;
                        let field_length = rdr_data.read_u16::<BigEndian>()?;
                        if field_length == 0 {
                            return Err(anyhow!("field length 0 error"));
                        }
                        scope_fields.push(Field::new(field_type, field_length))
                    }

                    for _ in 0..option_cnt {
                        let field_type = rdr_data.read_u16::<BigEndian>()?;
                        let field_length = rdr_data.read_u16::<BigEndian>()?;
                        if field_length == 0 {
                            return Err(anyhow!("field length 0 error"));
                        }
                        fields.push(Field::new(field_type, field_length))
                    }

                    let k = TemplateCacheKey::new(
                        exporter_addr.ip().to_string(),
                        source_id,
                        template_id,
                        version,
                    );

                    // Template Cache機構への書き込み
                    let v = TemplateCacheValue::new(fields, scope_fields, true);
                    let mut template_cache = self.template_cache.write().unwrap();
                    template_cache.insert(k, v);
                }
            } else if flowset_id >= 256 {
                // Data FlowSet
                let k = TemplateCacheKey::new(
                    exporter_addr.ip().to_string(),
                    source_id,
                    flowset_id,
                    version,
                );

                // Template Cache機構の読み込み
                let template_cache = self.template_cache.read().unwrap();
                if !template_cache.contains_key(&k) {
                    return Err(anyhow!("not found template template_id = {}", flowset_id));
                }
                let v = template_cache.get(&k).unwrap();

                let field_length_sum = v.fields.iter().map(|x| x.length as u64).sum::<u64>();

                let mut buf_data = vec![0; length as usize - 4];
                rdr.read_exact(&mut buf_data)?;
                let buf_data_len = buf_data.len() as u64;
                let mut rdr_data = Cursor::new(buf_data.as_slice());

                while (buf_data_len - rdr_data.position()) >= field_length_sum {
                    let mut scope_datas = FlowDatas::new();
                    for o in v.scope_fields.iter() {
                        let mut buf_data = vec![0u8; o.length as usize];
                        rdr_data.read_exact(&mut buf_data)?;
                        scope_datas.insert(o.type_, buf_data);
                    }

                    let mut datas = FlowDatas::new();
                    for o in v.fields.iter() {
                        let mut buf_data = vec![0u8; o.length as usize];
                        rdr_data.read_exact(&mut buf_data)?;
                        datas.insert(o.type_, buf_data);
                    }

                    if v.is_option {
                        // Option Cache機構への書き込み
                        let mut option_cache = self.option_cache.write().unwrap();
                        let k = OptionCacheKey::new(exporter_addr.ip().to_string());
                        option_cache.insert(k, datas);
                    } else {
                        let mut builder = FlowMessageBuilder::default();
                        builder
                            .datetime(datetime.to_string())
                            .exporter_addr(exporter_addr)
                            .version(version)
                            .sys_up_time(sys_up_time)
                            .flow_sequence(seq_number)
                            .unix_secs(unix_secs);
                        builder = add_builder(builder, &datas);

                        // Option Cache機構の読み込む
                        let option_cache = self.option_cache.read().unwrap();
                        let k = OptionCacheKey::new(exporter_addr.ip().to_string());
                        if option_cache.contains_key(&k) {
                            let option_datas = option_cache.get(&k).unwrap();
                            builder = add_builder(builder, option_datas);
                        }

                        flowmessages.push(builder.build().unwrap());
                    }
                }
            }
        }
        Ok(flowmessages)
    }
}

dataをFlowMessageに対応させる add_builder 関数を定義します。

(多くのfieldへの対応付がされているので読み飛ばして頂いて大丈夫です)

fn add_builder
fn add_builder(mut builder: FlowMessageBuilder, datas: &FlowDatas) -> FlowMessageBuilder {
    for (type_, data) in datas {
        match type_ {
            1u16 => {
                if let Ok(in_bytes) = bytes_to_usize(&data) {
                    builder.in_bytes(in_bytes);
                }
            }
            2u16 => {
                if let Ok(in_pkts) = bytes_to_usize(&data) {
                    builder.in_pkts(in_pkts);
                }
            }
            3u16 => {
                if let Ok(flows) = bytes_to_usize(&data) {
                    builder.flows(flows);
                }
            }
            4u16 => {
                builder.protocol(data[0]);
            }
            5u16 => {
                builder.tos(data[0]);
            }
            6u16 => {
                builder.tcp_flags(data[0]);
            }
            7u16 => {
                builder.src_port(BigEndian::read_u16(data.as_slice()));
            }
            8u16 => {
                builder.ipv4_src_addr(Ipv4Addr::from(BigEndian::read_u32(data.as_slice())));
            }
            9u16 => {
                builder.src_mask(data[0]);
            }
            10u16 => {
                if let Ok(input_snmp) = bytes_to_usize(&data) {
                    builder.input_snmp(input_snmp);
                }
            }
            11u16 => {
                builder.dst_port(BigEndian::read_u16(data.as_slice()));
            }
            12u16 => {
                builder.ipv4_dst_addr(Ipv4Addr::from(BigEndian::read_u32(data.as_slice())));
            }
            13u16 => {
                builder.dst_mask(data[0]);
            }
            14u16 => {
                if let Ok(output_snmp) = bytes_to_usize(&data) {
                    builder.output_snmp(output_snmp);
                }
            }
            15u16 => {
                builder.ipv4_next_hop(Ipv4Addr::from(BigEndian::read_u32(data.as_slice())));
            }
            16u16 => {
                if data.len() == 2 {
                    builder.src_as(BigEndian::read_u16(data.as_slice()) as u32);
                } else if data.len() == 4 {
                    builder.src_as(BigEndian::read_u32(data.as_slice()));
                }
            }
            17u16 => {
                if data.len() == 2 {
                    builder.dst_as(BigEndian::read_u16(data.as_slice()) as u32);
                } else if data.len() == 4 {
                    builder.dst_as(BigEndian::read_u32(data.as_slice()));
                }
            }
            18u16 => {
                builder.bgp_ipv4_next_hop(Ipv4Addr::from(BigEndian::read_u32(data.as_slice())));
            }
            19u16 => {
                if let Ok(mul_dst_pkts) = bytes_to_usize(&data) {
                    builder.mul_dst_pkts(mul_dst_pkts);
                }
            }
            20u16 => {
                if let Ok(mul_dst_bytes) = bytes_to_usize(&data) {
                    builder.mul_dst_bytes(mul_dst_bytes);
                }
            }
            21u16 => {
                builder.last_switched(BigEndian::read_u32(data.as_slice()));
            }
            22u16 => {
                builder.first_switched(BigEndian::read_u32(data.as_slice()));
            }
            23u16 => {
                if let Ok(out_bytes) = bytes_to_usize(&data) {
                    builder.out_bytes(out_bytes);
                }
            }
            24u16 => {
                if let Ok(out_pkts) = bytes_to_usize(&data) {
                    builder.out_pkts(out_pkts);
                }
            }
            27u16 => {
                builder.ipv6_src_addr(Ipv6Addr::from(BigEndian::read_u128(data.as_slice())));
            }
            28u16 => {
                builder.ipv6_dst_addr(Ipv6Addr::from(BigEndian::read_u128(data.as_slice())));
            }
            29u16 => {
                builder.ipv6_src_mask(data[0]);
            }
            30u16 => {
                builder.ipv6_dst_mask(data[0]);
            }
            31u16 => {
                if let Ok(ipv6_flow_label) = bytes_to_usize(&data) {
                    builder.ipv6_flow_label(ipv6_flow_label);
                }
            }
            32u16 => {
                builder.icmp_type(BigEndian::read_u16(data.as_slice()));
            }
            33u16 => {
                builder.mul_igmp_type(data[0]);
            }
            34u16 => {
                builder.sampling_interval(BigEndian::read_u32(data.as_slice()));
            }
            35u16 => {
                builder.sampling_algorithm(data[0]);
            }
            36u16 => {
                builder.flow_active_timeout(BigEndian::read_u16(data.as_slice()));
            }
            37u16 => {
                builder.flow_inactive_timeout(BigEndian::read_u16(data.as_slice()));
            }
            38u16 => {
                builder.engine_type(data[0]);
            }
            39u16 => {
                builder.engine_id(data[0]);
            }
            40u16 => {
                builder.engine_id(data[0]);
            }
            41u16 => {
                if let Ok(total_bytes_exp) = bytes_to_usize(&data) {
                    builder.total_bytes_exp(total_bytes_exp);
                }
            }
            42u16 => {
                if let Ok(total_pkts_exp) = bytes_to_usize(&data) {
                    builder.total_pkts_exp(total_pkts_exp);
                }
            }
            46u16 => {
                builder.mpls_top_label(data[0]);
            }
            47u16 => {
                builder.mpls_top_label_ip_addr(BigEndian::read_u32(data.as_slice()));
            }
            48u16 => {
                builder.flow_sampler_id(data[0]);
            }
            49u16 => {
                builder.flow_sampler_mode(data[0]);
            }
            50u16 => {
                builder.flow_sampler_random_interval(BigEndian::read_u32(data.as_slice()));
            }
            55u16 => {
                builder.dst_tos(data[0]);
            }
            56u16 => {
                builder.src_mac(BigEndian::read_u48(data.as_slice()));
            }
            57u16 => {
                builder.dst_mac(BigEndian::read_u48(data.as_slice()));
            }
            58u16 => {
                builder.src_vlan(BigEndian::read_u16(data.as_slice()));
            }
            59u16 => {
                builder.dst_vlan(BigEndian::read_u16(data.as_slice()));
            }
            60u16 => {
                builder.ip_protocol_version(data[0]);
            }
            61u16 => {
                builder.direction(data[0]);
            }
            62u16 => {
                builder.ipv6_next_hop(Ipv6Addr::from(BigEndian::read_u128(data.as_slice())));
            }
            63u16 => {
                builder.bgp_ipv6_next_hop(Ipv6Addr::from(BigEndian::read_u128(data.as_slice())));
            }
            64u16 => {
                builder.ipv6_option_headers(BigEndian::read_u32(data.as_slice()));
            }
            70u16 => {
                builder.mpls_label_1(BigEndian::read_u24(data.as_slice()));
            }
            71u16 => {
                builder.mpls_label_2(BigEndian::read_u24(data.as_slice()));
            }
            72u16 => {
                builder.mpls_label_3(BigEndian::read_u24(data.as_slice()));
            }
            73u16 => {
                builder.mpls_label_4(BigEndian::read_u24(data.as_slice()));
            }
            74u16 => {
                builder.mpls_label_5(BigEndian::read_u24(data.as_slice()));
            }
            75u16 => {
                builder.mpls_label_6(BigEndian::read_u24(data.as_slice()));
            }
            76u16 => {
                builder.mpls_label_7(BigEndian::read_u24(data.as_slice()));
            }
            77u16 => {
                builder.mpls_label_8(BigEndian::read_u24(data.as_slice()));
            }
            78u16 => {
                builder.mpls_label_9(BigEndian::read_u24(data.as_slice()));
            }
            79u16 => {
                builder.mpls_label_10(BigEndian::read_u24(data.as_slice()));
            }
            _ => {}
        }
    }
    builder
}

JSON Publisher

以下のようにJSONに出力するPublisherを実装します。

#[derive(Debug, Clone)]
struct JsonPublisher {}

impl Publisher for JsonPublisher {
    fn box_clone(&self) -> Box<dyn Publisher> {
        Box::new(self.clone())
    }

    fn publish(&self, flowmessages: &Vec<FlowMessage>) -> Result<()> {
        for flowmessage in flowmessages {
            let serialized = serde_json::to_string(flowmessage)?;
            println!("{}", serialized);
        }
        Ok(())
    }
}

CSV Publisher

以下のようにCSVに出力するPublisherを実装します。

use csv::WriterBuilder;
use std::io::stdout;

#[derive(Debug, Clone)]
struct CsvPublisher {}

impl Publisher for CsvPublisher {
    fn box_clone(&self) -> Box<dyn Publisher> {
        Box::new(self.clone())
    }

    fn publish(&self, flowmessages: &Vec<FlowMessage>) -> Result<()> {
        let mut wtr = WriterBuilder::new()
            .has_headers(false)
            .from_writer(stdout());
        for flowmessage in flowmessages {
            wtr.serialize(flowmessage)?;
        }
        wtr.flush()?;
        Ok(())
    }
}

まとめ

以下のようにmain関数を定義することで一つのportで複数のNetflowプロトコルに対応して、複数の出力をします。

use std::error::Error;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
    let socket = UdpSocket::bind("0.0.0.0:2055").await?;
    let server = Server {
        socket: socket,
        buf: vec![0u8; 4096],
        handlers: vec![Box::new(NetflowV5Handler {}), Box::new(NetflowV9Handler::new())],
        publishers: vec![Box::new(JsonPublisher {}), Box::new(CsvPublisher{})],
    };
    server.run().await?;
    Ok(())
}

Netflow v5 出力

{"datetime":"2020-12-06 21:15:48.730703300 UTC","exporter_addr":"127.0.0.1:44979","version":5,"sys_up_time":50168458,"unix_secs":1607289347,"unix_nsecs":204946000,"flow_sequence":41160,"engine_type":1,"engine_id":1,"sampling_interval":0,"ipv4_src_addr":"10.0.0.10","ipv4_dst_addr":"20.0.0.19","ipv4_next_hop":"30.0.0.254","input":1,"output":2,"dpkts":818,"d0ctets":167310,"first":50168124,"last":50168458,"src_port":1160,"dst_port":3160,"tcp_flags":27,"tos":0,"src_as":110,"dst_as":210,"src_mask":24,"dst_mask":24,"in_bytes":null,"in_pkts":null,"flows":null,"protocol":6,"input_snmp":null,"output_snmp":null,"bgp_ipv4_next_hop":null,"mul_dst_pkts":null,"mul_dst_bytes":null,"last_switched":null,"first_switched":null,"out_bytes":null,"out_pkts":null,"ipv6_src_addr":null,"ipv6_dst_addr":null,"ipv6_src_mask":null,"ipv6_dst_mask":null,"ipv6_flow_label":null,"icmp_type":null,"mul_igmp_type":null,"sampling_algorithm":null,"flow_active_timeout":null,"flow_inactive_timeout":null,"total_bytes_exp":null,"total_pkts_exp":null,"mpls_top_label":null,"mpls_top_label_ip_addr":null,"flow_sampler_id":null,"flow_sampler_mode":null,"flow_sampler_random_interval":null,"dst_tos":null,"src_mac":null,"dst_mac":null,"src_vlan":null,"dst_vlan":null,"ip_protocol_version":4,"direction":null,"ipv6_next_hop":null,"bgp_ipv6_next_hop":null,"ipv6_option_headers":null,"mpls_label_1":null,"mpls_label_2":null,"mpls_label_3":null,"mpls_label_4":null,"mpls_label_5":null,"mpls_label_6":null,"mpls_label_7":null,"mpls_label_8":null,"mpls_label_9":null,"mpls_label_10":null}
2020-12-06 21:15:48.728603500 UTC,127.0.0.1:44979,5,50168458,1607289347,204856000,41130,1,1,0,10.0.0.223,20.0.0.247,30.0.0.254,1,2,540,239961,50168188,50168458,1101,3101,27,0,101,201,24,24,,,,6,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,4,,,,,,,,,,,,,,

Netflow v9 出力

{"datetime":"2020-12-06 21:14:04.421696300 UTC","exporter_addr":"127.0.0.1:49152","version":9,"sys_up_time":100000,"unix_secs":1607289244,"unix_nsecs":null,"flow_sequence":10002,"engine_type":null,"engine_id":null,"sampling_interval":null,"ipv4_src_addr":null,"ipv4_dst_addr":null,"ipv4_next_hop":null,"input":null,"output":null,"dpkts":null,"d0ctets":null,"first":null,"last":null,"src_port":18389,"dst_port":56778,"tcp_flags":143,"tos":137,"src_as":null,"dst_as":null,"src_mask":null,"dst_mask":null,"in_bytes":517585210,"in_pkts":77586710,"flows":4284816197,"protocol":125,"input_snmp":47008,"output_snmp":6818,"bgp_ipv4_next_hop":null,"mul_dst_pkts":null,"mul_dst_bytes":null,"last_switched":2485926971,"first_switched":437598202,"out_bytes":null,"out_pkts":null,"ipv6_src_addr":"d5cf:2031:fb1e:6b2a:3882:8e99:a07e:65d7","ipv6_dst_addr":"3c5f:ee4e:4f68:bc01:9cde:dbb3:57b:430","ipv6_src_mask":null,"ipv6_dst_mask":null,"ipv6_flow_label":null,"icmp_type":null,"mul_igmp_type":null,"sampling_algorithm":null,"flow_active_timeout":null,"flow_inactive_timeout":null,"total_bytes_exp":null,"total_pkts_exp":null,"mpls_top_label":null,"mpls_top_label_ip_addr":null,"flow_sampler_id":208,"flow_sampler_mode":null,"flow_sampler_random_interval":null,"dst_tos":null,"src_mac":null,"dst_mac":99995648153052,"src_vlan":null,"dst_vlan":9454,"ip_protocol_version":80,"direction":89,"ipv6_next_hop":null,"bgp_ipv6_next_hop":null,"ipv6_option_headers":null,"mpls_label_1":null,"mpls_label_2":null,"mpls_label_3":null,"mpls_label_4":null,"mpls_label_5":null,"mpls_label_6":null,"mpls_label_7":null,"mpls_label_8":null,"mpls_label_9":null,"mpls_label_10":null}
2020-12-06 21:14:04.421696300 UTC,127.0.0.1:49152,9,100000,1607289244,,10002,,,,223.29.195.136,56.164.102.128,,,,,,,,40390,57485,24,217,,,,,1497831369,2570795131,395010829,17,2697,57646,,,,734749007,522537991,,,,,,,,,,,,,,,,,195,,,,32806267726847,,44242,,126,230,,,,,,,,,,,,,

無事にparseできていることを確認しました。
近年、非同期な仕組みが充実しているRustで実装することができたのでそこそこ早いCollectorに仕上がったと思います。

終わりに

今回は、Netflow v5, v9の対応をしましたが別途Handlerに則って実装することでNetflow v9の拡張であるIPFIXやサンプリングレートベースのsFlowなどの仕様に対応可能です。
PublisherもJSONとCSVのみを実装しましたが、本来であればDB, kafka, Cloudのキューイングシステムなどに出力するPublisherがより一般的です。

今回の実装は適切にモジュールに分けて、こちらに上げておきました。

使用方法

git clone https://github.com/suzusuzu/ferrisflow.git
cd ferrisflow
cargo run -- -p 2055 --netflow-v5 --netflow-v9 --json

Netflow Collectorの実装を通してNetflowについてご理解していただけましたでしょうか。
明日は、 @tetrapod117 さんの記事になります。

参考

32
18
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
32
18