はじめに
バッググラウンドでバッチ的に動作する処理をk8sで行なう場合には、kind:CronJobでスケジュールすることができますが、処理が重い場合に、プロセスを分散させるといったことは簡単ではありません。
また、1つのプロセス(Pod)でしか処理ができない場合には、定期的にスケジュールさせるよりも、リクエストの到着次第、処理を開始したいというニーズもあったりします。
あらかじめHelmを利用してk8sクラスターにRabbitMQデプロイしているので、これを利用するためのルールと構成上の考慮点をまとめておきます。
Kubernetes v1.17以降を利用するのであれば、RabbitMQの公式サイトに記載されているOperatorによるデプロイメントを検討することをお勧めします。→ https://qiita.com/YasuhiroABE/items/7c1e82e006ea37e0fe25
今回の目標
メッセージが適切でなかった場合に、(原因追求や手動再送信のため)通常処理用のQueueとは別のQueueに差し戻す仕組みがdead letterです。RabbitMQではDead Letter Exchange (DLX)の名称で解説されていて、他のMQ処理系では、Dead Letter Queue (DLQ)と呼ばれていたりします。
- dead letterを使うこと
- アプリ間でセパレーションを、セキュリティとパフォーマンスのバランスに配慮して実施すること
- 単純なJobの負荷分散のWork Queueと、利用者を識別するTopic Queueを利用する
まだRabbitMQのベストプラクティスがまとめられる状況ではありませんが、できるだけちゃんと使っていきたいと思います。
References
念のため "rabbitmq best practices" で検索して、いくつかの記事を確認しています。
公式ガイド
公式チュートリアル
- https://www.rabbitmq.com/getstarted.html
- https://www.rabbitmq.com/tutorials/tutorial-two-ruby.html (Work Queues by Ruby)
- https://www.rabbitmq.com/tutorials/tutorial-five-ruby.html (Topics by Ruby)
UserとVirtualHostの使い方について
Userは名前のとおり、接続を許可する際に利用します。今回はvhost毎にユーザーを作成します。
vhostはQueueやExchangeを区分けする論理分割を実現するための機構です。
vhostはユーザー毎の権限に応じて操作を許可するので、個別にユーザーを登録する必要があります。管理者(標準:user)であっても登録されていないユーザーは操作ができなくなるので、permissionsから削除すると一部の操作ができなくなります。
決めた事
- "app-name" の名称で、userを作成する (e.g. myapp)
- "/app-name" の名称で、vhostを作成する (e.g. /myapp)
- 作成したvhostのpermissionには、User: "app-name" を加える (user:app-nameは1つのvhostにのみ所属する)
- 管理者(user等)はvhostのpermissionに加えたままにする (管理者は全てのvhostに所属する)
【Work Queue】Exchangeの使い方について
複数のアプリケーションで、AMQP default (exchange)を利用すると、メッセージが大量に送信された場合に滞留するかもしれません。AMQP defaultはrouting-keyに書かれたQueue名に転送するだけなので気にする必要はないかもしれませんが、念のためアプリケーション毎にExchangeを分けておきます。
当初はQueue毎にExchangeを定義するようにしていましたが、現在は"app-name"毎にExchangeとDLX(Dead Letter eXchange)を1つずつ定義するとしています。
決めた事
- "app-name" と "app-name.dlx" の2つのExchangeを定義する (e.g. myapp, myapp.dlx)
- Durable: trueにする
- Queueを作成したら、Bindingの設定で、To:queue, routing-key: に作成したQueue名を設定する
- DLQを作成したら、Bindingの設定で、To:queue, routing-key: に作成したDLQ名を設定する
【Work Queue】Queueの使い方について
公式ガイドでは冒頭でPolicyで全体にDLXを設定する方法が解説されていたりしますが、複数のアプリケーションが固有のメッセージを送受信している環境では、滞留を避けてデバッグを円滑に行なうため、メッセージの種類毎か、Queue毎にDLXを分けるなど、ある程度の粒度を保つのが良いと思っています。
当初はQueue名に"app-name"を含めていましたが、user名等の接続情報と重複するので、分ける事にしました。userを書き込み用と参照用に分割したい場合や、他の"app-name"にも接続したい場合にはQueue名に"app-name"を含めておいた方が視認性は向上すると思います。DLXはQueue毎に分割したいところですが、メッセージからQueueが分かる場合がほとんどなので、"app-name"で1つとしています。
決めた事
- "app-name"毎にDLQを作成する (e.g. myapp.dlx)
- Queueを作成する際(e.g. queue1)に、'x-dead-letter-exchange' に作成したDLXを指定する (e.g. myapp.dlx)
- Queueを作成する際(e.g. queue1)に、'x-dead-letter-routing-key' に作成したDLQを指定する (e.g. myapp.dlx)
Work Queueを利用して、気になった事
- 最初はBindingの設定を行なわず、アプケーションからのnackに対して、DLQにメッセージが配送されず悩んだ
- Exchange作成時に、alternate-exchangeを設定するべきか迷っている (現在は設定していない)
- vhost毎にdefault_exchangeが作成されるので、無理に分けなくても良かったかなとは感じました
Work Queue サンプル
rubyで動作するサンプルを載せておきます。
準備作業
ライブラリを./libに準備しておきます。
source 'https://rubygems.org'
gem "bunny", "2.15.0"
$ bundle install --path ./lib
test_put.rb
# !/usr/bin/ruby
require 'bundler/setup'
Bundler.require
require 'bunny'
require 'date'
conn = Bunny.new(host: "rabbitmq.example.com", vhost: "/myapp", user: "myapp", password: "secret")
conn.start
ch = conn.create_channel
x = ch.exchange("myapp", { durable: true } )
x.publish("Hello #{Time.now}", persistent: true, routing_key: "queue1")
ch.close
conn.close
test_sub.rb
引数を何か指定すると常にDLXにメッセージが転送されてDLQに蓄積されるので、RabbitMQのWeb UIなどからメッセージを確認・削除する操作が必要になります。
# !/usr/bin/ruby
require 'bundler/setup'
Bundler.require
require 'bunny'
conn = Bunny.new(host: "rabbitmq.example.com", vhost: "/myapp", user: "myapp", password: "secret")
conn.start
ch = conn.create_channel
q = ch.queue("queue1",
durable: true,
arguments: { 'x-dead-letter-exchange' => 'myapp.dlx',
'x-dead-letter-routing-key' => 'myapp.dlx',
'x-queue-type' => 'quorum' } )
q.subscribe(manual_ack: true) do |delivery_info, metadata, payload|
puts "Payload: #{payload}"
if ARGV.length == 0
ch.ack(delivery_info.delivery_tag)
else
ch.nack(delivery_info.delivery_tag, false)
end
end
## FYI, you can also use the pop method synchronously.
# delivery_info, metadata, payload = q.pop
# puts "Payload: #{payload}"
begin
loop { sleep 5 }
ensure
ch.close
conn.close
end
【Topics】Exchange & Queueについて
Topicsであっても実際にはQueueに届いたメッセージはWork Queueと同様に順番に取り出され処理されます。受信側の意図するrouting-keyを持ったメッセージであることは保証されないので、routing-keyが意図したものかどうか確認する必要があります。
公式チュートリアルのサンプルでは、subscribe()を呼び出す側が、Queue::bind()を呼び出しているので、選択的に受信したいTopicを指定できると思ってしましますが、いくつか問題があります。
- Queue名を空文字列で呼び出しているので、動的にQueueが作成され、指定したtopicのメッセージだけを受信するものの、作成されたQueueは定義されたまま残ってしまう。またdurableでもない。
- 明示的に事前にtopicメッセージ受信用のdurableなQueueを定義することはできるが、bindされている全てのメッセージを受信し、subscribeするとqueueに入っている全てのメッセージを受信する (選択的に特定topicを受信できるわけではない)
Work Queueとの比較でいえば、Work Queueがsubscribeすることで、メッセージをシリアライズすることができる特徴を持っている点と比較して、Topicsも同様の特徴を持ちつつ、1つのメッセージを単一のExchangeを通して複数のQueueに送信する機能にExchangeの設定を変更することで柔軟性を持たせることができる点で違いがあります。
Work QueueでもExchangeのBindingを工夫すれば、Topicsを使うメリットがあまりないように感じられますが、そのとおりだろうとは思います。ただ、メッセージの形式が複数存在するような環境であれば、routing_keyである程度の処理内容を分岐させられるので、メッセージに応じた処理を省けるという便利な局面はあるのかもしれません。
決めた事
- Queue名を空にして、アプリケーションからQueue::bind()を呼び出すコーディングは禁止する
- 受信側では必ず、Queueから取り出したメッセージが処理するべき(意図している)routing-keyを持っているか確認する
APIを利用したExchange, Queueなどの作成について
別の記事で、RabbitMQのAPIの利用についてまとめています。
以上