#概要
- 「リアルタイムに数字を反映してほしい」という気が短い顧客の要望に応えるためAWSで集計処理を行うLambda関数を短い間隔で定期的に実行する方法を試してみた。結果を共有する。
課題
- 収集したデータを集計してリアルタイムに反映するような処理を、非常に短い間隔(たとえば 5秒おき)に実行したい。
- 頻繁に実行すれば目的達成で、実行間隔の時間精度はそれほど高くなくてもよい。
既存の方法
- Cloudwatch のルールで定期実行する。これが間違いなく便利。ただし、この方法は現時点では1分以下を指定できないので、残念ながら今回の要件には適さない。
- ec2インスタンス上でsleepをはさんで無限にループするシェルスクリプトのようなもので処理をキックする。ただし、すでにec2インスタンスがあれば利用できるが、これだけのためにインスタンスをたてるのはもったいない。また管理の手間が面倒だ。
- もっと楽で安い方法はないだろうか?
試した方法
- 処理を実行後、SQSのメッセージ遅延機能を使い、次回処理をキックする。
- この方法のメリットは、高頻度で繰り返してもコストが安い点。計算上は、ec2やStep Functionよりも安い。(頻度や処理によっては、ほかの方法がベターな場合もあるかもしれない。PFコストは時期によっても変化する)
構築手順
- 今回は5秒毎にLamda関数を実行する例である。
IAM Role
- Lambda関数を実行するためのIAM Roleを以下で作成する
- 管理ポリシーとして、AWSLambdaBasicExecutionRole をアタッチ
- カスタムポリシーとして以下の権限を付与する。もちろん実行する処理に応じて追加の権限を付与。
IAM_policy
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "VisualEditor0",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:*:*:function:*"
]
},
{
"Sid": "VisualEditor1",
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueAttributes",
],
"Resource": "*"
}
]
}
SQS キュー作成
- queue_waiting というキューを作成する。
- スタンダードキューを指定
- ロングポーリングを有効にする。メッセージ受信待機時間を5秒に設定。
Lambda関数本体
- ランタイムに Python3.7 を指定して新規関数を作成
- ロールに上記で作成したロールを指定する
lambda_function.py
import json
import boto3
import datetime
# SQSのセットアップ
queue = boto3.client('sqs')
# キューのURLを指定
queue_url ='https://sqs.us-west-2.amazonaws.com/************/queue_waiting'
def process(records):
count_message = 0
# 受信メッセージを削除する
for item in records:
try:
receiptHandle = item.get('receiptHandle','dummy')
body = item.get('body','')
queue.delete_message(QueueUrl=queue_url,ReceiptHandle=receiptHandle)
count_message += 1
except:
print('catch excception while cleaning message')
print(datetime.datetime.now())
#---------------------
# ここに処理を記述する
#---------------------
if count_message > 0 :
response = queue.send_message(
QueueUrl=queue_url,
DelaySeconds=5,
MessageBody=json.dumps({"exec_after_wait":"1"})
)
return {"status":"OK"}
def lambda_handler(event, context):
if 'Records' in event.keys() :
return(process(event['Records']))
return {"Unknown Inputs"}
- 関数ができたら、トリガーにSQSを割り当てる
- 関数の定期実行をスタートするには、マネジメントコンソールなどを用いて SQS にてキュー queue_waiting に1つメッセージを送ればよい。
- 関数を停止するには、Lambda関数でSQSトリガを停止し、SQSメッセージを削除すればよい。
まとめ
実行結果の確認
- 以下はCloudWatchのログから抜粋
START RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d Version: $LATEST
2019-08-14 13:44:45.874013
END RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d
REPORT RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d Duration: 115.69 ms Billed Duration: 200 ms Memory Size: 128 MB Max Memory Used: 72 MB
START RequestId: 49401749-e181-5455-800e-4287f3097775 Version: $LATEST
2019-08-14 13:44:50.954724
END RequestId: 49401749-e181-5455-800e-4287f3097775
REPORT RequestId: 49401749-e181-5455-800e-4287f3097775 Duration: 91.06 ms Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 72 MB
START RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950 Version: $LATEST
2019-08-14 13:44:56.034734
END RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950
REPORT RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950 Duration: 129.14 ms Billed Duration: 200 ms Memory Size: 128 MB Max Memory Used: 72 MB
- この程度のLamda関数であれば メモリ128MB,実行時間 100ms-200msで軽量である。
- 実行間隔は 5秒+アルファである。
- 正確に5秒毎でなくてもよい要件であれば、この方法で問題ないと考える
今後の課題
- なんらかの原因でSQSメッセージが破損して停止した場合、再度キューにメッセージを送るような復旧方法があるとよい。Cloudwatchのアラームで一定時間Lamda関数が起動しないことを検知 -> SNS → Lambda でできそうである。