Spring 整合JMS 基於ActiveMQ 實現消息的發送接收

一、下載並安裝ActiveMQ

首先我們到apache官網上下載activeMQ(Apache ActiveMQ ? -- Download),進行解壓後運行其bin目錄下面的activemq.bat文件啟動activeMQ。

二、Spring中加入ActiveMQ的配置

首先將相關的jar拷貝到項目的lib文件下

配置之前先看一下相關目錄以便於理解

下面開始配置

<!-- ActiveMQ 連接工廠 -->

<!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->

<bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">

<!-- <property name="brokerURL" value="tcp://192.168.1.79:61616" /> -->

<property name="brokerURL" value="${mqUrl}" />

</bean>

<!-- Spring Caching連接工廠 -->

<!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->

<bean id="cachingConnectionFactory"

class="org.springframework.jms.connection.CachingConnectionFactory">

<!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->

<property name="targetConnectionFactory" ref="connectinFactory"></property>

<!-- Session緩存數量 -->

<property name="sessionCacheSize" value="10"></property>

</bean>

<!-- 配置消息發送目的地方式 -->

<!-- Queue隊列:僅有一個訂閱者會收到消息,消息一旦被處理就不會存在隊列中 -->

<bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">

<constructor-arg value="q.notify"></constructor-arg>

</bean>

<!-- 目的地:Topic主題 :放入一個消息,所有訂閱者都會收到 -->

<!--這個是主題目的地,一對多的-->

<bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">

<constructor-arg value="t.notify"></constructor-arg>

</bean>

<!-- Spring JMS Template 配置JMS模版 -->

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">

<property name="connectionFactory" ref="cachingConnectionFactory" />

</bean>

<!-- 使用Spring JmsTemplate 的消息生產者 -->

<bean id="queueMessageProducer" class="com.common.jms.QueueMessageProducer">

<property name="jmsTemplate" ref="jmsTemplate"></property>

<property name="notifyQueue" ref="notifyQueue"></property>

<property name="messageConverter" ref="messageConverter"></property>

</bean>

<bean id="topicMessageProducer" class="com.common.jms.TopicMessageProducer">

<property name="jmsTemplate" ref="jmsTemplate"></property>

<property name="notifyTopic" ref="notifyTopic"></property>

<property name="messageConverter" ref="messageConverter"></property>

</bean>

<!-- 消息消費者 一般使用spring的MDP非同步接收Queue模式 -->

<!-- 消息監聽容器 -->

<bean id="queueContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="connectinFactory"></property>

<property name="destination" ref="notifyQueue"></property>

<property name="messageListener" ref="queueMessageListener"></property>

</bean>

<!-- 消息監聽容器 -->

<bean id="topicContainer"

class="org.springframework.jms.listener.DefaultMessageListenerContainer">

<property name="connectionFactory" ref="connectinFactory"></property>

<property name="destination" ref="notifyTopic"></property>

<property name="messageListener" ref="topicMessageListener"></property>

<!-- 發布訂閱模式 -->

<property name="pubSubDomain" value="true" />

</bean>

<!-- 非同步接收消息處理類 -->

<bean id="queueMessageListener" class="com.common.jms.QueueMessageListener">

<property name="messageConverter" ref="messageConverter"></property>

</bean>

<bean id="topicMessageListener" class="com.common.jms.TopicMessageListener">

<property name="messageConverter" ref="messageConverter"></property>

</bean>

<bean id="messageConverter" class="com.common.jms.NotifyMessageConverter">

</bean>

下面展示一下Sender

public class Sender {

private static ServletContext servletContext;

private static WebApplicationContext ctx;

/**

* 發送點對點信息

* @param noticeInfo

*/

public static void setQueueSender(){

servletContext = ServletActionContext.getServletContext();

ctx = WebApplicationContextUtils.getWebApplicationContext(servletContext);

QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) ctx.getBean("queueMessageProducer"));

PhoneNoticeInfo noticeInfo = new PhoneNoticeInfo();

(下面先展示PhoneNoticeInfo 然後是 QueueMessageProducer )

noticeInfo.setNoticeContent("Hello Word");

noticeInfo.setNoticeTitle("hello Word");

noticeInfo.setReceiver("hello");

noticeInfo.setReceiverPhone("1111111");

notifyMessageProducer.sendQueue(noticeInfo);

}

public static ServletContext getServletContext() {

return servletContext;

}

public static void setServletContext(ServletContext servletContext) {

Sender.servletContext = servletContext;

}

public static WebApplicationContext getCtx() {

return ctx;

}

public static void setCtx(WebApplicationContext ctx) {

Sender.ctx = ctx;

}

}

PhoneNoticeInfo

