7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Broadway SQS でキューの動作を確認する

Last updated at Posted at 2023-10-23

はじめに

Broadway は Elixir で堅牢なパイプラインを構築するためのモジュールです

例えばレストランでの注文を考えてみます

ウェイターは次々と客の注文を紙に書いて厨房に渡します

厨房には注文の紙がどんどん溜まっていき、コックは先に来た注文から順に対応していきます

この状況では、注文が多くなるとさまざまな問題が発生します

  • 注文の順番が飛ばされて、ずっと待っている客がいる
  • 誤って同じ注文を二人のコックが別々に作っていた
  • 作り終わっていない注文が何故か提供済になっていた

ちゃんとした仕組みが必要そうですね

上記の例で言うウェイターに当たるものが「キュー」(Queue)になります

Amazon SQS でクラウド上にキューを作成すると、そこにメッセージを送信、蓄積しておいて、別のプロセスがキューからメッセージを受け取り順次処理していくことができます

そして、 Broadway は現場を管理するためのマネージャーです

誰がどの順番でどの注文に対応するのか、どれが対応中でどれが対応済なのか、しっかり管理し、全てを捌ききります

この記事では Livebook 上で Broadway を動かし、 Broadway がキュー上のメッセージを捌いていく様子を確認します

実装したノートブックはこちら

セットアップ

必要なモジュールをインストールします

Mix.install([
  {:ex_aws, "~> 2.5"},
  {:ex_aws_sqs, "~> 3.4"},
  {:kino, "~> 0.11.0"},
  {:hackney, "~> 1.20"},
  {:jason, "~> 1.4"},
  {:sweet_xml, "~> 0.7.4"},
  {:broadway, "~> 1.0"},
  {:broadway_sqs, "~> 0.7.3"}
])

Elixir から AWS のリソースを操作するため、 ExAws を使用します

また、Broadway で SQS のキューからメッセージを受信するためのモジュール Broadway SQS を使用します

Amazon SQS の基本動作確認

キューを作成する

Amazon SQS でキューを作成するため、 AWS の認証情報を入力します

access_key_id_input = Kino.Input.password("ACCESS_KEY_ID")
secret_access_key_input = Kino.Input.password("SECRET_ACCESS_KEY")
region_input = Kino.Input.text("REGION")

[
  access_key_id_input,
  secret_access_key_input,
  region_input
]
|> Kino.Layout.grid(columns: 3)

スクリーンショット 2023-10-20 13.52.51.png

入力した値をキーワードリストに入れておきます
Kino.nothing() は認証情報を出力しないために入れています

auth_config = [
  access_key_id: Kino.Input.read(access_key_id_input),
  secret_access_key: Kino.Input.read(secret_access_key_input),
  region: Kino.Input.read(region_input)
]

Kino.nothing()

キューを作成します
ExAws.request! で AWS の API にリクエストを投げるとき、認証情報を渡しています

sqs_res =
  "sample_queue"
  |> ExAws.SQS.create_queue()
  |> ExAws.request!(auth_config)

SQS のレスポンスからキューの URL を取得します

queue_url = sqs_res.body.queue_url

出力は以下のような形式の URL になります

"https://sqs.ap-northeast-1.amazonaws.com/<AWSのアカウントID>/sample_queue"

キューにメッセージを送信する

1, 2, 3 の文字列を順にキューへ送信します

["1", "2", "3"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(message)
  |> ExAws.request!(auth_config)
end)

キューの状態を確認してみましょう

この後も何回か確認するので、キュー確認用の関数を作っておきます

show_queue_status = fn (queue_url, auth_config) ->
  queue_url
  |> ExAws.SQS.get_queue_attributes()
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:attributes)
end

関数を呼び出します

show_queue_status.(queue_url, auth_config)

出力は以下のようになります

%{
  queue_arn: "arn:aws:sqs:ap-northeast-1:<AWSのアカウントID>:sample_queue",
  approximate_number_of_messages: 3,
  approximate_number_of_messages_not_visible: 0,
  approximate_number_of_messages_delayed: 0,
  created_timestamp: 1697765450,
  last_modified_timestamp: 1697765450,
  visibility_timeout: 30,
  maximum_message_size: 262144,
  message_retention_period: 345600,
  delay_seconds: 0,
  receive_message_wait_time_seconds: 0,
  sqs_managed_sse_enabled: true
}

approximate_number_of_messages で現在キューに入っているメッセージの数が分かります
3つ投げたので3つ入った状態です

キューからメッセージを受信する

キューからメッセージを受信します

max_number_of_messages で受信する最大数を指定します

1個ずつ、4回メッセージを受信してみます

1..4
|> Enum.map(fn _ ->
  queue_url
  |> ExAws.SQS.receive_message(max_number_of_messages: 1)
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:messages)
end)

結果は以下のようになります

[
  [
    %{
      attributes: [],
      body: "3",
      message_attributes: [],
      message_id: "860a4fb2-c4bb-4c83-b330-4560a0fefdbb",
      md5_of_body: "eccbc87e4b5ce2fe28308fd9f2a7baf3",
      receipt_handle: "AQEBhiJfGyg/f72QJmtyb7WaN5k..."
    }
  ],
  [
    %{
      attributes: [],
      body: "2",
      message_attributes: [],
      message_id: "1d30385c-8d9a-4ce8-a0e6-98c978d55f88",
      md5_of_body: "c81e728d9d4c2f636f067f89cc14862c",
      receipt_handle: "AQEBdDWM771vEe4UVvZRk9EorKWJ1FTC..."
    }
  ],
  [
    %{
      attributes: [],
      body: "1",
      message_attributes: [],
      message_id: "b5953171-4b89-4069-a5e1-310e89280918",
      md5_of_body: "c4ca4238a0b923820dcc509a6f75849b",
      receipt_handle: "AQEBvStMy884OdKlpPbqtAV9loTZG1I..."
    }
  ],
  []
]

1, 2, 3 の順番で投げたはずが、 3, 2, 1 の順番で返ってきました
(4回目はもう受信するものがないので空)

実は SQS は特に指定していない場合、標準キューを作成します

標準キューは FIFO (ファーストイン・ファーストアウト = 先入先出 = 入れた順番と出る順番が同じ)にならないため、順序は保証されません

FIFO キューについては後で作成してみます

このときのキューの状態を確認します

show_queue_status.(queue_url, auth_config)

結果は以下のようになります

%{
  ...
  approximate_number_of_messages: 0,
  approximate_number_of_messages_not_visible: 3,
  approximate_number_of_messages_delayed: 0,
  ...
  visibility_timeout: 30,
  ...
}

approximate_number_of_messages が 0 になり、 approximate_number_of_messages_not_visible が 3 になりました

30秒待ってから、もう一度ステータスを確認すると、以下のようになります

%{
  ...
  approximate_number_of_messages: 3,
  approximate_number_of_messages_not_visible: 0,
  approximate_number_of_messages_delayed: 0,
  ...
  visibility_timeout: 30,
  ...
}

SQS は受信済のメッセージをそのまま削除せず、不可視状態にします
visibility_timeout の時間が経過した後、メッセージは復活します
復活させないためには、メッセージを明示的に削除する必要があります

1..4
|> Enum.map(fn _ ->
  message =
    queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    queue_url
    |>ExAws.SQS.delete_message(message.receipt_handle)
    |> ExAws.request!(auth_config)
  end

  message
end)

受信したメッセージが削除され、 approximate_number_of_messagesapproximate_number_of_messages_not_visible も両方 0 になりました

%{
  ...
  approximate_number_of_messages: 0,
  approximate_number_of_messages_not_visible: 0,
  approximate_number_of_messages_delayed: 0,
  ...
  visibility_timeout: 30,
  ...
}

遅延メッセージを送信する

ついでに遅延メッセージを送信してみます

queue_url
|> ExAws.SQS.send_message("delayed", delay_seconds: 30)
|> ExAws.request!(auth_config)

show_queue_status.(queue_url, auth_config) の結果は以下のようになります

%{
  ...
  approximate_number_of_messages: 0,
  approximate_number_of_messages_not_visible: 0,
  approximate_number_of_messages_delayed: 1,
  ...
  visibility_timeout: 30,
  ...
}

30秒待ってもう一度 show_queue_status.(queue_url, auth_config) を実行すると、結果は以下のようになります

%{
  ...
  approximate_number_of_messages: 1,
  approximate_number_of_messages_not_visible: 0,
  approximate_number_of_messages_delayed: 0,
  ...
  visibility_timeout: 30,
  ...
}

メッセージに遅延設定をつけることで、指定した時間の間はキューからメッセージが受信できなくなります
受信側の処理が遅延している場合などに利用します

メッセージを全て削除する

purge_queue で全てのメッセージが削除できます

"sample_queue"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)

Broadway によるメッセージ受信

以下のように Broadway の機能を使って SQS からのメッセージを受信するモジュールを作成します

