はじめに
Broadway は Elixir で堅牢なパイプラインを構築するためのモジュールです
例えばレストランでの注文を考えてみます
ウェイターは次々と客の注文を紙に書いて厨房に渡します
厨房には注文の紙がどんどん溜まっていき、コックは先に来た注文から順に対応していきます
この状況では、注文が多くなるとさまざまな問題が発生します
- 注文の順番が飛ばされて、ずっと待っている客がいる
- 誤って同じ注文を二人のコックが別々に作っていた
- 作り終わっていない注文が何故か提供済になっていた
ちゃんとした仕組みが必要そうですね
上記の例で言うウェイターに当たるものが「キュー」(Queue)になります
Amazon SQS でクラウド上にキューを作成すると、そこにメッセージを送信、蓄積しておいて、別のプロセスがキューからメッセージを受け取り順次処理していくことができます
そして、 Broadway は現場を管理するためのマネージャーです
誰がどの順番でどの注文に対応するのか、どれが対応中でどれが対応済なのか、しっかり管理し、全てを捌ききります
この記事では Livebook 上で Broadway を動かし、 Broadway がキュー上のメッセージを捌いていく様子を確認します
実装したノートブックはこちら
セットアップ
必要なモジュールをインストールします
Mix.install([
{:ex_aws, "~> 2.5"},
{:ex_aws_sqs, "~> 3.4"},
{:kino, "~> 0.11.0"},
{:hackney, "~> 1.20"},
{:jason, "~> 1.4"},
{:sweet_xml, "~> 0.7.4"},
{:broadway, "~> 1.0"},
{:broadway_sqs, "~> 0.7.3"}
])
Elixir から AWS のリソースを操作するため、 ExAws を使用します
また、Broadway で SQS のキューからメッセージを受信するためのモジュール Broadway SQS を使用します
Amazon SQS の基本動作確認
キューを作成する
Amazon SQS でキューを作成するため、 AWS の認証情報を入力します
access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")
[
access_key_id_input,
secret_access_key_input,
region_input
]
|> Kino.Layout.grid(columns: 3)
入力した値をキーワードリストに入れておきます
Kino.nothing()
は認証情報を出力しないために入れています
auth_config = [
access_key_id: Kino.Input.read(access_key_id_input),
secret_access_key: Kino.Input.read(secret_access_key_input),
region: Kino.Input.read(region_input)
]
Kino.nothing()
キューを作成します
ExAws.request!
で AWS の API にリクエストを投げるとき、認証情報を渡しています
sqs_res =
"sample_queue"
|> ExAws.SQS.create_queue()
|> ExAws.request!(auth_config)
SQS のレスポンスからキューの URL を取得します
queue_url = sqs_res.body.queue_url
出力は以下のような形式の URL になります
"https://sqs.ap-northeast-1.amazonaws.com/<AWSのアカウントID>/sample_queue"
キューにメッセージを送信する
1, 2, 3 の文字列を順にキューへ送信します
["1", "2", "3"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(message)
|> ExAws.request!(auth_config)
end)
キューの状態を確認してみましょう
この後も何回か確認するので、キュー確認用の関数を作っておきます
show_queue_status = fn (queue_url, auth_config) ->
queue_url
|> ExAws.SQS.get_queue_attributes()
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:attributes)
end
関数を呼び出します
show_queue_status.(queue_url, auth_config)
出力は以下のようになります
%{
queue_arn: "arn:aws:sqs:ap-northeast-1:<AWSのアカウントID>:sample_queue",
approximate_number_of_messages: 3,
approximate_number_of_messages_not_visible: 0,
approximate_number_of_messages_delayed: 0,
created_timestamp: 1697765450,
last_modified_timestamp: 1697765450,
visibility_timeout: 30,
maximum_message_size: 262144,
message_retention_period: 345600,
delay_seconds: 0,
receive_message_wait_time_seconds: 0,
sqs_managed_sse_enabled: true
}
approximate_number_of_messages
で現在キューに入っているメッセージの数が分かります
3つ投げたので3つ入った状態です
キューからメッセージを受信する
キューからメッセージを受信します
max_number_of_messages
で受信する最大数を指定します
1個ずつ、4回メッセージを受信してみます
1..4
|> Enum.map(fn _ ->
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
end)
結果は以下のようになります
[
[
%{
attributes: [],
body: "3",
message_attributes: [],
message_id: "860a4fb2-c4bb-4c83-b330-4560a0fefdbb",
md5_of_body: "eccbc87e4b5ce2fe28308fd9f2a7baf3",
receipt_handle: "AQEBhiJfGyg/f72QJmtyb7WaN5k..."
}
],
[
%{
attributes: [],
body: "2",
message_attributes: [],
message_id: "1d30385c-8d9a-4ce8-a0e6-98c978d55f88",
md5_of_body: "c81e728d9d4c2f636f067f89cc14862c",
receipt_handle: "AQEBdDWM771vEe4UVvZRk9EorKWJ1FTC..."
}
],
[
%{
attributes: [],
body: "1",
message_attributes: [],
message_id: "b5953171-4b89-4069-a5e1-310e89280918",
md5_of_body: "c4ca4238a0b923820dcc509a6f75849b",
receipt_handle: "AQEBvStMy884OdKlpPbqtAV9loTZG1I..."
}
],
[]
]
1, 2, 3 の順番で投げたはずが、 3, 2, 1 の順番で返ってきました
(4回目はもう受信するものがないので空)
実は SQS は特に指定していない場合、標準キューを作成します
標準キューは FIFO (ファーストイン・ファーストアウト = 先入先出 = 入れた順番と出る順番が同じ)にならないため、順序は保証されません
FIFO キューについては後で作成してみます
このときのキューの状態を確認します
show_queue_status.(queue_url, auth_config)
結果は以下のようになります
%{
...
approximate_number_of_messages: 0,
approximate_number_of_messages_not_visible: 3,
approximate_number_of_messages_delayed: 0,
...
visibility_timeout: 30,
...
}
approximate_number_of_messages
が 0 になり、 approximate_number_of_messages_not_visible
が 3 になりました
30秒待ってから、もう一度ステータスを確認すると、以下のようになります
%{
...
approximate_number_of_messages: 3,
approximate_number_of_messages_not_visible: 0,
approximate_number_of_messages_delayed: 0,
...
visibility_timeout: 30,
...
}
SQS は受信済のメッセージをそのまま削除せず、不可視状態にします
visibility_timeout
の時間が経過した後、メッセージは復活します
復活させないためには、メッセージを明示的に削除する必要があります
1..4
|> Enum.map(fn _ ->
message =
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
|> Enum.at(0)
# 受信したメッセージを削除する
unless is_nil(message) do
queue_url
|>ExAws.SQS.delete_message(message.receipt_handle)
|> ExAws.request!(auth_config)
end
message
end)
受信したメッセージが削除され、 approximate_number_of_messages
も approximate_number_of_messages_not_visible
も両方 0 になりました
%{
...
approximate_number_of_messages: 0,
approximate_number_of_messages_not_visible: 0,
approximate_number_of_messages_delayed: 0,
...
visibility_timeout: 30,
...
}
遅延メッセージを送信する
ついでに遅延メッセージを送信してみます
queue_url
|> ExAws.SQS.send_message("delayed", delay_seconds: 30)
|> ExAws.request!(auth_config)
show_queue_status.(queue_url, auth_config)
の結果は以下のようになります
%{
...
approximate_number_of_messages: 0,
approximate_number_of_messages_not_visible: 0,
approximate_number_of_messages_delayed: 1,
...
visibility_timeout: 30,
...
}
30秒待ってもう一度 show_queue_status.(queue_url, auth_config)
を実行すると、結果は以下のようになります
%{
...
approximate_number_of_messages: 1,
approximate_number_of_messages_not_visible: 0,
approximate_number_of_messages_delayed: 0,
...
visibility_timeout: 30,
...
}
メッセージに遅延設定をつけることで、指定した時間の間はキューからメッセージが受信できなくなります
受信側の処理が遅延している場合などに利用します
メッセージを全て削除する
purge_queue
で全てのメッセージが削除できます
"sample_queue"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)
Broadway によるメッセージ受信
以下のように Broadway の機能を使って SQS からのメッセージを受信するモジュールを作成します
defmodule SampleBroadway do
use Broadway
def start_link(queue_url, config, concurrency) do
Broadway.start_link(__MODULE__,
name: SamplePipeline,
producer: [
module: {
BroadwaySQS.Producer,
queue_url: queue_url,
config: config # AWS の認証情報
},
concurrency: concurrency # 何個並列で受信するか
],
processors: [
default: [
max_demand: 1, # 何個毎のかたまりで受信するか
concurrency: concurrency # 何個並列で処理するか
]
]
)
end
# メッセージ受信時の処理
def handle_message(_processor_name, message, _context) do
Process.sleep(1000) # 1秒待つ
IO.inspect(message.data)
message
end
end
並列数 1 で受信を開始します
これで SQS にメッセージが入るのを待っている状態になります
{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)
メッセージを送信します
["11", "12", "13"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(message)
|> ExAws.request!(auth_config)
end)
送信後、少しすると Broadway がメッセージを受信し、順次 1 秒待ってからメッセージを表示します
FIFOキューではないため、順序はランダムになっています
実行後の状態を show_queue_status.(queue_url, auth_config)
で確認すると、以下のようになっています
%{
...
approximate_number_of_messages: 0,
approximate_number_of_messages_not_visible: 0,
approximate_number_of_messages_delayed: 0,
...
visibility_timeout: 30,
...
}
Broadway SQS では、処理したメッセージは自動的に削除され、メッセージは無くなっています
Broadway の受信を停止しておきます
Broadway.stop(pipeline)
FIFOキュー
やはり順序性は大事にしたいので、 FIFO キューを作ってみます
ちなみに FIFO キューは少しコストが高くなります
東京リージョンの場合
- 標準キュー: 0.4 USD / 100万リクエスト
- FIFOキュー: 0.5 USD / 100万リクエスト
FIFOキューの作成
ExAws.SQS.create_queue
のオプションに fifo_queue: true
を指定すると FIFO キューになります
また、FIFO キューの場合、 SQS では名前の末尾に ".fifo" を付けなければいけません
(付けないと作成時にエラーになります)
sqs_res =
"sample_queue.fifo"
|> ExAws.SQS.create_queue(fifo_queue: true)
|> ExAws.request!(auth_config)
キューの URL を取得します
queue_url = sqs_res.body.queue_url
キューの URL は以下のようになっています
"https://sqs.ap-northeast-1.amazonaws.com/<AWSアカウントID>/sample_queue.fifo"
FIFOキューへのメッセージ送信
FIFO キューにメッセージを送信してみます
FIFO キューの場合、 message_group_id
と message_deduplication_id
が必須になります
["1", "2", "3"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: "sample", # とりあえず全て同じグループにする
message_deduplication_id: Integer.to_string(:os.system_time()) # とりあえず現在時刻を入れる
)
|> ExAws.request!(auth_config)
end)
show_queue_status.(queue_url, auth_config)
で状態を確認すると、以下のようになっています
%{
queue_arn: "arn:aws:sqs:ap-northeast-1:<AWSアカウントID>:sample_queue.fifo",
approximate_number_of_messages: 3,
approximate_number_of_messages_not_visible: 0,
approximate_number_of_messages_delayed: 0,
created_timestamp: 1697766447,
last_modified_timestamp: 1697780323,
visibility_timeout: 30,
maximum_message_size: 262144,
message_retention_period: 345600,
delay_seconds: 0,
receive_message_wait_time_seconds: 0,
sqs_managed_sse_enabled: true,
fifo_queue: true,
content_based_deduplication: false,
deduplication_scope: "queue",
fifo_throughput_limit: "perQueue"
}
approximate_number_of_messages
が 3 になっていて、 fifo_queue
が true になっています
FIFOキューからのメッセージ受信
FIFO キューからメッセージを受信してみます
1..4
|> Enum.map(fn _ ->
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
end)
結果は以下のようになりました
[
[
%{
attributes: [],
body: "1",
message_attributes: [],
message_id: "0f361e8e-51ad-4288-a753-93e901505b88",
md5_of_body: "c4ca4238a0b923820dcc509a6f75849b",
receipt_handle: "AQEBc95aNlxEeaDh1hSSHCXPgyTIz/tW70va8hSFXf3VRdYTlpgE+P6Sh5I7sNjVsggv85+RfO95jmLLUcrNE0P1eAELz8zYzBtnotnXXRCQYQ91a+eT3/tkwwDHcKa0Kza81zB2G6ea5/Ou38OL5nGZyFefELssp/E41fQ2nLEHAgmutnMDJdzbXvjT95bWW9cduWVu30fxryROG7nX8+09uFskOP7cMrntFY/DpkDwUVlvXOsG3MicYO613EyI9zPHVRBXoAUZBezxZfE4j8jcU4uq+JKNu+V+goVq/vhzJYA="
}
],
[],
[],
[]
]
最初だけ 1 が取得できて、後は取得できていません
これは 1 が削除されないままでは、 2 を受信できないためです
受信したら削除するようにします
1..4
|> Enum.map(fn _ ->
message =
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
|> Enum.at(0)
# 受信したメッセージを削除する
unless is_nil(message) do
queue_url
|>ExAws.SQS.delete_message(message.receipt_handle)
|> ExAws.request!(auth_config)
end
message
end)
すると、以下のような結果になり、順番通りに全て受信できています
[
%{
attributes: [],
body: "1",
message_attributes: [],
message_id: "0f361e8e-51ad-4288-a753-93e901505b88",
md5_of_body: "c4ca4238a0b923820dcc509a6f75849b",
receipt_handle: "AQEBgkvjLcyEmR8Ik7iwwk/7SUsKcfLq..."
},
message_id: "a01d610c-bfec-46fa-8be1-947926ff517c",
md5_of_body: "c81e728d9d4c2f636f067f89cc14862c",
receipt_handle: "AQEBtSF0xFCyZ0RE/auwPA/t5S8XAoYtEXD..."
},
%{
attributes: [],
body: "3",
message_attributes: [],
message_id: "3e4d6ad0-e843-421a-8933-82078105013f",
md5_of_body: "eccbc87e4b5ce2fe28308fd9f2a7baf3",
receipt_handle: "AQEBkjviG5bnR3BYJrz/KR+iUkaUnH3kzn..."
},
nil
]
グループの動作確認
A グループと B グループのメッセージを送った場合の挙動を確認しましょう
[{"A", "A1"}, {"B", "B1"}, {"A", "A2"}, {"B", "B2"}, {"A", "A3"}, {"B", "B3"}]
|> Enum.each(fn {group_id, message} ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: group_id,
message_deduplication_id: Integer.to_string(:os.system_time())
)
|> ExAws.request!(auth_config)
end)
受信しますが、 A2 の場合だけ削除しないようにします
1..7
|> Enum.map(fn _ ->
message =
queue_url
|> ExAws.SQS.receive_message(max_number_of_messages: 1)
|> ExAws.request!(auth_config)
|> Map.get(:body)
|> Map.get(:messages)
|> Enum.at(0)
# 受信したメッセージを削除する
unless is_nil(message) do
unless message.body == "A2" do
queue_url
|>ExAws.SQS.delete_message(message.receipt_handle)
|> ExAws.request!(auth_config)
end
message.body
else
nil
end
end)
結果は以下のようになります
["A1", "B1", "A2", "B2", "B3", nil, nil]
A2 が削除されていないので同じグループの A3 は受信していませんが、違うグループの B2 B3 はそのまま受信できています
次の処理の邪魔にならないよう、一旦キューを空にしておきます
"sample_queue.fifo"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)
Broadway による FIFO キューの受信
FIFO キューでも同じように Broadway で受信できます
{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)
受信中にメッセージを送信します
["11", "12", "13"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: "sample",
message_deduplication_id: Integer.to_string(:os.system_time())
)
|> ExAws.request!(auth_config)
end)
Broadway でも順番通りに受信できました
受信を止めておきます
Broadway.stop(pipeline)
Broadway による並列受信
concurrency 並列数に 2 を指定してみましょう
{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 2)
この状態で以下のように送信します
["21", "22", "23", "24", "25", "26"]
|> Enum.each(fn message ->
queue_url
|> ExAws.SQS.send_message(
message,
message_group_id: "sample",
message_deduplication_id: Integer.to_string(:os.system_time())
)
|> ExAws.request!(auth_config)
end)
21 と 22 、 24 と 25 が同時に出力されました
タイミングによって1並列のときもありますが、最大2並列で処理できています
受信を止めます
Broadway.stop(pipeline)
キューの削除
最後に使ったキューを削除します
"sample_queue"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
"sample_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
まとめ
SQS の標準キュー、 FIFO キューの動作が確認できました
また、 Broadway によるキュー受信の動作が確認できました
Broadway を使うことで、並列処理が得意な Elixir らしい動作を簡単に実現できますね