3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

パケットデータ(pcap)をフィルタしたり集約してJSON出力するツールを作ってみる

Last updated at Posted at 2020-03-23

#はじめに

Wiresharkのコマンド版であるTSharkにはパケットキャプチャデータ(PCAPファイル)をJSON形式で出力するオプション(-T json)があります。パケットデータをテキストとして扱えるので非常に便利です。

また類似のオプションとして「-T ek」がありElasticsearchBulk APIで利用可能な形式で出力します。この機能を使ってElasticsearchへエキスポートし、例えば他の通信ログと対応するパケットデータを横串検索したりなどと活用できます。

この際パケットデータを特定のフィルタルールで絞り込んだり、複数パケットを単一フローへ集約することができるとElasticsearchへ格納するログ量を減らせて便利です。さらにElastic Common Schema (ECS)に対応させたり、Community ID Flow Hashingのフィールド値を追加すると他のログとのピボットがしやすくなります。

これまでそのたびにpythonスクリプトでマッチルールを書いたり、Bulk APIを使うにしても出力されたJSONデータを分割していたりしていたのですが(一度に多すぎる数のJSONデータをBulkPutするとElasticsearchのキャパ超えでエラーになる場合があります・・・)、そういった機能を簡単なツール(tsharkコマンドのラッパーツール)として実装してみました。今回は(持ち運びしやすい?ので)Goを使って実装してみます。

実装したツール(version 0.1)はこちら(github)へ置いておきました。

#実装機能の概要

  • 出力するパケットフィールド名を指定可能
  • パケットフィールド値をフィルタルールで絞り込み可能 (ANDed/ORed, exact/case_ignore/exists/regex)
  • Elastic Common Schema (ECS)のフィールドを自動生成
  • Community ID Flow Hashingのフィールドを自動追加
  • 同一フローの全パケットを単一ログへ集約(Community IDを利用)
  • フローへ集約された各パケットフィールドの差分を抽出可能

#出力形式

  1. 標準出力 (Stdout)
  2. Elasticsearchへエキスポート:Bulk APIを利用。また認証およびTLSもサポートしてみます。

###出力例

以下はHTTPS(TLS)のServer Name Indicationを絞り込み出力する例です。

[
...
{
  "@timestamp": "2020-03-06T12:30:57.813Z",
  "agent": {
    "type": "tshark-filter"
  },
  "destination": {
    "domain": "192.168.0.1",
    "ip": "192.168.0.1",
    "port": 443
  },
  "event": {
    "created": "2020-03-22T16:20:32.000Z",
    "module": "pcap",
    "type": "protocol"
  },
  "file": {
    "name": "examples/http1.pcapng"
  },
  "network": {
    "bytes": 251,
    "community_id": "1:9Us4XWwcOkDl1xAE6xlxxxxxxxx=",
    "iana_number": "6",
    "packets": 1,
    "protocol": "tls",
    "related": {
      "ip": [
        "192.168.1.1",
        "192.168.0.1"
      ]
    },
    "transport": "tcp",
    "type": "ipv4"
  },
  "packet": {
    "frame_interface_name": "ens33",
    "frame_len": "251",
    "frame_number": "2422",
    "frame_protocols": "eth:ethertype:ip:tcp:tls",
    "frame_time_delta": "0.002324240",
    "frame_time_epoch": "1583757057.813936138",
    "frame_time_relative": "62.307191989",
    "ip_dst": "192.168.0.1",
    "ip_dst_host": "192.168.0.1",
    "ip_len": "237",
    "ip_proto": "6",
    "ip_src": "192.168.1.1",
    "ip_src_host": "192.168.1.1",
    "ip_version": "4",
    "tcp_ack": "1",
    "tcp_dstport": "443",
    "tcp_flags": "0x00000018",
    "tcp_len": "197",
    "tcp_nxtseq": "198",
    "tcp_seq": "1",
    "tcp_srcport": "49907",
    "tls_handshake_extensions_server_name": "login.example.com" ★★★
  },
  "source": {
    "domain": "192.168.1.1",
    "ip": "192.168.1.1",
    "port": 49907
  }
},
...
]

※ TShark 2.6.8では「ssl_handshake_extensions_server_name」ですが、TShark 3.2.2では「tls_handshake_extensions_server_name」とフィールド名が変更になったようです。ご注意ください。

#コマンド形式

