はじめに
アプリケーション同士を連携させるのに、必ずしもAPIをHTTPで呼び出すことが適切でない場合もあります。
特にQueueに入れた処理を逐次処理させる場合には、アプリケーションが依頼を直接受け取ってしまうと、アプリ側で永続化の処理を考えなければいけませんし、負荷分散も難しくなります。
とりあえずPub/Subよりも単純なQueueをRabbitMQで実現する予定なので、その作業のメモを残しておきます。
前提
- Kubernetes v1.15.11 (by Kubespray v2.11.2)
- Rook/Cephが導入されている
- MetalLBが導入されている
References
次の資料を参考にしました。
- https://github.com/ruby-amqp/bunny
- http://rubybunny.info/articles/getting_started.html
- https://qiita.com/suketa/items/bb4a8c294351cb7b9025
- https://qiita.com/suketa/items/147aad2f0f583b1871c2
- https://stackoverflow.com/questions/39646442/using-bunny-how-to-set-x-max-length-when-connecting-to-existing-queue
HelmでのRabbitMQの導入
やり直すことを考えて、処理のためにMakefileを準備しておきます。
REPO_NAME = stable/rabbitmq
NAMESPACE = rabbitmq
REL_NAME = myrabbitmq
RMQ_OPTIONS = --set persistence.storageClass=rook-ceph-block \
--set replicas=2 \
--set service.type=LoadBalancer \
--set service.loadBalancerIP=192.168.1.120 \
--set persistence.storageClass=rook-ceph-block \
--set rabbitmq.erlangCookie=9a63d47049016fd933371a76af08fc8f \
--set rabbitmq.password=70550b0ac43a2e5c \
--set metrics.enabled=true
.PHONY: init update fetch install upgrade
init:
kubectl create ns $(NAMESPACE)
update:
helm repo update
fetch:
helm fetch $(REPO_NAME)
install:
helm install $(REPO_NAME) --name $(REL_NAME) --namespace $(NAMESPACE) $(RMQ_OPTIONS)
upgrade:
helm upgrade ${REL_NAME} $(REPO_NAME) --namespace $(NAMESPACE) $(RMQ_OPTIONS)
導入の際の手順はおおむね次ようなものです。
$ make fetch
$ ls
Makefile rabbitmq-6.18.2.tgz
## tgzファイルを展開し、values.yamlの内容を確認する
$ tar xvzf rabbitmq-6.18.2.tgz
$ less rabbitmq/values.yaml
## 他に変更する点がなければ導入する、あればMakefileのRMQ_OPTIONSに追記
$ make install
RMQ_OPTIONSに設定していた"service.type=LoadBalancer"の部分は環境に合わせて変更が必要だと思います。設定可能な項目については、rabbitmq/values.yamlを確認してください。
service.typeにLoadBalancerを指定しているので、4369,5672,15672の全ポートが公開されてしまっています。これで問題なければ良いですが、選択的にポートを絞って公開したい場合には、service.type=ClusterIPを指定したまま次のようなServiceを指定することもできます。
apiVersion: v1
kind: Service
metadata:
name: my-release-rabbitmq-lb
labels:
app: rabbitmq
namespace: rabbitmq
spec:
ports:
- name: amqp
port: 5672
protocol: TCP
targetPort: amqp
- name: stats
port: 15672
protocol: TCP
targetPort: stats
selector:
app: rabbitmq
release: myrabbitmq
type: LoadBalancer
RabbitMQの準備作業
現在のサービスは次のように公開されています。
$ kubectl -n rabbitmq get svc
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
myrabbitmq LoadBalancer 10.233.25.181 192.168.1.120 4369:31028/TCP,5672:32031/TCP,25672:31175/TCP,15672:32214,9419:32022/TCP 35m
myrabbitmq-headless ClusterIP None <none> 4369/TCP,5672/TCP,25672/TCP,15672/TCP 35m
http://192.168.1.120:15672/ から、RabbitMQのWeb UIにアクセスします。
Helmから導入したRabbitMQにログインする場合は、それぞれ、--setで、rabbitmq.username (default:user), rabbitmq.password に指定したものを利用します。今回は (username,password) = (user, 70550b0ac43a2e5c) を使用します。
Queueの作成
WebUIのQueuesタブをクリックし、新規のQueueを追加します。
- Name: testq
- Durable: yes
接続用Userの作成
Adminタブから新規のユーザーを追加します。
追加したら、そのユーザー名をクリックし、権限を付与します。
- username: user01
- password: secret
今回はテストなので、作成したユーザーにデフォルトで権限を付与しておきます。
- vhost: /
- Configure regexp: .*
- Write regexp: .*
- Read regexp: .*
Bunnyからの接続
Rubyのアプリケーションを作成するディレクトリに、Gemfileを準備し、bundleからlibディレクトリに配置します。
source 'https://rubygems.org'
gem "bunny"
$ bundle install --path lib
Put/Getのテスト
準備はできたので、次のようなRubyスクリプトを配置します。
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require
require 'bunny'
conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "secret")
conn.start
ch = conn.create_channel
## x-max-lengthなどをQueueに設定している場合には、次のように:argumentsに設定を加える
q = ch.queue("testq",
durable: true,
arguments: { 'x-max-length' => 1024 ,
'x-max-length-bytes' => 1048576,
'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更
q.publish("Hello", persistent: true)
ch.close
conn.close
ここでは、送信するだけで結果は受け取れません。受信用には次のスクリプトを作成しています。
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require
require 'bunny'
conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "zaq12wsx")
conn.start
ch = conn.create_channel
q = ch.queue("testq",
durable: true,
arguments: { 'x-max-length' => 1024,
'x-max-length-bytes' => 1048576,
'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更
puts "Message Count: #{q.message_count}"
delivery_info, metadata, payload = q.pop
puts "Received: #{payload}"
ch.close
conn.close
Subscribeのテスト
実際には、アプリケーション側でメッセージが届くまでWaitしたいので、get.rbのコードを少し変更しました。
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require
require 'bunny'
conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "zaq12wsx")
conn.start
ch = conn.create_channel
q = ch.queue("testq",
durable: true,
arguments: { 'x-max-length' => 1024,
'x-max-length-bytes' => 1048576,
'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更
q.subscribe(manual_ack: true) do |delivery_info, metadata, payload|
puts "-------"
puts "Message Count: #{q.message_count}"
puts "routing_key: #{delivery_info.routing_key}"
puts "Received: #{payload}"
ch.ack(delivery_info.delivery_tag)
sleep 10
end
## waiting for never
loop { sleep 5 }
ch.close
conn.close
Prometheusからの監視
metrics.enabled=trueを設定しているため、9419番ポートにアクセスすることができるようになっています。
$ curl http://192.168.1.120:9419/metrics
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.000149472
go_gc_duration_seconds{quantile="0.25"} 0.000268979
...
# HELP rabbitmq_sockets_used File descriptors used as sockets.
# TYPE rabbitmq_sockets_used gauge
rabbitmq_sockets_used{node="rabbit@myrabbitmq-0.myrabbitmq-headless.rabbitmq.svc.cluster.local"} 0
rabbitmq_sockets_used{node="rabbit@myrabbitmq-1.myrabbitmq-headless.rabbitmq.svc.cluster.local"} 0
# HELP rabbitmq_up Was the last scrape of rabbitmq successful.
# TYPE rabbitmq_up gauge
rabbitmq_up 1
- job_name: rabbitmq-prod
scheme: http
metrics_path: /metrics
static_configs:
- targets:
- 192.168.1.120:9419
labels:
group: "rabbitmq"
Grafanaから確認する場合には、以下のDashboard(Easy RabbitMQ)が登録されています。
以上