Thursday, February 27, 2014

How to Publish and Consume messages using RabbitMQ?

With the evolution of distributed computing paradigm, often the processes are running on different physical platforms. However, as the processing becomes distributed, you start worrying about how the systems can be connected together. This elicit the need for a reliable communication mechanism which can send messages between discrete components of an application. Hence, Message Queues are developed which act as a broker that facilitate message passing which other services can access. 
In this topic, we will develop a small Producer-Consumer application using RabbitMQ.

A little background...

RabbitMQ is open source message broker solution that implements Advanced Message Queuing Protocol (AMQP).


Message Broker
Communication flow using a Message Broker

 There are three main entities in any Message Oriented Middleware (MOM) architecture:

  • Producer : This entity produces messages and publish them in a Queue. There can be many producers, producing different messages publishing in different queues managed by a single Message Broker.
  • Message Broker : This system is responsible for managing the messages in queues and pass them to consumer as and when they become available. There are several Message Queue implementations available and we will use RabbitMQ for our example.
  • Consumer : Consumers listens for a message on a single queue or multiple queues and processes them. 

Setup
  • Download and setup RabbitMQ. It can be downloaded from here
  • Configure maven dependency of Java client in pom.xml as below:
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>3.2.3</version>
    </dependency>
While message brokers such as RabbitMQ can be used to model a variety of schemes such as one to one message delivery or publisher/subscriber, our application will  be simple enough and have two basic components, a single producer, that will produce a message and a single consumer that will consume that message.

Get Started
Step 1 : Establish MQ Connection

Both Producer and Consumer will be using same endpoints for connecting to the MQ. Generalize that implementation in an abstract class  which will be extended by Producer and Consumer. Below is the abstract class which will establish the connection to a specified Queue in the Message Broker.
package com.javabydefault.queue;

import java.io.IOException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * Creates a connection to the Queue
 * 
 */
public abstract class MessageQueueEndPoint {
    protected Connection connection;
    protected Channel channel;
    protected String endPointName;
 
    private static final Logger logger = LoggerFactory.getLogger(MessageQueueEndPoint.class);

    public MessageQueueEndPoint(String queueName) {
        this.endPointName = queueName;
  
        //Create a connection factory
        ConnectionFactory factory = new ConnectionFactory();
        
        //Replace with the correct connection uri
        String uri = "amqp://userName:password@hostName:portNumber/virtualHost"; 
        try {
            factory.setUri(uri);
        
            //getting a connection
            connection = factory.newConnection();
     
            //creating a channel
            channel = connection.createChannel();
      
            //declaring a queue for this channel. If queue does not exist, it will be created on the server.
            //durability (second param) is also set as TRUE (the queue will survive a server restart)
            channel.queueDeclare(queueName, true, false, false, null);
        } catch (Exception e) {
            logger.error("Error connecting to MQ Server.", e);
        }
    }
 
 
    /**
     * Closes the Queue Connection. This is not needed to be called explicitly as connection closure happens implicitly anyways.
     * @throws IOException
     */
     public void close() throws IOException{
         this.connection.close(); //closing connection, closes all the open channels
     }
 
     public int getCurrentMessageCount() throws IOException {
         return getChannel().queueDeclarePassive(this.endPointName).getMessageCount();
     }
}
 
Step 2 : Producer Class

Producer class is responsible for publishing messages to the queue. We will be pushing an object to Message Queue. This object must be Serializable. We will use Apache's common Lang library to convert object to a byte array.
package com.javabydefault.queue;

import java.io.IOException;
import java.util.HashMap;

import org.apache.commons.lang.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Producer extends MessageQueueEndPoint {
    private static final Logger logger = LoggerFactory.getLogger(Producer.class);
    
    public Producer(String queueName) {
        super(queueName);
    }
    
    public void publishMessage(HashMap<String, Integer> msgMap) {
        try {
            channel.basicPublish("", endPointName, null, SerializationUtils.serialize(msgMap));
        } catch (IOException e) {
            logger.error("Error connecting to Queue." , e);
        }
    }
}

Step 3 : Consumer Class

Consumer class which is a Runnable implementation, can be kept running as a separate thread and processes the message as soon as it is available in the queue.
package com.javabydefault.queue;

import java.util.HashMap;

import org.apache.commons.lang.SerializationUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rabbitmq.client.QueueingConsumer;