tshark-filter --pcap <pcap_file_path> --config <config_yml_path> --action <action_name> --output <output_name>
コマンド引数 説明
--pcap pcap_file_path PCAPファイル名(パケットキャプチャデータ)
--config config_yml_path フィルタや集約ルール、Elasticsearchのアドレスや認証情報などを記述する設定ファイル名
--action action_name 操作指定。filter(フィルタ)またはagg(集約)を指定可能
--output output_name 出力指定。stdout(標準出力)またはelasticsearch(Elasticsearchへエキスポート)を指定可能
例:パケットデータをフィルタして標準出力
> ./tshark-filter --config ./default.yml --pcap http.pcapng --action filter --output stdout --pretty true
例:パケットデータをフロー毎(Community-ID)に集約して標準出力
> ./tshark-filter --config ./default.yml --pcap http.pcapng --action agg --output stdout --pretty true
例:パケットデータをフィルタしてElasticseachへエキスポート
> ./tshark-filter --config ./default.yml --pcap http.pcapng --action filter --output elasticsearch

#インストール

###依存するコンポーネント

###ビルド方法
以下はLinux (e.g. Ubuntu 18.04) でのビルド手順です。Goで実装しているのでWindowsやMacでもビルドできると思います(たぶん)。

1. cd $GOPATH/src
2. go get -u github.com/adriansr/flowhash
3. cd $GOPATH/src/github.com
4. mkdir elastic (if needed)
5. cd $GOPATH/src/github.com/elastic
6. git clone -b 7.x https://github.com/elastic/go-elasticsearch.git
7. cd $GOPATH/src/
8. git clone https://github.com/rhpenguin/tshark-filter.git
9. cd $GOPATH/src/tshark-filter/tshark-filter
10. go build

ビルドに成功すると「tshark-filter」というコマンドが生成されます。

#設定ファイル
フィルタルールや集約ルール、ElasticseachのURLや認証情報などを指定する設定ファイルはYAML形式で記述します。ここではそれをdefault.ymlとしています。

default.yml
#
# 抽出するパケットフィールド名を指定。
# 指定されたフィールド名は実際にはtsharkコマンドの「-T ek -e <フィールド名>...」
# として渡されるためその形式で指定する。
# それらはtsharkコマンドを-T jsonまたは-T ekを指定して出力することで確認できる。
# 
# ただし、いくつかのフィールド(Frame情報、IP・TCP・UDPヘッダ情報)はデフォルトで
# 出力される(上述の出力例を参照)。
#
pcap_extracted_fields:
  - http.request.method
  - http.request.uri

#
# tsharkコマンドで出力されたフィールドへ適用するフィルタルールを指定。
# 以下の例ではhttp.request.methodフィールドの値が「GET」へ一致するものだけへ絞り
# 込まれる。
#
# ただし「tshark -T ek」の仕様によりフィールド名内の「.」が「_」へ置換される模様
# のため注意。
# (例)http.request.method => http_request_method
#  
pcap_field_conditions:
  - http_request_method: "GET"

###フィルタルール(pcap_field_conditions)のバリエーション

1. マッチさせるフィールド値を複数指定 (exactマッチ)

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # 配列で指定するとORedでマッチされる
  #
  # (http_request_method == "GET" OR http_request_method == "POST")
  # のパケットへマッチ
  # 
  - http_request_method: ["GET", "POST"]

または

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  - http_request_method: 
    - "GET"
    - "POST"

2. マッチさせるフィールド値をANDedで複数指定 (exactマッチ)

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # オブジェクト型で複数フィールド指定するとANDedでマッチ
  #
  # (http_request_method == "GET" AND http_host == "www.example.com")
  # のパケットへマッチ
  #
  - http_request_method: "GET"
    http_host: "www.example.com"

3. マッチさせるフィールド値をORedで複数指定 (exactマッチ)

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # 配列型で複数フィールド指定するとORedでマッチ
  #
  # (http_request_method == "GET" OR http_host == "www.example.com")
  # のパケットへマッチ
  #
  - http_request_method: "GET"
  - http_host: "www.example.com"

4. ANDedマッチさせる複数フィールドルールをORedで複数指定 (exactマッチ)

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # オブジェクト型ルールを配列型で指定
  #
  # (http_request_method == "GET" AND http_host == "www1.example.com")
  #  OR
  # (http_request_method == "POST" AND http_host == "www2.example.com")
  # のルールでパケットへマッチ
  #
  - http_request_method: "GET"
    http_host: "www1.example.com"
  - http_request_method: "POST"
    http_host: "www2.example.com"

5. exactマッチフィルタルールの冗長記述

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # オブジェクト型で複数フィールド指定するとANDedでマッチ
  #
  # match: <マッチタイプ>
  # value: <マッチさせる値またはパターン>
  #
  # (http_request_method == "GET" OR http_request_method == "POST") AND http_host == "www.example.com"
  # のパケットへexactマッチ
  #
  - http_request_method:
      match: exact
      value: ["GET", "POST"]
    http_host: 
      match: exact
      value: "www.example.com"
    

6. case_ignoreマッチのフィルタルール

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # match: case_ignore 
  #  => 大文字小文字を無視してマッチさせる 
  #
  # オブジェクト型で複数フィールド指定するとANDedでマッチ
  #
  # (http_request_method == "GET" OR http_request_method == "POST") AND http_host == "www.example.com"
  # のパケットデータへ絞り込み
  #
  - http_request_method:
      match: case_ignore
      value: ["get", "post"]
    http_host: 
      match: case_ignore
      value: "WWW.EXAMPLE.COM"
    

7. フィールド存在のフィルタルール

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # match: exists
  #  => パケットデータに指定されたフィールドが存在する場合にマッチ 
  #
  # http_request_methodフィールドが存在するパケットデータのみへ絞り込み
  #
  - http_request_method:
      match: exists
    

8. 正規表現によるフィルタルール

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  #
  # match: regex
  # value: パターン文字列
  #  => パケットデータに指定されたフィールド値を正規表現マッチ 
  #
  # http_hostフィールドが".*\\.microsoft\\.com"または".*\\.facebook\\.com"へ
  # マッチするパケットデータのみへ絞り込み
  #
  - http_host:
      match: regex
      value:
        - ".*\\.microsoft\\.com"
        - ".*\\.facebook\\.com"    

または

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  - http_host:
      match: regex
      value: [".*\\.microsoft\\.com", ".*\\.facebook\\.com"]

###パケットフィールドのマッチタイプ(まとめ)

マッチタイプ 説明
exact 完全一致
case_ignore 大文字と小文字を区別しない一致
exists 該当の名前のフィールドがパケットデータに存在する場合にマッチ
regex 正規表現マッチ

###集約ルールの設定

フィルタ処理されたパケットデータに対してさらに集約ルールを使うことでCommunity ID Flow Hashingにより複数パケットを単一のイベントログへまとめる(サマリー化する)こともできます。

これにより個々のパケット毎ではなく必要なフィールドだけを含んだ単一ログ(フローログ)として集約(サマリー化)されるため、ログ量を減らしたりまたログをピボットしたりサマライズすることが簡単になります。

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - tls.handshake.extensions_server_name
pcap_field_conditions:
  - tcp_dstport: [80, 443, 3128]
  - tcp_srcport: [80, 443, 3128]

#
# 集約ルールの記述
#
agg:
  #
  # 集約タイプ名。現状はCommunity IDのみサポート。値が一致するパケットを同一フローとして集約。
  # 
  type: "community_id"
  #
  # 抽出されたパケットーフィールドのうち重複しない値を列挙する指定
  #
  extracted_result_fields:
    - http_request_method
    - http_host
    - tls_handshake_extensions_server_name
    # SYN, ACK, FIN, RSTなど
    - tcp_flags
    - frame_protocols

#
# Elasticseach設定(後述)
#
elasticsearch:
  address: http://log.example.com:9200
  document_id: "community_id"
  bulk: 128
コマンド実行例
> ./tshark-filter --config ./default.yml --pcap http.pcapng --action agg --output elasticsearch

以下の例では同一Community IDを持つ12パケットが単一フローのログへ集約されています。

Elasticsearchへの出力例
{
  "_index": "tshark-filter-2020-03-09",
  "_type": "_doc",
  "_id": "27140429a5bd05a708ea719e80a8aa33fa01ec73dfb4f908bca37cb0dc83bf46",
  "_version": 1,
  "_score": null,
  "_source": {
    "@timestamp": "2020-03-09T12:31:12.580Z",
    "agent": {
      "type": "tshark-filter"
    },
    "destination": {
      "domain": "192.168.0.1",
      "ip": "192.168.0.1",
      "port": 80
    },
    "event": {
      "created": "2020-03-23T15:37:42.000Z",
      "module": "pcap",
      "type": "protocol"
    },
    "file": {
      "name": "examples/http1.pcapng"
    },
    "network": {
      # 集約された同一フロー内の全パケットサイズ合計
      "bytes": 967,
      # Community ID Flow Hash
      "community_id": "1:3hslQBzXeSXhqePQ1V6xxxxxxxx=",
      "iana_number": "6",
      # 集約された同一フロー内の全パケット数
      "packets": 12,
      "related": {
        "ip": [
          "192.168.2.150",
          "192.168.0.1"
        ]
      },
      "transport": "tcp",
      "type": "ipv4"
    },
    "packet": {
      "frame_interface_name": "ens33",
      "frame_len": "74",
      "frame_number": "3108",
      # フィールド値が重複しないものを列挙
      "frame_protocols": [
        "eth:ethertype:ip:tcp",
        "eth:ethertype:ip:tcp:http"
      ],
      "frame_time_delta": "0.965645454",
      "frame_time_epoch": "1583757072.580788038",
      "frame_time_relative": "77.074043889",
      # フィールド値が重複しないものを列挙
      "http_host": "www.example.com",
      "http_request_method": "GET",
      "ip_dst": "192.168.0.1",
      "ip_dst_host": "192.168.0.1",
      "ip_len": "60",
      "ip_proto": "6",
      "ip_src": "192.168.2.150",
      "ip_src_host": "192.168.2.150",
      "ip_version": "4",
      "tcp_ack": "0",
      "tcp_dstport": "80",
      # フィールド値が重複しないものを列挙
      "tcp_flags": [
        "0x00000002",
        "0x00000012",
        "0x00000010",
        "0x00000018",
        "0x00000019",
        "0x00000011"
      ],
      "tcp_len": "0",
      "tcp_nxtseq": "1",
      "tcp_seq": "0",
      "tcp_srcport": "50564"
    },
    "source": {
      "domain": "192.168.2.150",
      "ip": "192.168.2.150",
      "port": 50564
    }
  },
  "fields": {
    "@timestamp": [
      "2020-03-09T12:31:12.580Z"
    ],
    "event.created": [
      "2020-03-23T15:37:42.000Z"
    ]
  },
  "sort": [
    1583757072580
  ]
}

###Elasticsearchへのエキスポート設定

default.yml
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  - http_host:
      match: regex
      value: [".*\\.microsoft\\.com", ".*\\.facebook\\.com"]
#
# Elasticseach設定
#
elasticsearch:
  #
  # ElashticseachのURL
  #
  address: http://log.example.com:9200
  #
  # Document IDの生成方法
  #  - "auto" : Elasticseachが生成
  #  - "community_id" : 「frame_time_epoch + Community ID」のハッシュ値を利用。
  #                      => 同一パケットやフローデータを投入しても重複を避けることができる。
  #
  document_id: "community_id"
  #
  # 一度のBulkリクエストへ含める最大ログ数(JSONドキュメント数)
  #
  bulk: 128

Elasticseachとの通信に認証およびTLSを利用する場合には以下の通り。

default.yml(認証+TLS)
pcap_extracted_fields:
  - http.host
  - http.request.method
  - http.request.uri
pcap_field_conditions:
  - http_host:
      match: regex
      value: [".*\\.microsoft\\.com", ".*\\.facebook\\.com"]
elasticsearch:
  #
  # ElashticseachのURL (HTTPS)
  #
  address: https://log.example.com:9200
  #
  # 認証ユーザー名とパスワード
  #
  user_name: elastic
  password: elastic
  #
  # CA証明書(PEM形式)
  #
  ssl_ca_certificate: "./certs/ca/ca.crt"
  #
  # trueを指定すると証明書の検証をスキップ。
  #
  ssl_verification_disabled: false
  document_id: "community_id"
  bulk: 128

###その他の設定

default.yml

#
# デフォルトの出力指定。記述することでコマンド実行で「--output」を省略できる。
#
# stdout または elasticsearch
#
#default_output: "elasticsearch"

#
# デフォルトの操作指定。記述することでコマンド実行で「--action」を省略できる。
#
# filter または agg
#
#default_action: "filter"

#
# フィールド値の最大サイズ。このサイズを超えるデータは切り捨てられます。
# (デフォルト(0)は全データを出力)
#
#max_field_len: 1024

#
# TSHarkの設定
#
#tshark:
  #
  # EXEのパス
  #
  #exe: "/usr/bin/tshark"
  #
  # tsharkへ渡す追加の引数がある場合には列挙。
  # 
  #args:
    #- "-d"
    #- "tcp.port==3128,http"

#設定ファイルのサンプル

ファイル名 説明
http.yml HTTPパケットをフィルタするサンプル設定ファイル
agg_by_community_id.yml Community IDによって単一フローへ集約するサンプル設定ファイル
elasticsearch.yml Elasticseachへエキスポートするサンプル設定ファイル
download_winexe.yml Windows EXEファイルをダウンロードするHTTPパケットを抽出するサンプル設定。tcp_reassembled_dataフィールド値に「\r\n\r\nMZ」というパターンマッチするフィルタルールが定義されています
CVE_2020_0796.yml 最近話題のMicrosoft SMBv3 Wormable Vulnerability(CVE-2020-0796)のシグネチャにマッチするフィルタルールが定義されています。TShark 3.2.2でテスト済み

#まとめ
tsharkコマンドのJSON出力を加工する簡単なラッパーツールを実装してみました。

Linux以外での動作実績レポートやバグレポート、その他PRなどありましたらこちらのコメント欄ではなくgithubの方へ送っていただけると助かります。

3
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?