LoginSignup
7
2

Lambda 関数を SQS から起動して結果を Broadway で受け取る

Last updated at Posted at 2023-10-23

はじめに

本記事では Elixir と SQS と Lambda を連携させます

Elixir でキューからメッセージを受信するためのモジュール Broadway と、 AWS のキューサービス Amazon SQS の基本的な動作・連携について、先に以下の記事を読んでください

本記事では更に Lambda を連携させ、以下のようなことをしたいとします

画像処理や AI 推論など、メモリを大量に使う処理や長い時間がかかる処理はメインプロセスとは別のところで実行した方が安全です

しかも処理対象が大量に存在し、並列実行しつつ、全体の進捗を管理するような場合では、キューを使うと制御がしやすくなります

そこで、 Elixir からキュー経由で Lambda に処理を依頼し、その結果は Lambda からキュー経由で Elixir に戻るようにします

これが実現できるか確認しました

例によって Livebook 上で検証しています

実装したノートブックはこちら

セットアップ

必要なモジュールをインストールします

Mix.install([
  {:aws, "~> 0.13"},
  {:ex_aws, "~> 2.5"},
  {:ex_aws_lambda, "~> 2.1"},
  {:ex_aws_s3, "~> 2.5"},
  {:ex_aws_sqs, "~> 3.4"},
  {:ex_aws_sts, "~> 2.3"},
  {:kino, "~> 0.11.0"},
  {:hackney, "~> 1.20"},
  {:jason, "~> 1.4"},
  {:sweet_xml, "~> 0.7.4"},
  {:broadway, "~> 1.0"},
  {:broadway_sqs, "~> 0.7.3"}
])

Elixir には AWS を操作するためのモジュールが AWS Elixir と ExAWS の2種類存在しますが、今回は両方を使っています
どちらも一長一短があるためです

Elixir でのキュー受信には Broadway を使用します

認証情報の設定

AWS のリソースを作成するため、認証情報を入力します

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)

スクリーンショット 2023-10-20 13.52.51.png

入力した認証情報を使って AWS Elixir のクライアントを用意します

client =
  AWS.Client.create(
    Kino.Input.read(access_key_id_input),
    Kino.Input.read(secret_access_key_input),
    Kino.Input.read(region_input)
  )

ExAWS と Broadway 用に認証情報をキーワードリストに入れておきます

auth_config = [
  access_key_id: Kino.Input.read(access_key_id_input),
  secret_access_key: Kino.Input.read(secret_access_key_input),
  region: Kino.Input.read(region_input)
]

Kino.nothing()

後で使うため、 STS から AWS のアカウント ID を取得しておきます

account_id =
  ExAws.STS.get_caller_identity()
  |> ExAws.request!(auth_config)
  |> then(& &1.body.account)

Lambda 関数の作成

Python ランタイムの Lambda 関数を AWS 上にデプロイします

大まかな手順は以下の通り

  • Lambda 関数を実行するための IAM ロール作成
  • Lambda 関数を Python ファイルに実装
  • Python ファイルを ZIP で圧縮
  • 圧縮した ZIP ファイルを指定して Lambda 関数を作成

IAM ロールの作成

Lambda 関数を実行するための IAM ロールを作成します

client
|> AWS.IAM.create_role(%{
  "RoleName" => "sample-lambda-role",
  "AssumeRolePolicyDocument" =>
    Jason.encode!(%{
      "Statement" => [
        %{
          "Sid" => "STS202201051440",
          "Effect" => "Allow",
          "Principal" => %{
            "Service" => ["lambda.amazonaws.com"]
          },
          "Action" => "sts:AssumeRole"
        }
      ]
    })
})

Lambda 関数用ロールにアタッチするためのポリシーを作成します

Cloud Watch にログを出力するための権限と、 SQS のメッセージ送受信用の権限を付与します

client
|> AWS.IAM.create_policy(%{
  "PolicyName" => "sample-lambda-role-policy",
  "PolicyDocument" =>
    Jason.encode!(%{
      "Version" => "2012-10-17",
      "Statement" => [
        %{
          "Effect" => "Allow",
          "Action" => [
            "cloudwatch:PutMetricData",
            "logs:CreateLogStream",
            "logs:PutLogEvents",
            "logs:CreateLogGroup",
            "logs:DescribeLogStreams",
            "sqs:DeleteMessage",
            "sqs:GetQueueAttributes",
            "sqs:ReceiveMessage",
            "sqs:SendMessage"
          ],
          "Resource" => ["*"]
        }
      ]
    })
})