public class PhoneNoticeInfo implements Serializable {

/** 消息標題 */

public String noticeTitle;

/** 消息內容 */

public String noticeContent;

/** 接收者 */

public String receiver;

/** 接收手機號 */

public String receiverPhone;

public String getNoticeTitle() {

return noticeTitle;

}

public void setNoticeTitle(String noticeTitle) {

this.noticeTitle = noticeTitle;

}

public String getNoticeContent() {

return noticeContent;

}

public void setNoticeContent(String noticeContent) {

this.noticeContent = noticeContent;

}

public String getReceiver() {

return receiver;

}

public void setReceiver(String receiver) {

this.receiver = receiver;

}

public String getReceiverPhone() {

return receiverPhone;

}

public void setReceiverPhone(String receiverPhone) {

this.receiverPhone = receiverPhone;

}

}

QueueMessageProducer

/**

* 消息生產者服務類

*/

public class QueueMessageProducer {

private JmsTemplate jmsTemplate;

private Destination notifyQueue;

private NotifyMessageConverter messageConverter;

public void sendQueue(PhoneNoticeInfo noticeInfo){

sendMessage(noticeInfo);

}

private void sendMessage(PhoneNoticeInfo noticeInfo) {

// TODO Auto-generated method stub

jmsTemplate.setMessageConverter(messageConverter);

jmsTemplate.setPubSubDomain(false);

jmsTemplate.convertAndSend(notifyQueue,noticeInfo);

}

public JmsTemplate getJmsTemplate() {

return jmsTemplate;

}

public void setJmsTemplate(JmsTemplate jmsTemplate) {

this.jmsTemplate = jmsTemplate;

}

public Destination getNotifyQueue() {

return notifyQueue;

}

public void setNotifyQueue(Destination notifyQueue) {

this.notifyQueue = notifyQueue;

}

public NotifyMessageConverter getMessageConverter() {

return messageConverter;

}

public void setMessageConverter(NotifyMessageConverter messageConverter) {

this.messageConverter = messageConverter;

}

}

NotifyMessageConverter

/**

* 消息轉換

*/

public class NotifyMessageConverter implements MessageConverter {

private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class);

@Override

/**

* 轉換接收到的消息為NoticeInfo對象

*/

public Object fromMessage(Message message) throws JMSException,

MessageConversionException {

// TODO Auto-generated method stub

if (logger.isDebugEnabled()) {

logger.debug("Receive JMS message :"+message);

}

if (message instanceof ObjectMessage) {

ObjectMessage oMsg = (ObjectMessage)message;

if (oMsg instanceof ActiveMQObjectMessage) {

ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage)oMsg;

try {

PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)aMsg.getObject();

return noticeInfo;

} catch (Exception e) {

// TODO: handle exception

logger.error("Message:${} is not a instance of NoticeInfo."+message.toString());

throw new JMSException("Message:"+message.toString()+"is not a instance of NoticeInfo."+message.toString());

}

}else{

logger.error("Message:${} is not a instance of ActiveMQObjectMessage."+message.toString());

throw new JMSException("Message:"+message.toString()+"is not a instance of ActiveMQObjectMessage."+message.toString());

}

}else {

logger.error("Message:${} is not a instance of ObjectMessage."+message.toString());

throw new JMSException("Message:"+message.toString()+"is not a instance of ObjectMessage."+message.toString());

}

}

@Override

/**

* 轉換NoticeInfo對象到消息

*/

public Message toMessage(Object obj, Session session) throws JMSException,

MessageConversionException {

// TODO Auto-generated method stub

if (logger.isDebugEnabled()) {

logger.debug("Convert Notify object to JMS message:${}"+obj.toString());

}

if (obj instanceof PhoneNoticeInfo) {

ActiveMQObjectMessage msg = (ActiveMQObjectMessage)session.createObjectMessage();

msg.setObject((PhoneNoticeInfo)obj);

return msg;

}else {

logger.debug("Convert Notify object to JMS message:${}"+obj.toString());

}

return null;

}

}

QueueMessageListener

public class QueueMessageListener implements MessageListener {

private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);

private NotifyMessageConverter messageConverter;

/**

* 接收消息

*/

@Override

public void onMessage(Message message) {

// TODO Auto-generated method stub

try {

ObjectMessage objectMessage = (ObjectMessage)message;

PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)messageConverter.fromMessage(objectMessage);

System.out.println("queue收到消息"+noticeInfo.getNoticeContent());

System.out.println("model:"+objectMessage.getJMSDeliveryMode());

System.out.println("destination:"+objectMessage.getJMSDestination());

System.out.println("type:"+objectMessage.getJMSType());

System.out.println("messageId:"+objectMessage.getJMSMessageID());

System.out.println("time:"+objectMessage.getJMSTimestamp());

System.out.println("expiredTime:"+objectMessage.getJMSExpiration());

System.out.println("priority:"+objectMessage.getJMSPriority());

} catch (Exception e) {

// TODO: handle exception

logger.error("處理信息時發生異常",e);

}

}

public NotifyMessageConverter getMessageConverter() {

return messageConverter;

}

public void setMessageConverter(NotifyMessageConverter messageConverter) {

this.messageConverter = messageConverter;

}

}
推薦閱讀:

螞蟻消息中間件 (MsgBroker) 在 YGC 優化上的探索

TAG:ActiveMQ | 消息隊列 |