AWS
Elasticsearch
kibana
Logstash
VPCFlowLogs

VPC FlowLogsをLogstashで正規化してみた


はじめに

AWS VPCFlowLogsの通信ログをEC2で構築したElasticStackに取り込む方法をまとめてみました。


利用環境

image.png


実施内容


  1. IAM Role作成

  2. CloudWatchLogs LogGroup作成

  3. VPCFlowLogs作成

  4. ElasticStackマシン作成

  5. 正規化フィルタ作成

  6. Kibana画面


1. IAM Role作成

以下、2つのIAM Roleを事前に作成します。


  • VPCFlowLogsがCloudWatchLogsにログを出力するために必要なIAM Role


flowlogs_role

{

"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}


  • ElasticStackがCloudWatchLogsからログを取得するために必要なIAM Role


cloudwatchlogs_readonly_role

{

"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:Describe*",
"logs:Get*",
"logs:List*",
"logs:StartQuery",
"logs:StopQuery",
"logs:TestMetricFilter",
"logs:FilterLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}


2. CloudWatchLogs LogGroup作成


  • AWS Management Consoleから[CloudWatch] > [ログ] > [アクション]で[ロググループの作成]を選択します。

    image.png


  • [ロググループ名]に任意の名前を付けて、[ロググループの作成]を選択します。(今回は、vpcflowlogs)

    image.png


  • 以下のように作成されていればOKです。

    image.png



3. VPCFlowLogs作成


  • 次に、[VPC] > [VPC] > [アクション]で[Create fow log]を選択します。

    image.png


  • 全ての通信ログを対象(FilterをAll)に作成したロググループIAM Roleを指定して、[Create]を選択します。

    image.png


  • CloudWatchLogsの作成したvpcflowlogsのロググループにENIごとの通信ログが出力されていればOKです。

    image.png


【参考】vpcflowlogsのログフォーマット

VPC フローログレコード

image.png


4. ElasticStackマシン作成


  • EC2インスタンスにJava1.8.0、logstash、Kibana、Elasticsearchを導入します。

  • JVMヒープサイズの設定は省略しています。

[root@ip-172-31-34-49]# vi /etc/yum.repos.d/elastic.repo

------
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
------
[root@ip-172-31-34-49]# yum install -y java-1.8.0-openjdk
[root@ip-172-31-34-49]# yum install -y logstash kibana elasticsearch
[root@ip-172-31-34-49]# systemctl start elasticsearch



  • logstash-input-cloudwatch_logspluginを導入します。

[root@ip-172-31-34-49]# /usr/share/logstash/bin/logstash-plugin install logstash-input-cloudwatch_logs

Validating logstash-input-cloudwatch_logs
Installing logstash-input-cloudwatch_logs
Installation successful


5. 正規化フィルタ作成


  • logstashのgrok filterで正規化するためのパターン(grok patterns)を作成します。

[root@ip-172-31-34-49]# cd /etc/logstash

[root@ip-172-31-34-49 logstash]# mkdir patterns
[root@ip-172-31-34-49 patterns]# vi vpcflowlogs_patterns
# VPC_Flow_Logs
VPCFLOWLOG %{NUMBER:version} %{NUMBER:account-id} %{NOTSPACE:interface-id} %{NOTSPACE:srcaddr} %{NOTSPACE:dstaddr} %{NOTSPACE:srcport} %{NOTSPACE:dstport} %{NOTSPACE:protocol} %{NOTSPACE:packets} %{NOTSPACE:bytes} %{NOTSPACE:start} %{NOTSPACE:end} %{NOTSPACE:action} %{NOTSPACE:log-status}

【参考】Grok Patternの標準セット

image.png


  • 次にlogstash.confを作成します。

  • cloudwatchlogsから取得した通信ログを加工し、Elasticsreachに保存します。


/etc/logstash/conf.d/logstash.conf

input {

cloudwatch_logs {
region => "ap-southeast-1"
log_group => [ "vpcflowlogs" ]
sincedb_path => "/var/lib/logstash/sincedb"
}
}

filter {
grok {
patterns_dir => [ "/etc/logstash/patterns/vpcflowlogs_patterns" ]
match => { "message" => "%{VPCFLOWLOG}"}
}
date {
match => [ "start_time","UNIX" ]
target => "@timestamp"
}
}

output {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "vpcflowlogs-%{+YYYY.MM.dd}"
}
}



  • Logstashのサービスを起動します。

[root@ip-172-31-34-49]# systemctl start logstash


6. Kibana画面


  • Kibanaにアクセスするためにkibana.ymlを修正し、サービス起動します。

[root@ip-172-31-34-49]# vi /etc/kibana/kibana.yml

server.host: "0.0.0.0"
[root@ip-172-31-34-49]# systemctl start kibana


  • http://:5601にWebアクセスします。

  • Kibanaの[Management] > [Index Patterns]で[Create Index pattern]を選択します。

  • [Index pattern]にvpcflowlogs-*とIndex名を指定し、[Next step]を選択します。

image.png


  • [Time Filter field name]に@timestampを選択し、[Create index pattern]を選択します。

image.png


  • [Discover]画面でログが検索出来るようになっていると思います。

  • ログの中身は以下のような感じです。

image.png


まとめ

本来はsrcaddrやdstaddrをIP型にしたり、bytesをFloat型にしたりdata.typeを変換したいところではありますが、ログの内容によって、[-(ハイフン)]になる可能性があるため、今回は断念しました。もし良い方法があれば、教えてください!!