Spring 整合JMS 基於ActiveMQ 實現消息的發送接收
一、下載並安裝ActiveMQ
首先我們到apache官網上下載activeMQ(Apache ActiveMQ ? -- Download),進行解壓後運行其bin目錄下面的activemq.bat文件啟動activeMQ。
二、Spring中加入ActiveMQ的配置
首先將相關的jar拷貝到項目的lib文件下
配置之前先看一下相關目錄以便於理解
下面開始配置
<!-- ActiveMQ 連接工廠 -->
<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <!-- <property name="brokerURL" value="tcp://192.168.1.79:61616" /> --> <property name="brokerURL" value="${mqUrl}" /> </bean> <!-- Spring Caching連接工廠 --> <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"><!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
<property name="targetConnectionFactory" ref="connectinFactory"></property> <!-- Session緩存數量 --> <property name="sessionCacheSize" value="10"></property> </bean><!-- 配置消息發送目的地方式 -->
<!-- Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 --><bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="q.notify"></constructor-arg> </bean><!-- 目的地:Topic主題 :放入一個消息,所有訂閱者都會收到 -->
<!--這個是主題目的地,一對多的--> <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic"> <constructor-arg value="t.notify"></constructor-arg> </bean> <!-- Spring JMS Template 配置JMS模版 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory" /> </bean> <!-- 使用Spring JmsTemplate 的消息生產者 --><bean id="queueMessageProducer" class="com.common.jms.QueueMessageProducer">
<property name="jmsTemplate" ref="jmsTemplate"></property> <property name="notifyQueue" ref="notifyQueue"></property> <property name="messageConverter" ref="messageConverter"></property> </bean> <bean id="topicMessageProducer" class="com.common.jms.TopicMessageProducer"> <property name="jmsTemplate" ref="jmsTemplate"></property> <property name="notifyTopic" ref="notifyTopic"></property> <property name="messageConverter" ref="messageConverter"></property> </bean><!-- 消息消費者 一般使用spring的MDP非同步接收Queue模式 -->
<!-- 消息監聽容器 --> <bean id="queueContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectinFactory"></property> <property name="destination" ref="notifyQueue"></property> <property name="messageListener" ref="queueMessageListener"></property> </bean> <!-- 消息監聽容器 --> <bean id="topicContainer"class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectinFactory"></property> <property name="destination" ref="notifyTopic"></property> <property name="messageListener" ref="topicMessageListener"></property> <!-- 發布訂閱模式 --> <property name="pubSubDomain" value="true" /></bean>
<!-- 非同步接收消息處理類 --> <bean id="queueMessageListener" class="com.common.jms.QueueMessageListener"> <property name="messageConverter" ref="messageConverter"></property></bean>
<bean id="topicMessageListener" class="com.common.jms.TopicMessageListener"> <property name="messageConverter" ref="messageConverter"></property> </bean> <bean id="messageConverter" class="com.common.jms.NotifyMessageConverter"> </bean>下面展示一下Sender
public class Sender {
private static ServletContext servletContext; private static WebApplicationContext ctx;/**
* 發送點對點信息 * @param noticeInfo */ public static void setQueueSender(){ servletContext = ServletActionContext.getServletContext(); ctx = WebApplicationContextUtils.getWebApplicationContext(servletContext); QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) ctx.getBean("queueMessageProducer")); PhoneNoticeInfo noticeInfo = new PhoneNoticeInfo();(下面先展示PhoneNoticeInfo 然後是 QueueMessageProducer )
noticeInfo.setNoticeContent("Hello Word");
noticeInfo.setNoticeTitle("hello Word"); noticeInfo.setReceiver("hello"); noticeInfo.setReceiverPhone("1111111"); notifyMessageProducer.sendQueue(noticeInfo); }public static ServletContext getServletContext() {
return servletContext; } public static void setServletContext(ServletContext servletContext) { Sender.servletContext = servletContext; } public static WebApplicationContext getCtx() { return ctx; } public static void setCtx(WebApplicationContext ctx) { Sender.ctx = ctx; } }PhoneNoticeInfo
public class PhoneNoticeInfo implements Serializable {
/** 消息標題 */ public String noticeTitle; /** 消息內容 */ public String noticeContent; /** 接收者 */ public String receiver; /** 接收手機號 */ public String receiverPhone; public String getNoticeTitle() { return noticeTitle; } public void setNoticeTitle(String noticeTitle) { this.noticeTitle = noticeTitle; } public String getNoticeContent() { return noticeContent; } public void setNoticeContent(String noticeContent) { this.noticeContent = noticeContent; } public String getReceiver() { return receiver; } public void setReceiver(String receiver) { this.receiver = receiver; }public String getReceiverPhone() {
return receiverPhone; } public void setReceiverPhone(String receiverPhone) { this.receiverPhone = receiverPhone; }}QueueMessageProducer
/**
* 消息生產者服務類 */public class QueueMessageProducer { private JmsTemplate jmsTemplate; private Destination notifyQueue; private NotifyMessageConverter messageConverter; public void sendQueue(PhoneNoticeInfo noticeInfo){ sendMessage(noticeInfo); } private void sendMessage(PhoneNoticeInfo noticeInfo) { // TODO Auto-generated method stub jmsTemplate.setMessageConverter(messageConverter); jmsTemplate.setPubSubDomain(false); jmsTemplate.convertAndSend(notifyQueue,noticeInfo); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; } public Destination getNotifyQueue() { return notifyQueue; } public void setNotifyQueue(Destination notifyQueue) { this.notifyQueue = notifyQueue; } public NotifyMessageConverter getMessageConverter() { return messageConverter; } public void setMessageConverter(NotifyMessageConverter messageConverter) { this.messageConverter = messageConverter; }}NotifyMessageConverter
/**
* 消息轉換 */public class NotifyMessageConverter implements MessageConverter { private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class); @Override /** * 轉換接收到的消息為NoticeInfo對象 */ public Object fromMessage(Message message) throws JMSException, MessageConversionException { // TODO Auto-generated method stub if (logger.isDebugEnabled()) { logger.debug("Receive JMS message :"+message); } if (message instanceof ObjectMessage) { ObjectMessage oMsg = (ObjectMessage)message; if (oMsg instanceof ActiveMQObjectMessage) { ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage)oMsg; try { PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)aMsg.getObject(); return noticeInfo; } catch (Exception e) { // TODO: handle exception logger.error("Message:${} is not a instance of NoticeInfo."+message.toString()); throw new JMSException("Message:"+message.toString()+"is not a instance of NoticeInfo."+message.toString()); } }else{ logger.error("Message:${} is not a instance of ActiveMQObjectMessage."+message.toString()); throw new JMSException("Message:"+message.toString()+"is not a instance of ActiveMQObjectMessage."+message.toString()); } }else { logger.error("Message:${} is not a instance of ObjectMessage."+message.toString()); throw new JMSException("Message:"+message.toString()+"is not a instance of ObjectMessage."+message.toString()); } }@Override
/** * 轉換NoticeInfo對象到消息 */ public Message toMessage(Object obj, Session session) throws JMSException, MessageConversionException { // TODO Auto-generated method stub if (logger.isDebugEnabled()) { logger.debug("Convert Notify object to JMS message:${}"+obj.toString()); } if (obj instanceof PhoneNoticeInfo) { ActiveMQObjectMessage msg = (ActiveMQObjectMessage)session.createObjectMessage(); msg.setObject((PhoneNoticeInfo)obj); return msg; }else { logger.debug("Convert Notify object to JMS message:${}"+obj.toString()); } return null; }}
QueueMessageListener
public class QueueMessageListener implements MessageListener {
private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class); private NotifyMessageConverter messageConverter; /** * 接收消息 */ @Override public void onMessage(Message message) { // TODO Auto-generated method stub try { ObjectMessage objectMessage = (ObjectMessage)message; PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)messageConverter.fromMessage(objectMessage); System.out.println("queue收到消息"+noticeInfo.getNoticeContent()); System.out.println("model:"+objectMessage.getJMSDeliveryMode()); System.out.println("destination:"+objectMessage.getJMSDestination()); System.out.println("type:"+objectMessage.getJMSType()); System.out.println("messageId:"+objectMessage.getJMSMessageID()); System.out.println("time:"+objectMessage.getJMSTimestamp()); System.out.println("expiredTime:"+objectMessage.getJMSExpiration()); System.out.println("priority:"+objectMessage.getJMSPriority()); } catch (Exception e) { // TODO: handle exception logger.error("處理信息時發生異常",e); } } public NotifyMessageConverter getMessageConverter() { return messageConverter; } public void setMessageConverter(NotifyMessageConverter messageConverter) { this.messageConverter = messageConverter; }}推薦閱讀: