Edited at

VPC FlowLogsをLogstashで正規化してみた


はじめに

AWS VPCFlowLogsの通信ログをEC2で構築したElasticStackに取り込む方法をまとめてみました。


利用環境


実施内容


  1. IAM Role作成

  2. CloudWatchLogs LogGroup作成

  3. VPCFlowLogs作成

  4. ElasticStackマシン作成

  5. 正規化フィルタ作成

  6. Kibana画面


1. IAM Role作成

以下、2つのIAM Roleを事前に作成します。


  • VPCFlowLogsがCloudWatchLogsにログを出力するために必要なIAM Role


flowlogs_role

{

"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}


  • ElasticStackがCloudWatchLogsからログを取得するために必要なIAM Role


cloudwatchlogs_readonly_role

{

"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:Describe*",
"logs:Get*",
"logs:List*",
"logs:StartQuery",
"logs:StopQuery",
"logs:TestMetricFilter",
"logs:FilterLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}


2. CloudWatchLogs LogGroup作成


  • AWS Management Consoleから[CloudWatch] > [ログ] > [アクション]で[ロググループの作成]を選択します。



  • [ロググループ名]に任意の名前を付けて、[ロググループの作成]を選択します。(今回は、vpcflowlogs)



  • 以下のように作成されていればOKです。




3. VPCFlowLogs作成


  • 次に、[VPC] > [VPC] > [アクション]で[Create fow log]を選択します。



  • 全ての通信ログを対象(FilterをAll)に作成したロググループIAM Roleを指定して、[Create]を選択します。



  • CloudWatchLogsの作成したvpcflowlogsのロググループにENIごとの通信ログが出力されていればOKです。



【参考】vpcflowlogsのログフォーマット

VPC フローログレコード

No
Field名
説明
Value(Sample)
Grok Pattern

1
version
VPCフローログバージョン
2
NUMBER

2
account-id
フローログのAWSアカウントID
123456789010
NOTSPACE

3
interface-id
トラフィックが記録されるENIのID
eni-abc123de
NOTSPACE

4
srcaddr
送信元IPアドレス(IPv4/IPv6)
172.31.16.139 (空の場合は「-」)
IP

5
dstaddr
宛先IPアドレス(IPv4/IPv6)
172.31.16.21 (空の場合は「-」)
IP

6
srcport
送信元ポート番号
20641 (空の場合は「-」)
NOTSPACE

7
dstport
宛先ポート番号
22 (空の場合は「-」)
NOTSPACE

8
protocol
トラフィックのIANAプロトコル番号
6 (空の場合は「-」)
NOTSPACE

9
packets
キャプチャウィンドウ中に転送されたパケット数
20 (空の場合は「-」)
NUMBER

10
bytes
キャプチャウィンドウ中に転送されたバイト数
4249 (空の場合は「-」)
NUMBER

11
start_time
キャプチャウィンドウの開始時刻(Unix時間)
1418530010 (空の場合は「-」)
NOTSPACE

12
end_time
キャプチャウィンドウの終了時刻(Unix時間)
1418530070 (空の場合は「-」)
NOTSPACE

13
action
トラフィックに関連付けられたアクション(許可/拒否)
ACCEPT (空の場合は「-」)
NOTSPACE

14
log-status
フローログのロギングステータス(OK/NODATA/SKIPDATA)
OK (空の場合は「NODATA」)
NOTSPACE

※各Field間は半角スペースで区切り(TSVフォーマット)


4. ElasticStackマシン作成


  • EC2インスタンスにJava1.8.0、logstash、Kibana、Elasticsearchを導入します。

  • JVMヒープサイズの設定は省略しています。

[root@ip-172-31-34-49]# vi /etc/yum.repos.d/elastic.repo

------
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
------
[root@ip-172-31-34-49]# yum install -y java-1.8.0-openjdk
[root@ip-172-31-34-49]# yum install -y logstash kibana elasticsearch
[root@ip-172-31-34-49]# systemctl start elasticsearch



  • logstash-input-cloudwatch_logspluginを導入します。

[root@ip-172-31-34-49]# /usr/share/logstash/bin/logstash-plugin install logstash-input-cloudwatch_logs

Validating logstash-input-cloudwatch_logs
Installing logstash-input-cloudwatch_logs
Installation successful


5. 正規化フィルタ作成


  • logstashのgrok filterで正規化するためのパターン(grok patterns)を作成します。

[root@ip-172-31-34-49]# cd /etc/logstash

[root@ip-172-31-34-49 logstash]# mkdir patterns
[root@ip-172-31-34-49 logstash]# cd patterns/
[root@ip-172-31-34-49 patterns]# vi vpcflowlogs_patterns
# VPC_Flow_Logs
VPCFLOWLOG %{NUMBER:version} %{NOTSPACE:account-id} %{NOTSPACE:interface-id} %{IP:srcaddr} %{IP:dstaddr} %{NOTSPACE:srcport} %{NOTSPACE:dstport} %{NOTSPACE:protocol} %{NUMBER:packets:float} %{NUMBER:bytes:float} %{NOTSPACE:start} %{NOTSPACE:end} %{NOTSPACE:action} %{NOTSPACE:log-status}

【参考】Grok Patternの標準セット

No
Grok Pattern
説明
利用シーン
正規表現

1
WORD
単一の単語に一致するパターン
-
\b\w+\b

2
NUMBER
正または負の整数または浮動小数点数に一致するパターン
正や負、小数点を含む数値
(?:%{BASE10NUM})

3
POSINT
正の整数にマッチするパターン
ポート番号等の正の整数
\b(?:[1-9][0-9]*)\b

4
IP
IPv4またはIPv6のIPアドレスと一致するパターン
IPアドレス
(?:%{IPV6}l%{IPV4})

5
NOTSPACE
スペース以外のものに一致するパターン
スペースやタブ区切りのデータ
\S+

6
SPACE
連続する任意の数のスペースに一致するパターン
スペースの回数が読めない場合
\s*

7
DATA
限られた量のあらゆる種類のデータとパターンマッチング
-
.*?

8
GREEDYDATA
残りのすべてのデータに一致するパターン
何が来るか読めない全文字列
.*

※浮動小数点数…1より小さい数値を2進数で分かりやすく表現する方式


  • 次にlogstash.confを作成します。

  • cloudwatchlogsから取得した通信ログを加工し、Elasticsreachに保存します。


/etc/logstash/conf.d/logstash.conf

input {

cloudwatch_logs {
region => "ap-southeast-1"
log_group => [ "vpcflowlogs" ]
sincedb_path => "/var/lib/logstash/sincedb"
}
}

filter {
if "OK" in [message] {
grok {
patterns_dir => [ "/etc/logstash/patterns/vpcflowlogs_patterns" ]
match => { "message" => "%{VPCFLOWLOG}"}
}
date {
match => [ "start","UNIX" ]
target => "@timestamp"
}
date {
match => [ "start","UNIX" ]
target => "start_time"
}
date {
match => [ "end","UNIX" ]
target => "end_time"
}
geoip {
source => "srcaddr"
target => "src_geoip"
tag_on_failure => "src_geoip_lookup_failure"
}
geoip {
source => "dstaddr"
target => "dst_geoip"
tag_on_failure => "dst_geoip_lookup_failure"
}
mutate {
remove_field => [ "start", 'end' ]
}
}
}

output {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "vpcflowlogs-%{+YYYY.MM.dd}"
}
}



  • Logstashのサービスを起動します。

[root@ip-172-31-34-49]# systemctl start logstash


6. Kibana画面


  • Kibanaにアクセスするためにkibana.ymlを修正し、サービス起動します。

[root@ip-172-31-34-49]# vi /etc/kibana/kibana.yml

server.host: "0.0.0.0"
[root@ip-172-31-34-49]# systemctl start kibana


  • EC2のグローバルIP:5601にWebアクセスします。

  • Kibanaの[Dev Tools]を選択し、PUT _template/vpcflowlogsでvpcflowlogsというIndex生成時に利用するIndex Template(Mapping定義)を作成します。



_template/vpcflowlogs


PUT _template/vpcflowlogs
{
"index_patterns": ["vpcflowlogs-*"],
"settings": {
"number_of_shards": 1, #shard数はお好みで変更してください
"number_of_replicas" : 1
},
"mappings": {
"doc" : {
"properties": {
"@timestamp": {
"type": "date"
},
"@version" : {
"type" : "keyword"
},
"account-id" : {
"type" : "keyword"
},
"action" : {
"type" : "keyword"
},
"bytes" : {
"type" : "float"
},
"cloudwatch_logs" : {
"properties" : {
"event_id" : {
"type" : "keyword"
},
"ingestion_time" : {
"type" : "date"
},
"log_group" : {
"type" : "keyword"
},
"log_stream" : {
"type" : "keyword"
}
}
},
"dst_geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_code" : {
"type" : "keyword"
},
"country_code2" : {
"type" : "keyword"
},
"country_code3" : {
"type" : "keyword"
},
"country_name" : {
"type" : "keyword"
},
"ip" : {
"type" : "ip"
},
"latitude" : {
"type" : "float"
},
"location" : {
"properties" : {
"lat" : {
"type" : "float"
},
"lon" : {
"type" : "float"
}
}
},
"longitude" : {
"type" : "float"
},
"postal_code" : {
"type" : "keyword"
},
"region_code" : {
"type" : "keyword"
},
"region_name" : {
"type" : "keyword"
},
"timezone" : {
"type" : "keyword"
}
}
},
"dstaddr" : {
"type" : "ip"
},
"dstport" : {
"type" : "keyword"
},
"end_time" : {
"type" : "date"
},
"interface-id" : {
"type" : "keyword"
},
"log-status" : {
"type" : "keyword"
},
"message" : {
"type" : "text"
},
"packets" : {
"type" : "float"
},
"protocol" : {
"type" : "keyword"
},
"src_geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_code" : {
"type" : "keyword"
},
"country_code2" : {
"type" : "keyword"
},
"country_code3" : {
"type" : "keyword"
},
"country_name" : {
"type" : "keyword"
},
"dma_code" : {
"type" : "long"
},
"ip" : {
"type" : "ip"
},
"latitude" : {
"type" : "float"
},
"location" : {
"properties" : {
"lat" : {
"type" : "float"
},
"lon" : {
"type" : "float"
}
}
},
"longitude" : {
"type" : "float"
},
"postal_code" : {
"type" : "keyword"
},
"region_code" : {
"type" : "keyword"
},
"region_name" : {
"type" : "keyword"
},
"timezone" : {
"type" : "keyword"
}
}
},
"srcaddr" : {
"type" : "ip"
},
"srcport" : {
"type" : "keyword"
},
"start_time" : {
"type" : "date"
},
"tags" : {
"type" : "keyword"
},
"version" : {
"type" : "keyword"
}
}
}
}
}

※Index Settingsで定義したMapping定義でマッチしないと以下のようなエラーがlogstash-plain.logに出力されます。

[2019-02-25T01:26:41,456][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"vpcflowlogs-2019.02.24", :_type=>"doc", :routing=>nil}, #<LogStash::Event:0x41fcd605>], :response=>{"index"=>{"_index"=>"vpcflowlogs-2019.02.24", "_type"=>"doc", "_id"=>"yuFUIGkBPPD_NEFre_Eo", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Rejecting mapping update to [vpcflowlogs-2019.02.24] as the final mapping would have more than 1 type: [_doc, doc]"}}}}


  • Kibanaの[Management] > [Index Patterns]で[Create Index pattern]を選択します。

  • [Index pattern]にvpcflowlogs-*とIndex名を指定し、[Next step]を選択します。


  • [Time Filter field name]に@timestampを選択し、[Create index pattern]を選択します。


  • [Discover]画面でログが検索出来るようになっていると思います。

  • ログの中身は以下のような感じです。


JSONフォーマット

{

"_index": "vpcflowlogs-2019.02.24",
"_type": "doc",
"_id": "1dnyHmkBvXrxnkIMecco",
"_version": 1,
"_score": null,
"_source": {
"version": "2",
"protocol": "6",
"tags": [
"dst_geoip_lookup_failure"
],
"bytes": 1098,
"log-status": "OK",
"dstport": "5601",
"@timestamp": "2019-02-24T09:58:29.000Z",
"cloudwatch_logs": {
"log_group": "vpcflowlogs",
"event_id": "34588507295341659901605779663883067398582880181540093985",
"ingestion_time": "2019-02-24T09:59:34.678Z",
"log_stream": "eni-0bf6a7b1e940269a3-all"
},
"account-id": "<AWSアカウント>",
"action": "ACCEPT",
"message": "2 <AWSアカウント> eni-0bf6a7b1e940269a3 <SrcIP> 172.31.6.143 63374 5601 6 6 1098 1551002309 1551002342 ACCEPT OK",
"src_geoip": {
"postal_code": "501-0115",
"longitude": 136.7222,
"region_name": "Gifu",
"country_name": "Japan",
"country_code2": "JP",
"ip": "<SrcIP>",
"country_code3": "JP",
"location": {
"lat": 35.3911,
"lon": 136.7222
},
"timezone": "Asia/Tokyo",
"region_code": "21",
"latitude": 35.3911,
"city_name": "Gifu City",
"continent_code": "AS"
},
"dstaddr": "172.31.6.143",
"interface-id": "eni-0bf6a7b1e940269a3",
"start_time": "1551002309",
"dst_geoip": {},
"@version": "1",
"srcport": "63374",
"packets": 6,
"end_time": "1551002342",
"srcaddr": "<SrcIP>"
},
"fields": {
"@timestamp": [
"2019-02-24T09:58:29.000Z"
],
"cloudwatch_logs.ingestion_time": [
"2019-02-24T09:59:34.678Z"
]
},
"sort": [
1551002309000
]
}


まとめ

logstash.confのfilter区でif "OK" in [message]とすることでlog-statusOKのものだけをElasticsearchに入れて分析出来るようにしました。NODATASKIPDATAのログは多くの項目がハイフン(-)のため、データ型が合わずElasticsearchには入ってこないはずですが、念のため。。