defmodule SampleBroadway do
  use Broadway

  def start_link(queue_url, config, concurrency) do
    Broadway.start_link(__MODULE__,
      name: SamplePipeline,
      producer: [
        module: {
          BroadwaySQS.Producer,
          queue_url: queue_url,
          config: config # AWS の認証情報
        },
        concurrency: concurrency # 何個並列で受信するか
      ],
      processors: [
        default: [
          max_demand: 1, # 何個毎のかたまりで受信するか
          concurrency: concurrency # 何個並列で処理するか
        ]
      ]
    )
  end

  # メッセージ受信時の処理
  def handle_message(_processor_name, message, _context) do
    Process.sleep(1000) # 1秒待つ

    IO.inspect(message.data)

    message
  end
end

並列数 1 で受信を開始します
これで SQS にメッセージが入るのを待っている状態になります

{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)

メッセージを送信します

["11", "12", "13"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(message)
  |> ExAws.request!(auth_config)
end)

送信後、少しすると Broadway がメッセージを受信し、順次 1 秒待ってからメッセージを表示します
FIFOキューではないため、順序はランダムになっています

queue.gif

実行後の状態を show_queue_status.(queue_url, auth_config) で確認すると、以下のようになっています

%{
  ...
  approximate_number_of_messages: 0,
  approximate_number_of_messages_not_visible: 0,
  approximate_number_of_messages_delayed: 0,
  ...
  visibility_timeout: 30,
  ...
}

Broadway SQS では、処理したメッセージは自動的に削除され、メッセージは無くなっています

Broadway の受信を停止しておきます

Broadway.stop(pipeline)

FIFOキュー

やはり順序性は大事にしたいので、 FIFO キューを作ってみます

ちなみに FIFO キューは少しコストが高くなります

東京リージョンの場合

  • 標準キュー: 0.4 USD / 100万リクエスト
  • FIFOキュー: 0.5 USD / 100万リクエスト

FIFOキューの作成

ExAws.SQS.create_queue のオプションに fifo_queue: true を指定すると FIFO キューになります

また、FIFO キューの場合、 SQS では名前の末尾に ".fifo" を付けなければいけません
(付けないと作成時にエラーになります)

sqs_res =
  "sample_queue.fifo"
  |> ExAws.SQS.create_queue(fifo_queue: true)
  |> ExAws.request!(auth_config)

キューの URL を取得します

queue_url = sqs_res.body.queue_url

キューの URL は以下のようになっています

"https://sqs.ap-northeast-1.amazonaws.com/<AWSアカウントID>/sample_queue.fifo"

FIFOキューへのメッセージ送信

FIFO キューにメッセージを送信してみます

FIFO キューの場合、 message_group_idmessage_deduplication_id が必須になります

["1", "2", "3"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: "sample", # とりあえず全て同じグループにする
    message_deduplication_id: Integer.to_string(:os.system_time()) # とりあえず現在時刻を入れる
  )
  |> ExAws.request!(auth_config)
end)

show_queue_status.(queue_url, auth_config) で状態を確認すると、以下のようになっています

%{
  queue_arn: "arn:aws:sqs:ap-northeast-1:<AWSアカウントID>:sample_queue.fifo",
  approximate_number_of_messages: 3,
  approximate_number_of_messages_not_visible: 0,
  approximate_number_of_messages_delayed: 0,
  created_timestamp: 1697766447,
  last_modified_timestamp: 1697780323,
  visibility_timeout: 30,
  maximum_message_size: 262144,
  message_retention_period: 345600,
  delay_seconds: 0,
  receive_message_wait_time_seconds: 0,
  sqs_managed_sse_enabled: true,
  fifo_queue: true,
  content_based_deduplication: false,
  deduplication_scope: "queue",
  fifo_throughput_limit: "perQueue"
}

approximate_number_of_messages が 3 になっていて、 fifo_queue が true になっています

FIFOキューからのメッセージ受信

FIFO キューからメッセージを受信してみます

1..4
|> Enum.map(fn _ ->
  queue_url
  |> ExAws.SQS.receive_message(max_number_of_messages: 1)
  |> ExAws.request!(auth_config)
  |> Map.get(:body)
  |> Map.get(:messages)
end)

結果は以下のようになりました

[
  [
    %{
      attributes: [],
      body: "1",
      message_attributes: [],
      message_id: "0f361e8e-51ad-4288-a753-93e901505b88",
      md5_of_body: "c4ca4238a0b923820dcc509a6f75849b",
      receipt_handle: "AQEBc95aNlxEeaDh1hSSHCXPgyTIz/tW70va8hSFXf3VRdYTlpgE+P6Sh5I7sNjVsggv85+RfO95jmLLUcrNE0P1eAELz8zYzBtnotnXXRCQYQ91a+eT3/tkwwDHcKa0Kza81zB2G6ea5/Ou38OL5nGZyFefELssp/E41fQ2nLEHAgmutnMDJdzbXvjT95bWW9cduWVu30fxryROG7nX8+09uFskOP7cMrntFY/DpkDwUVlvXOsG3MicYO613EyI9zPHVRBXoAUZBezxZfE4j8jcU4uq+JKNu+V+goVq/vhzJYA="
    }
  ],
  [],
  [],
  []
]

