7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

VPC FlowLogsをLogstashで正規化してみた

Last updated at Posted at 2019-02-11

はじめに

AWS VPCFlowLogsの通信ログをEC2で構築したElasticStackに取り込む方法をまとめてみました。

利用環境

image.png

実施内容

  1. IAM Role作成
  2. CloudWatchLogs LogGroup作成
  3. VPCFlowLogs作成
  4. ElasticStackマシン作成
  5. 正規化フィルタ作成
  6. Kibana画面

1. IAM Role作成

以下、2つのIAM Roleを事前に作成します。

  • VPCFlowLogsがCloudWatchLogsにログを出力するために必要なIAM Role
flowlogs_role
{
    "Statement": [
        {
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:DescribeLogGroups",
                "logs:DescribeLogStreams",
                "logs:PutLogEvents"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}
  • ElasticStackがCloudWatchLogsからログを取得するために必要なIAM Role
cloudwatchlogs_readonly_role
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "logs:Describe*",
                "logs:Get*",
                "logs:List*",
                "logs:StartQuery",
                "logs:StopQuery",
                "logs:TestMetricFilter",
                "logs:FilterLogEvents"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}

2. CloudWatchLogs LogGroup作成

  • AWS Management Consoleから[CloudWatch] > [ログ] > [アクション]で[ロググループの作成]を選択します。
    image.png

  • [ロググループ名]に任意の名前を付けて、[ロググループの作成]を選択します。(今回は、vpcflowlogs)
    image.png

  • 以下のように作成されていればOKです。
    image.png

3. VPCFlowLogs作成

  • 次に、[VPC] > [VPC] > [アクション]で[Create fow log]を選択します。
    image.png

  • 全ての通信ログを対象(FilterをAll)に作成したロググループIAM Roleを指定して、[Create]を選択します。
    image.png

  • CloudWatchLogsの作成したvpcflowlogsのロググループにENIごとの通信ログが出力されていればOKです。
    image.png

【参考】vpcflowlogsのログフォーマット
VPC フローログレコード

No Field名 説明 Value(Sample) Grok Pattern
1 version VPCフローログバージョン 2 NUMBER
2 account-id フローログのAWSアカウントID 123456789010 NOTSPACE
3 interface-id トラフィックが記録されるENIのID eni-abc123de NOTSPACE
4 srcaddr 送信元IPアドレス(IPv4/IPv6) 172.31.16.139 (空の場合は「-」) IP
5 dstaddr 宛先IPアドレス(IPv4/IPv6) 172.31.16.21 (空の場合は「-」) IP
6 srcport 送信元ポート番号 20641 (空の場合は「-」) NOTSPACE
7 dstport 宛先ポート番号 22 (空の場合は「-」) NOTSPACE
8 protocol トラフィックのIANAプロトコル番号 6 (空の場合は「-」) NOTSPACE
9 packets キャプチャウィンドウ中に転送されたパケット数 20 (空の場合は「-」) NUMBER
10 bytes キャプチャウィンドウ中に転送されたバイト数 4249 (空の場合は「-」) NUMBER
11 start_time キャプチャウィンドウの開始時刻(Unix時間) 1418530010 (空の場合は「-」) NOTSPACE
12 end_time キャプチャウィンドウの終了時刻(Unix時間) 1418530070 (空の場合は「-」) NOTSPACE
13 action トラフィックに関連付けられたアクション(許可/拒否) ACCEPT (空の場合は「-」) NOTSPACE
14 log-status フローログのロギングステータス(OK/NODATA/SKIPDATA) OK (空の場合は「NODATA」) NOTSPACE
※各Field間は半角スペースで区切り(TSVフォーマット)

4. ElasticStackマシン作成

  • EC2インスタンスにJava1.8.0、logstash、Kibana、Elasticsearchを導入します。
  • JVMヒープサイズの設定は省略しています。
[root@ip-172-31-34-49]# vi /etc/yum.repos.d/elastic.repo
------
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
------
[root@ip-172-31-34-49]# yum install -y java-1.8.0-openjdk
[root@ip-172-31-34-49]# yum install -y logstash kibana elasticsearch
[root@ip-172-31-34-49]# systemctl start elasticsearch
  • logstash-input-cloudwatch_logspluginを導入します。
[root@ip-172-31-34-49]# /usr/share/logstash/bin/logstash-plugin install logstash-input-cloudwatch_logs
Validating logstash-input-cloudwatch_logs
Installing logstash-input-cloudwatch_logs
Installation successful

5. 正規化フィルタ作成

  • logstashのgrok filterで正規化するためのパターン(grok patterns)を作成します。
[root@ip-172-31-34-49]# cd /etc/logstash
[root@ip-172-31-34-49 logstash]# mkdir patterns
[root@ip-172-31-34-49 logstash]# cd patterns/
[root@ip-172-31-34-49 patterns]# vi vpcflowlogs_patterns
# VPC_Flow_Logs
VPCFLOWLOG %{NUMBER:version} %{NOTSPACE:account-id} %{NOTSPACE:interface-id} %{IP:srcaddr} %{IP:dstaddr} %{NOTSPACE:srcport} %{NOTSPACE:dstport} %{NOTSPACE:protocol} %{NUMBER:packets:float} %{NUMBER:bytes:float} %{NOTSPACE:start} %{NOTSPACE:end} %{NOTSPACE:action} %{NOTSPACE:log-status}

【参考】Grok Patternの標準セット

No Grok Pattern 説明 利用シーン 正規表現
1 WORD 単一の単語に一致するパターン - \b\w+\b
2 NUMBER 正または負の整数または浮動小数点数に一致するパターン 正や負、小数点を含む数値 (?:%{BASE10NUM})
3 POSINT 正の整数にマッチするパターン ポート番号等の正の整数 \b(?:[1-9][0-9]*)\b
4 IP IPv4またはIPv6のIPアドレスと一致するパターン IPアドレス (?:%{IPV6}l%{IPV4})
5 NOTSPACE スペース以外のものに一致するパターン スペースやタブ区切りのデータ \S+
6 SPACE 連続する任意の数のスペースに一致するパターン スペースの回数が読めない場合 \s*
7 DATA 限られた量のあらゆる種類のデータとパターンマッチング - .*?
8 GREEDYDATA 残りのすべてのデータに一致するパターン 何が来るか読めない全文字列 .*
※浮動小数点数…1より小さい数値を2進数で分かりやすく表現する方式
  • 次にlogstash.confを作成します。
  • cloudwatchlogsから取得した通信ログを加工し、Elasticsreachに保存します。
/etc/logstash/conf.d/logstash.conf
input {
  cloudwatch_logs {
    region => "ap-southeast-1"
    log_group => [ "vpcflowlogs" ]
    sincedb_path => "/var/lib/logstash/sincedb"
  }
}

filter {
  if "OK" in [message] {
    grok {
      patterns_dir => [ "/etc/logstash/patterns/vpcflowlogs_patterns" ]
      match => { "message" => "%{VPCFLOWLOG}"}
    }
    date {
      match => [ "start","UNIX" ]
      target => "@timestamp"
    }
    date {
      match => [ "start","UNIX" ]
      target => "start_time"
    }
    date {
      match => [ "end","UNIX" ]
      target => "end_time"
    }
    geoip {
      source => "srcaddr"
      target => "src_geoip"
      tag_on_failure => "src_geoip_lookup_failure"
    }
    geoip {
      source => "dstaddr"
      target => "dst_geoip"
      tag_on_failure => "dst_geoip_lookup_failure"
    }
    mutate {
      remove_field => [ "start", 'end' ]
    }
  }
}

output {
  elasticsearch {
    hosts => [ "localhost:9200" ]
    index => "vpcflowlogs-%{+YYYY.MM.dd}"
  }
}
  • Logstashのサービスを起動します。
[root@ip-172-31-34-49]# systemctl start logstash

6. Kibana画面

  • Kibanaにアクセスするためにkibana.ymlを修正し、サービス起動します。
[root@ip-172-31-34-49]# vi /etc/kibana/kibana.yml
server.host: "0.0.0.0"
[root@ip-172-31-34-49]# systemctl start kibana
  • EC2のグローバルIP:5601にWebアクセスします。
  • Kibanaの[Dev Tools]を選択し、PUT _template/vpcflowlogsでvpcflowlogsというIndex生成時に利用するIndex Template(Mapping定義)を作成します。
    image.png
_template/vpcflowlogs

PUT _template/vpcflowlogs
{
  "index_patterns": ["vpcflowlogs-*"],
  "settings": {
    "number_of_shards": 1,     #shard数はお好みで変更してください
    "number_of_replicas" : 1
  },
  "mappings": {
    "doc" : {
      "properties": {
        "@timestamp": {
          "type": "date"
        },
        "@version" : {
          "type" : "keyword"
        },
        "account-id" : {
          "type" : "keyword"
        },
        "action" : {
          "type" : "keyword"
          },
        "bytes" : {
          "type" : "float"
        },
        "cloudwatch_logs" : {
          "properties" : {
            "event_id" : {
              "type" : "keyword"
            },
            "ingestion_time" : {
              "type" : "date"
            },
            "log_group" : {
              "type" : "keyword"
            },
            "log_stream" : {
              "type" : "keyword"
            }
          }
        },
        "dst_geoip" : {
          "properties" : {
            "city_name" : {
              "type" : "keyword"
            },
            "continent_code" : {
              "type" : "keyword"
            },
            "country_code2" : {
              "type" : "keyword"
            },
            "country_code3" : {
              "type" : "keyword"
            },
            "country_name" : {
              "type" : "keyword"
            },
            "ip" : {
              "type" : "ip"
            },
            "latitude" : {
              "type" : "float"
            },
            "location" : {
              "properties" : {
                "lat" : {
                  "type" : "geo_point"
                },
                "lon" : {
                  "type" : "geo_point"
                }
              }
            },
            "longitude" : {
              "type" : "float"
            },
            "postal_code" : {
              "type" : "keyword"
            },
            "region_code" : {
              "type" : "keyword"
            },
            "region_name" : {
              "type" : "keyword"
            },
            "timezone" : {
              "type" : "keyword"
            }
          }
        },
        "dstaddr" : {
          "type" : "ip"
        },
        "dstport" : {
          "type" : "keyword"
        },
        "end_time" : {
          "type" : "date"
        },
        "interface-id" : {
          "type" : "keyword"
        },
        "log-status" : {
          "type" : "keyword"
        },
        "message" : {
          "type" : "text"
        },
        "packets" : {
          "type" : "float"
        },
        "protocol" : {
          "type" : "keyword"
        },
        "src_geoip" : {
          "properties" : {
            "city_name" : {
              "type" : "keyword"
            },
            "continent_code" : {
              "type" : "keyword"
            },
            "country_code2" : {
              "type" : "keyword"
            },
            "country_code3" : {
              "type" : "keyword"
            },
            "country_name" : {
              "type" : "keyword"
            },
            "dma_code" : {
              "type" : "long"
            },
            "ip" : {
              "type" : "ip"
            },
            "latitude" : {
              "type" : "float"
            },
            "location" : {
              "properties" : {
                "lat" : {
                  "type" : "geo_point"
                },
                "lon" : {
                  "type" : "geo_point"
                }
              }
            },
            "longitude" : {
              "type" : "float"
            },
            "postal_code" : {
              "type" : "keyword"
            },
            "region_code" : {
              "type" : "keyword"
            },
            "region_name" : {
              "type" : "keyword"
            },
            "timezone" : {
              "type" : "keyword"
            }
          }
        },
        "srcaddr" : {
          "type" : "ip"
        },
        "srcport" : {
          "type" : "keyword"
        },
        "start_time" : {
          "type" : "date"
        },
        "tags" : {
          "type" : "keyword"
        },
        "version" : {
          "type" : "keyword"
        }
      }
    }
  }
}

※Index Settingsで定義したMapping定義でマッチしないと以下のようなエラーがlogstash-plain.logに出力されます。

[2019-02-25T01:26:41,456][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"vpcflowlogs-2019.02.24", :_type=>"doc", :routing=>nil}, #<LogStash::Event:0x41fcd605>], :response=>{"index"=>{"_index"=>"vpcflowlogs-2019.02.24", "_type"=>"doc", "_id"=>"yuFUIGkBPPD_NEFre_Eo", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Rejecting mapping update to [vpcflowlogs-2019.02.24] as the final mapping would have more than 1 type: [_doc, doc]"}}}}
  • Kibanaの[Management] > [Index Patterns]で[Create Index pattern]を選択します。
  • [Index pattern]にvpcflowlogs-*とIndex名を指定し、[Next step]を選択します。

image.png

  • [Time Filter field name]に@timestampを選択し、[Create index pattern]を選択します。

image.png

  • [Discover]画面でログが検索出来るようになっていると思います。
  • ログの中身は以下のような感じです。
JSONフォーマット
{
  "_index": "vpcflowlogs-2019.02.24",
  "_type": "doc",
  "_id": "1dnyHmkBvXrxnkIMecco",
  "_version": 1,
  "_score": null,
  "_source": {
    "version": "2",
    "protocol": "6",
    "tags": [
      "dst_geoip_lookup_failure"
    ],
    "bytes": 1098,
    "log-status": "OK",
    "dstport": "5601",
    "@timestamp": "2019-02-24T09:58:29.000Z",
    "cloudwatch_logs": {
      "log_group": "vpcflowlogs",
      "event_id": "34588507295341659901605779663883067398582880181540093985",
      "ingestion_time": "2019-02-24T09:59:34.678Z",
      "log_stream": "eni-0bf6a7b1e940269a3-all"
    },
    "account-id": "<AWSアカウント>",
    "action": "ACCEPT",
    "message": "2 <AWSアカウント> eni-0bf6a7b1e940269a3 <SrcIP> 172.31.6.143 63374 5601 6 6 1098 1551002309 1551002342 ACCEPT OK",
    "src_geoip": {
      "postal_code": "501-0115",
      "longitude": 136.7222,
      "region_name": "Gifu",
      "country_name": "Japan",
      "country_code2": "JP",
      "ip": "<SrcIP>",
      "country_code3": "JP",
      "location": {
        "lat": 35.3911,
        "lon": 136.7222
      },
      "timezone": "Asia/Tokyo",
      "region_code": "21",
      "latitude": 35.3911,
      "city_name": "Gifu City",
      "continent_code": "AS"
    },
    "dstaddr": "172.31.6.143",
    "interface-id": "eni-0bf6a7b1e940269a3",
    "start_time": "1551002309",
    "dst_geoip": {},
    "@version": "1",
    "srcport": "63374",
    "packets": 6,
    "end_time": "1551002342",
    "srcaddr": "<SrcIP>"
  },
  "fields": {
  "@timestamp": [
    "2019-02-24T09:58:29.000Z"
  ],
  "cloudwatch_logs.ingestion_time": [
    "2019-02-24T09:59:34.678Z"
  ]
  },
  "sort": [
    1551002309000
  ]
}

まとめ

logstash.confのfilter区でif "OK" in [message] とすることでlog-statusOKのものだけをElasticsearchに入れて分析出来るようにしました。NODATASKIPDATAのログは多くの項目がハイフン(-)のため、データ型が合わずElasticsearchには入ってこないはずですが、念のため。。

7
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?