Wednesday, April 13, 2011

Converting Message of RabbitMQ to JSON using JsonMessageConverter in Async Mode using Spring.


When we talk about RabbitMQ in Async Mode using spring and has the requirement where message is send by application working in different language, in that case it becomes mandatory for us use the default JsonMessageConverter provided by Spring for AMQP. This class helps us to marshal and demarshal the message before sending to the Exchange/Queue and receiving it to client.
Please find the below code TestSendAsycRMQ class send the message into the Exchange testAsyncExchange with routing key testAsyncRK, when the message enter the queue testAsyncQueue as per the configuration in rabbitConfiguration.xml it will invoke messageListener which is our receiver class rabbitMQ.TestReceiverAsycRMQ.
How the Spring flow work here:
We are providing SimpleMessageListenerContainer class with connectionFactory and queuename along with listener. connectionFactory helps SimpleMessageListenerContainer to get the connection. Further in our TestSendAsycRMQ we create the instance of RabbitTemplate which will refer to bean id=”rabbitTemplate” of rabbitConfiguration.xml by initializing it with default messageConverter as JsonMessageConverter.
Note: To work with JsonMessageConverter for AMQP we need to have jackson-mapper-asl-1.5.8.jar and jackson-core-asl-1.5.3.jar in your class path.

TestSendAsycRMQ:
package rabbitMQ;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TestSendAsycRMQ {
Connection conn = null;
ConnectionFactory factory;
Channel chan;
public void sendAsyncRabbitMQMessage(String strQueueName)
{
try{
ConfigurableApplicationContext context = new ClassPathXmlApplicationContext(“/rabbitConfiguration.xml”);
RabbitTemplate rabbitTemplate = (RabbitTemplate)context.getBean(“rabbitTemplate”);
String exchangeName = “testAsyncExchange”;
rabbitTemplate.setExchange(exchangeName);
String routingKey = “”;
if(strQueueName.trim().equals(“testAsyncQueue”))
{
for(int i=0;i<5;i++)
{
routingKey = “testAsyncRK”;
rabbitTemplate.setRoutingKey(routingKey);
rabbitTemplate.setQueue(“testAsyncQueue”);
rabbitTemplate.convertAndSend(“hi”);
}
}System.out.println(“Completed the flow:”);
}
catch(Exception e)
{
e.printStackTrace();
}
}
public static void main(String agrs[])
{
TestSendAsycRMQ objTestSendAsycRabbitMQ = new TestSendAsycRMQ();
objTestSendAsycRabbitMQ.sendAsyncRabbitMQMessage(“testAsyncQueue”);
}
}
TestReceiverAsycRMQ:
package rabbitMQ;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.support.converter.JsonMessageConverter;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class TestReceiverAsycRMQ implements MessageListener{
Connection conn = null;
ConnectionFactory factory;
Channel chan;
@Override
public void onMessage(Message message) {
try{
JsonMessageConverter jmc = new JsonMessageConverter();
String u = (String)jmc.fromMessage(message);
System.out.println(“received: ” + u);
System.out.println(“received: ” + message);
}catch(Exception ex)
{
ex.printStackTrace();
}
}
}
rabbitConfiguration.xml:

 id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory">







 id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
p:connectionFactory-ref="connectionFactory"
p:messageConverter-ref="messageConverter"




 id="messageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">





 id="messageListener" class="rabbitMQ.TestReceiverAsycRMQ">





class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:queueName="testAsyncQueue"
p:messageListener-ref="messageListener" 
p:concurrentConsumers="10"
p:channelTransacted="true"
p:prefetchCount="1"


class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:queueName="testQueue"
p:messageListener-ref="messageListener"


rabbitConfiguration.xml with Spring Frame for Exchange/Queue:




 id="connectionFactory" class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory">







 id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate"
p:connectionFactory-ref="connectionFactory"
p:messageConverter-ref="messageConverter"
p:queue="testAsyncQueue, testQueue"
p:routingKey="testAsyncRK"
p:exchange="testAsyncExchange"




 id="messageConverter" class="org.springframework.amqp.support.converter.JsonMessageConverter">




 id="messageListener" class="rabbitMQ.TestReceiverAsycRMQ">




 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:queueName="testAsyncQueue"
p:messageListener-ref="messageListener" 
p:concurrentConsumers="10"
p:channelTransacted="true"
p:prefetchCount="1"


 class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
p:connectionFactory-ref="connectionFactory"
p:queueName="testQueue"
p:messageListener-ref="messageListener"