Java
Scala
RabbitMQ
amqp
container

Process Communication using AMQP with RabbitMQ

More than 1 year has passed since last update.

Setup RabbitMQ

To setup RabbitMQ, follow this tutorial: https://www.rabbitmq.com/tutorials/tutorial-one-java.html
(Currently I am using Scala for development so I needed to convert the example code written in Java into Scala.)

If your project is a maven project, you need to add the following lines into your pom.xml file.

<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>4.2.0</version>
</dependency>

Then, start RabbitMQ service

service rabbitmq-server start

RabbitMQ Sender

Following is a simple code of RabbitMQ Sender

import com.rabbitmq.client.{Channel, Connection, ConnectionFactory}
object rabbitMQ_sender {
  def main(args: Array[String]) {

    val factory: ConnectionFactory = new ConnectionFactory()
    factory.setHost("localhost")
    val connection: Connection = factory.newConnection
    val channel: Channel = connection.createChannel

    val queue_to_send = "test"
    channel.queueDeclare(queue_to_send, false, false, false, null)
    val message = "Hello World!"
    channel.basicPublish("", queue_to_send, null, message.getBytes)
    System.out.println(" [x] Sent '" + message + "'")

    channel.close
    connection.close()
  }
}

In the below figure, P represents RabbitMQ Sender (P means producer). When you run the above code, the sender send a message "Hello World" to Queue Stack (red blocks in the figure). Then, the Receiver (C: consumer) receives the message.

image

RabbitMQ Receiver

Following is a simple code of RabbitMQ Receiver.

import com.rabbitmq.client._
object rabbitMQ_receiver {
  def main(args: Array[String]) {
      try {
        val factory: ConnectionFactory  = new ConnectionFactory()
        factory.setHost("localhost")
        val connection: Connection  = factory.newConnection()
        val channel: Channel  = connection.createChannel()

        val QUEUE_NAME = "test"
        channel.queueDeclare(QUEUE_NAME, false, false, false, null)
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C")
        val consumer:Consumer  = new DefaultConsumer(channel) {
          override def handleDelivery(consumerTag: String, envelope: Envelope , properties: AMQP.BasicProperties , body: Array[Byte] ) = {
            val message: String  = new String (body, "UTF-8")
            System.out.println (" [x] Received '" + message + "'")
          }
        }
        channel.basicConsume (QUEUE_NAME, true, consumer)
      }
  }
}

While RabbitMQ Receiver is running, it returns the below result every time it gets message from a sender.

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'

Test: Process handling using RabbitMQ

So now, how can I implement these RabbitMQ Sender/Receiver into my coding?

Here is what I am going to do.

  1. Start 2 processes (process A and B) at the same time
  2. But process B should stop its process at some point and wait process A finishes its job
  3. Process A send a message when it completed its job
  4. Once process B get a message from process A, it restarts its job

In process A, call the following function where you want process A to send AMQP message, in my case it should be the end of main() function, so that process B can be notified the job completion.

  def send_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
    val factory: ConnectionFactory = new ConnectionFactory()
    factory.setHost(rabbitmq_host)
    val connection: Connection = factory.newConnection
    val channel: Channel = connection.createChannel

    val queue_to_send = queue_name
    channel.queueDeclare(queue_to_send, false, false, false, null)
    channel.basicPublish("", queue_to_send, null, stop_message.getBytes)
    System.out.println(" [x] Sent '" + stop_message + "'")
    channel.close
    connection.close()
  }

In process B, "RabbitMQ receiver" code should be modified a little bit like below. By adding while loop in the receiver, it waits until get a specific message from queue (in this case, a message from process B. These 2 messages need to be identical). Define the below function and call it where you want to stop process B until process A finishes its job.
Note that, messages are stuck in a queue whose name is the same one which is defined in channel.queueDeclare(QUEUE_NAME, false, false, false, null). Also, you can specify the location of RabbitMQ server with factory.setHost().

    def get_ampq_task_completion(rabbitmq_host: String, queue_name: String, stop_message: String) = {
      try {
        val factory: ConnectionFactory  = new ConnectionFactory()
        factory.setHost("localhost")
        val connection: Connection  = factory.newConnection()
        val channel: Channel  = connection.createChannel()

        val QUEUE_NAME = queue_name
        channel.queueDeclare(QUEUE_NAME, false, false, false, null)
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C")

        var enough: Boolean = false
        while (!enough) {
          val consumer: Consumer = new DefaultConsumer(channel) {
            override def handleDelivery(consumerTag: String, envelope: Envelope, properties: AMQP.BasicProperties, body: Array[Byte]) = {
              val message: String = new String(body, "UTF-8")
              System.out.println(" [x] Received '" + message + "'")
              if (message == stop_message) {enough = true}
            }
          }
          channel.basicConsume(QUEUE_NAME, true, consumer)
        }
        channel.close()
        connection.close()
    }
  }

In addition, I defined the below function which remove all messages in a queue.
Once you send a message to RabbitMQ server and it is stacked in a queue, unless a consumer reads the message, it will never be removed (and this is one of the beneficials of AMQP service). Therefore, before starting the 2 processes, execute the following function and clear all old messages to avoid that process B mistakenly read an old message from process A and move ahead without waiting process A completion.

    def clear_ampq_queues(rabbitmq_host: String, queue_name: String) = {
      val factory: ConnectionFactory  = new ConnectionFactory()
      factory.setHost("localhost")
      val connection: Connection  = factory.newConnection()
      val channel: Channel  = connection.createChannel()
      channel.queueDelete(queue_name)
      channel.close()
      connection.close()
    }

The above codes are only applicable to sequential processes, but what I am trying to develop is like multiple processes which runs on containers in parallel and they send message after completing its job to a controller (which should be receiver in the example above). So I am going to modify the code for parallel processing.