Saturday, November 1, 2014

Publish and Consume messages to RabbitMQ with Spring AMQP

Earlier in this post, I have demonstrated publishing and consuming messages to RabbitMQ message broker using RabbitMQ client library. When we are working on application which uses Spring framework, it makes sense to use spring-amqp library instead. However, it is not to say RabbitMQ client library cannot be used with Spring. Using spring-amqp offers advantages of loose coupling with message broker which enables to switch from one Message Broker to another without any major changes in the code.


Setup
  • Configure maven dependency of Java client in pom.xml as below:
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-amqp</artifactId>
        <version>1.3.6.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>1.3.6.RELEASE</version>
    </dependency>
Get Started
Step 1 : Setup Spring Application Context for connection with MQ
<bean class="java.net.URI" id="amqpUrl">
        <constructor-arg value="amqp://guest:guest@localhost:5672"/>
</bean>

<!-- Connection Factory -->
<rabbit:connection-factory id="amqpConnectionFactory"
  host="#{@amqpUrl.getHost()}"
  port="#{@amqpUrl.getPort()}"
  username="#{ @amqpUrl.getUserInfo().split(':')[0] }"
  password="#{ @amqpUrl.getUserInfo().split(':')[1] }"
  channel-cache-size="25" />

<!-- Spring AMQP Template -->
<rabbit:template id="amqpTemplate" connection-factory="amqpConnectionFactory" retry-template="retryTemplate"/>

<!-- in case connection is broken then Retry based on the below policy -->
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
 <property name="backOffPolicy">
  <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
   <property name="initialInterval" value="500" />
   <property name="multiplier" value="2" />
   <property name="maxInterval" value="30000" />
  </bean>
 </property>
</bean>

<!-- Spring AMQP Admin -->
<rabbit:admin id="amqpAdmin" connection-factory="amqpConnectionFactory"/>

Step 2 : Producer Class
import java.util.Date;
import java.util.Random;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;

@Component
public class QueuePublishProcess {
    
    public static final String QUEUE_NAME = "test.queue";

    @Autowired 
    private AmqpAdmin admin;

    @Autowired 
    private AmqpTemplate template;

    public static void main(String[] args) throws Exception {
          ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
          QueuePublishProcess publisher = context.getBean(QueuePublishProcess.class);
          publisher.setup();
          publisher.publish();
    }

    private void setup() {
        Exchange exchange = DirectExchange.DEFAULT;
        boolean durable = true;

        for (int i=1; i<=5; i++) {
            String queueName = "test.queue."+i;
            Queue q = new Queue(queueName, durable, false, true);
            admin.declareQueue(q);
            BindingBuilder.bind(q).to(exchange).with(queueName);
            System.out.println("Bounded queue " + queueName);
        }
    }

    private void publish() throws Exception {
        for (int i=0; i<10; i++) {
            try {
                String sent = i + " Catch the rabbit! " + new Date();

                String queueName = generateQueueName();

                // write message
                template.convertAndSend(queueName, sent );
                System.out.println( "Msg Sent to " + queueName + " :  " + sent );
                Thread.sleep(3000);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    private String generateQueueName() {
        Random rand = new Random();

        // nextInt is normally exclusive of the top value,
        // so add 1 to make it inclusive
        int randomNum = rand.nextInt((5 - 1) + 1) + 1;
        return "test.queue."+randomNum ;
    }    
}
Step 3 : Consumer Class
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.stereotype.Component;

@Component
public class QueueConsumeProcess implements MessageListener{
    
    @Autowired
    private CachingConnectionFactory connectionFactory;
    
    public static void main(String[] argv) throws Exception {
          ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
          QueueConsumeProcess consumer = context.getBean(QueueConsumeProcess.class);
            
          consumer.consume();

    }

    private void consume() {
          SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
          container.setConnectionFactory(connectionFactory);
          container.setQueueNames("test.queue.1", "test.queue.2", "test.queue.3", "test.queue.4", "test.queue.5");
          container.setMessageListener(new MessageListenerAdapter(this));
          container.start();
    }

    @Override
    public void onMessage(Message msg) {
        System.out.println(new String(msg.getBody()));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Final round up... This post attempts to demonstrate producing and consuming messages to/from RabbitMQ message broker. The Message consumer implements a MessageListener which calls onMessage() whenever a message is available for consumption.

Hope this post proves to be helpful in your journey with Java.

No comments:

Post a Comment