<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd "> <!-- 配置connectionFactory --> <bean id="mqFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>${mq_url}</value> </property> <property name="sendTimeout" value="1000"></property> <property name="closeTimeout" value="1000"></property> <property name="useAsyncSend" value="true"></property> </bean> <bean id="jmsFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <constructor-arg ref="mqFactory" /> <property name="exceptionListener" ref="messageExceptionListener" /><!-- 错误监听 --> <property name="reconnectOnException" value="true"/> <property name="sessionCacheSize" value="100" /><!-- 缓存的消息数 --> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="com.iflashbuy.base.queue.MyJmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> <!-- 默认队列 --> <property name="defaultDestinationName" value="DEFAULT_QUEUE" /><!-- 默认队列 --> <property name="deliveryMode" value="1"></property> <!-- 默认不持久化 --> <property name="deliveryPersistent" value="false"></property> <property name="explicitQosEnabled" value="true"></property> <property name="timeToLive" value="5000"></property> </bean> </beans>
package com.iflashbuy.base.queue; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Session; import org.springframework.jms.core.JmsTemplate; public class MyJmsTemplate extends JmsTemplate { public MyJmsTemplate() { super(); } public MyJmsTemplate(ConnectionFactory connectionFactory) { super(connectionFactory); } public void doSend(MessageProducer producer, Message message) throws JMSException { if (isExplicitQosEnabled()) { producer.send(message, getDeliveryMode(), getPriority(), getTimeToLive()); } else { producer.send(message); } } public Session getSession() { try { return getConnectionFactory().createConnection().createSession(false,Session.AUTO_ACKNOWLEDGE); } catch (JMSException e) { e.printStackTrace(); return null; } } }
Session session = jmsTemplate.getSession(); Destination msgDes = DestinationFactory.createDestination(QueueConstant.SAVE_QUEUE); MessageProducer producer = session.createProducer(msgDes); ObjectMessage message = session.createObjectMessage(orderQueue); //设置持久化队列 producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(message);
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" xmlns:context="http://www.springframework.org/schema/context" xmlns:util="http://www.springframework.org/schema/util" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd "> <!-- 配置connectionFactory --> <bean id="mqFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>${mq_url}</value> </property> <property name="sendTimeout" value="1000"></property> <property name="closeTimeout" value="1000"></property> <property name="useAsyncSend" value="true"></property> </bean> <bean id="operate" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="save_queue,cancel_queue"/> </bean> <!-- 消息监听容器 --> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="mqFactory" /> <property name="destination" ref="operate"/> <property name="messageListener" ref="messageReceiver" /> //在messsageReceiverbean中进行数据消费 </bean> </beans>
@Component("messageReceiver") public class MessageReceiver implements MessageListener { private static final Logger log = LoggerFactory.getLogger(MessageReceiver.class); @Autowired private DataSnyService dataSynService; public void onMessage(Message message) { log.info("获取queueJob开始"); try { if (message instanceof ObjectMessage) { Serializable m= ((ObjectMessage) message).getObject(); log.info("------------------------------------"); if (m instanceof OrderQueue) { OrderQueue job = (OrderQueue)m; } log.info("------------------------------------"); } log.info("获取queueJob结束"); } catch (Exception e) { e.printStackTrace(); } } }
原文:http://my.oschina.net/fengshuzi/blog/503597