Help us understand the problem. What is going on with this article?

AWSで短い間隔で処理を繰り返す

More than 1 year has passed since last update.

概要

  • 「リアルタイムに数字を反映してほしい」という気が短い顧客の要望に応えるためAWSで集計処理を行うLambda関数を短い間隔で定期的に実行する方法を試してみた。結果を共有する。

課題

  • 収集したデータを集計してリアルタイムに反映するような処理を、非常に短い間隔(たとえば 5秒おき)に実行したい。
  • 頻繁に実行すれば目的達成で、実行間隔の時間精度はそれほど高くなくてもよい。

既存の方法

  • Cloudwatch のルールで定期実行する。これが間違いなく便利。ただし、この方法は現時点では1分以下を指定できないので、残念ながら今回の要件には適さない。
  • ec2インスタンス上でsleepをはさんで無限にループするシェルスクリプトのようなもので処理をキックする。ただし、すでにec2インスタンスがあれば利用できるが、これだけのためにインスタンスをたてるのはもったいない。また管理の手間が面倒だ。
  • もっと楽で安い方法はないだろうか?

試した方法

  • 処理を実行後、SQSのメッセージ遅延機能を使い、次回処理をキックする。
  • この方法のメリットは、高頻度で繰り返してもコストが安い点。計算上は、ec2やStep Functionよりも安い。(頻度や処理によっては、ほかの方法がベターな場合もあるかもしれない。PFコストは時期によっても変化する)

構築手順

  • 今回は5秒毎にLamda関数を実行する例である。

IAM Role

  • Lambda関数を実行するためのIAM Roleを以下で作成する
  • 管理ポリシーとして、AWSLambdaBasicExecutionRole をアタッチ
  • カスタムポリシーとして以下の権限を付与する。もちろん実行する処理に応じて追加の権限を付与。
IAM_policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "lambda:InvokeFunction"
            ],
            "Resource": [
                "arn:aws:lambda:*:*:function:*"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueAttributes",
            ],
            "Resource": "*"
        }
    ]
}

SQS キュー作成

  • queue_waiting というキューを作成する。
  • スタンダードキューを指定
  • ロングポーリングを有効にする。メッセージ受信待機時間を5秒に設定。

Lambda関数本体

  • ランタイムに Python3.7 を指定して新規関数を作成
  • ロールに上記で作成したロールを指定する
lambda_function.py
import json
import boto3
import datetime

# SQSのセットアップ
queue = boto3.client('sqs')
# キューのURLを指定
queue_url ='https://sqs.us-west-2.amazonaws.com/************/queue_waiting'

def process(records):
    count_message = 0
    # 受信メッセージを削除する
    for item in records:
        try:
            receiptHandle = item.get('receiptHandle','dummy')
            body = item.get('body','')
            queue.delete_message(QueueUrl=queue_url,ReceiptHandle=receiptHandle)
            count_message += 1
        except:
            print('catch excception while cleaning message')
    print(datetime.datetime.now())
    #---------------------
    # ここに処理を記述する
    #---------------------
    if count_message > 0 :
        response = queue.send_message(
            QueueUrl=queue_url,
            DelaySeconds=5,
            MessageBody=json.dumps({"exec_after_wait":"1"})
        )
    return {"status":"OK"}

def lambda_handler(event, context):
    if 'Records' in event.keys() :
        return(process(event['Records']))
    return {"Unknown Inputs"}

  • 関数ができたら、トリガーにSQSを割り当てる
  • 関数の定期実行をスタートするには、マネジメントコンソールなどを用いて SQS にてキュー queue_waiting に1つメッセージを送ればよい。
  • 関数を停止するには、Lambda関数でSQSトリガを停止し、SQSメッセージを削除すればよい。

まとめ

実行結果の確認

  • 以下はCloudWatchのログから抜粋
START RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d Version: $LATEST
2019-08-14 13:44:45.874013
END RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d
REPORT RequestId: 9644aee2-fcfe-5258-8c17-fb2cee76bc8d  Duration: 115.69 ms Billed Duration: 200 ms Memory Size: 128 MB Max Memory Used: 72 MB  
START RequestId: 49401749-e181-5455-800e-4287f3097775 Version: $LATEST
2019-08-14 13:44:50.954724
END RequestId: 49401749-e181-5455-800e-4287f3097775
REPORT RequestId: 49401749-e181-5455-800e-4287f3097775  Duration: 91.06 ms  Billed Duration: 100 ms Memory Size: 128 MB Max Memory Used: 72 MB  
START RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950 Version: $LATEST
2019-08-14 13:44:56.034734
END RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950
REPORT RequestId: 0dbba3cf-6677-5fcb-8adc-f60174faa950  Duration: 129.14 ms Billed Duration: 200 ms Memory Size: 128 MB Max Memory Used: 72 MB  
  • この程度のLamda関数であれば メモリ128MB,実行時間 100ms-200msで軽量である。
  • 実行間隔は 5秒+アルファである。
  • 正確に5秒毎でなくてもよい要件であれば、この方法で問題ないと考える

今後の課題

  • なんらかの原因でSQSメッセージが破損して停止した場合、再度キューにメッセージを送るような復旧方法があるとよい。Cloudwatchのアラームで一定時間Lamda関数が起動しないことを検知 -> SNS → Lambda でできそうである。

参考文献

akimai
GYAOで動画のシステム開発を担当しています
gyao
GYAOは、ヤフーグループのエンターテインメントカンパニーとして、GYAO!サービスを運営しています。
http://www.gyao.co.jp/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした