对于Consumer的setMessageListener中的MessageListener
ActiveMQMessageConsumer::setMessageListener
public void setMessageListener(MessageListener listener) throws JMSException { checkClosed(); if (info.getPrefetchSize() == 0) { throw new JMSException( "Illegal prefetch size of zero. This setting is not supported for asynchronous consumers please set a value of at least 1"); } if (listener != null) { boolean wasRunning = session.isRunning(); if (wasRunning) { session.stop(); } this.messageListener.set(listener); session.redispatch(this, unconsumedMessages); if (wasRunning) { session.start(); } } else { this.messageListener.set(null); } }
MessageConsumer的receive方法
/** * Receives the next message produced for this message consumer. * <P> * This call blocks indefinitely until a message is produced or until this * message consumer is closed. * <P> * If this <CODE>receive</CODE> is done within a transaction, the consumer * retains the message until the transaction commits. * * @return the next message produced for this message consumer, or null if * this message consumer is concurrently closed */ public Message receive() throws JMSException { checkClosed(); checkMessageListener(); sendPullCommand(0); MessageDispatch md = dequeue(-1); if (md == null) { return null; } beforeMessageIsConsumed(md); afterMessageIsConsumed(md, false); return createActiveMQMessage(md); }
ActiveMQConnection
在session中addConsumer
原文:http://www.cnblogs.com/starfuxks/p/3555574.html