ポリシーをロールにアタッチします

client
|> AWS.IAM.attach_role_policy(%{
  "RoleName" => "sample-lambda-role",
  "PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})

Lambda 関数のデプロイ

Lambda 関数を Python で実装し、ファイルに書き込みます

Python では以下の処理を実行しています

  • メッセージを受信したらレコードを取り出す
  • 2 秒待つ
  • レコードが存在すれば、レコードの内容をメッセージとして結果キューに送信する
File.write!("/tmp/sample.py", """
import boto3
import json
import time

from datetime import datetime

def handler(event, context):
    client = boto3.client("sqs")

    records = event.get("Records", [])

    print("records:", len(records))

    time.sleep(2)

    response = "no_records"
    if records:
        for record in records:
            request_body = json.loads(record["body"])
            response = request_body["message"]
            print("response:", response)
            callback_queue_url = request_body["callback_queue_url"]
            client.send_message(
                QueueUrl=callback_queue_url,
                MessageBody=response,
                MessageGroupId="sample",
                MessageDeduplicationId=str(datetime.now().timestamp())
            )

    return response
""")

書き出した Python ファイルを ZIP 形式で圧縮します

:zip.create("/tmp/sample_lambda.zip", ['sample.py'], cwd: "/tmp")

アップロード先の S3 バケットを入力します

bucket_name_input = Kino.Input.text("BUCKET_ANME")

ZIP を S3 にアップロードします

bucket_name = Kino.Input.read(bucket_name_input)

"/tmp/sample_lambda.zip"
|> ExAws.S3.Upload.stream_file()
|> ExAws.S3.upload(bucket_name, "sample_lambda/sample_lambda.zip")
|> ExAws.request!(auth_config)

アップロードした ZIP ファイルを元として Lambda 関数を作成します

%ExAws.Operation.RestQuery{
  service: :lambda,
  http_method: :post,
  path: "/2015-03-31/functions",
  body: %{
    "FunctionName" => "sample_lambda",
    "Handler" => "sample.handler",
    "Code" => %{
      "S3Bucket" => bucket_name,
      "S3Key" => "sample_lambda/sample_lambda.zip"
    },
    "Role" => "arn:aws:iam::#{account_id}:role/sample-lambda-role",
    "Runtime" => "python3.11"
  }
}
|> ExAws.request!(auth_config)

作成した Lambda 関数を呼び出します

"sample_lambda"
|> ExAws.Lambda.invoke(%{}, %{})
|> ExAws.request!(auth_config)

結果は "no_records" になります(SQSから起動していないため)

キューの作成

SQS で依頼キューを作成します

receive_message_wait_time_seconds で 1 秒に 1 回キューを見に行くようにします

sqs_res =
  "sample_queue.fifo"
  |> ExAws.SQS.create_queue(fifo_queue: true, receive_message_wait_time_seconds: 1)
  |> ExAws.request!(auth_config)

依頼キューが Lambda を起動するように設定します

ここで BatchSize = 1 回の Lambda 起動で受け取るメッセージの最大数 と MaximumConcurrency = Lambda 関数の最大並列実行数を指定しています

MaximumConcurrency は 2 〜 1,000 の範囲で指定可能です

以下のように設定すると、 Lambda 関数は同時に 2 つまで起動し、1 つづつメッセージを処理していきます

%ExAws.Operation.RestQuery{
  service: :lambda,
  http_method: :post,
  path: "/2015-03-31/event-source-mappings/",
  body: %{
    "FunctionName" => "sample_lambda",
    "EventSourceArn" => "arn:aws:sqs:ap-northeast-1:#{account_id}:sample_queue.fifo",
    "BatchSize" => 1,
    "ScalingConfig" => %{
      "MaximumConcurrency" => 2
    }
  }
}
|> ExAws.request!(auth_config)

依頼キューの URL を取得しておきます

queue_url = sqs_res.body.queue_url

結果キューを作成します

call_back_sqs_res =
  "sample_callback_queue.fifo"
  |> ExAws.SQS.create_queue(fifo_queue: true, receive_message_wait_time_seconds: 1)
  |> ExAws.request!(auth_config)

結果キューの URL を取得します

callback_queue_url = call_back_sqs_res.body.queue_url

メッセージの送信

結果キューの URL と 1, 2, 3 の文字列を依頼キューに送信します

["1", "2", "3"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    """
    {"callback_queue_url": "#{callback_queue_url}", "message": "#{message}"}
    """,
    message_group_id: "sample", # とりあえず全て同じグループにする
    message_deduplication_id: Integer.to_string(:os.system_time()) # とりあえず現在時刻を入れる
  )
  |> ExAws.request!(auth_config)
end)

キューの状態確認用関数を作成します

show_queue_status = fn (queue_url, auth_config) ->
  queue_url
  |> ExAws.SQS.get_queue_attributes()
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:attributes)
end

