Help us understand the problem. What is going on with this article?

APIGatewayのログのタイムスタンプをLambdaで加工してElasticsearch Serviceに送る

はじめに

APIGatewayのログをElasticsearch Serviceに送ってKibanaで可視化しようとしたときの手順。
ただ、kibanaで対応しているタイムスタンプの形式がAPIGatewayが出力するタイムスタンプの型と一致していなかった。
そのため、Lambdaでタイムスタンプの形式を変換してKibanaでタイムスタンプとして扱えるようにした。

構成

APIGatewayのアクセスログは、Kinesisにしか出力できないため、Kinesisを利用している。
aaa.png

手順

1.Lambdaの作成
2.Kinesis Data Firehoseの作成
3.APIGatewayでアクセスログを出力する設定追加
※Elasticsearch Serviceのドメインは作成済の前提

1.Lambdaの作成

Lamndaの環境変数にログレベルとして「LOG_LEVEL」を設定する必要がある。
下記は、ログレベルがデバッグの場合
debag.jpg

タイムスタンプのキーは、APIGatewayで設定した「requestTime」。

lambda_function.py
import json
import base64
import logging
from os import environ
from datetime import datetime as dt
from datetime import timedelta, timezone as tz

# 環境変数ログレベルのキー
LOG_LEVEL = 'LOG_LEVEL'
# タイムスタンプのキー
REQUEST_TIME_KEY = 'requestTime'

# ログオブジェクト作成
def get_logger(workername):
    # ログの出力名を設定
    logger = logging.getLogger(workername)
    # ログレベルの設定
    logger.setLevel(int(environ[LOG_LEVEL]))

    return logger

# ロガー
logger = get_logger(__name__)

# タイムスタンプの形式をKibana用に変換
def conv_timestamp(timestamp):
    # date型に変換
    date = dt.strptime(timestamp, '%d/%b/%Y:%H:%M:%S +0000')
    # 9時間進める(力技なのでイマイチ・・・)
    date = date + timedelta(hours=+9)
    # String型に変換して返却
    return dt.strftime(date, '%Y-%m-%dT%H:%M:%S+0900')

def lambda_handler(event, context):
    logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼event▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
    logger.debug(event)
    output_list = []
    for record in event['records']:
        data = record['data']
        # デコード
        data_decode = base64.b64decode(data)
        logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換前)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
        logger.debug(data_decode)
        logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換前)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

        # byte型をString型に変換
        data_str = data_decode.decode("ascii")
        # Json文字列を辞書型に変換
        data = json.loads(data_str)

        # タイムスタンプの形式をkibana用に変換
        data[REQUEST_TIME_KEY] = conv_timestamp(data[REQUEST_TIME_KEY])

        # 辞書型をjson文字列→base64エンコード→デコード
        record['data'] = base64.b64encode(json.dumps(data).encode('ascii')).decode("ascii")
        # resultを追加
        record['result'] = 'Ok'
        output_list.append(record)

        data_decode = base64.b64decode(record['data'])
        logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼data(変換後)▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
        logger.debug(data_decode)
        logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲data(変換後)▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

    logger.debug('▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼返却するLIST▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼▼')
    logger.debug(output_list)
    logger.debug('▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲返却するLIST▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲▲')

    return {'records': output_list}

Kinesisにデータを戻すために、AWS管理ポリシーの「AWSLambdaKinesisExecutionRole」を追加

2.Kinesis Data Firehoseの作成

「Choose a Souce」は「Direct PUT or other sources」を選択する。
01.png

「record transformation」は「Enabled」を選択し、作成した加工用のLambdaを選択する。
02.png

「Amazon Elasticsearch Service destination」のドメインで作成済のドメインを選択する。
「Index」は任意の名前。
「Index rotation」、日ごとにする場合は、「Every day」を選択する。
03.png

また、Elasticsearch Serviceにログを送るためとLambdaを実行するために、下記の権限をポリシーで追加する。

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "es:ESHttpPost",
                "es:ESHttpPut"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "lambda:GetFunctionConfiguration"
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

以上の設定をして作成する。

3.APIGatewayでアクセスログを出力する設定追加

アクセスログを出力するAPIのステージ > ログトレースタブ
Access Log Destination ARN:1.で作成したKinesisのARN
ログの形式:JSONボタンを押下して設定

gateway.png

これで構成の図の通りに完成です。

最後に

Kinesisのサービスを今回初めて触りました。
なんか難しいのかと思っていましたが、データを溜め込んでくれる単純なサービスなんだと理解できました。
ただ東京リージョンでも全て英語なので、初学者のハードルを少し上げていると思いました。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away