1
0

【AWS】CloudWatchLogs→Lambda→SQS連携方法

Last updated at Posted at 2024-01-23

はじめに

本記事は Lambda を使ったことが無い自分の備忘として、
Lambda 構築までの流れをメモした記事となります。

構成

「CloudwatchLogs」→「Lambda」→「SQS」のシンプルな構成とします。
■処理フロー
image.png

実装してみた

Amazon SQS 作成

今回の構成で SQS は Lambda からのメッセージをキューに保持するだけなので、
以下 CLI コマンドで簡易的に作成。

aws sqs create-queue --queue-name <SQS名>
# 例:aws sqs create-queue --queue-name testqueue

AWS Lambda 作成

今回は AWSCLI を利用して Lambda の作成を行う。
なお、AWSCLI 実行環境は CloudShell を利用する。

Lambda 用実行ファイルの作成

本ファイルは作成後に function.zip という形式で圧縮を行う。
Lambda が呼び出されるとコード内関数lambda_handlerが呼び出される。
引数としてeventにはサブスクリプションで検出する CloudWatchLogs が格納される。

#----------------------------------
# 【セクション1】パッケージimportとLog出力初期設定
#----------------------------------
import boto3
import json
import logging
import os
import traceback
import base64
import zlib

# モジュールごと(pythonファイルごと)にログ出力するloggerインスタンス作成
logger = logging.getLogger(__name__)
# ログレベル[INFO]を設定
logger.setLevel(logging.INFO)

############################
## Lambdaメイン処理
############################
def lambda_handler(event, context):
    try:
        logger.info("process start")
        #----------------------------------
        #【セクション2】base64デコード/gzipの展開
        #----------------------------------
        decoded_data = base64.b64decode(event['awslogs']['data'])
        data = zlib.decompress(decoded_data, 16+zlib.MAX_WBITS)
        logger.info("decode: " + str(data))

        data_json = json.loads(data)
        logger.info("json: " + str(data_json))

        #----------------------------------
        #【セクション3】複数レコード(logEvents)の数だけ繰り返し
        #----------------------------------
        for logEvent in data_json["logEvents"]:
            # Unicode文字列は\uXXXX形式でエンコードされる(日本語入力が文字化けする)から、「ensure_ascii=False 」を引数に設定して非アスキー文字をそのまま保持するようにする。
            logEvent_json = json.loads(json.dumps(logEvent, ensure_ascii=False))
            data_json.update({'logEvents': logEvent_json})

            #----------------------------------
            #【セクション4】SQSへの書き込み
            #----------------------------------
            response = boto3.client('sqs').send_message(
                QueueUrl = os.environ['QUEUE_URL'],
                MessageBody = json.dumps(data_json)
            )
        logger.info(response)
        return response
    except Exception as e:
        logger.error(e)
        traceback.print_exc()
        raise e

内容メモ
import boto3:Lambda 関数の中で AWS リソースにアクセスするために使用。このコードでは SQS との連携に使用
import logging:ログイベントを処理するために使用。このコードではランタイム情報(INFO)、デバッグ(DEBUG)情報、およびエラー情報を記録
import os:OS とのインタラクション(対話プロセス)を提供します。このコードでは OS 情報の取得を目的とし、環境変数(ACCOUNT、REGION、QUEUE_URL)を取得するために使用
import traceback:例外が発生した場合、スタックトレース情報を表示する機能を提供。このコードでは、エラーハンドリング時に、スタックトレースを記録しているのに使用しています。
logger = logging.getLogger(__name__): モジュールごと(pythonファイルごと)にログ出力するloggerインスタンス作成
zlib.decompress:16+zlib.MAX_WBITS」は16+zlib.MAX_WBITSとは、zlib.decompress()関数に対して、処理対象のデータがgzip形式であることを伝える値
for文の意図:Logs-Lmabda連携はある程度のログのまとまりが連携されるため複数レコード(logEvents)を1レコード(logEvent)に置き換えている

Lambda 用の SQS へのアクセス権限を作成

Lambda との信頼関係を持つ IAM ロールと SQS ヘのアクセス権限を持つポリシーを作成
※作成手順については割愛

Lambda を作成

AWSCLI を利用して Lambda の作成を行う。
事前に CloudShell 上の以下コマンド実行ディレクトリに、
「AWS Lambda 作成」で作成した zip ファイルを格納しておくこと

aws lambda create-function --function-name <作成したいLambda関数名> \
--runtime python3.12 \
--role <Lambda 用IAMロールArn> \
--handler <作成したいLambda関数名>.lambda_handler \
--zip-file fileb://function.zip \
--timeout 300 \
--memory-size 128 \
--environment "Variables={QUEUE_URL=<送付先SQSのURL>, REGION=ap-northeast-1}" \
--description "lambda-sqs"

