はじめに
本記事では Elixir と SQS と Lambda を連携させます
Elixir でキューからメッセージを受信するためのモジュール Broadway と、 AWS のキューサービス Amazon SQS の基本的な動作・連携について、先に以下の記事を読んでください
本記事では更に Lambda を連携させ、以下のようなことをしたいとします
画像処理や AI 推論など、メモリを大量に使う処理や長い時間がかかる処理はメインプロセスとは別のところで実行した方が安全です
しかも処理対象が大量に存在し、並列実行しつつ、全体の進捗を管理するような場合では、キューを使うと制御がしやすくなります
そこで、 Elixir からキュー経由で Lambda に処理を依頼し、その結果は Lambda からキュー経由で Elixir に戻るようにします
これが実現できるか確認しました
例によって Livebook 上で検証しています
実装したノートブックはこちら
セットアップ
必要なモジュールをインストールします
Mix.install([
{:aws, "~> 0.13"},
{:ex_aws, "~> 2.5"},
{:ex_aws_lambda, "~> 2.1"},
{:ex_aws_s3, "~> 2.5"},
{:ex_aws_sqs, "~> 3.4"},
{:ex_aws_sts, "~> 2.3"},
{:kino, "~> 0.11.0"},
{:hackney, "~> 1.20"},
{:jason, "~> 1.4"},
{:sweet_xml, "~> 0.7.4"},
{:broadway, "~> 1.0"},
{:broadway_sqs, "~> 0.7.3"}
])
Elixir には AWS を操作するためのモジュールが AWS Elixir と ExAWS の2種類存在しますが、今回は両方を使っています
どちらも一長一短があるためです
Elixir でのキュー受信には Broadway を使用します
認証情報の設定
AWS のリソースを作成するため、認証情報を入力します
access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")
[
access_key_id_input,
secret_access_key_input,
region_input
]
|> Kino.Layout.grid(columns: 3)
入力した認証情報を使って AWS Elixir のクライアントを用意します
client =
AWS.Client.create(
Kino.Input.read(access_key_id_input),
Kino.Input.read(secret_access_key_input),
Kino.Input.read(region_input)
)
ExAWS と Broadway 用に認証情報をキーワードリストに入れておきます
auth_config = [
access_key_id: Kino.Input.read(access_key_id_input),
secret_access_key: Kino.Input.read(secret_access_key_input),
region: Kino.Input.read(region_input)
]
Kino.nothing()
後で使うため、 STS から AWS のアカウント ID を取得しておきます
account_id =
ExAws.STS.get_caller_identity()
|> ExAws.request!(auth_config)
|> then(& &1.body.account)
Lambda 関数の作成
Python ランタイムの Lambda 関数を AWS 上にデプロイします
大まかな手順は以下の通り
- Lambda 関数を実行するための IAM ロール作成
- Lambda 関数を Python ファイルに実装
- Python ファイルを ZIP で圧縮
- 圧縮した ZIP ファイルを指定して Lambda 関数を作成
IAM ロールの作成
Lambda 関数を実行するための IAM ロールを作成します
client
|> AWS.IAM.create_role(%{
"RoleName" => "sample-lambda-role",
"AssumeRolePolicyDocument" =>
Jason.encode!(%{
"Statement" => [
%{
"Sid" => "STS202201051440",
"Effect" => "Allow",
"Principal" => %{
"Service" => ["lambda.amazonaws.com"]
},
"Action" => "sts:AssumeRole"
}
]
})
})
Lambda 関数用ロールにアタッチするためのポリシーを作成します
Cloud Watch にログを出力するための権限と、 SQS のメッセージ送受信用の権限を付与します
client
|> AWS.IAM.create_policy(%{
"PolicyName" => "sample-lambda-role-policy",
"PolicyDocument" =>
Jason.encode!(%{
"Version" => "2012-10-17",
"Statement" => [
%{
"Effect" => "Allow",
"Action" => [
"cloudwatch:PutMetricData",
"logs:CreateLogStream",
"logs:PutLogEvents",
"logs:CreateLogGroup",
"logs:DescribeLogStreams",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes",
"sqs:ReceiveMessage",
"sqs:SendMessage"
],
"Resource" => ["*"]
}
]
})
})
ポリシーをロールにアタッチします
client
|> AWS.IAM.attach_role_policy(%{
"RoleName" => "sample-lambda-role",
"PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})
Lambda 関数のデプロイ
Lambda 関数を Python で実装し、ファイルに書き込みます
Python では以下の処理を実行しています
- メッセージを受信したらレコードを取り出す
- 2 秒待つ
- レコードが存在すれば、レコードの内容をメッセージとして結果キューに送信する
File.write!("/tmp/sample.py", """
import boto3
import json
import time
from datetime import datetime
def handler(event, context):
client = boto3.client("sqs")
records = event.get("Records", [])
print("records:", len(records))
time.sleep(2)
response = "no_records"
if records:
for record in records:
request_body = json.loads(record["body"])
response = request_body["message"]
print("response:", response)
callback_queue_url = request_body["callback_queue_url"]
client.send_message(
QueueUrl=callback_queue_url,
MessageBody=response,
MessageGroupId="sample",
MessageDeduplicationId=str(datetime.now().timestamp())
)
return response
""")
書き出した Python ファイルを ZIP 形式で圧縮します
:zip.create("/tmp/sample_lambda.zip", ['sample.py'], cwd: "/tmp")
アップロード先の S3 バケットを入力します
bucket_name_input = Kino.Input.text("BUCKET_ANME")
ZIP を S3 にアップロードします
bucket_name = Kino.Input.read(bucket_name_input)
"/tmp/sample_lambda.zip"
|> ExAws.S3.Upload.stream_file()
|> ExAws.S3.upload(bucket_name, "sample_lambda/sample_lambda.zip")
|> ExAws.request!(auth_config)
アップロードした ZIP ファイルを元として Lambda 関数を作成します
%ExAws.Operation.RestQuery{
service: :lambda,
http_method: :post,
path: "/2015-03-31/functions",
body: %{
"FunctionName" => "sample_lambda",
"Handler" => "sample.handler",
"Code" => %{
"S3Bucket" => bucket_name,
"S3Key" => "sample_lambda/sample_lambda.zip"
},
"Role" => "arn:aws:iam::#{account_id}:role/sample-lambda-role",
"Runtime" => "python3.11"
}
}
|> ExAws.request!(auth_config)
作成した Lambda 関数を呼び出します
"sample_lambda"
|> ExAws.Lambda.invoke(%{}, %{})
|> ExAws.request!(auth_config)
結果は "no_records"
になります(SQSから起動していないため)
キューの作成
SQS で依頼キューを作成します
receive_message_wait_time_seconds
で 1 秒に 1 回キューを見に行くようにします
sqs_res =
"sample_queue.fifo"
|> ExAws.SQS.create_queue(fifo_queue: true, receive_message_wait_time_seconds: 1)
|> ExAws.request!(auth_config)
依頼キューが Lambda を起動するように設定します
ここで BatchSize = 1 回の Lambda 起動で受け取るメッセージの最大数 と MaximumConcurrency = Lambda 関数の最大並列実行数を指定しています
MaximumConcurrency は 2 〜 1,000 の範囲で指定可能です
以下のように設定すると、 Lambda 関数は同時に 2 つまで起動し、1 つづつメッセージを処理していきます
%ExAws.Operation.RestQuery{
service: :lambda,
http_method: :post,
path: "/2015-03-31/event-source-mappings/",
body: %{
"FunctionName" => "sample_lambda",
"EventSourceArn" => "arn:aws:sqs:ap-northeast-1:#{account_id}:sample_queue.fifo",
"BatchSize" => 1,
"ScalingConfig" => %{
"MaximumConcurrency" => 2
}
}
}
|> ExAws.request!(auth_config)
依頼キューの URL を取得しておきます
queue_url = sqs_res.body.queue_url
結果キューを作成します
call_back_sqs_res =
"sample_callback_queue.fifo"
|> ExAws.SQS.create_queue(fifo_queue: true, receive_message_wait_time_seconds: 1)
|> ExAws.request!(auth_config)
結果キューの URL を取得します
callback_queue_url = call_back_sqs_res.body.queue_url
メッセージの送信
結果キューの URL と 1, 2, 3 の文字列を依頼キューに送信します
["1", "2", "3"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
"""
{"callback_queue_url": "#{callback_queue_url}", "message": "#{message}"}
""",
message_group_id: "sample", # とりあえず全て同じグループにする
message_deduplication_id: Integer.to_string(:os.system_time()) # とりあえず現在時刻を入れる
)
|> ExAws.request!(auth_config)
end)
キューの状態確認用関数を作成します
show_queue_status = fn (queue_url, auth_config) ->
queue_url
|> ExAws.SQS.get_queue_attributes()
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:attributes)
end
キューの状態を確認します
show_queue_status.(queue_url, auth_config)
結果は以下のようになります
%{
...
approximate_number_of_messages: 3,
approximate_number_of_messages_not_visible: 0,
...
}
先ほど 3 つメッセージを送信したので、 approximate_number_of_messages
が 3 になっています
少し経ってからもう一度実行すると、以下のように変化します
%{
...
approximate_number_of_messages: 0,
approximate_number_of_messages_not_visible: 0,
...
}
Lamda が実行され、キューからメッセージが削除されています
(Lambda が正常終了したとき、自動的にメッセージが削除されました)
結果キューの方を確認してみます
show_queue_status.(callback_queue_url, auth_config)
結果キューには 3 つメッセージが入っています
%{
...
approximate_number_of_messages: 3,
approximate_number_of_messages_not_visible: 0,
...
}
結果を受信してみます
1..4
|> Enum.map(fn _ ->
message =
callback_queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
|> Enum.at(0)
# 受信したメッセージを削除する
unless is_nil(message) do
callback_queue_url
|>ExAws.SQS.delete_message(message.receipt_handle)
|> ExAws.request!(auth_config)
message.body
else
nil
end
end)
結果は以下のようになりました
["1", "3", "2", nil]
FIFO キューではなく標準キューなので順序はバラバラですが、 3 つともメッセージが受信できています
Broadway による受信
受信用のモジュールを作成します
メッセージを受信したらデータを表示するだけです
defmodule SampleBroadway do
use Broadway
def start_link(queue_url, config) do
Broadway.start_link(__MODULE__,
name: SamplePipeline,
producer: [
module: {
BroadwaySQS.Producer,
queue_url: queue_url,
config: config
},
concurrency: 1
],
processors: [
default: [concurrency: 1]
]
)
end
def handle_message(_processor_name, message, _context) do
IO.inspect(message.data)
message
end
end
受信を開始します
{:ok, pipeline} = SampleBroadway.start_link(callback_queue_url, auth_config)
メッセージを送信します
["11", "12", "13", "14", "15", "16"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
"""
{"callback_queue_url": "#{callback_queue_url}", "message": "#{message}"}
""",
message_group_id: "sample", # とりあえず全て同じグループにする
message_deduplication_id: Integer.to_string(:os.system_time()) # とりあえず現在時刻を入れる
)
|> ExAws.request!(auth_config)
end)
ある程度の間隔、ある程度の並列具合で動いているのが分かると思います
受信を停止します
Broadway.stop(pipeline)
リソースの削除
作成した AWS 上のリソースを削除します
Lambda 関数の削除
"sample_lambda"
|> ExAws.Lambda.delete_function()
|> ExAws.request!(auth_config)
IAM ロールポリシーのデタッチ
client
|> AWS.IAM.detach_role_policy(%{
"RoleName" => "sample-lambda-role",
"PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})
IAM ロールポリシーの削除
client
|> AWS.IAM.delete_policy(%{
"PolicyArn" => "arn:aws:iam::#{account_id}:policy/sample-lambda-role-policy"
})
IAM ロールの削除
client
|> AWS.IAM.delete_role(%{
"RoleName" => "sample-lambda-role"
})
キューの削除
"sample_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
"sample_callback_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
まとめ
SQS のキューにメッセージを投げることで Lambda 関数を実行できました
また、 Lambda 関数からキューにメッセージを投げ、 Broadway で受信することもできました
以下の値を変更することで Lambda 関数の実行頻度や実行時間、メモリサイズなどを調整できます
- SQS キューの
receive_message_wait_time_seconds
- Lambda トリガーの BatchSize と MaximumConcurrency
- Broadway の concurrency