LoginSignup
19
22

More than 5 years have passed since last update.

amqpを使ってRabbitMQのキューを操作する

Last updated at Posted at 2015-03-25

使用するバージョン

  • RabbitMQ 3.3.5
  • amqp 1.5.0

条件

  • vhostは/production
  • ユーザはuser1、パスワードはpassとする

キューにメッセージを追加する(基本)

# -*- coding: utf-8 -*-
require 'amqp'

# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
     # channelを作成
     channel  = AMQP::Channel.new(connection)

     # queue1というキューを作成
     # キューが無い場合は作成(既にある場合もエラーにはならない)
     queue = channel.queue("queue1")

     # exchangeを作成
     exchange = channel.default_exchange

     # queue1にmessageを追加
     exchange.publish("Hello, world!", :routing_key => queue.name)
end

キューを確認すると登録されている

$ sudo /usr/sbin/rabbitmqctl list_queues -p /production

Listing queues ...
queue1  1

ただし、このやり方だとRabbitMQサーバーが再起動 or 落ちた場合、キュー+メッセージは消えてしまう

キューにメッセージを追加する(メッセージを永続)

RabbitMQサーバーが再起動 or 落ちてもキュー+メッセージが残るようにするには、durablepersistentを指定する

# -*- coding: utf-8 -*-
require 'amqp'

# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
  # channelを作成
  channel  = AMQP::Channel.new(connection)

  # queue1というキューを作成
  # :durable => trueとすることで、再起動してもキューは残る
  queue = channel.queue("queue1", :nowait => false, :durable => true)

  # exchangeを作成
  exchange = channel.default_exchange

  # queue1にmessageを追加
  # :persistent => trueとすることで、再起動してもメッセージは残る
  exchange.publish("Hello, world!", :routing_key => queue.name, :persistent => true)
end

キューからメッセージを取得(subscribe)

subscribeを使う場合、キューにメッセージが登録されるごとにsubscribeが実行される(メッセージが登録されない限り実行されない)

# -*- coding: utf-8 -*-
require 'amqp'

# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
  # channelを作成
  channel = AMQP::Channel.new(connection)

  # queue1を作成
  queue = channel.queue("queue1", :nowait => false, :durable => true)

  # queue1からmessageを取得
  # :ack => trueとすることで、ackを返さない限り、messageは削除されない
  # :ack => falseの場合、messageを取得した時点でRabbitMQサーバーからmessageが削除される
  queue.subscribe(:ack => true) do |hdr, msg|
    p "queue  = #{queue.name}" #=> "queue  = queue1"
    p "message= #{msg}"        #=> "message= Hello, world!"

    # RabbitMQのmessageを削除
    hdr.ack
  end
end
  • :ack => trueの場合、RabbitMQのメッセージを削除するにはackが必要。
  • ackする前にアプリが落ちた(RabbitMQとのコネクションが切れた)場合は、メッセージはRabbitMQサーバーに戻るので、再取得が可能(メッセージは消失しない)

キューからメッセージを取得(pop)

キューからメッセージを取得するもう一つの方法はpopがある。
popは明示的に(同期)してRabbitMQのキューからメッセージを取得する。
subscribeと違い、キューにメッセージが登録されても、されなくても処理される。その為、メッセージが無い場合は、nilが返るので、nilチェックが必要。

# -*- coding: utf-8 -*-
require 'amqp'

# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|
  # channelを作成
  channel = AMQP::Channel.new(connection)

  # queue1を作成
  queue = channel.queue("queue1", :nowait => false, :durable => true)

  # queue1から1秒おきにmessageを取得(※timerを使わないと取得できない)
  EventMachine.add_periodic_timer(1) do
    # コネクションが切れるときは処理しない
    next if connection and (connection.closing? || connection.closed?)

    queue.pop(:ack => true) do |hdr, msg|
      if msg
        p "queue  = #{queue.name}"  #=> "queue  = queue1"
        p "message= #{msg}"         #=> "message= Hello, world!"
        # RabbitMQのmessageを削除
        hdr.ack
      end
    end
  end
end

キューからメッセージを取得(コネクション断時の再接続)

キューを取得する場合、RabbitMQをポーリングすることになる。
その場合、RabbitMQとのコネクションが切れる場合があるので、connection.on_tcp_connection_losschannel.auto_recovery
コネクション再接続するようにする

# -*- coding: utf-8 -*-
require 'amqp'

# RabbitMQに接続
AMQP.start(:host => "localhost", :vhost => "/production", :user => "user1", :pass => "pass") do |connection, open_ok|

  # コネクションが切れたら再接続するようにする
  connection.on_tcp_connection_loss do |cl, settings|
    # 再接続
    cl.reconnect
  end

  # channelを作成
  channel = AMQP::Channel.new(connection)
  # channelが切れたら自動回復するようにする
  channel.auto_recovery = true

  # queue1を作成
  queue = channel.queue("queue1", :nowait => false, :durable => true)

  # queue1からmessageを取得
  queue.subscribe(:ack => true) do |hdr, msg|
    p "queue  = #{queue.name}" #=> "queue  = queue1"
    p "message= #{msg}"        #=> "message= Hello, world!"

    # RabbitMQのmessageを削除
    hdr.ack
  end
end
19
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
19
22