SQS拡張クライアントライブラリ
SQSのペイロード256KB制限を超えたいときに利用できる拡張ライブラリ、最近Pythonでもライブラリが公開された。
S3を経由してメッセージを伝えているが、それらを隠蔽しているだけなので自作したほうがいいかもしれない
公式のチュートリアル
import boto3
import sqs_extended_client
sqs_extended_client = boto3.client("sqs", region_name="us-east-1")
sqs_extended_client.large_payload_support = "S3_BUCKET_NAME"
sqs_extended_client.use_legacy_attribute = False
これでもいいが、分かりにくいのとLinterに引っかかるためboto3.Sessionクラスを継承したSQSExtendedClientSessionを使うほうが好き。
from sqs_extended_client.client import SQSExtendedClientSession
sqs_extended_client = SQSExtendedClientSession().client("sqs")
sqs_extended_client.large_payload_support = "<BUCKET_NAME>"
sqs_extended_client.use_legacy_attribute = False
#送信
send_message_response = sqs_extended_client.send_message(
QueueUrl="<QUEUE_URL>",
MessageBody=large_message
)
assert send_message_response['ResponseMetadata']['HTTPStatusCode'] == 200
#受信
receive_message_response = sqs_extended_client.receive_message(
QueueUrl="<QUEUE_URL>",
MessageAttributeNames=['All']
)
Lambdaでポーリングしている場合は以下のようにしてメッセージを取得している
from aws_lambda_powertools.utilities.batch import (
BatchProcessor,
EventType,
process_partial_response,
)
from aws_lambda_powertools.utilities.data_classes.sqs_event import SQSRecord
from aws_lambda_powertools.utilities.typing import LambdaContext
from sqs_extended_client.client import SQSExtendedClientSession
processor = BatchProcessor(event_type=EventType.SQS)
sqs_extend_client = SQSExtendedClientSession().client("sqs", region_name="ap-northeast-1")
sqs_extend_client.large_payload_support = "<BUCKET_NAME>"
sqs_extend_client.use_legacy_attribute = False
sqs_extend_client.delete_payload_from_s3 = True
def record_handler(record: SQSRecord) -> str:
body = sqs_extend_client.retrieve_message_from_s3(record.body)
#
def lambda_handler(event, context: LambdaContext):
return process_partial_response(
event=event,
record_handler=record_handler,
processor=processor,
context=context,
)