はじめに
AWS VPCFlowLogsの通信ログをEC2で構築したElasticStackに取り込む方法をまとめてみました。
利用環境
実施内容
- IAM Role作成
- CloudWatchLogs LogGroup作成
- VPCFlowLogs作成
- ElasticStackマシン作成
- 正規化フィルタ作成
- Kibana画面
1. IAM Role作成
以下、2つのIAM Roleを事前に作成します。
- VPCFlowLogsがCloudWatchLogsにログを出力するために必要なIAM Role
flowlogs_role
{
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
- ElasticStackがCloudWatchLogsからログを取得するために必要なIAM Role
cloudwatchlogs_readonly_role
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:Describe*",
"logs:Get*",
"logs:List*",
"logs:StartQuery",
"logs:StopQuery",
"logs:TestMetricFilter",
"logs:FilterLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
2. CloudWatchLogs LogGroup作成
3. VPCFlowLogs作成
全ての通信ログを対象(FilterをAll)に作成した
ロググループ
とIAM Role
を指定して、[Create]を選択します。
CloudWatchLogsの作成したvpcflowlogsのロググループにENIごとの通信ログが出力されていればOKです。
【参考】vpcflowlogsのログフォーマット
・ VPC フローログレコード
4. ElasticStackマシン作成
- EC2インスタンスにJava1.8.0、logstash、Kibana、Elasticsearchを導入します。
- JVMヒープサイズの設定は省略しています。
[root@ip-172-31-34-49]# vi /etc/yum.repos.d/elastic.repo
------
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
------
[root@ip-172-31-34-49]# yum install -y java-1.8.0-openjdk
[root@ip-172-31-34-49]# yum install -y logstash kibana elasticsearch
[root@ip-172-31-34-49]# systemctl start elasticsearch
-
logstash-input-cloudwatch_logs
pluginを導入します。
[root@ip-172-31-34-49]# /usr/share/logstash/bin/logstash-plugin install logstash-input-cloudwatch_logs
Validating logstash-input-cloudwatch_logs
Installing logstash-input-cloudwatch_logs
Installation successful
5. 正規化フィルタ作成
- logstashの
grok filter
で正規化するためのパターン(grok patterns)を作成します。
[root@ip-172-31-34-49]# cd /etc/logstash
[root@ip-172-31-34-49 logstash]# mkdir patterns
[root@ip-172-31-34-49 patterns]# vi vpcflowlogs_patterns
# VPC_Flow_Logs
VPCFLOWLOG %{NUMBER:version} %{NUMBER:account-id} %{NOTSPACE:interface-id} %{NOTSPACE:srcaddr} %{NOTSPACE:dstaddr} %{NOTSPACE:srcport} %{NOTSPACE:dstport} %{NOTSPACE:protocol} %{NOTSPACE:packets} %{NOTSPACE:bytes} %{NOTSPACE:start} %{NOTSPACE:end} %{NOTSPACE:action} %{NOTSPACE:log-status}
- 次に
logstash.conf
を作成します。 - cloudwatchlogsから取得した通信ログを加工し、Elasticsreachに保存します。
/etc/logstash/conf.d/logstash.conf
input {
cloudwatch_logs {
region => "ap-southeast-1"
log_group => [ "vpcflowlogs" ]
sincedb_path => "/var/lib/logstash/sincedb"
}
}
filter {
grok {
patterns_dir => [ "/etc/logstash/patterns/vpcflowlogs_patterns" ]
match => { "message" => "%{VPCFLOWLOG}"}
}
date {
match => [ "start_time","UNIX" ]
target => "@timestamp"
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "vpcflowlogs-%{+YYYY.MM.dd}"
}
}
- Logstashのサービスを起動します。
[root@ip-172-31-34-49]# systemctl start logstash
6. Kibana画面
- Kibanaにアクセスするために
kibana.yml
を修正し、サービス起動します。
[root@ip-172-31-34-49]# vi /etc/kibana/kibana.yml
server.host: "0.0.0.0"
[root@ip-172-31-34-49]# systemctl start kibana
- http://:5601にWebアクセスします。
- Kibanaの[Management] > [Index Patterns]で[Create Index pattern]を選択します。
- [Index pattern]に
vpcflowlogs-*
とIndex名を指定し、[Next step]を選択します。
- [Time Filter field name]に
@timestamp
を選択し、[Create index pattern]を選択します。
- [Discover]画面でログが検索出来るようになっていると思います。
- ログの中身は以下のような感じです。
まとめ
本来はsrcaddrやdstaddrをIP型にしたり、bytesをFloat型にしたりdata.typeを変換したいところではありますが、ログの内容によって、[-(ハイフン)]になる可能性があるため、今回は断念しました。もし良い方法があれば、教えてください!!