1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Helmで導入したRabbitMQをRuby(Bunny)から利用した時のメモ

Last updated at Posted at 2020-04-03

はじめに

アプリケーション同士を連携させるのに、必ずしもAPIをHTTPで呼び出すことが適切でない場合もあります。
特にQueueに入れた処理を逐次処理させる場合には、アプリケーションが依頼を直接受け取ってしまうと、アプリ側で永続化の処理を考えなければいけませんし、負荷分散も難しくなります。

とりあえずPub/Subよりも単純なQueueをRabbitMQで実現する予定なので、その作業のメモを残しておきます。

前提

  • Kubernetes v1.15.11 (by Kubespray v2.11.2)
  • Rook/Cephが導入されている
  • MetalLBが導入されている

References

次の資料を参考にしました。

HelmでのRabbitMQの導入

やり直すことを考えて、処理のためにMakefileを準備しておきます。

Makefile

REPO_NAME = stable/rabbitmq
NAMESPACE = rabbitmq
REL_NAME = myrabbitmq

RMQ_OPTIONS = --set persistence.storageClass=rook-ceph-block \
    --set replicas=2 \
    --set service.type=LoadBalancer \
    --set service.loadBalancerIP=192.168.1.120 \
    --set persistence.storageClass=rook-ceph-block \
    --set rabbitmq.erlangCookie=9a63d47049016fd933371a76af08fc8f \
    --set rabbitmq.password=70550b0ac43a2e5c \
    --set metrics.enabled=true

.PHONY: init update fetch install upgrade 

init:
        kubectl create ns $(NAMESPACE)

update:
        helm repo update

fetch:
        helm fetch $(REPO_NAME)

install:
        helm install $(REPO_NAME) --name $(REL_NAME) --namespace $(NAMESPACE) $(RMQ_OPTIONS)

upgrade:
        helm upgrade ${REL_NAME} $(REPO_NAME) --namespace $(NAMESPACE) $(RMQ_OPTIONS)

導入の際の手順はおおむね次ようなものです。

k8s-masterノードでの作業
$ make fetch
$ ls
Makefile rabbitmq-6.18.2.tgz
## tgzファイルを展開し、values.yamlの内容を確認する
$ tar xvzf rabbitmq-6.18.2.tgz
$ less rabbitmq/values.yaml
## 他に変更する点がなければ導入する、あればMakefileのRMQ_OPTIONSに追記
$ make install

RMQ_OPTIONSに設定していた"service.type=LoadBalancer"の部分は環境に合わせて変更が必要だと思います。設定可能な項目については、rabbitmq/values.yamlを確認してください。

service.typeにLoadBalancerを指定しているので、4369,5672,15672の全ポートが公開されてしまっています。これで問題なければ良いですが、選択的にポートを絞って公開したい場合には、service.type=ClusterIPを指定したまま次のようなServiceを指定することもできます。

metadata.name,各namespace等は環境に合わせて変更してください
apiVersion: v1
kind: Service
metadata:
  name: my-release-rabbitmq-lb  
  labels:
    app: rabbitmq
  namespace: rabbitmq
spec:
  ports:
  - name: amqp
    port: 5672
    protocol: TCP
    targetPort: amqp
  - name: stats
    port: 15672
    protocol: TCP
    targetPort: stats
  selector:
    app: rabbitmq
    release: myrabbitmq
  type: LoadBalancer

RabbitMQの準備作業

現在のサービスは次のように公開されています。

svcの状態
$ kubectl -n rabbitmq get svc
NAME                  TYPE           CLUSTER-IP      EXTERNAL-IP    PORT(S)                                                         AGE
myrabbitmq            LoadBalancer   10.233.25.181   192.168.1.120   4369:31028/TCP,5672:32031/TCP,25672:31175/TCP,15672:32214,9419:32022/TCP   35m
myrabbitmq-headless   ClusterIP      None            <none>         4369/TCP,5672/TCP,25672/TCP,15672/TCP                           35m

http://192.168.1.120:15672/ から、RabbitMQのWeb UIにアクセスします。
Helmから導入したRabbitMQにログインする場合は、それぞれ、--setで、rabbitmq.username (default:user), rabbitmq.password に指定したものを利用します。今回は (username,password) = (user, 70550b0ac43a2e5c) を使用します。

Queueの作成

