LoginSignup
0
0

More than 5 years have passed since last update.

SQSに蓄積したDeep SecurityのSNSトピックをLambda(python)でS3に保存する

Posted at

こんにちは、ひろかずです。
前回では、Deep Security as a Service(以下、DSaaS)で発生するイベントを全てSNSトピックでSQSへキューイングするところまで行いました。

今回は、主な需要のうち、DSaaSイベントの長期保存について一筆書きます。
Juyo_01.png

今回実装すること

  • Lambdaを5分毎に定期実行する。
  • SQSに蓄積したSNSトピックからDSaaSイベント情報を抜き出し、S3へ保存する
  • 処理したキューは削除する。

ざっくり構成(今回の分)

Diagram_02.png

参考情報

今回は、こちらが非常に役に立ちました。
Lambda(Python)でSQSのメッセージの内容をDynamoDBにPUTする

工程

  1. 格納先となるs3バケットを作成する。
  2. Lambda Functionを作成する。
  3. IAM権限を設定する。
  4. 実行結果

1. 格納先となるs3バケットを作成する。

バケット名をdsaas-snsと設定しました。
リージョンは、SQSの場所と合せました。
S3_01.png

できましたね。
S3_02.png

2. Lambda Functionを作成する。

dsaas-sns-sqs-s3という名前のLambda Functionを作成しました。
こんな感じで設定しています。
Lambda_01.png

定期実行設定は、トリガー画面でCloudWatch Eventsで設定します。
Lambda_02.png

コードはこのように作成しました。
s3に保存されるファイル名は、DSaaSイベントのタイムスタンプを設定するようにしています。

import json
import boto3
import logging

logger = logging.getLogger()
logger.setLevel(logging.INFO)

queueName = 'dsaas-sns'
maxNumberOfMessages = 10
AWS_S3_BUCKET_NAME = 'dsaas-sns'

def lambda_handler(event, context):

    try:
         logger.info(event)

         queue = boto3.resource('sqs').get_queue_by_name(
             QueueName = queueName
         )

         messages = queue.receive_messages(
             MaxNumberOfMessages = maxNumberOfMessages
         )

         entries = []
         items = []
         for message in messages:
              entries.append({
                   "Id": message.message_id,
                   "ReceiptHandle": message.receipt_handle
              })
              items.append({
                   "UnsubscribeURL": json.loads(message.body)['UnsubscribeURL'],
                   "SigningCertURL": json.loads(message.body)['SigningCertURL'],
                   "Signature": json.loads(message.body)['Signature'],
                   "SignatureVersion": json.loads(message.body)['SignatureVersion'],
                   "Timestamp": json.loads(message.body)['Timestamp'],
                   "Message": json.loads(message.body)['Message'],
                   "TopicArn": json.loads(message.body)['TopicArn'],
                   "MessageId": json.loads(message.body)['MessageId'],
                   "Type": json.loads(message.body)['Type']
              })
              s3 = boto3.resource('s3')
              bucket = s3.Bucket(AWS_S3_BUCKET_NAME)
              for item in items:
                   PUT_OBJECT_KEY_NAME = item['Timestamp']
                   obj = bucket.Object(PUT_OBJECT_KEY_NAME)
                   body = item['Message']

                   response = obj.put(
                       Body=body.encode('utf-8'),
                       ContentEncoding='utf-8',
                       ContentType='text/plane'
                   )

         response = {}
         if len(entries) != 0:
             response = queue.delete_messages(
                  Entries = entries
             )

         logger.info(response)
         return response

    except Exception as e:
         logger.error(e)
         raise e

3. IAM権限を追加する。

sns-sqs-s3.lambda_roleにSQSとs3の権限を追加します。
IAM_11.png

4. 実行結果

SQSのメトリクスでキューが順調に処理されているのがわかりますね。
SQS_21.png

s3にもイベントがオブジェクトとして保存されています。
S3_11.png

オブジェクトの内容もDSaaSのイベントだけになっていますね。
S3_12.png

既存のイベントをSNSトピックとして飛ばすことはできませんが、この実装によってDSaaS上で新たに発生したイベントは全てs3に保存できるようになりました。

今日はここまでです。
お疲れ様でした。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0