LoginSignup
0
0

More than 5 years have passed since last update.

Process Communication using AMQP with RabbitMQ

Last updated at Posted at 2017-08-09

Setup RabbitMQ

To setup RabbitMQ, follow this tutorial: https://www.rabbitmq.com/tutorials/tutorial-one-java.html
(Currently I am using Scala for development so I needed to convert the example code written in Java into Scala.)

If your project is a maven project, you need to add the following lines into your pom.xml file.

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.2.0</version>
</dependency>

Then, start RabbitMQ service

service rabbitmq-server start

RabbitMQ Sender

Following is a simple code of RabbitMQ Sender

import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
object rabbitMQ_sender {
  def main(args: Array[String]) {

    val factory: ConnectionFactory = new ConnectionFactory()
    factory.setHost("localhost")
    val connection: Connection = factory.newConnection
    val channel: Channel = connection.createChannel

    val queue_to_send = "test"
    channel.queueDeclare(queue_to_send, false, false, false, null)
    val message = "Hello World!"
    channel.basicPublish("", queue_to_send, null, message.getBytes)
    System.out.println(" [x] Sent '" + message + "'")

    channel.close
    connection.close()
  }
}

In the below figure, P represents RabbitMQ Sender (P means producer). When you run the above code, the sender send a message "Hello World" to Queue Stack (red blocks in the figure). Then, the Receiver (C: consumer) receives the message.

image

RabbitMQ Receiver

Following is a simple code of RabbitMQ Receiver.

import com.rabbitmq.client._
object rabbitMQ_receiver {
  def main(args: Array[String]) {
      try {
        val factory: ConnectionFactory  = new ConnectionFactory()
        factory.setHost("localhost")
        val connection: Connection  = factory.newConnection()
        val channel: Channel  = connection.createChannel()

        val QUEUE_NAME = "test"
        channel.queueDeclare(QUEUE_NAME, false, false, false, null)
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
        val consumer:Consumer  = new DefaultConsumer(channel) {
          override def handleDelivery(consumerTag: String, envelope: Envelope , properties: AMQP.BasicProperties , body: Array[Byte] ) = {
            val message: String  = new String (body, "UTF-8")
            System.out.println (" [x] Received '" + message + "'")
          }
        }
        channel.basicConsume (QUEUE_NAME, true, consumer)
      }
  }
}

While RabbitMQ Receiver is running, it returns the below result every time it gets message from a sender.

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

Test: Process handling using RabbitMQ

So now, how can I implement these RabbitMQ Sender/Receiver into my coding?

Here is what I am going to do.

  1. Start 2 processes (process A and B) at the same time
  2. But process B should stop its process at some point and wait process A finishes its job
  3. Process A send a message when it completed its job
  4. Once process B get a message from process A, it restarts its job

In process A, call the following function where you want process A to send AMQP message, in my case it should be the end of main() function, so that process B can be notified the job completion.

  def send_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
    val factory: ConnectionFactory = new ConnectionFactory()
    factory.setHost(rabbitmq_host)
    val connection: Connection = factory.newConnection
    val channel: Channel = connection.createChannel

    val queue_to_send = queue_name
    channel.queueDeclare(queue_to_send, false, false, false, null)
    channel.basicPublish("", queue_to_send, null, stop_message.getBytes)
    System.out.println(" [x] Sent '" + stop_message + "'")
    channel.close
    connection.close()
  }

In process B, "RabbitMQ receiver" code should be modified a little bit like below. By adding while loop in the receiver, it waits until get a specific message from queue (in this case, a message from process B. These 2 messages need to be identical). Define the below function and call it where you want to stop process B until process A finishes its job.
Note that, messages are stuck in a queue whose name is the same one which is defined in channel.queueDeclare(QUEUE_NAME, false, false, false, null). Also, you can specify the location of RabbitMQ server with factory.setHost().

    def get_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
      try {
        val factory: ConnectionFactory  = new ConnectionFactory()
        factory.setHost("localhost")
        val connection: Connection  = factory.newConnection()
        val channel: Channel  = connection.createChannel()

        val QUEUE_NAME = queue_name
        channel.queueDeclare(QUEUE_NAME, false, false, false, null)
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C")

        var enough: Boolean = false
        while (!enough) {
          val consumer: Consumer = new DefaultConsumer(channel) {
            override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]) = {
              val message: String = new String(body, "UTF-8")
              System.out.println(" [x] Received '" + message + "'")
              if (message == stop_message) {enough = true}
            }
          }
          channel.basicConsume(QUEUE_NAME, true, consumer)
        }
        channel.close()
        connection.close()
    }
  }

In addition, I defined the below function which remove all messages in a queue.
Once you send a message to RabbitMQ server and it is stacked in a queue, unless a consumer reads the message, it will never be removed (and this is one of the beneficials of AMQP service). Therefore, before starting the 2 processes, execute the following function and clear all old messages to avoid that process B mistakenly read an old message from process A and move ahead without waiting process A completion.

    def clear_ampq_queues(rabbitmq_host: String, queue_name: String) = {
      val factory: ConnectionFactory  = new ConnectionFactory()
      factory.setHost("localhost")
      val connection: Connection  = factory.newConnection()
      val channel: Channel  = connection.createChannel()
      channel.queueDelete(queue_name)
      channel.close()
      connection.close()
    }

The above codes are only applicable to sequential processes, but what I am trying to develop is like multiple processes which runs on containers in parallel and they send message after completing its job to a controller (which should be receiver in the example above). So I am going to modify the code for parallel processing.

0
0
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0