最初だけ 1 が取得できて、後は取得できていません

これは 1 が削除されないままでは、 2 を受信できないためです

受信したら削除するようにします

1..4
|> Enum.map(fn _ ->
  message =
    queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    queue_url
    |>ExAws.SQS.delete_message(message.receipt_handle)
    |> ExAws.request!(auth_config)
  end

  message
end)

すると、以下のような結果になり、順番通りに全て受信できています

[
  %{
    attributes: [],
    body: "1",
    message_attributes: [],
    message_id: "0f361e8e-51ad-4288-a753-93e901505b88",
    md5_of_body: "c4ca4238a0b923820dcc509a6f75849b",
    receipt_handle: "AQEBgkvjLcyEmR8Ik7iwwk/7SUsKcfLq..."
  },
    message_id: "a01d610c-bfec-46fa-8be1-947926ff517c",
    md5_of_body: "c81e728d9d4c2f636f067f89cc14862c",
    receipt_handle: "AQEBtSF0xFCyZ0RE/auwPA/t5S8XAoYtEXD..."
  },
  %{
    attributes: [],
    body: "3",
    message_attributes: [],
    message_id: "3e4d6ad0-e843-421a-8933-82078105013f",
    md5_of_body: "eccbc87e4b5ce2fe28308fd9f2a7baf3",
    receipt_handle: "AQEBkjviG5bnR3BYJrz/KR+iUkaUnH3kzn..."
  },
  nil
]

グループの動作確認

A グループと B グループのメッセージを送った場合の挙動を確認しましょう

[{"A", "A1"}, {"B", "B1"}, {"A", "A2"}, {"B", "B2"}, {"A", "A3"}, {"B", "B3"}]
|> Enum.each(fn {group_id, message} ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: group_id,
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)

受信しますが、 A2 の場合だけ削除しないようにします

1..7
|> Enum.map(fn _ ->
  message =
    queue_url
    |> ExAws.SQS.receive_message(max_number_of_messages: 1)
    |> ExAws.request!(auth_config)
    |> Map.get(:body)
    |> Map.get(:messages)
    |> Enum.at(0)

  # 受信したメッセージを削除する
  unless is_nil(message) do
    unless message.body == "A2" do
      queue_url
      |>ExAws.SQS.delete_message(message.receipt_handle)
      |> ExAws.request!(auth_config)
    end

    message.body
  else
    nil
  end
end)

結果は以下のようになります

["A1", "B1", "A2", "B2", "B3", nil, nil]

A2 が削除されていないので同じグループの A3 は受信していませんが、違うグループの B2 B3 はそのまま受信できています

次の処理の邪魔にならないよう、一旦キューを空にしておきます

"sample_queue.fifo"
|> ExAws.SQS.purge_queue()
|> ExAws.request!(auth_config)

Broadway による FIFO キューの受信

FIFO キューでも同じように Broadway で受信できます

{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 1)

受信中にメッセージを送信します

["11", "12", "13"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: "sample",
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)

Broadway でも順番通りに受信できました

fifo_queue.gif

受信を止めておきます

Broadway.stop(pipeline)

Broadway による並列受信

concurrency 並列数に 2 を指定してみましょう

{:ok, pipeline} = SampleBroadway.start_link(queue_url, auth_config, 2)

この状態で以下のように送信します

["21", "22", "23", "24", "25", "26"]
|> Enum.each(fn message ->
  queue_url
  |> ExAws.SQS.send_message(
    message,
    message_group_id: "sample",
    message_deduplication_id: Integer.to_string(:os.system_time())
  )
  |> ExAws.request!(auth_config)
end)

concurrency.gif

21 と 22 、 24 と 25 が同時に出力されました

タイミングによって1並列のときもありますが、最大2並列で処理できています

受信を止めます

Broadway.stop(pipeline)

キューの削除

最後に使ったキューを削除します

"sample_queue"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)
"sample_queue.fifo"
|> ExAws.SQS.delete_queue()
|> ExAws.request!(auth_config)

まとめ

SQS の標準キュー、 FIFO キューの動作が確認できました

また、 Broadway によるキュー受信の動作が確認できました

Broadway を使うことで、並列処理が得意な Elixir らしい動作を簡単に実現できますね

7
3
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?