はじめに
AWS VPCFlowLogsの通信ログをEC2で構築したElasticStackに取り込む方法をまとめてみました。
利用環境
実施内容
- IAM Role作成
- CloudWatchLogs LogGroup作成
- VPCFlowLogs作成
- ElasticStackマシン作成
- 正規化フィルタ作成
- Kibana画面
1. IAM Role作成
以下、2つのIAM Roleを事前に作成します。
- VPCFlowLogsがCloudWatchLogsにログを出力するために必要なIAM Role
flowlogs_role
{
"Statement": [
{
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:DescribeLogGroups",
"logs:DescribeLogStreams",
"logs:PutLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
- ElasticStackがCloudWatchLogsからログを取得するために必要なIAM Role
cloudwatchlogs_readonly_role
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:Describe*",
"logs:Get*",
"logs:List*",
"logs:StartQuery",
"logs:StopQuery",
"logs:TestMetricFilter",
"logs:FilterLogEvents"
],
"Effect": "Allow",
"Resource": "*"
}
]
}
2. CloudWatchLogs LogGroup作成
3. VPCFlowLogs作成
-
全ての通信ログを対象(FilterをAll)に作成した
ロググループ
とIAM Role
を指定して、[Create]を選択します。
-
CloudWatchLogsの作成したvpcflowlogsのロググループにENIごとの通信ログが出力されていればOKです。
【参考】vpcflowlogsのログフォーマット
・ VPC フローログレコード
No | Field名 | 説明 | Value(Sample) | Grok Pattern |
---|---|---|---|---|
1 | version | VPCフローログバージョン | 2 | NUMBER |
2 | account-id | フローログのAWSアカウントID | 123456789010 | NOTSPACE |
3 | interface-id | トラフィックが記録されるENIのID | eni-abc123de | NOTSPACE |
4 | srcaddr | 送信元IPアドレス(IPv4/IPv6) | 172.31.16.139 (空の場合は「-」) | IP |
5 | dstaddr | 宛先IPアドレス(IPv4/IPv6) | 172.31.16.21 (空の場合は「-」) | IP |
6 | srcport | 送信元ポート番号 | 20641 (空の場合は「-」) | NOTSPACE |
7 | dstport | 宛先ポート番号 | 22 (空の場合は「-」) | NOTSPACE |
8 | protocol | トラフィックのIANAプロトコル番号 | 6 (空の場合は「-」) | NOTSPACE |
9 | packets | キャプチャウィンドウ中に転送されたパケット数 | 20 (空の場合は「-」) | NUMBER |
10 | bytes | キャプチャウィンドウ中に転送されたバイト数 | 4249 (空の場合は「-」) | NUMBER |
11 | start_time | キャプチャウィンドウの開始時刻(Unix時間) | 1418530010 (空の場合は「-」) | NOTSPACE |
12 | end_time | キャプチャウィンドウの終了時刻(Unix時間) | 1418530070 (空の場合は「-」) | NOTSPACE |
13 | action | トラフィックに関連付けられたアクション(許可/拒否) | ACCEPT (空の場合は「-」) | NOTSPACE |
14 | log-status | フローログのロギングステータス(OK/NODATA/SKIPDATA) | OK (空の場合は「NODATA」) | NOTSPACE |
※各Field間は半角スペースで区切り(TSVフォーマット) |
4. ElasticStackマシン作成
- EC2インスタンスにJava1.8.0、logstash、Kibana、Elasticsearchを導入します。
- JVMヒープサイズの設定は省略しています。
[root@ip-172-31-34-49]# vi /etc/yum.repos.d/elastic.repo
------
[elasticsearch-6.x]
name=Elasticsearch repository for 6.x packages
baseurl=https://artifacts.elastic.co/packages/6.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
------
[root@ip-172-31-34-49]# yum install -y java-1.8.0-openjdk
[root@ip-172-31-34-49]# yum install -y logstash kibana elasticsearch
[root@ip-172-31-34-49]# systemctl start elasticsearch
-
logstash-input-cloudwatch_logs
pluginを導入します。
[root@ip-172-31-34-49]# /usr/share/logstash/bin/logstash-plugin install logstash-input-cloudwatch_logs
Validating logstash-input-cloudwatch_logs
Installing logstash-input-cloudwatch_logs
Installation successful
5. 正規化フィルタ作成
- logstashの
grok filter
で正規化するためのパターン(grok patterns)を作成します。
[root@ip-172-31-34-49]# cd /etc/logstash
[root@ip-172-31-34-49 logstash]# mkdir patterns
[root@ip-172-31-34-49 logstash]# cd patterns/
[root@ip-172-31-34-49 patterns]# vi vpcflowlogs_patterns
# VPC_Flow_Logs
VPCFLOWLOG %{NUMBER:version} %{NOTSPACE:account-id} %{NOTSPACE:interface-id} %{IP:srcaddr} %{IP:dstaddr} %{NOTSPACE:srcport} %{NOTSPACE:dstport} %{NOTSPACE:protocol} %{NUMBER:packets:float} %{NUMBER:bytes:float} %{NOTSPACE:start} %{NOTSPACE:end} %{NOTSPACE:action} %{NOTSPACE:log-status}
【参考】Grok Patternの標準セット
No | Grok Pattern | 説明 | 利用シーン | 正規表現 |
---|---|---|---|---|
1 | WORD | 単一の単語に一致するパターン | - | \b\w+\b |
2 | NUMBER | 正または負の整数または浮動小数点数に一致するパターン | 正や負、小数点を含む数値 | (?:%{BASE10NUM}) |
3 | POSINT | 正の整数にマッチするパターン | ポート番号等の正の整数 | \b(?:[1-9][0-9]*)\b |
4 | IP | IPv4またはIPv6のIPアドレスと一致するパターン | IPアドレス | (?:%{IPV6}l%{IPV4}) |
5 | NOTSPACE | スペース以外のものに一致するパターン | スペースやタブ区切りのデータ | \S+ |
6 | SPACE | 連続する任意の数のスペースに一致するパターン | スペースの回数が読めない場合 | \s* |
7 | DATA | 限られた量のあらゆる種類のデータとパターンマッチング | - | .*? |
8 | GREEDYDATA | 残りのすべてのデータに一致するパターン | 何が来るか読めない全文字列 | .* |
※浮動小数点数…1より小さい数値を2進数で分かりやすく表現する方式 |
- 次に
logstash.conf
を作成します。 - cloudwatchlogsから取得した通信ログを加工し、Elasticsreachに保存します。
/etc/logstash/conf.d/logstash.conf
input {
cloudwatch_logs {
region => "ap-southeast-1"
log_group => [ "vpcflowlogs" ]
sincedb_path => "/var/lib/logstash/sincedb"
}
}
filter {
if "OK" in [message] {
grok {
patterns_dir => [ "/etc/logstash/patterns/vpcflowlogs_patterns" ]
match => { "message" => "%{VPCFLOWLOG}"}
}
date {
match => [ "start","UNIX" ]
target => "@timestamp"
}
date {
match => [ "start","UNIX" ]
target => "start_time"
}
date {
match => [ "end","UNIX" ]
target => "end_time"
}
geoip {
source => "srcaddr"
target => "src_geoip"
tag_on_failure => "src_geoip_lookup_failure"
}
geoip {
source => "dstaddr"
target => "dst_geoip"
tag_on_failure => "dst_geoip_lookup_failure"
}
mutate {
remove_field => [ "start", 'end' ]
}
}
}
output {
elasticsearch {
hosts => [ "localhost:9200" ]
index => "vpcflowlogs-%{+YYYY.MM.dd}"
}
}
- Logstashのサービスを起動します。
[root@ip-172-31-34-49]# systemctl start logstash
6. Kibana画面
- Kibanaにアクセスするために
kibana.yml
を修正し、サービス起動します。
[root@ip-172-31-34-49]# vi /etc/kibana/kibana.yml
server.host: "0.0.0.0"
[root@ip-172-31-34-49]# systemctl start kibana
- EC2のグローバルIP:5601にWebアクセスします。
- Kibanaの[Dev Tools]を選択し、
PUT _template/vpcflowlogs
でvpcflowlogsというIndex生成時に利用するIndex Template(Mapping定義)を作成します。
_template/vpcflowlogs
PUT _template/vpcflowlogs
{
"index_patterns": ["vpcflowlogs-*"],
"settings": {
"number_of_shards": 1, #shard数はお好みで変更してください
"number_of_replicas" : 1
},
"mappings": {
"doc" : {
"properties": {
"@timestamp": {
"type": "date"
},
"@version" : {
"type" : "keyword"
},
"account-id" : {
"type" : "keyword"
},
"action" : {
"type" : "keyword"
},
"bytes" : {
"type" : "float"
},
"cloudwatch_logs" : {
"properties" : {
"event_id" : {
"type" : "keyword"
},
"ingestion_time" : {
"type" : "date"
},
"log_group" : {
"type" : "keyword"
},
"log_stream" : {
"type" : "keyword"
}
}
},
"dst_geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_code" : {
"type" : "keyword"
},
"country_code2" : {
"type" : "keyword"
},
"country_code3" : {
"type" : "keyword"
},
"country_name" : {
"type" : "keyword"
},
"ip" : {
"type" : "ip"
},
"latitude" : {
"type" : "float"
},
"location" : {
"properties" : {
"lat" : {
"type" : "geo_point"
},
"lon" : {
"type" : "geo_point"
}
}
},
"longitude" : {
"type" : "float"
},
"postal_code" : {
"type" : "keyword"
},
"region_code" : {
"type" : "keyword"
},
"region_name" : {
"type" : "keyword"
},
"timezone" : {
"type" : "keyword"
}
}
},
"dstaddr" : {
"type" : "ip"
},
"dstport" : {
"type" : "keyword"
},
"end_time" : {
"type" : "date"
},
"interface-id" : {
"type" : "keyword"
},
"log-status" : {
"type" : "keyword"
},
"message" : {
"type" : "text"
},
"packets" : {
"type" : "float"
},
"protocol" : {
"type" : "keyword"
},
"src_geoip" : {
"properties" : {
"city_name" : {
"type" : "keyword"
},
"continent_code" : {
"type" : "keyword"
},
"country_code2" : {
"type" : "keyword"
},
"country_code3" : {
"type" : "keyword"
},
"country_name" : {
"type" : "keyword"
},
"dma_code" : {
"type" : "long"
},
"ip" : {
"type" : "ip"
},
"latitude" : {
"type" : "float"
},
"location" : {
"properties" : {
"lat" : {
"type" : "geo_point"
},
"lon" : {
"type" : "geo_point"
}
}
},
"longitude" : {
"type" : "float"
},
"postal_code" : {
"type" : "keyword"
},
"region_code" : {
"type" : "keyword"
},
"region_name" : {
"type" : "keyword"
},
"timezone" : {
"type" : "keyword"
}
}
},
"srcaddr" : {
"type" : "ip"
},
"srcport" : {
"type" : "keyword"
},
"start_time" : {
"type" : "date"
},
"tags" : {
"type" : "keyword"
},
"version" : {
"type" : "keyword"
}
}
}
}
}
※Index Settingsで定義したMapping定義でマッチしないと以下のようなエラーがlogstash-plain.log
に出力されます。
[2019-02-25T01:26:41,456][WARN ][logstash.outputs.elasticsearch] Could not index event to Elasticsearch. {:status=>400, :action=>["index", {:_id=>nil, :_index=>"vpcflowlogs-2019.02.24", :_type=>"doc", :routing=>nil}, #<LogStash::Event:0x41fcd605>], :response=>{"index"=>{"_index"=>"vpcflowlogs-2019.02.24", "_type"=>"doc", "_id"=>"yuFUIGkBPPD_NEFre_Eo", "status"=>400, "error"=>{"type"=>"illegal_argument_exception", "reason"=>"Rejecting mapping update to [vpcflowlogs-2019.02.24] as the final mapping would have more than 1 type: [_doc, doc]"}}}}
- Kibanaの[Management] > [Index Patterns]で[Create Index pattern]を選択します。
- [Index pattern]に
vpcflowlogs-*
とIndex名を指定し、[Next step]を選択します。
- [Time Filter field name]に
@timestamp
を選択し、[Create index pattern]を選択します。
- [Discover]画面でログが検索出来るようになっていると思います。
- ログの中身は以下のような感じです。
JSONフォーマット
{
"_index": "vpcflowlogs-2019.02.24",
"_type": "doc",
"_id": "1dnyHmkBvXrxnkIMecco",
"_version": 1,
"_score": null,
"_source": {
"version": "2",
"protocol": "6",
"tags": [
"dst_geoip_lookup_failure"
],
"bytes": 1098,
"log-status": "OK",
"dstport": "5601",
"@timestamp": "2019-02-24T09:58:29.000Z",
"cloudwatch_logs": {
"log_group": "vpcflowlogs",
"event_id": "34588507295341659901605779663883067398582880181540093985",
"ingestion_time": "2019-02-24T09:59:34.678Z",
"log_stream": "eni-0bf6a7b1e940269a3-all"
},
"account-id": "<AWSアカウント>",
"action": "ACCEPT",
"message": "2 <AWSアカウント> eni-0bf6a7b1e940269a3 <SrcIP> 172.31.6.143 63374 5601 6 6 1098 1551002309 1551002342 ACCEPT OK",
"src_geoip": {
"postal_code": "501-0115",
"longitude": 136.7222,
"region_name": "Gifu",
"country_name": "Japan",
"country_code2": "JP",
"ip": "<SrcIP>",
"country_code3": "JP",
"location": {
"lat": 35.3911,
"lon": 136.7222
},
"timezone": "Asia/Tokyo",
"region_code": "21",
"latitude": 35.3911,
"city_name": "Gifu City",
"continent_code": "AS"
},
"dstaddr": "172.31.6.143",
"interface-id": "eni-0bf6a7b1e940269a3",
"start_time": "1551002309",
"dst_geoip": {},
"@version": "1",
"srcport": "63374",
"packets": 6,
"end_time": "1551002342",
"srcaddr": "<SrcIP>"
},
"fields": {
"@timestamp": [
"2019-02-24T09:58:29.000Z"
],
"cloudwatch_logs.ingestion_time": [
"2019-02-24T09:59:34.678Z"
]
},
"sort": [
1551002309000
]
}
まとめ
logstash.confのfilter区でif "OK" in [message]
とすることでlog-status
のOK
のものだけをElasticsearchに入れて分析出来るようにしました。NODATA
やSKIPDATA
のログは多くの項目がハイフン(-)のため、データ型が合わずElasticsearchには入ってこないはずですが、念のため。。