7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

シーエー・アドバンスAdvent Calendar 2018

Day 14

LambdaのイベントソースにSQSが追加されたので使ってみた

Last updated at Posted at 2018-12-13

はじめに

追加されてから半年ほど経過してしまいましたが、LambdaのイベントソースにSQSが追加されたので試してみました。

workerサンプル

今までも、Lambdaをworkerとして定期実行させ、処理を行っていました。
ランタイムはpython2.7です。(古い。。)

handler_before.py
import boto3
import json
import re

# ## globals
sqs = boto3.resource('sqs')
queue = sqs.Queue(QUEUE_URL)

def handle_queue_msg(queue_msg):
    upinfo = json.loads(queue_msg.body)
    if 'url' not in upinfo:
        return
    if not re.search('^http', upinfo['url']):
        return
    upsert_url_mapping(upinfo['url'])

def get_messages():
    return queue.receive_messages(
        MaxNumberOfMessages=10,
        WaitTimeSeconds = 0
    )

def upsert_url_mapping(url):
	DBにurlを登録する処理

# ## main handler
def handler(event, context):
    msgs = get_messages()
    while 0 < len(msgs):
        for msg in msgs:
            handle_queue_msg(msg)
            msg.delete()

        msgs = get_messages()

実装内容は、一般的なSQSポーリング処理です。

  1. 10件ずつメッセージを取得
  2. メイン処理
  3. メッセージ削除処理
  4. 0件になるまで1〜3の繰り返し

SQSイベントを使った処理

SQSイベントリソースを使用した処理に書き換えてみました。

handler_after.py
import boto3
import json
import re

# ## globals
sqs = boto3.resource('sqs')

def handle_queue_msg(queue_msg):
    upinfo = json.loads(queue_msg['body'])
    if 'url' not in upinfo:
        return
    if not re.search('^http', upinfo['url']):
        return
    upsert_url_mapping(upinfo['url'])

def upsert_url_mapping(url):
	DBにurlを登録する処理

# ## main handler
def handler(event, context):
    for record in event['Records']:
        handle_queue_msg(record)

AWSがポーリングをしてくれるので、event引数からメッセージを取得するだけでOKです。
処理が正常に終了するとキューも削除してくれるので、削除処理を記述する必要もありません。

トリガーにSQSを設定

ソースをデプロイ後、トリガー設定をします。
181213-0002.png

バッチ数を設定します。(最大件数10件)

181213-0003.png

同時実行数

RDSと連携している場合、大量のメッセージが登録されると自動で並列処理が行われるため、
RDSへの接続数があっという間に消費されるため、同時実行数の設定で同時実行数を定義しておくことをおすすめします。

181213-0001.png

おわりに

今携わっているプロダクトではバッチ処理はすべてlambdaでおこなっており、
SQSのポーリング処理の実装がまだあるので、時間があるときにSQSイベントに変更したいと思います。
(あとpythonの3系への対応も)

7
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?