AWS SQS
Python で記載
import boto3
import json
import os
# クライアントを作成
sqs_client = boto3.client('sqs')
s3_client = boto3.client('s3')
def process_sqs_queue_to_s3(queue_url, bucket_name):
"""指定されたSQSキューの全メッセージをS3に保存する関数"""
processed_count = 0
while True:
response = sqs_client.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=10,
WaitTimeSeconds=10
)
if 'Messages' not in response:
break
for message in response['Messages']:
message_body = message['Body']
s3_key = f"messages/{message['MessageId']}.json"
s3_client.put_object(
Bucket=bucket_name,
Key=s3_key,
Body=json.dumps({"message_body": message_body})
)
sqs_client.delete_message(
QueueUrl=queue_url,
ReceiptHandle=message['ReceiptHandle']
)
processed_count += 1
return processed_count
def lambda_handler(event, context):
# キューリストとS3バケット名を環境変数から取得
queue_urls = os.environ['SQS_QUEUE_URLS'].split(',')
s3_bucket_name = os.environ['S3_BUCKET_NAME']
total_processed = 0
for queue_url in queue_urls:
processed_count = process_sqs_queue_to_s3(queue_url.strip(), s3_bucket_name)
total_processed += processed_count
print(f"Queue {queue_url} processed {processed_count} messages.")
return {'statusCode': 200, 'body': f"Total {total_processed} messages processed from all queues."}
環境変数 SQS_QUEUE_URLS にすべてのキューURLをカンマ区切りで入力(例: https://sqs.region.amazonaws.com/123456789012/Queue1,https://sqs.region.amazonaws.com/123456789012/Queue2