16
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

AWSで短い間隔で処理を繰り返す

Posted at

#概要

  • 「リアルタイムに数字を反映してほしい」という気が短い顧客の要望に応えるためAWSで集計処理を行うLambda関数を短い間隔で定期的に実行する方法を試してみた。結果を共有する。

課題

  • 収集したデータを集計してリアルタイムに反映するような処理を、非常に短い間隔(たとえば 5秒おき)に実行したい。
  • 頻繁に実行すれば目的達成で、実行間隔の時間精度はそれほど高くなくてもよい。

既存の方法

  • Cloudwatch のルールで定期実行する。これが間違いなく便利。ただし、この方法は現時点では1分以下を指定できないので、残念ながら今回の要件には適さない。
  • ec2インスタンス上でsleepをはさんで無限にループするシェルスクリプトのようなもので処理をキックする。ただし、すでにec2インスタンスがあれば利用できるが、これだけのためにインスタンスをたてるのはもったいない。また管理の手間が面倒だ。
  • もっと楽で安い方法はないだろうか?

試した方法

  • 処理を実行後、SQSのメッセージ遅延機能を使い、次回処理をキックする。
  • この方法のメリットは、高頻度で繰り返してもコストが安い点。計算上は、ec2やStep Functionよりも安い。(頻度や処理によっては、ほかの方法がベターな場合もあるかもしれない。PFコストは時期によっても変化する)

構築手順

  • 今回は5秒毎にLamda関数を実行する例である。

IAM Role

  • Lambda関数を実行するためのIAM Roleを以下で作成する
  • 管理ポリシーとして、AWSLambdaBasicExecutionRole をアタッチ
  • カスタムポリシーとして以下の権限を付与する。もちろん実行する処理に応じて追加の権限を付与。
IAM_policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:*"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
            ],
            "Resource": "*"
        }
    ]
}

SQS キュー作成

  • queue_waiting というキューを作成する。
  • スタンダードキューを指定
  • ロングポーリングを有効にする。メッセージ受信待機時間を5秒に設定。

Lambda関数本体

  • ランタイムに Python3.7 を指定して新規関数を作成
  • ロールに上記で作成したロールを指定する
lambda_function.py
import json
import boto3
import datetime

# SQSのセットアップ
queue = boto3.client('sqs')
# キューのURLを指定
queue_url ='https://sqs.us-west-2.amazonaws.com/************/queue_waiting'

def process(records):
    count_message = 0
    # 受信メッセージを削除する
    for item in records:
        try:
            receiptHandle = item.get('receiptHandle','dummy')
            body = item.get('body','')
            queue.delete_message(QueueUrl=queue_url,ReceiptHandle=receiptHandle)
            count_message += 1
        except:
            print('catch excception while cleaning message')
    print(datetime.datetime.now())
    #---------------------
    # ここに処理を記述する
    #---------------------
    if count_message > 0 :
        response = queue.send_message(
            QueueUrl=queue_url,
            DelaySeconds=5,
            MessageBody=json.dumps({"exec_after_wait":"1"})
        )
    return {"status":"OK"}

def lambda_handler(event, context):
    if 'Records' in event.keys() :
        return(process(event['Records']))
    return {"Unknown Inputs"}

  • 関数ができたら、トリガーにSQSを割り当てる
  • 関数の定期実行をスタートするには、マネジメントコンソールなどを用いて SQS にてキュー queue_waiting に1つメッセージを送ればよい。
  • 関数を停止するには、Lambda関数でSQSトリガを停止し、SQSメッセージを削除すればよい。

まとめ

実行結果の確認

  • 以下はCloudWatchのログから抜粋
START RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d Version: $LATEST
2019-08-14 13:44:45.874013
END RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d
REPORT RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d	Duration: 115.69 ms	Billed Duration: 200 ms Memory Size: 128 MB	Max Memory Used: 72 MB	
START RequestId: 49401749-e181-5455-800e-4287f3097775 Version: $LATEST
2019-08-14 13:44:50.954724
END RequestId: 49401749-e181-5455-800e-4287f3097775
REPORT RequestId: 49401749-e181-5455-800e-4287f3097775	Duration: 91.06 ms	Billed Duration: 100 ms Memory Size: 128 MB	Max Memory Used: 72 MB	
START RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950 Version: $LATEST
2019-08-14 13:44:56.034734
END RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950
REPORT RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950	Duration: 129.14 ms	Billed Duration: 200 ms Memory Size: 128 MB	Max Memory Used: 72 MB	
  • この程度のLamda関数であれば メモリ128MB,実行時間 100ms-200msで軽量である。
  • 実行間隔は 5秒+アルファである。
  • 正確に5秒毎でなくてもよい要件であれば、この方法で問題ないと考える

今後の課題

  • なんらかの原因でSQSメッセージが破損して停止した場合、再度キューにメッセージを送るような復旧方法があるとよい。Cloudwatchのアラームで一定時間Lamda関数が起動しないことを検知 -> SNS → Lambda でできそうである。

参考文献

16
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
14

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?