AWS
ElasticsearchService
ALB
CLB

ALB/CLBのアクセスログをElasticsearch Service (6.0/6.2) に取り込むメモ

完全に「他人のふんどしでなんとやら」な記事ですが、クラスメソッドさんの、

を、Elasticsearch Service の Elasticsearch 6.0 環境に取り込む形でチャレンジしてみましたので、メモを残しておきます。

2018/04/08追記:
Elasticsearch 6.2 環境でも同じ方法で取り込むことができることを確認しました。

気を付けること

これも、クラスメソッドさんの、

に書いてありますが、Elasticsearch 6.0 にリクエストを送信するときにはContent-Type: application/jsonをリクエストヘッダに指定する必要があります。

基本的には、ここだけです。

2018/03/16追記:
ログの量が多い場合、Lambda の割り当てメモリサイズを増やす必要があります(後述)。

手順

1. Amazon Elasticsearch Service の起動

基本的には元記事のままですので、手順は省略します(バージョンの選択を変えるだけ)。

なお、VPC 内にクラスタを作成する場合、セキュリティグループではHTTPS(TCP:443)のインバウンドアクセスを有効にしておく必要があります。

2. Ingest Pipeline の設定

実行するcurlコマンドに、前述のリクエストヘッダの指定を加えるだけです。

curlでPipeline設定
$ curl -H "Content-Type: application/json" -XPUT 'https://XXXXX-XXXXXXXXXXXXXXXXXXXXXXXXXX.ap-northeast-1.es.amazonaws.com/_ingest/pipeline/elblog' -d '{
(以下、略)

※ALB 独自のログ項目を取得する必要がなければ、ログフォーマット・項目の設定は元記事のままで ALB でも利用できるようです。

3. AWS Lambda ファンクションの作成

元記事の説明がかなり省略されているので、一部、画面キャプチャで補っておきます。

IAM Role の作成

以下は、Elasticsearch Service クラスタを VPC 内に作成する場合の設定です。
lambda_role.png

S3AccessForES
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:ListAllMyBuckets",
                "s3:ListBucket",
                "s3:GetObject*",
                "s3:GetBucketLocation"
            ],
            "Resource": "*"
        }
    ]
}
ElasticsearchPostAccess
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "es:ESHttpPost",
                "es:ESHttpPut"
            ],
            "Resource": "*"
        }
    ]
}

Lambda ファンクションの作成

元記事のコードに対して、リクエストヘッダでContent-Type: application/x-ndjsonを加える処理を追加します。

2018/03/30追記:
_bulkでデータ投入する場合は(Content-Type: application/jsonでも動きますが)Content-Type: application/x-ndjsonが正しいようですので訂正しました(画面キャプチャは直っていませんが)。

lambda_py_change.png

変更部分
    headers = {'Content-Type': 'application/json'}
    response = request(url, "POST", credentials, 'es', headers=headers, data=doc)

※うまく動かないときは、この変更箇所の直下にprint(response.text)を加えると、CloudWatch Logs のログにエラー等の内容が書き出され、原因究明の手掛かりになりますので試してみてください。

環境変数は以下のように指定します。
lambda_env.png

なお、アクセスログが保存されている S3 バケットのトリガーの設定は以下のように行います。
lambda_s3_add.png
↓保存後
lambda_s3.png

先に作成したロールの指定は以下の通りです。タイムアウト時間も適宜変更しておきます。
lambda_role_time.png

2018/03/16追記:
ALB/CLB のログサイズが大きい場合、Lambda に割り当てるメモリサイズが 128MB では不足します。
URL/パスの長さなどにもよりますが、私が試した環境では、分間 40,000 リクエスト程度で最大 160MB 程度必要になったため、Lambda への割り当てを 192MB に増やしました。

なお、メモリが足りない場合、解凍後のログをまとめて転送する途中でエラーになり、最大3回リトライされて同じログが3つ記録されることがあります(その一方で、記録されないログも存在する可能性あり)。

以下は、VPC 内に作成する場合のネットワーク設定の例です。
lambda_nw.png
※セキュリティグループでアウトバウンド接続を許可しておきます。

設定後、5~10分程度経過すると、ログが Elasticsearch Service に取り込まれ、Kibana で確認できるようになります。

2018/03/16追記:
早速 6.2 が出たので、近いうちに試すことになりそうです。→04/08 試しました。

2018/03/21追記:
CloudFront アクセスログ用の変更点を書きました。