AWS
CloudFront
lambda
ElasticsearchService

CloudFrontのアクセスログをElasticsearch Service (6.0/6.2) に取り込むメモ(手抜き編)

この記事は、ALB/CLBのアクセスログをElasticsearch Service (6.0/6.2) に取り込むメモの CloudFront アクセスログ版です。

記事で参照している元記事、クラスメソッドさんの

の終わりに、

Ingest Pipeline の設定を変更するだけで同じ Lambda ファンクションで CloudFront のアクセスログも可視化することができます。

と書かれていますが、少なくとも Elasticsearch Service 6.0(以降)ではうまくいきませんでしたので、そのフォローとして書いています。

※タイトルにある通り、やり方は完全に手抜きです…取り急ぎ用意する必要があったので…。

対応のポイント

ベースは前回の記事通りですが、単純に Ingest Pipeline の設定を変えるだけでは、以下の点で問題が生じます。

  • ログがタブ区切り(TSV?)なので、そのまま Elasticsearch Service の Ingest Pipeline に送るとエラーが出て怒られる。
  • ログの先頭にコメント行が2行ほど入るので、そこの取り込みでもエラーが出る(無視しても良いが、気持ち悪い…)。
  • 日付の形式が、Grok がデフォルトで持っているパターン(DATE_US・DATE_EU など)では取り込めない(定義すれば良いのですが、定義した上で更に時刻と結合し、タイムゾーンを…と考えるのが面倒)。

というわけで、今回はほぼ Python コード側で対応してしまいます(本当は Ingest Pipeline 側で何とかするのが筋でしょうが…)。

Ingest Pipeline 設定

CloudFront のログ形式はこちらです。

また、Grok の設定については、こちらを参考にしました。

Logstash設定(Advanced)ログメッセージのパースの部分です。

設定(CloudFront用)
$ curl -H "Content-Type: application/json" -XPUT 'https://XXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXX.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/cflog' -d '{
  "processors": [{
    "grok": {
      "field": "message",
      "patterns":[ "%{TIMESTAMP_ISO8601:timestamp} %{NOTSPACE:x_edge_location} (?:%{INT:sc_bytes:int}|-) %{IPORHOST:c_ip} %{WORD:cs_method} %{HOSTNAME:cs_host} %{NOTSPACE:cs_uri_stem} %{INT:sc_status:int} %{GREEDYDATA:referrer} %{GREEDYDATA:User_Agent} %{GREEDYDATA:cs_uri_query} %{GREEDYDATA:cookies} %{WORD:x_edge_result_type} %{NOTSPACE:x_edge_request_id} %{HOSTNAME:x_host_header} %{URIPROTO:cs_protocol} (?:%{INT:cs_bytes:int}|-) %{NUMBER:time_taken:float} %{NOTSPACE:x_forwarded_for} %{NOTSPACE:ssl_protocol} %{NOTSPACE:ssl_cipher} %{WORD:x_edge_response_result_type}" ],
      "ignore_missing": true
    }
  },{
    "remove":{
      "field": "message"
    }
  }, {
    "user_agent": {
      "field": "User_Agent",
      "target_field": "user_agent",
      "ignore_failure": true
    }
  }, {
    "remove": {
      "field": "User_Agent"
    }
  }]
}'

コード変更点

29行目から始まるif文のブロックを書き換えます。

コード変更部分
    if not line.strip().startswith('#'):
        data += '{"index":{"_index":"%s","_type":"log"}}\n' % es_index
        data += '{"message":"%s"}\n' % line.strip().replace('"', '\\"').replace('\t', ' ').replace(' ', 'T', 1).replace(' ', 'Z ', 1)

        if len(data) > 3000000:
            _bulk(es_host, data, credentials)
            data = ""
  • コメント行はスキップする。
  • タブ区切りを半角スペース区切りに変換する。
  • 日付と時刻をTIMESTAMP_ISO8601で取り込める形式に変換する(タイムゾーンはUTC)。

を行っています。

Lambda 変更点

ログの取り込み元バケットのほか、環境変数の Elasticsearch Service に取り込むときの INDEX プレフィクスと、Ingest Pipeline の名前(下図)を変更します。
aws_cflog.png

以上で、CloudFrontのログも取り込まれるようになるはずです。

※おそらく、Elsaticsearch Service 6.2 でも同じ手順で大丈夫だと思います(未検証。近日中に検証予定)。→04/08 大丈夫でした。