WebUIのQueuesタブをクリックし、新規のQueueを追加します。

  • Name: testq
  • Durable: yes

接続用Userの作成

Adminタブから新規のユーザーを追加します。

追加したら、そのユーザー名をクリックし、権限を付与します。

  • username: user01
  • password: secret

今回はテストなので、作成したユーザーにデフォルトで権限を付与しておきます。

  • vhost: /
  • Configure regexp: .*
  • Write regexp: .*
  • Read regexp: .*

Bunnyからの接続

Rubyのアプリケーションを作成するディレクトリに、Gemfileを準備し、bundleからlibディレクトリに配置します。

Gemfile
source 'https://rubygems.org'

gem "bunny"
bunnyライブラリのダウンロード
$ bundle install --path lib

Put/Getのテスト

準備はできたので、次のようなRubyスクリプトを配置します。

put.rb
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require

require 'bunny'

conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "secret")
conn.start
ch = conn.create_channel
## x-max-lengthなどをQueueに設定している場合には、次のように:argumentsに設定を加える
q = ch.queue("testq",
             durable: true,
             arguments: { 'x-max-length' => 1024 , 
                          'x-max-length-bytes' => 1048576, 
                          'x-queue-type' => 'classic' } )  ## arguments:の設定はQueue定義に応じて要変更

q.publish("Hello", persistent: true)

ch.close
conn.close

ここでは、送信するだけで結果は受け取れません。受信用には次のスクリプトを作成しています。

get.rb
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require

require 'bunny'

conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "zaq12wsx")
conn.start
ch = conn.create_channel
q = ch.queue("testq", 
             durable: true,
             arguments: { 'x-max-length' => 1024,
                          'x-max-length-bytes' => 1048576,
                          'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更
puts "Message Count: #{q.message_count}"

delivery_info, metadata, payload = q.pop
puts "Received: #{payload}"

ch.close
conn.close

Subscribeのテスト

実際には、アプリケーション側でメッセージが届くまでWaitしたいので、get.rbのコードを少し変更しました。

sub.rb
#!/usr/bin/ruby
#
require 'bundler/setup'
Bundler.require

require 'bunny'

conn = Bunny.new(host: "192.168.1.120", vhost: "/", user: "user01", password: "zaq12wsx")
conn.start
ch = conn.create_channel
q = ch.queue("testq", 
             durable: true,
             arguments: { 'x-max-length' => 1024,
                          'x-max-length-bytes' => 1048576,
                          'x-queue-type' => 'classic' } ) ## arguments:の設定はQueue定義に応じて要変更

q.subscribe(manual_ack: true) do |delivery_info, metadata, payload|
  puts "-------"
  puts "Message Count: #{q.message_count}"
  puts "routing_key: #{delivery_info.routing_key}"
  puts "Received: #{payload}"
  ch.ack(delivery_info.delivery_tag)
   
  sleep 10
end

## waiting for never
loop { sleep 5 }

ch.close
conn.close

Prometheusからの監視

metrics.enabled=trueを設定しているため、9419番ポートにアクセスすることができるようになっています。

$ curl http://192.168.1.120:9419/metrics
# HELP go_gc_duration_seconds A summary of the GC invocation durations.
# TYPE go_gc_duration_seconds summary
go_gc_duration_seconds{quantile="0"} 0.000149472
go_gc_duration_seconds{quantile="0.25"} 0.000268979
...
# HELP rabbitmq_sockets_used File descriptors used as sockets.
# TYPE rabbitmq_sockets_used gauge
rabbitmq_sockets_used{node="rabbit@myrabbitmq-0.myrabbitmq-headless.rabbitmq.svc.cluster.local"} 0
rabbitmq_sockets_used{node="rabbit@myrabbitmq-1.myrabbitmq-headless.rabbitmq.svc.cluster.local"} 0
# HELP rabbitmq_up Was the last scrape of rabbitmq successful.
# TYPE rabbitmq_up gauge
rabbitmq_up 1
/etc/prometheus/prometheus.ymlから抜粋
- job_name: rabbitmq-prod
  scheme: http
  metrics_path: /metrics
  static_configs:
   - targets:
       - 192.168.1.120:9419
     labels:
       group: "rabbitmq"

Grafanaから確認する場合には、以下のDashboard(Easy RabbitMQ)が登録されています。

以上

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?