CloudwatchLogs のサブスクリプションフィルタ設定

ロググループとログストリームの作成

  1. コンソールから「Cloudwatch」を選択し、「ロググループ」を選択する。

  2. 「ロググループを作成」を選択する。

  3. 以下設定を入力し、作成を選択する。

    • ロググループ名:/aws/testLogGroup
    • 保持期間の設定:デフォルト
    • ログクラス:デフォルト
    • KMS キー:デフォルト
    • タグ:お好みで
  4. 「/aws/testLogGroup」を選択する。

  5. 「ログストリームの作成」を選択する。

  6. 「ログストリーム名:testLogStream」と入力し「Create」を選択

サブスクリプションフィルタの設定

  1. 「/aws/testLogGroup」を選択し、Lmabda サブスクリプションフィルターを作成を選択する。

  2. 以下値を入力し、ストリーミング開始を押下する。

    • Lambda 関数:前段で構築した Lambda 関数名
    • 保持期間の設定:デフォルト
    • ログの形式:デフォルト
    • サブスクリプションフィルターのパターン:[message=Test]
    • サブスクリプションフィルター名:testLogGroup

動作確認

CloudwatchLogs に試験的にログを格納してみる。

  1. 「CloudShell」画面にて以下のコマンドを実行し、CloudwatchLog を格納する。
aws logs put-log-events --log-group-name "/aws/testLogGroup" --log-stream-name "testLogStream" --log-events timestamp=(タイムスタンプ),message="Test Message"

timestamp:CloudwatchLogs は仕様上、ログ時点より 14 日以上経過したタイムスタンプを持つログの格納を拒否する。
そのため、UNIX time, GMT, ローカル時刻 を相互に変換より現在の unix 時間を確認し入力する。
※msec までの単位が必要なため、13 桁を入力する

なお、ログ分割処理を検証するためには以下のようなjsonファイルを作成し、
ログを 1 つのファイルに集約し CloudwatchLog への格納コマンドを実行する。

eventtest.json
[
  {
    "timestamp": 1705896639533,
    "message": "Test MessageA"
  },
  {
    "timestamp": 1705896639533,
    "message": "Test MessageB"
  },
  {
    "timestamp": 1705896639533,
    "message": "Test MessageC"
  }
]
aws logs put-log-events --log-group-name "/aws/testLogGroup" --log-stream-name "testLogStream" --log-events file://testevent.json

2. LambdaのログでLogsから転送処理されているメッセージを確認
2024-01-23_09h14_51.png

3. SQS キューにメッセージ送付されていることを確認
コンソールより本作業内で作成したSQSを確認。
image.png
2024-01-23_09h17_30.png

その他疑問点の確認

except Exception as e:について

except Exception as e は try 句で 発生した 例外オブジェクト は、
except Exception as e: の e に格納される。
例外オブジェクトとは、以下例における情報「ZeroDivisionError: division by zero」部分になる。
例外オブジェクトサンプル

errorSample
>1/0
# Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# ZeroDivisionError: division by zero

つまり print(e)を実行すると以下出力結果を受領できる。

try:
    1/0
except Exception as e:
    print(e) # division by zero

ログ調査について

本リソースでは「logging」「traceback」パッケージにてログの調査を行えるようにした。
本パッケージを使用している理由としては以下の通りである。

logging

正常動作時でも処理の箇所がわかるように設定。
また、正常時にはログレベルを低く(logger.info)とすることで、
ログ確認時や監視ツールに検出されないように設定する。

異常時にはログレベルを低く(logger.error)設定することで、
ログ確認時の強調や監視ツールに検出するように設定する。

traceback

traceback モジュールの print_exc()関数を使用して、例外に関する詳細な情報をスタックトレースとして標準エラー出力に表示します。これにより、例外が発生した場所や例外の原因がわかりやすくなります。

上記例外オブジェクトの Traceback ... の部分を取得することができます。
基本的に戻り値は strのため、トレースの結果に応じて処理を分岐させるためには細かな文字列操作が必要となります。

なお、以下のように設定することで、
traceback 以下の内容が全てターミナルに出力することができます。

try:
    1/0
except Exception as e:
    traceback.print_exc()

# Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# ZeroDivisionError: division by zero

つまり、関数の場合は「logger.error(e)」として簡潔なログメッセージを出力させた後、
「traceback.print_exc()」で例外オブジェクトの内容全文を出力させています。

終わりに

「CloudWatchLogs→Lambda→SQS連携」に悩んでいる誰かの助けになれば幸いです。

参考元

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0