キューの状態を確認します

show_queue_status.(queue_url, auth_config)

結果は以下のようになります

%{
  ...
  approximate_number_of_messages: 3,
  approximate_number_of_messages_not_visible: 0,
  ...
}

先ほど 3 つメッセージを送信したので、 approximate_number_of_messages が 3 になっています

少し経ってからもう一度実行すると、以下のように変化します

%{
  ...
  approximate_number_of_messages: 0,
  approximate_number_of_messages_not_visible: 0,
  ...
}

Lamda が実行され、キューからメッセージが削除されています
(Lambda が正常終了したとき、自動的にメッセージが削除されました)

結果キューの方を確認してみます

show_queue_status.(callback_queue_url, auth_config)

結果キューには 3 つメッセージが入っています

%{
  ...
  approximate_number_of_messages: 3,
  approximate_number_of_messages_not_visible: 0,
  ...
}

結果を受信してみます

1..4
|> Enum.map(fn _ ->
  message =
    callback_queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    callback_queue_url
    |>ExAws.SQS.delete_message(message.receipt_handle)
    |> ExAws.request!(auth_config)

    message.body
  else
    nil
  end
end)

結果は以下のようになりました

["1", "3", "2", nil]

FIFO キューではなく標準キューなので順序はバラバラですが、 3 つともメッセージが受信できています

Broadway による受信

受信用のモジュールを作成します
メッセージを受信したらデータを表示するだけです

defmodule SampleBroadway do
  use Broadway

  def start_link(queue_url, config) do
    Broadway.start_link(__MODULE__,
      name: SamplePipeline,
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: queue_url,
          config: config
        },
        concurrency: 1
      ],
      processors: [
        default: [concurrency: 1]
      ]
    )
  end

  def handle_message(_processor_name, message, _context) do
    IO.inspect(message.data)
    message
  end
end

受信を開始します

{:ok, pipeline} = SampleBroadway.start_link(callback_queue_url, auth_config)

メッセージを送信します

["11", "12", "13", "14", "15", "16"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    """
    {"callback_queue_url": "#{callback_queue_url}", "message": "#{message}"}
    """,
    message_group_id: "sample", # とりあえず全て同じグループにする
    message_deduplication_id: Integer.to_string(:os.system_time()) # とりあえず現在時刻を入れる
  )
  |> ExAws.request!(auth_config)
end)

lambda_queue.gif

ある程度の間隔、ある程度の並列具合で動いているのが分かると思います

受信を停止します

Broadway.stop(pipeline)

リソースの削除

作成した AWS 上のリソースを削除します

Lambda 関数の削除

"sample_lambda"
|> ExAws.Lambda.delete_function()
|> ExAws.request!(auth_config)

IAM ロールポリシーのデタッチ

client
|> AWS.IAM.detach_role_policy(%{
  "RoleName" => "sample-lambda-role",
  "PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})

IAM ロールポリシーの削除

client
|> AWS.IAM.delete_policy(%{
  "PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})

IAM ロールの削除

client
|> AWS.IAM.delete_role(%{
  "RoleName" => "sample-lambda-role"
})

キューの削除

"sample_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
"sample_callback_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)

まとめ

SQS のキューにメッセージを投げることで Lambda 関数を実行できました

また、 Lambda 関数からキューにメッセージを投げ、 Broadway で受信することもできました

以下の値を変更することで Lambda 関数の実行頻度や実行時間、メモリサイズなどを調整できます

  • SQS キューの receive_message_wait_time_seconds
  • Lambda トリガーの BatchSize と MaximumConcurrency
  • Broadway の concurrency
7
2
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
2