LoginSignup
2

More than 1 year has passed since last update.

posted at

updated at

Organization

LambdaをSQSのコンシューマーにして、処理はECSのタスクで実行する

はじめに

Lambdaを使うと簡単にSQSのコンシューマーが構築できます。
しかし、Lambdaは最大15分しか実行できないため、時間のかかる可能性のある処理には向かない上、RDSとの相性が悪かったり等、色々制限があります。
そこで、ECSのタスク経由で処理させるようにします。

ここではコードはpython、フレームワークはDjangoで記載しています

構成図

例としてユーザがCSVファイルをアップロードし、非同期でCSVの内容を登録する処理とすると、こんな感じです。
Untitled Diagram.png

ここでは基本的にこの図の4,5,6,9についての話になります。

前準備

キューの作成

  • タイプはやりたいこと次第でスタンダートキューかFIFOキューか選びましょう。(同時実行して欲しく無い場合はFIFOキューにするなど)
  • デッドレターキューはデフォルト無効になってますが、正しく処理できなかった場合に通知したりできるので、設定しておいたほうが無難だと思います。
  • キューを作成したら、詳細にあるURLをメモっときましょう。

LambdaのトリガーにSQSを設定

  • 「トリガーを追加」から、上記で作成したキュー名を選択するだけです。

流れ

4. キューテーブルのIDをつけてメッセージ送信

こんな感じのSQS用のクラスを作りました。

sqs.py
import boto3
import json
from django.conf import settings


class Sqs:

    def __init__(self, url):
        self.sqs = boto3.client('sqs',
                                region_name='ap-northeast-1',
                                aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
                                aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY)
        self.url = url

    def send_message(self, message, message_group_id=None):
        """
        キューにメッセージを送信する
        :param message:
        :param message_group_id:
        :return:
        """
        try:
            data = {
                'MessageBody': json.dumps(message), 'QueueUrl': self.url,
            }
            if message_group_id:
                data['MessageGroupId'] = message_group_id
            response = self.sqs.send_message(**data)

        except Exception as e:
            print(e)
            return None

        return response

    def change_message_visibility(self, receipt_handle, timeout):
        """
        可視性タイムアウトを延長する(秒で指定)
        :param receipt_handle:
        :param timeout:
        :return:
        """
        self.sqs.change_message_visibility(
            QueueUrl=self.url,
            ReceiptHandle=receipt_handle,
            VisibilityTimeout=timeout
        )

    def delete_message(self, receipt_handle):
        """
        メッセージを削除する
        :param receipt_handle:
        :return:
        """
        self.sqs.delete_message(
            QueueUrl=self.url,
            ReceiptHandle=receipt_handle
        )

メッセージ送信はこんな感じ

sqs = Sqs('キュー作成時にメモったURL')
res = sqs.send_message({'que_id': que_id})

5. メッセージ取得 / 6. ECSのタスク実行(キューテーブルのIDを渡す)

Lambdaに以下のようなコードをデプロイします。

import json
import boto3


ecs = boto3.client('ecs')

def lambda_handler(event, context):

    records = event['Records']
    for record in records:
        body = json.loads(record['body'])
        que_id = body['que_id']
        receipt_handle = record['receiptHandle']
        response = run_task(que_id, receipt_handle)

    # lambdaの実行が成功するとメッセージを削除してしまうので、あえて例外エラーにする
    raise Exception('完了')

def run_task(que_id, receipt_handle):
    response = ecs.run_task(
        cluster='[クラスター名]',
        taskDefinition='[タスク名:リビジョン]',
        launchType='EC2',
        overrides={
            'containerOverrides': [
                {
                    'name': 'app',
                    'command': ["python","/app/manage.py","[実行コマンド]",str(que_id),'-r',receipt_handle],
                }
            ]
        },
    )
    return response

boto3のECSサービスのrun_taskでECSのタスクが実行できます。
実行パラメータは自分の環境に合わせてください。
EC2だけでなくFARGATEも実行できます。

Exceptionについて

run_taskは非同期でタスクを実行します。(完了を待たない)
また、LambdaをSQSのコンシューマーにした場合、Lambdaの実行が完了すると自動的にメッセージを削除します
そのため、ECSのタスクが完了してないのにメッセージが削除されてしまい、FIFOキューなのに多重実行されるといったことが発生してしまいます。

# lambdaの実行が成功するとメッセージを削除してしまうので、あえて例外エラーにする
raise Exception('完了')

なので、ここであえて例外エラーにすることでメッセージを削除されないようにしています。(もっといいやり方があれば知りたい・・・)

receipt_handleについて

receipt_handleはメッセージごとのIDみたいなものです。
メッセージを削除するのに使うので、必ずECSのタスクに渡しましょう。

9. CSVファイルの内容を登録する

class Command(BaseCommand):

    # 可視性タイムアウト(3時間)
    TIMEOUT = 3 * 60 * 60

    def add_arguments(self, parser):
        parser.add_argument('que_id', nargs='+', type=str)
        parser.add_argument('-r', '--receipt-handle', nargs='*', dest='receipt_handle')

    def handle(self, *args, **options):
        que_id = options['que_id'][0]
        receipt_handle = None
        if 'receipt_handle' in options and options['receipt_handle']:
            receipt_handle = options['receipt_handle'][0]

        sqs = Sqs('キュー作成時にメモったURL')
        # キューテーブルからキューを取得
        que = QueTable.objects.filter(id=que_id).first()
        if not que:
            logger.error(f"レコードが存在しませんでした que_id: {que_id}")
            if receipt_handle:
                sqs.delete_message(receipt_handle)
        elif que.status == "RUNNING":
            # 可視性タイムアウト以上に時間がかかってる場合
            print(f"実行中のためスキップします que_id: {que_id}")
        elif que.status == "SUCCESS" or que.status == "FAILURE":
            # 完了してるのにメッセージが削除されていない場合
            print(f"実行済みのためスキップします que_id: {que_id}")
            if receipt_handle:
                sqs.delete_message(receipt_handle)
        else:
            try:
                if receipt_handle:
                    sqs.change_message_visibility(receipt_handle, self.TIMEOUT)
                # 処理実行(S3からCSVファイルをダウンロードして内容を登録する)
                que.run()
                if receipt_handle:
                    sqs.delete_message(receipt_handle)
            except Exception as e:
                print(e)

処理を実行開始したらキューテーブルのレコードのstatusカラムをSUCCESS、完了したらSUCCESS、もしエラーなどがあった場合はFAILUREを想定してます。
二重で実行してしまわないように処理実行前にステータスを確認するようにしています。

可視性タイムアウトについて

可視性タイムアウトはメッセージを取得してから、メッセージが削除されなかった場合、この時間経過したら再度メッセージが取得できるまでの時間です。
この場合、このタイミングで可視性タイムアウトを変更してますが、AWSのコンソールであらかじめ、想定される実行時間よりも余裕のある時間に変更しておいてもいいと思います。

メッセージの削除について

必ずメッセージは削除しましょう。削除処理をさすれていると再度実行されてしまいます。

おわりに

SQSコンシューマーを作ったり管理するのは結構面倒だったりしますが、Lambdaを使えば簡単にできて便利です。
また、実際の処理をECSで実行できるようにすることで疎結合になり、もしSQS意外のキューイングシステムを使いたくなっても移行は比較的楽になりますし、SQSやLambdaに何か問題があって実行できなくても、最悪手動実行もできるといメリットがあります。

以上です、ありがとうございました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
2