0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Logstash Input S3 plugin で CloudFrontログを取り込む

Last updated at Posted at 2019-12-13

アップデート:7.7.0での動作

はじめに

  • CloudFrontのログをいい感じ(全てのフィールドを綺麗にVisualize)にElasticsearchで可視化したい
  • CloudFrontのc_ip or x_forwarded_for をGeo IPで緯度/経度を取得したい。国名、都市名などを分析するためにログに格納したい
  • Elastic Cloudへのログの取り込みをしたい
  • Amazon S3に溜まっているログデータを取り込みたい

前提

  • LogstashがインストールされているEC2/ECS等がすでに動いている
    • 取り込みの耐えられるスペック
  • S3にCloudFrontのログが保存されている
  • ログを取り込むElasticsearchがある
  • Logstash,Elasticsearchをすでに利用していて設定がある程度わかる

バージョン

  • Logstash-7.4.2
  • Elasticsearch-7.4.2
  • Kibana-7.4.2
  • update :: 7.7.0でも動作問題なし

書いてないこと

  • Logstashの設定(pipeline.yml/logstash.yml等)
  • Elasticserachの設定
  • CloudFrontからS3へのログの吐き出し方

動作構成

スクリーンショット 2019-12-13 16.39.55.png

設定

CloudFront Logデータ

date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type x-edge-request-id x-host-header cs-protocol cs-bytes time-taken x-forwarded-for ssl-protocol ssl-cipher x-edge-response-result-type cs-protocol-version fle-status fle-encrypted-fields c-port time-to-first-byte x-edge-detailed-result-type sc-content-type sc-content-len sc-range-start sc-range-end

Grok pattern  %{CF_ACCESS_LOG}として設定する

CF_ACCESS_LOG %{DATE_EU:date}\t%{TIME:time}\t(?<x_edge_location>\b[\w\-]+\b)\t(?:%{NUMBER:sc_bytes:int}|-)\t%{IP:c_ip}\t%{WORD:cs_method}\t%{HOSTNAME:cs_host}\t%{NOTSPACE:cs_uri_stem}\t%{NUMBER:sc_status:int}\t%{GREEDYDATA:referrer}\t%{GREEDYDATA:User_Agent}\t%{GREEDYDATA:cs_uri_stem}\t%{GREEDYDATA:cookies}\t%{WORD:x_edge_result_type}\t%{NOTSPACE:x_edge_request_id}\t%{HOSTNAME:x_host_header}\t%{URIPROTO:cs_protocol}\t%{INT:cs_bytes:int}\t%{GREEDYDATA:time_taken}\t%{GREEDYDATA:x_forwarded_for}\t%{GREEDYDATA:ssl_protocol}\t%{GREEDYDATA:ssl_cipher}\t%{GREEDYDATA:x_edge_response_result_type}\t%{GREEDYDATA:cs_protocal_version}\t%{GREEDYDATA:file_status}\t%{GREEDYDATA:file_encrypted_fields}\t%{GREEDYDATA:c_port}\t%{GREEDYDATA:time_to_first-byte}\t%{GREEDYDATA:x_edge_detailed_result_type}\t%{GREEDYDATA:sc_content_type}\t%{GREEDYDATA:sc_content_len}\t%{GREEDYDATA:sc_range_start}\t%{GREEDYDATA:sc_range_end}

Grok Debuggerを使うといい感じになる

Kibana → Dev Tools → Grok Debuggerを開く。 実際のログデータをSample Dataに設定しGrok Patternをチェックする。間違っていないと、Structured Dataとして結果を確認できる。予期せぬ形でElasticsearchに取り取り込まれなくて済む。
スクリーンショット 2019-12-13 10.24.58.png

cloudfront.conf

LogstashのPipelineでcloudfront.confを読み込ませるようにする。

Input設定

input {
  s3 {
    access_key_id => "XXXXXXXXXXXXXXX"
    secret_access_key => "XXXXXXXXXXXXXXXXXXXXXXXXXX"
    bucket => "s3-bucket-name"
    region => "ap-northeast-1"
    interval => "60"
    watch_for_new_files => true
  }
}

filter設定

Grok patternをパラメータとして読み込ませる設定

patterns_dir => ["/etc/logstash/patterns"]

grokをした後は各種プラグインを利用しログを加工する。x_forwarded_forがない場合はc_ipからgeo_locationを取得し、ある場合はx_forwarded_forを利用する設定。最後に重複するデータを削除する。useragentはUser Agentをいい感じにしてくれるプラグイン

filter {
  grok {
        patterns_dir => ["/etc/logstash/patterns"]
        match => [ "message", "%{CF_ACCESS_LOG}" ]
     }
  mutate {
    add_field => [ "listener_timestamp", "%{date} %{time}" ]
  }

  date {
    match => [ "listener_timestamp", "yy-MM-dd HH:mm:ss" ]
    timezone => "UTC"
    target => "@timestamp"
  }

  if [x_forwarded_for] == "-" {
    geoip {
      source => "c_ip"
    }
  } else {
    geoip {
      source => "x_forwarded_for"
    }
  }

  useragent {
    source => "User_Agent"
    target => "useragent"
  }

  mutate {
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}" ]
  }

  mutate {
    convert => [ "[geoip][coordinates]", "float" ]
  }

  mutate {
    remove_field => ["date", "time", "listener_timestamp", "cloudfront_version", "message", "cloudfront_fields", "User_Agent"]
  }
}

Output設定

Elastic Cloudに転送する設定。user/passwod,hostsは適宜修正。index名は日付単位で設定

output {
       elasticsearch {
           user  => xxxxxxx
           password => xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
           hosts => "https://xxxxxxxxxxxxxxxxxxxxxxxxxxxx:12345"
           index => "cloudfront-%{+yyyy.MM.dd}"
           }
}

sincedbを修正することで取り込みたいところから取り込める

S3をしばらく利用した状態で何も考えず実行すると過去のログが大量に取り込まれてしまうこともある。取り込みたいタイミングは、sincedbを修正することで変更できる。UTCでかくので注意。

# vi /${path}/plugins/inputs/s3/sincedb_8171845adf3152f179a1c22e493a62c4"}
# 2019-12-12 01:22:10 +0000

取り込めた

取り込んだログをVisualizeすると。いい感じに分析ができそう。

スクリーンショット 2019-12-13 16.19.19.png

注意点

CloudFrontのログフォーマットが変わるとGrokが失敗し、tagに _grokparsefailureがセットされログが取り込めなくなる。そんなに頻繁ではないが、ログを監視(Watcher)を入れてtagの値をアラートしておくと便利。

参考

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?