こんにちは、ひろかずです。
前回では、Deep Security as a Service(以下、DSaaS)で発生するイベントを全てSNSトピックでSQSへキューイングするところまで行いました。
今回は、主な需要のうち、DSaaSイベントの長期保存
について一筆書きます。
#今回実装すること
- Lambdaを5分毎に定期実行する。
- SQSに蓄積したSNSトピックからDSaaSイベント情報を抜き出し、S3へ保存する
- 処理したキューは削除する。
#参考情報
今回は、こちらが非常に役に立ちました。
Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする
#工程
- 格納先となるs3バケットを作成する。
- Lambda Functionを作成する。
- IAM権限を設定する。
- 実行結果
#1. 格納先となるs3バケットを作成する。
バケット名をdsaas-sns
と設定しました。
リージョンは、SQSの場所と合せました。
#2. Lambda Functionを作成する。
dsaas-sns-sqs-s3
という名前のLambda Functionを作成しました。
こんな感じで設定しています。
定期実行設定は、トリガー画面でCloudWatch Eventsで設定します。
コードはこのように作成しました。
s3に保存されるファイル名は、DSaaSイベントのタイムスタンプを設定するようにしています。
import json
import boto3
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
queueName = 'dsaas-sns'
maxNumberOfMessages = 10
AWS_S3_BUCKET_NAME = 'dsaas-sns'
def lambda_handler(event, context):
try:
logger.info(event)
queue = boto3.resource('sqs').get_queue_by_name(
QueueName = queueName
)
messages = queue.receive_messages(
MaxNumberOfMessages = maxNumberOfMessages
)
entries = []
items = []
for message in messages:
entries.append({
"Id": message.message_id,
"ReceiptHandle": message.receipt_handle
})
items.append({
"UnsubscribeURL": json.loads(message.body)['UnsubscribeURL'],
"SigningCertURL": json.loads(message.body)['SigningCertURL'],
"Signature": json.loads(message.body)['Signature'],
"SignatureVersion": json.loads(message.body)['SignatureVersion'],
"Timestamp": json.loads(message.body)['Timestamp'],
"Message": json.loads(message.body)['Message'],
"TopicArn": json.loads(message.body)['TopicArn'],
"MessageId": json.loads(message.body)['MessageId'],
"Type": json.loads(message.body)['Type']
})
s3 = boto3.resource('s3')
bucket = s3.Bucket(AWS_S3_BUCKET_NAME)
for item in items:
PUT_OBJECT_KEY_NAME = item['Timestamp']
obj = bucket.Object(PUT_OBJECT_KEY_NAME)
body = item['Message']
response = obj.put(
Body=body.encode('utf-8'),
ContentEncoding='utf-8',
ContentType='text/plane'
)
response = {}
if len(entries) != 0:
response = queue.delete_messages(
Entries = entries
)
logger.info(response)
return response
except Exception as e:
logger.error(e)
raise e
#3. IAM権限を追加する。
sns-sqs-s3.lambda_roleにSQSとs3の権限を追加します。
#4. 実行結果
SQSのメトリクスでキューが順調に処理されているのがわかりますね。
オブジェクトの内容もDSaaSのイベントだけになっていますね。
既存のイベントをSNSトピックとして飛ばすことはできませんが、
この実装によってDSaaS上で新たに発生したイベントは全てs3に保存できるようになりました。
今日はここまでです。
お疲れ様でした。