はじめに
追加されてから半年ほど経過してしまいましたが、LambdaのイベントソースにSQSが追加されたので試してみました。
workerサンプル
今までも、Lambdaをworkerとして定期実行させ、処理を行っていました。
ランタイムはpython2.7です。(古い。。)
handler_before.py
import boto3
import json
import re
# ## globals
sqs = boto3.resource('sqs')
queue = sqs.Queue(QUEUE_URL)
def handle_queue_msg(queue_msg):
upinfo = json.loads(queue_msg.body)
if 'url' not in upinfo:
return
if not re.search('^http', upinfo['url']):
return
upsert_url_mapping(upinfo['url'])
def get_messages():
return queue.receive_messages(
MaxNumberOfMessages=10,
WaitTimeSeconds = 0
)
def upsert_url_mapping(url):
〜DBにurlを登録する処理〜
# ## main handler
def handler(event, context):
msgs = get_messages()
while 0 < len(msgs):
for msg in msgs:
handle_queue_msg(msg)
msg.delete()
msgs = get_messages()
実装内容は、一般的なSQSポーリング処理です。
- 10件ずつメッセージを取得
- メイン処理
- メッセージ削除処理
- 0件になるまで1〜3の繰り返し
SQSイベントを使った処理
SQSイベントリソースを使用した処理に書き換えてみました。
handler_after.py
import boto3
import json
import re
# ## globals
sqs = boto3.resource('sqs')
def handle_queue_msg(queue_msg):
upinfo = json.loads(queue_msg['body'])
if 'url' not in upinfo:
return
if not re.search('^http', upinfo['url']):
return
upsert_url_mapping(upinfo['url'])
def upsert_url_mapping(url):
〜DBにurlを登録する処理〜
# ## main handler
def handler(event, context):
for record in event['Records']:
handle_queue_msg(record)
AWSがポーリングをしてくれるので、event引数からメッセージを取得するだけでOKです。
処理が正常に終了するとキューも削除してくれるので、削除処理を記述する必要もありません。
トリガーにSQSを設定
バッチ数を設定します。(最大件数10件)
同時実行数
RDSと連携している場合、大量のメッセージが登録されると自動で並列処理が行われるため、
RDSへの接続数があっという間に消費されるため、同時実行数の設定で同時実行数を定義しておくことをおすすめします。
おわりに
今携わっているプロダクトではバッチ処理はすべてlambdaでおこなっており、
SQSのポーリング処理の実装がまだあるので、時間があるときにSQSイベントに変更したいと思います。
(あとpythonの3系への対応も)