アップデート:7.7.0での動作
はじめに
- CloudFrontのログをいい感じ(全てのフィールドを綺麗にVisualize)にElasticsearchで可視化したい
- CloudFrontのc_ip or x_forwarded_for をGeo IPで緯度/経度を取得したい。国名、都市名などを分析するためにログに格納したい
- Elastic Cloudへのログの取り込みをしたい
- Amazon S3に溜まっているログデータを取り込みたい
前提
- LogstashがインストールされているEC2/ECS等がすでに動いている
- 取り込みの耐えられるスペック
- S3にCloudFrontのログが保存されている
- ログを取り込むElasticsearchがある
- Logstash,Elasticsearchをすでに利用していて設定がある程度わかる
バージョン
- Logstash-7.4.2
- Elasticsearch-7.4.2
- Kibana-7.4.2
- update :: 7.7.0でも動作問題なし
書いてないこと
- Logstashの設定(pipeline.yml/logstash.yml等)
- Elasticserachの設定
- CloudFrontからS3へのログの吐き出し方
動作構成

設定
CloudFront Logデータ
date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end
Grok pattern %{CF_ACCESS_LOG}として設定する
CF_ACCESS_LOG %{DATE_EU:date}\t%{TIME:time}\t(?<x_edge_location>\b[\w\-]+\b)\t(?:%{NUMBER:sc_bytes:int}|-)\t%{IP:c_ip}\t%{WORD:cs_method}\t%{HOSTNAME:cs_host}\t%{NOTSPACE:cs_uri_stem}\t%{NUMBER:sc_status:int}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:User_Agent}\t%{GREEDYDATA:cs_uri_stem}\t%{GREEDYDATA:cookies}\t%{WORD:x_edge_result_type}\t%{NOTSPACE:x_edge_request_id}\t%{HOSTNAME:x_host_header}\t%{URIPROTO:cs_protocol}\t%{INT:cs_bytes:int}\t%{GREEDYDATA:time_taken}\t%{GREEDYDATA:x_forwarded_for}\t%{GREEDYDATA:ssl_protocol}\t%{GREEDYDATA:ssl_cipher}\t%{GREEDYDATA:x_edge_response_result_type}\t%{GREEDYDATA:cs_protocal_version}\t%{GREEDYDATA:file_status}\t%{GREEDYDATA:file_encrypted_fields}\t%{GREEDYDATA:c_port}\t%{GREEDYDATA:time_to_first-byte}\t%{GREEDYDATA:x_edge_detailed_result_type}\t%{GREEDYDATA:sc_content_type}\t%{GREEDYDATA:sc_content_len}\t%{GREEDYDATA:sc_range_start}\t%{GREEDYDATA:sc_range_end}
Grok Debuggerを使うといい感じになる
Kibana → Dev Tools → Grok Debuggerを開く。 実際のログデータをSample Dataに設定しGrok Patternをチェックする。間違っていないと、Structured Dataとして結果を確認できる。予期せぬ形でElasticsearchに取り取り込まれなくて済む。
cloudfront.conf
LogstashのPipelineでcloudfront.confを読み込ませるようにする。
Input設定
input {
s3 {
access_key_id => "XXXXXXXXXXXXXXX"
secret_access_key => "XXXXXXXXXXXXXXXXXXXXXXXXXX"
bucket => "s3-bucket-name"
region => "ap-northeast-1"
interval => "60"
watch_for_new_files => true
}
}
filter設定
Grok patternをパラメータとして読み込ませる設定
patterns_dir => ["/etc/logstash/patterns"]
grokをした後は各種プラグインを利用しログを加工する。x_forwarded_forがない場合はc_ipからgeo_locationを取得し、ある場合はx_forwarded_forを利用する設定。最後に重複するデータを削除する。useragentはUser Agentをいい感じにしてくれるプラグイン。
filter {
grok {
patterns_dir => ["/etc/logstash/patterns"]
match => [ "message", "%{CF_ACCESS_LOG}" ]
}
mutate {
add_field => [ "listener_timestamp", "%{date} %{time}" ]
}
date {
match => [ "listener_timestamp", "yy-MM-dd HH:mm:ss" ]
timezone => "UTC"
target => "@timestamp"
}
if [x_forwarded_for] == "-" {
geoip {
source => "c_ip"
}
} else {
geoip {
source => "x_forwarded_for"
}
}
useragent {
source => "User_Agent"
target => "useragent"
}
mutate {
add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
}
mutate {
convert => [ "[geoip][coordinates]", "float" ]
}
mutate {
remove_field => ["date", "time", "listener_timestamp", "cloudfront_version", "message", "cloudfront_fields", "User_Agent"]
}
}
Output設定
Elastic Cloudに転送する設定。user/passwod,hostsは適宜修正。index名は日付単位で設定
output {
elasticsearch {
user => xxxxxxx
password => xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
hosts => "https://xxxxxxxxxxxxxxxxxxxxxxxxxxxx:12345"
index => "cloudfront-%{+yyyy.MM.dd}"
}
}
sincedbを修正することで取り込みたいところから取り込める
S3をしばらく利用した状態で何も考えず実行すると過去のログが大量に取り込まれてしまうこともある。取り込みたいタイミングは、sincedbを修正することで変更できる。UTCでかくので注意。
# vi /${path}/plugins/inputs/s3/sincedb_8171845adf3152f179a1c22e493a62c4"}
# 2019-12-12 01:22:10 +0000
取り込めた
取り込んだログをVisualizeすると。いい感じに分析ができそう。
注意点
CloudFrontのログフォーマットが変わるとGrokが失敗し、tagに _grokparsefailureがセットされログが取り込めなくなる。そんなに頻繁ではないが、ログを監視(Watcher)を入れてtagの値をアラートしておくと便利。