はじめに
本記事は Lambda を使ったことが無い自分の備忘として、
Lambda 構築までの流れをメモした記事となります。
構成
「CloudwatchLogs」→「Lambda」→「SQS」のシンプルな構成とします。
■処理フロー
実装してみた
Amazon SQS 作成
今回の構成で SQS は Lambda からのメッセージをキューに保持するだけなので、
以下 CLI コマンドで簡易的に作成。
aws sqs create-queue --queue-name <SQS名>
# 例:aws sqs create-queue --queue-name testqueue
AWS Lambda 作成
今回は AWSCLI を利用して Lambda の作成を行う。
なお、AWSCLI 実行環境は CloudShell を利用する。
Lambda 用実行ファイルの作成
本ファイルは作成後に function.zip という形式で圧縮を行う。
Lambda が呼び出されるとコード内関数lambda_handlerが呼び出される。
引数としてevent
にはサブスクリプションで検出する CloudWatchLogs が格納される。
#----------------------------------
# 【セクション1】パッケージimportとLog出力初期設定
#----------------------------------
import boto3
import json
import logging
import os
import traceback
import base64
import zlib
# モジュールごと(pythonファイルごと)にログ出力するloggerインスタンス作成
logger = logging.getLogger(__name__)
# ログレベル[INFO]を設定
logger.setLevel(logging.INFO)
############################
## Lambdaメイン処理
############################
def lambda_handler(event, context):
try:
logger.info("process start")
#----------------------------------
#【セクション2】base64デコード/gzipの展開
#----------------------------------
decoded_data = base64.b64decode(event['awslogs']['data'])
data = zlib.decompress(decoded_data, 16+zlib.MAX_WBITS)
logger.info("decode: " + str(data))
data_json = json.loads(data)
logger.info("json: " + str(data_json))
#----------------------------------
#【セクション3】複数レコード(logEvents)の数だけ繰り返し
#----------------------------------
for logEvent in data_json["logEvents"]:
# Unicode文字列は\uXXXX形式でエンコードされる(日本語入力が文字化けする)から、「ensure_ascii=False 」を引数に設定して非アスキー文字をそのまま保持するようにする。
logEvent_json = json.loads(json.dumps(logEvent, ensure_ascii=False))
data_json.update({'logEvents': logEvent_json})
#----------------------------------
#【セクション4】SQSへの書き込み
#----------------------------------
response = boto3.client('sqs').send_message(
QueueUrl = os.environ['QUEUE_URL'],
MessageBody = json.dumps(data_json)
)
logger.info(response)
return response
except Exception as e:
logger.error(e)
traceback.print_exc()
raise e
内容メモ
import boto3
:Lambda 関数の中で AWS リソースにアクセスするために使用。このコードでは SQS との連携に使用
import logging
:ログイベントを処理するために使用。このコードではランタイム情報(INFO)、デバッグ(DEBUG)情報、およびエラー情報を記録
import os
:OS とのインタラクション(対話プロセス)を提供します。このコードでは OS 情報の取得を目的とし、環境変数(ACCOUNT、REGION、QUEUE_URL)を取得するために使用
import traceback
:例外が発生した場合、スタックトレース情報を表示する機能を提供。このコードでは、エラーハンドリング時に、スタックトレースを記録しているのに使用しています。
logger = logging.getLogger(__name__)
: モジュールごと(pythonファイルごと)にログ出力するloggerインスタンス作成
zlib.decompress
:16+zlib.MAX_WBITS」は16+zlib.MAX_WBITSとは、zlib.decompress()関数に対して、処理対象のデータがgzip形式であることを伝える値
for文の意図
:Logs-Lmabda連携はある程度のログのまとまりが連携されるため複数レコード(logEvents)を1レコード(logEvent)に置き換えている
Lambda 用の SQS へのアクセス権限を作成
Lambda との信頼関係を持つ IAM ロールと SQS ヘのアクセス権限を持つポリシーを作成
※作成手順については割愛
Lambda を作成
AWSCLI を利用して Lambda の作成を行う。
事前に CloudShell 上の以下コマンド実行ディレクトリに、
「AWS Lambda 作成」で作成した zip ファイルを格納しておくこと
aws lambda create-function --function-name <作成したいLambda関数名> \
--runtime python3.12 \
--role <Lambda 用IAMロールArn> \
--handler <作成したいLambda関数名>.lambda_handler \
--zip-file fileb://function.zip \
--timeout 300 \
--memory-size 128 \
--environment "Variables={QUEUE_URL=<送付先SQSのURL>, REGION=ap-northeast-1}" \
--description "lambda-sqs"
CloudwatchLogs のサブスクリプションフィルタ設定
ロググループとログストリームの作成
-
コンソールから「Cloudwatch」を選択し、「ロググループ」を選択する。
-
「ロググループを作成」を選択する。
-
以下設定を入力し、作成を選択する。
- ロググループ名:/aws/testLogGroup
- 保持期間の設定:デフォルト
- ログクラス:デフォルト
- KMS キー:デフォルト
- タグ:お好みで
-
「/aws/testLogGroup」を選択する。
-
「ログストリームの作成」を選択する。
-
「ログストリーム名:testLogStream」と入力し「Create」を選択
サブスクリプションフィルタの設定
-
「/aws/testLogGroup」を選択し、Lmabda サブスクリプションフィルターを作成を選択する。
-
以下値を入力し、ストリーミング開始を押下する。
- Lambda 関数:前段で構築した Lambda 関数名
- 保持期間の設定:デフォルト
- ログの形式:デフォルト
- サブスクリプションフィルターのパターン:[message=Test]
- サブスクリプションフィルター名:testLogGroup
動作確認
CloudwatchLogs に試験的にログを格納してみる。
- 「CloudShell」画面にて以下のコマンドを実行し、CloudwatchLog を格納する。
aws logs put-log-events --log-group-name "/aws/testLogGroup" --log-stream-name "testLogStream" --log-events timestamp=(タイムスタンプ),message="Test Message"
timestamp
:CloudwatchLogs は仕様上、ログ時点より 14 日以上経過したタイムスタンプを持つログの格納を拒否する。
そのため、UNIX time, GMT, ローカル時刻 を相互に変換より現在の unix 時間を確認し入力する。
※msec までの単位が必要なため、13 桁を入力する
なお、ログ分割処理を検証するためには以下のようなjsonファイルを作成し、
ログを 1 つのファイルに集約し CloudwatchLog への格納コマンドを実行する。
[
{
"timestamp": 1705896639533,
"message": "Test MessageA"
},
{
"timestamp": 1705896639533,
"message": "Test MessageB"
},
{
"timestamp": 1705896639533,
"message": "Test MessageC"
}
]
aws logs put-log-events --log-group-name "/aws/testLogGroup" --log-stream-name "testLogStream" --log-events file://testevent.json
2. LambdaのログでLogsから転送処理されているメッセージを確認
3. SQS キューにメッセージ送付されていることを確認
コンソールより本作業内で作成したSQSを確認。
その他疑問点の確認
except Exception as e:について
except Exception as e は try 句で 発生した 例外オブジェクト は、
except Exception as e: の e に格納される。
例外オブジェクトとは、以下例における情報「ZeroDivisionError: division by zero」部分になる。
例外オブジェクトサンプル
>1/0
# Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# ZeroDivisionError: division by zero
つまり print(e)を実行すると以下出力結果を受領できる。
try:
1/0
except Exception as e:
print(e) # division by zero
ログ調査について
本リソースでは「logging」「traceback」パッケージにてログの調査を行えるようにした。
本パッケージを使用している理由としては以下の通りである。
logging
正常動作時でも処理の箇所がわかるように設定。
また、正常時にはログレベルを低く(logger.info)とすることで、
ログ確認時や監視ツールに検出されないように設定する。
異常時にはログレベルを低く(logger.error)設定することで、
ログ確認時の強調や監視ツールに検出するように設定する。
traceback
traceback モジュールの print_exc()関数を使用して、例外に関する詳細な情報をスタックトレースとして標準エラー出力に表示します。これにより、例外が発生した場所や例外の原因がわかりやすくなります。
上記例外オブジェクトの Traceback ... の部分を取得することができます。
基本的に戻り値は strのため、トレースの結果に応じて処理を分岐させるためには細かな文字列操作が必要となります。
なお、以下のように設定することで、
traceback 以下の内容が全てターミナルに出力することができます。
try:
1/0
except Exception as e:
traceback.print_exc()
# Traceback (most recent call last):
# File "<stdin>", line 1, in <module>
# ZeroDivisionError: division by zero
つまり、関数の場合は「logger.error(e)」として簡潔なログメッセージを出力させた後、
「traceback.print_exc()」で例外オブジェクトの内容全文を出力させています。
終わりに
「CloudWatchLogs→Lambda→SQS連携」に悩んでいる誰かの助けになれば幸いです。
参考元