Posted at

SQSに蓄積したDeep SecurityのSNSトピックをLambda(python)でS3に保存する

More than 1 year has passed since last update.

こんにちは、ひろかずです。

前回では、Deep Security as a Service(以下、DSaaS)で発生するイベントを全てSNSトピックでSQSへキューイングするところまで行いました。

今回は、主な需要のうち、DSaaSイベントの長期保存について一筆書きます。

Juyo_01.png


今回実装すること


  • Lambdaを5分毎に定期実行する。

  • SQSに蓄積したSNSトピックからDSaaSイベント情報を抜き出し、S3へ保存する

  • 処理したキューは削除する。


ざっくり構成(今回の分)

Diagram_02.png


参考情報

今回は、こちらが非常に役に立ちました。

Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする


工程


  1. 格納先となるs3バケットを作成する。

  2. Lambda Functionを作成する。

  3. IAM権限を設定する。

  4. 実行結果


1. 格納先となるs3バケットを作成する。

バケット名をdsaas-snsと設定しました。

リージョンは、SQSの場所と合せました。

S3_01.png

できましたね。

S3_02.png


2. Lambda Functionを作成する。

dsaas-sns-sqs-s3という名前のLambda Functionを作成しました。

こんな感じで設定しています。

Lambda_01.png

定期実行設定は、トリガー画面でCloudWatch Eventsで設定します。

Lambda_02.png

コードはこのように作成しました。

s3に保存されるファイル名は、DSaaSイベントのタイムスタンプを設定するようにしています。

import json

import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'dsaas-sns'
maxNumberOfMessages = 10
AWS_S3_BUCKET_NAME = 'dsaas-sns'

def lambda_handler(event, context):

try:
logger.info(event)

queue = boto3.resource('sqs').get_queue_by_name(
QueueName = queueName
)

messages = queue.receive_messages(
MaxNumberOfMessages = maxNumberOfMessages
)

entries = []
items = []
for message in messages:
entries.append({
"Id": message.message_id,
"ReceiptHandle": message.receipt_handle
})
items.append({
"UnsubscribeURL": json.loads(message.body)['UnsubscribeURL'],
"SigningCertURL": json.loads(message.body)['SigningCertURL'],
"Signature": json.loads(message.body)['Signature'],
"SignatureVersion": json.loads(message.body)['SignatureVersion'],
"Timestamp": json.loads(message.body)['Timestamp'],
"Message": json.loads(message.body)['Message'],
"TopicArn": json.loads(message.body)['TopicArn'],
"MessageId": json.loads(message.body)['MessageId'],
"Type": json.loads(message.body)['Type']
})
s3 = boto3.resource('s3')
bucket = s3.Bucket(AWS_S3_BUCKET_NAME)
for item in items:
PUT_OBJECT_KEY_NAME = item['Timestamp']
obj = bucket.Object(PUT_OBJECT_KEY_NAME)
body = item['Message']

response = obj.put(
Body=body.encode('utf-8'),
ContentEncoding='utf-8',
ContentType='text/plane'
)

response = {}
if len(entries) != 0:
response = queue.delete_messages(
Entries = entries
)

logger.info(response)
return response

except Exception as e:
logger.error(e)
raise e


3. IAM権限を追加する。

sns-sqs-s3.lambda_roleにSQSとs3の権限を追加します。

IAM_11.png


4. 実行結果

SQSのメトリクスでキューが順調に処理されているのがわかりますね。

SQS_21.png

s3にもイベントがオブジェクトとして保存されています。

S3_11.png

オブジェクトの内容もDSaaSのイベントだけになっていますね。

S3_12.png

既存のイベントをSNSトピックとして飛ばすことはできませんが、この実装によってDSaaS上で新たに発生したイベントは全てs3に保存できるようになりました。

今日はここまでです。

お疲れ様でした。