public class Consumer extends MessageQueueEndPoint implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
    
    public Consumer(String queueName) {
        super(queueName);
    }
    
    public void run() {
       QueueingConsumer consumer = new QueueingConsumer(channel);
       //start consuming messages. Auto acknowledge messages.
       channel.basicConsume(endPointName, true, consumer);
  
       while (true) { //keep listening for the message
          try {
             HashMap<String, Integer> msgMap = consumeMessage(consumer);   
             logger.info("Message #" + msgMap.get("My Message") + " received from Queue.");
          } catch (IOException e) {
             logger.error("Error connecting to Queue." , e);
          }
       }
    }

    /**
     * Blocking method, return only when something is available from the Queue 
     * @return
     * @throws Exception
     */
    private HashMap<String, Integer> consumeMessage(QueueingConsumer consumer) throws IOException {
        QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //blocking call
        return (HashMap<String, Integer>) SerializationUtils.deserialize(delivery.getBody());
    }
}

Step 4 : Main Class

This Main class will put together Producer and Consumer.
package com.javabydefault.queue;

import java.util.HashMap;

public class QueueMain {
    
    public static void main(String[] argv) throws Exception {
    
        final String QUEUE_NAME = "MY_QUEUE";
    
        //Spawn Consumer Thread, which will always listening for the messages to be processed
        Consumer consumer = new Consumer(QUEUE_NAME);
        Thread consumerThread = new Thread(consumer);
        consumerThread.start();
    
        //Publishes msg in the queue
        Producer producer = new Producer(QUEUE_NAME);
    
        //Produce 100 msgs
        for (int i = 0; i < 100; i++) {
           HashMap message = new HashMap();
           message.put("My Message", i);
           producer.publishMessage(message);
           System.out.println("Message #"+ i +" sent to Queue.");
        }
    }
}
Final round up...
This post shows a simple implementation of Producer and Consumer using RabbitMQ message broker. In real world, it is likely to have multi-threaded Producers and Consumers. There are following things which needs to be taken care of in multi-threaded environment : 
  • If there are multiple threads acting as Producer writing to same Queue, then if each thread creates instance of Producer, they internally is creating a new connection with the Queue. Make Producer a singleton class so all the threads uses the same connection to publish the message. Same applies for Consumer as well.
  • Connection object of RabbitMQ is Thread-Safe, however RabbitMQ recommends to use separate Channel object for each thread. In those cases, create a Channel object for each thread and store in ThreadLocal object. Modify MessageQueueEndPoint class as below. Don't keep Channel as class level variable and use getChannel() wherever channel object is required. This will return a channel object allocated to each Thread.
    //RabbitMq advises you to use channels per Thread, so pooling channels per thread.
    private final ThreadLocal<channel> channels = new ThreadLocal<channel>();
    .....
    /**
     * Maintain and Return Thread specific channel objects
     * @return
     * @throws IOException
     */
    protected final Channel getChannel() throws IOException {
       Channel channel = channels.get();
       if (channel == null) {
          channel = connection.createChannel();
          channels.set(channel);
       }
       return channel;
    }
    
Refer this post to using spring-amqp instead of rabbit mq client.
Thanks for stopping by. Please drop your comments/suggestions below.

4 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. This is a very nicely written article, thank you.

    I'd like to see an article that addresses a RabbitMQ solution for the next level of complexity. I have a use-case that involves multiple Producers sending messages to multiple Consumers using Exchanges and Queues. Here is a stick diagram of the message flow. Let's imagine that each Producer and each Consumer is running on a separate machine (4 machines total). The requirement here is that each Consumer needs to receive the messages from both Producers (The 'X' in the stick figure is meant to show message flow between Producers and Consumers).

    Producer_1 --- Exhange_1 -- Queue_1 -- Consumer_1
    ...............................................................X
    Producer_2 --- Exhange_2 -- Queue_2 -- Consumer_2

    The article above shows how messages are received using a blocking call to consumer.nextDelivery(). What design changes are needed in order to support a Consumer that needs to receive messages from 2 Producers?

    Very interested in seeing suggested design approaches.

    ReplyDelete
  3. Hi Am geting the following :
    Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketTimeoutException: Timeout during Connection negotiation

    ReplyDelete
  4. Stolen from https://dzone.com/articles/getting-started-rabbitmq-java

    ReplyDelete